Revert "Merge 'sstables: add versioning to the sstable_set ' from Wojciech Mitros"
This reverts commit31909515b3, reversing changes made toef97adc72a. It shows many serious regressions in dtest. Fixes #8197.
This commit is contained in:
@@ -987,8 +987,8 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
ss::table_sstables tst;
|
||||
tst.keyspace = schema->ks_name();
|
||||
tst.table = schema->cf_name();
|
||||
auto sstables = t->get_sstables_including_compacted_undeleted();
|
||||
for (auto sstable : *sstables) {
|
||||
|
||||
for (auto sstable : *t->get_sstables_including_compacted_undeleted()) {
|
||||
auto ts = db_clock::to_time_t(sstable->data_file_write_time());
|
||||
::tm t;
|
||||
::gmtime_r(&ts, &t);
|
||||
|
||||
@@ -392,7 +392,6 @@ scylla_tests = set([
|
||||
'test/boost/sstable_conforms_to_mutation_source_test',
|
||||
'test/boost/sstable_resharding_test',
|
||||
'test/boost/sstable_directory_test',
|
||||
'test/boost/sstable_set_test',
|
||||
'test/boost/sstable_test',
|
||||
'test/boost/sstable_move_test',
|
||||
'test/boost/storage_proxy_test',
|
||||
@@ -436,7 +435,6 @@ scylla_tests = set([
|
||||
'test/perf/perf_row_cache_reads',
|
||||
'test/perf/perf_simple_query',
|
||||
'test/perf/perf_sstable',
|
||||
'test/perf/perf_sstable_set',
|
||||
'test/unit/lsa_async_eviction_test',
|
||||
'test/unit/lsa_sync_eviction_test',
|
||||
'test/unit/row_cache_alloc_stress_test',
|
||||
|
||||
@@ -67,7 +67,7 @@ future<> view_update_generator::start() {
|
||||
// Exploit the fact that sstables in the staging directory
|
||||
// are usually non-overlapping and use a partitioned set for
|
||||
// the read.
|
||||
auto ssts = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
auto ssts = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, make_lw_shared<sstable_list>(sstable_list{}), false));
|
||||
for (auto& sst : sstables) {
|
||||
ssts->insert(sst);
|
||||
}
|
||||
|
||||
@@ -2607,7 +2607,8 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
size_t nr_sst_current = 0;
|
||||
while (!sstables.empty()) {
|
||||
auto ops_uuid = utils::make_random_uuid();
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s,
|
||||
make_lw_shared<sstable_list>(sstable_list{}), false));
|
||||
size_t batch_sst_nr = 16;
|
||||
std::vector<sstring> sst_names;
|
||||
std::vector<sstables::shared_sstable> sst_processed;
|
||||
|
||||
@@ -46,6 +46,7 @@ struct lw_shared_ptr_deleter<sstables::sstable> {
|
||||
namespace sstables {
|
||||
|
||||
using shared_sstable = seastar::lw_shared_ptr<sstable>;
|
||||
using sstable_list = std::unordered_set<shared_sstable>;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -63,526 +63,93 @@ std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) {
|
||||
return os;
|
||||
}
|
||||
|
||||
sstable_set_data::sstable_set_data(std::unique_ptr<sstable_set_impl> impl)
|
||||
: impl(std::move(impl)) {
|
||||
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s, lw_shared_ptr<sstable_list> all)
|
||||
: _impl(std::move(impl))
|
||||
, _schema(std::move(s))
|
||||
, _all(std::move(all)) {
|
||||
}
|
||||
|
||||
void sstable_set_data::insert(shared_sstable sst) {
|
||||
auto it = sstables_and_times_added.find(sst);
|
||||
if (it != sstables_and_times_added.end()) {
|
||||
// the sstable has already been added in some version
|
||||
it->second++;
|
||||
return;
|
||||
sstable_set::sstable_set(const sstable_set& x)
|
||||
: _impl(x._impl->clone())
|
||||
, _schema(x._schema)
|
||||
, _all(make_lw_shared<sstable_list>(*x._all))
|
||||
, _all_runs(x._all_runs) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(sstable_set&&) noexcept = default;
|
||||
|
||||
sstable_set&
|
||||
sstable_set::operator=(const sstable_set& x) {
|
||||
if (this != &x) {
|
||||
auto tmp = sstable_set(x);
|
||||
*this = std::move(tmp);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_set&
|
||||
sstable_set::operator=(sstable_set&&) noexcept = default;
|
||||
|
||||
std::vector<shared_sstable>
|
||||
sstable_set::select(const dht::partition_range& range) const {
|
||||
return _impl->select(range);
|
||||
}
|
||||
|
||||
std::vector<sstable_run>
|
||||
sstable_set::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
|
||||
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
|
||||
return _all_runs.at(run_id);
|
||||
}));
|
||||
}
|
||||
|
||||
void
|
||||
sstable_set::insert(shared_sstable sst) {
|
||||
_impl->insert(sst);
|
||||
try {
|
||||
impl->insert(sst);
|
||||
all_runs[sst->run_identifier()].insert(sst);
|
||||
sstables_and_times_added.emplace(sst, 1);
|
||||
} catch (...) {
|
||||
impl->erase(sst);
|
||||
auto runs_it = all_runs.find(sst->run_identifier());
|
||||
if (runs_it != all_runs.end()) {
|
||||
runs_it->second.erase(sst);
|
||||
if (runs_it->second.empty()) {
|
||||
all_runs.erase(runs_it);
|
||||
}
|
||||
_all->insert(sst);
|
||||
try {
|
||||
_all_runs[sst->run_identifier()].insert(sst);
|
||||
} catch (...) {
|
||||
_all->erase(sst);
|
||||
throw;
|
||||
}
|
||||
} catch (...) {
|
||||
_impl->erase(sst);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<shared_sstable> sstable_set_data::select(const dht::partition_range& range) const {
|
||||
return impl->select(range);
|
||||
}
|
||||
|
||||
std::unordered_set<shared_sstable> sstable_set_data::select_by_run_id(utils::UUID run_id) const {
|
||||
return all_runs.at(run_id);
|
||||
}
|
||||
|
||||
// Called when a version that was adding an sstable was removed or when the sstable was later erased in that version.
|
||||
void sstable_set_data::remove(shared_sstable sst) {
|
||||
if (--sstables_and_times_added.at(sst) == 0) {
|
||||
impl->erase(sst);
|
||||
all_runs[sst->run_identifier()].erase(sst);
|
||||
sstables_and_times_added.erase(sst);
|
||||
}
|
||||
}
|
||||
|
||||
sstable_set_version_reference::~sstable_set_version_reference() {
|
||||
if (_p) {
|
||||
_p->remove_reference();
|
||||
if (_p->can_merge_with_next()) {
|
||||
// merging will destroy the last reference to the version and the version will be deleted as a result
|
||||
_p->merge_with_next();
|
||||
} else if (_p->can_delete()) {
|
||||
delete _p;
|
||||
_p = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sstable_set_version_reference::sstable_set_version_reference(sstable_set_version* p) : _p(p) {
|
||||
if (_p) {
|
||||
_p->add_reference();
|
||||
}
|
||||
}
|
||||
|
||||
sstable_set_version_reference::sstable_set_version_reference(const sstable_set_version_reference& ref) : _p(ref._p) {
|
||||
if (_p) {
|
||||
_p->add_reference();
|
||||
}
|
||||
}
|
||||
|
||||
sstable_set_version_reference::sstable_set_version_reference(sstable_set_version_reference&& ref) noexcept : _p(ref._p) {
|
||||
ref._p = nullptr;
|
||||
}
|
||||
|
||||
sstable_set_version_reference& sstable_set_version_reference::operator=(const sstable_set_version_reference& ref) {
|
||||
*this = sstable_set_version_reference(ref);
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_set_version_reference& sstable_set_version_reference::operator=(sstable_set_version_reference&& ref) noexcept {
|
||||
if (this != &ref) {
|
||||
// Destroying this reference may invalide other references, so we're taking over the pointer managed by
|
||||
// the moved reference, and reassigning it after calling the destructor
|
||||
auto ptr = ref._p;
|
||||
ref._p = nullptr;
|
||||
this->~sstable_set_version_reference();
|
||||
_p = ptr;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
static sstable_set_version_reference make_sstable_set_version(std::unique_ptr<sstable_set_impl> impl, schema_ptr s) {
|
||||
sstable_set_version* new_version = new sstable_set_version(std::move(impl), std::move(s));
|
||||
return new_version->get_reference_to_this();
|
||||
}
|
||||
|
||||
sstable_list::sstable_list(std::unique_ptr<sstable_set_impl> impl, schema_ptr s)
|
||||
: _version(make_sstable_set_version(std::move(impl), std::move(s))) {
|
||||
}
|
||||
|
||||
sstable_list::sstable_list(const sstable_list& sstl)
|
||||
: _version(sstl._version->get_reference_to_new_copy()) {
|
||||
// copying an sstable_list creates a new sstable_set_version
|
||||
}
|
||||
|
||||
sstable_list::sstable_list(sstable_list&& sstl) noexcept = default;
|
||||
|
||||
sstable_list& sstable_list::operator=(const sstable_list& sstl) {
|
||||
if (this != &sstl) {
|
||||
*this = sstable_list(sstl);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_list& sstable_list::operator=(sstable_list&& sstl) noexcept {
|
||||
if (this != &sstl) {
|
||||
this->~sstable_list();
|
||||
_version = std::move(sstl._version);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
// Moves the iterator to the next sstable which is contained by the associated sstable_set, or to the end
|
||||
// If the iterator already references a satisfying sstable, no changes are made.
|
||||
void sstable_list::const_iterator::advance() {
|
||||
while (_it != (*_ver)->all().end() && !(*_ver)->contains(_it->first)) {
|
||||
_it++;
|
||||
}
|
||||
}
|
||||
|
||||
sstable_list::const_iterator::const_iterator(std::map<shared_sstable, unsigned>::const_iterator it, const sstable_set_version_reference* ver)
|
||||
: _it(std::move(it))
|
||||
, _ver(ver) {
|
||||
advance();
|
||||
}
|
||||
|
||||
sstable_list::const_iterator& sstable_list::const_iterator::operator++() {
|
||||
assert(_it != (*_ver)->all().end());
|
||||
_it++;
|
||||
advance();
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_list::const_iterator sstable_list::const_iterator::operator++(int) {
|
||||
const_iterator it = *this;
|
||||
operator++();
|
||||
return it;
|
||||
}
|
||||
|
||||
const shared_sstable& sstable_list::const_iterator::operator*() const {
|
||||
return _it->first;
|
||||
}
|
||||
|
||||
bool sstable_list::const_iterator::operator==(const const_iterator& it) const {
|
||||
assert(_ver == it._ver);
|
||||
return _it == it._it;
|
||||
}
|
||||
|
||||
sstable_list::const_iterator sstable_list::begin() const {
|
||||
return const_iterator(_version->all().begin(), &_version);
|
||||
}
|
||||
|
||||
sstable_list::const_iterator sstable_list::end() const {
|
||||
return const_iterator(_version->all().end(), &_version);
|
||||
}
|
||||
|
||||
size_t sstable_list::size() const {
|
||||
return _version->size();
|
||||
}
|
||||
|
||||
void sstable_list::insert(shared_sstable sst) {
|
||||
_version = _version->insert(sst);
|
||||
}
|
||||
|
||||
void sstable_list::erase(shared_sstable sst) {
|
||||
_version = _version->erase(sst);
|
||||
}
|
||||
|
||||
bool sstable_list::contains(shared_sstable sst) const {
|
||||
return _version->contains(sst);
|
||||
}
|
||||
|
||||
bool sstable_list::empty() const {
|
||||
return _version->size() == 0;
|
||||
}
|
||||
|
||||
const sstable_set_version& sstable_list::version() const {
|
||||
return *_version;
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s) {
|
||||
if (!impl->empty()) {
|
||||
throw std::logic_error("Can't create an sstable_set using a non-empty sstable_set_impl");
|
||||
}
|
||||
_all = make_lw_shared<sstable_list>(std::move(impl), std::move(s));
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(const sstable_set& x)
|
||||
: _all(make_lw_shared<sstable_list>(*x._all)) {
|
||||
}
|
||||
|
||||
sstable_set::sstable_set(sstable_set&& x) noexcept = default;
|
||||
|
||||
sstable_set& sstable_set::operator=(const sstable_set& ssts) {
|
||||
*this = sstable_set(ssts);
|
||||
return *this;
|
||||
}
|
||||
|
||||
sstable_set& sstable_set::operator=(sstable_set&& ssts) noexcept = default;
|
||||
|
||||
|
||||
std::vector<shared_sstable> sstable_set::select(const dht::partition_range& range) const {
|
||||
return _all->version().select(range);
|
||||
}
|
||||
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> sstable_set::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
return _all->version().select_sstable_runs(sstables);
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstable_list> sstable_set::all() const {
|
||||
return _all;
|
||||
}
|
||||
|
||||
void sstable_set::insert(shared_sstable sst) {
|
||||
_all->insert(sst);
|
||||
}
|
||||
|
||||
void sstable_set::erase(shared_sstable sst) {
|
||||
void
|
||||
sstable_set::erase(shared_sstable sst) {
|
||||
_impl->erase(sst);
|
||||
_all->erase(sst);
|
||||
_all_runs[sst->run_identifier()].erase(sst);
|
||||
}
|
||||
|
||||
sstable_set::~sstable_set() = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s)
|
||||
: _impl(std::move(impl))
|
||||
, _cmp(s) {
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector::~incremental_selector() = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
|
||||
|
||||
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s, lw_shared_ptr<sstable_list> sstl)
|
||||
: _impl(std::move(impl))
|
||||
, _cmp(s)
|
||||
, _sstl(std::move(sstl)) {
|
||||
}
|
||||
|
||||
sstable_set::incremental_selector::selection sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const {
|
||||
sstable_set::incremental_selector::selection
|
||||
sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const {
|
||||
if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) {
|
||||
std::vector<shared_sstable> current_versioned_sstables;
|
||||
std::tie(_current_range, current_versioned_sstables, _current_next_position) = _impl->select(pos);
|
||||
_current_sstables = boost::copy_range<std::vector<shared_sstable>>(current_versioned_sstables
|
||||
| boost::adaptors::filtered([this] (shared_sstable sst) { return _sstl->contains(sst); })
|
||||
| boost::adaptors::transformed([] (shared_sstable sst) { return sst; }));
|
||||
std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos);
|
||||
_current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); });
|
||||
}
|
||||
return {_current_sstables, _current_next_position};
|
||||
}
|
||||
|
||||
sstables::sstable_set::incremental_selector sstable_set::make_incremental_selector() const {
|
||||
return incremental_selector(_all->version().make_incremental_selector(), *_all->version().get_schema(), _all);
|
||||
}
|
||||
|
||||
sstable_set_version::~sstable_set_version() {
|
||||
for (auto& added_sst : _added) {
|
||||
_base_set->remove(added_sst);
|
||||
}
|
||||
if (_prev) {
|
||||
_prev->_next.erase(this);
|
||||
}
|
||||
}
|
||||
|
||||
// the sstable_set_impl must be empty
|
||||
sstable_set_version::sstable_set_version(std::unique_ptr<sstable_set_impl> impl, schema_ptr schema)
|
||||
: _base_set(make_lw_shared<sstable_set_data>(std::move(impl)))
|
||||
, _schema(std::move(schema)) {
|
||||
}
|
||||
|
||||
// Creates a new version based on ver
|
||||
sstable_set_version::sstable_set_version(sstable_set_version* ver)
|
||||
: _base_set(ver->_base_set)
|
||||
, _schema(ver->_schema)
|
||||
, _prev(ver) {
|
||||
_prev->_next.insert(this);
|
||||
}
|
||||
|
||||
// Merges changes made in this version into the next version (can be called only when there is a single next version,
|
||||
// and no further changes can be made to this one, i.e. no sstable_list references this version).
|
||||
void sstable_set_version::merge_with_next() noexcept {
|
||||
auto next_version = *_next.begin();
|
||||
next_version->_added = std::move(_added);
|
||||
next_version->_erased = std::move(_erased);
|
||||
auto next_nh = _next.extract(*_next.begin());
|
||||
if (_prev) {
|
||||
_prev->_next.erase(this);
|
||||
_prev->_next.insert(std::move(next_nh));
|
||||
}
|
||||
next_version->_prev = std::move(_prev); // destroys this by overwriting the last reference
|
||||
}
|
||||
|
||||
void sstable_set_version::propagate_inserted_sstable(const shared_sstable& sst) noexcept {
|
||||
if (_reference_count > _next.size()) {
|
||||
// If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it
|
||||
return;
|
||||
}
|
||||
for (auto& ver_chck : _next) {
|
||||
if (!ver_chck->_added.contains(sst)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Remove the sstable from child versions and get a node handle to insert in this version
|
||||
auto sst_nh = (*_next.begin())->_added.extract(sst);
|
||||
for (auto& ver_chck : _next) {
|
||||
auto nh = ver_chck->_added.extract(sst_nh.value());
|
||||
if (!nh.empty()) {
|
||||
_base_set->remove(nh.value());
|
||||
}
|
||||
}
|
||||
auto it = _erased.find(sst_nh.value());
|
||||
if (it != _erased.end()) {
|
||||
// If the sstable was erased in this version and added in all its children, its as if it weren't added or inserted in any of them
|
||||
// because we won't read from this version anymore.
|
||||
_erased.erase(it);
|
||||
_base_set->remove(sst_nh.value());
|
||||
} else {
|
||||
auto added_it = _added.insert(std::move(sst_nh)).position;
|
||||
if (_prev) {
|
||||
_prev->propagate_inserted_sstable(*added_it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void sstable_set_version::propagate_erased_sstable(const shared_sstable& sst) noexcept {
|
||||
if (_reference_count > _next.size()) {
|
||||
// If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it
|
||||
return;
|
||||
}
|
||||
for (auto& ver_chck : _next) {
|
||||
if (!ver_chck->_erased.contains(sst)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Remove the sstable from child versions and get a node handle to insert in this version
|
||||
auto sst_nh = (*_next.begin())->_erased.extract(sst);
|
||||
for (auto& ver_chck : _next) {
|
||||
ver_chck->_erased.extract(sst_nh.value());
|
||||
}
|
||||
auto it = _added.find(sst_nh.value());
|
||||
if (it != _added.end()) {
|
||||
// If the sstable was added in this version and erased in all its children, its as if it weren't added or inserted in any of them
|
||||
// because we won't read from this version anymore.
|
||||
_added.erase(it);
|
||||
_base_set->remove(sst_nh.value());
|
||||
} else {
|
||||
auto erased_it = _erased.insert(std::move(sst_nh)).position;
|
||||
if (_prev) {
|
||||
_prev->propagate_erased_sstable(*erased_it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Called when a reference to the version gets removed - if the reference was from an sstable_list, it's the first time we can propagate any
|
||||
// changes, and if the reference was from another sstable_set_version, we want to check if there were any changes that were present in all
|
||||
// versions based on this one, but absent in the version that was just removed.
|
||||
void sstable_set_version::propagate_changes_from_next_versions() noexcept {
|
||||
if (_reference_count > _next.size() || _next.empty()) {
|
||||
// If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it
|
||||
// Or there are no child versions so there is nothing to propagate
|
||||
return;
|
||||
}
|
||||
sstable_set_version* next_ver = *_next.begin();
|
||||
// Propagate additions
|
||||
for (auto ver : _next) {
|
||||
if (ver->_added.size() < next_ver->_added.size()) {
|
||||
next_ver = ver;
|
||||
}
|
||||
}
|
||||
for (auto it = next_ver->_added.begin(); it != next_ver->_added.end();) {
|
||||
auto& sst = *it;
|
||||
it++;
|
||||
propagate_inserted_sstable(sst);
|
||||
}
|
||||
|
||||
next_ver = *_next.begin();
|
||||
// Propagate erasures
|
||||
for (auto ver : _next) {
|
||||
if (ver->_erased.size() < next_ver->_erased.size()) {
|
||||
next_ver = ver;
|
||||
}
|
||||
}
|
||||
for (auto it = next_ver->_erased.begin(); it != next_ver->_erased.end();) {
|
||||
auto& sst = *it;
|
||||
it++;
|
||||
propagate_erased_sstable(sst);
|
||||
}
|
||||
}
|
||||
|
||||
const sstable_set_version* sstable_set_version::get_previous_version() const {
|
||||
return _prev.get();
|
||||
}
|
||||
|
||||
bool sstable_set_version::can_merge_with_next() const noexcept {
|
||||
return _reference_count == 1 && _next.size() == 1;
|
||||
}
|
||||
|
||||
bool sstable_set_version::can_delete() const noexcept {
|
||||
return _reference_count == 0;
|
||||
}
|
||||
|
||||
void sstable_set_version::add_reference() noexcept {
|
||||
_reference_count++;
|
||||
}
|
||||
|
||||
void sstable_set_version::remove_reference() noexcept {
|
||||
_reference_count--;
|
||||
propagate_changes_from_next_versions();
|
||||
}
|
||||
|
||||
schema_ptr sstable_set_version::get_schema() const {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
std::vector<shared_sstable> sstable_set_version::select(const dht::partition_range& range) const {
|
||||
return boost::copy_range<std::vector<shared_sstable>>(_base_set->select(range)
|
||||
| boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); }));
|
||||
}
|
||||
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> sstable_set_version::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
|
||||
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
|
||||
return sstable_run(boost::copy_range<std::unordered_set<shared_sstable>>(_base_set->select_by_run_id(run_id)
|
||||
| boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); })));
|
||||
}));
|
||||
}
|
||||
|
||||
const std::map<shared_sstable, unsigned>& sstable_set_version::all() const {
|
||||
return _base_set->sstables_and_times_added;
|
||||
}
|
||||
|
||||
// Provides strong exception guarantee
|
||||
sstable_set_version_reference sstable_set_version::insert(shared_sstable sst) {
|
||||
if (this->contains(sst)) {
|
||||
return get_reference_to_this();
|
||||
}
|
||||
if (_next.size()) {
|
||||
auto sstvr = get_reference_to_new_copy();
|
||||
// The new version has no copies based on it, so inserting into it doesn't create another version
|
||||
return sstvr->insert(sst);
|
||||
}
|
||||
auto it = _erased.find(sst);
|
||||
if (it != _erased.end()) {
|
||||
_erased.erase(it);
|
||||
} else {
|
||||
_base_set->insert(sst);
|
||||
try {
|
||||
_added.insert(sst);
|
||||
if (_prev) {
|
||||
_prev->propagate_inserted_sstable(sst);
|
||||
}
|
||||
} catch (...) {
|
||||
_base_set->remove(sst);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
return get_reference_to_this();
|
||||
}
|
||||
|
||||
// Provides strong exception guarantee
|
||||
sstable_set_version_reference sstable_set_version::erase(shared_sstable sst) {
|
||||
if (!this->contains(sst)) {
|
||||
return get_reference_to_this();
|
||||
}
|
||||
if (_next.size()) {
|
||||
auto sstvr = get_reference_to_new_copy();
|
||||
// The new version has no copies based on it, so erasing from it doesn't create another version
|
||||
return sstvr->erase(sst);
|
||||
}
|
||||
auto it = _added.find(sst);
|
||||
if (it != _added.end()) {
|
||||
_added.erase(it);
|
||||
_base_set->remove(sst);
|
||||
} else {
|
||||
_erased.insert(sst);
|
||||
if (_prev) {
|
||||
_prev->propagate_erased_sstable(sst);
|
||||
}
|
||||
}
|
||||
return get_reference_to_this();
|
||||
}
|
||||
|
||||
bool sstable_set_version::contains(shared_sstable sst) const {
|
||||
return _added.contains(sst) || (!_erased.contains(sst) && _prev && _prev->contains(sst));
|
||||
}
|
||||
|
||||
size_t sstable_set_version::size() const {
|
||||
return _added.size() - _erased.size() + (_prev ? _prev->size() : 0);
|
||||
}
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> sstable_set_version::make_incremental_selector() const {
|
||||
return _base_set->impl->make_incremental_selector();
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
sstable_set_version::create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
return _base_set->impl->create_single_key_sstable_reader(cf, std::move(schema),
|
||||
std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
sstable_set_version_reference sstable_set_version::get_reference_to_this() {
|
||||
return sstable_set_version_reference(this);
|
||||
}
|
||||
|
||||
sstable_set_version_reference sstable_set_version::get_reference_to_new_copy() {
|
||||
return sstable_set_version_reference(new sstable_set_version(this));
|
||||
sstable_set::incremental_selector
|
||||
sstable_set::make_incremental_selector() const {
|
||||
return incremental_selector(_impl->make_incremental_selector(), *_schema);
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> bag_sstable_set::clone() const {
|
||||
@@ -604,10 +171,6 @@ void bag_sstable_set::erase(shared_sstable sst) {
|
||||
}
|
||||
}
|
||||
|
||||
bool bag_sstable_set::empty() const {
|
||||
return _sstables.empty();
|
||||
}
|
||||
|
||||
class bag_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
const std::vector<shared_sstable>& _sstables;
|
||||
public:
|
||||
@@ -736,10 +299,6 @@ void partitioned_sstable_set::erase(shared_sstable sst) {
|
||||
}
|
||||
}
|
||||
|
||||
bool partitioned_sstable_set::empty() const {
|
||||
return _unleveled_sstables.empty() && _leveled_sstables.empty();
|
||||
}
|
||||
|
||||
class partitioned_sstable_set::incremental_selector : public incremental_selector_impl {
|
||||
schema_ptr _schema;
|
||||
const std::vector<shared_sstable>& _unleveled_sstables;
|
||||
@@ -837,10 +396,6 @@ void time_series_sstable_set::erase(shared_sstable sst) {
|
||||
}
|
||||
}
|
||||
|
||||
bool time_series_sstable_set::empty() const {
|
||||
return _sstables->empty();
|
||||
}
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> time_series_sstable_set::make_incremental_selector() const {
|
||||
struct selector : public incremental_selector_impl {
|
||||
const time_series_sstable_set& _set;
|
||||
@@ -980,15 +535,16 @@ std::unique_ptr<sstable_set_impl> time_window_compaction_strategy::make_sstable_
|
||||
return std::make_unique<time_series_sstable_set>(std::move(schema));
|
||||
}
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) {
|
||||
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema);
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata) {
|
||||
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema, std::move(all));
|
||||
}
|
||||
|
||||
sstable_set
|
||||
compaction_strategy::make_sstable_set(schema_ptr schema) const {
|
||||
return sstable_set(
|
||||
_compaction_strategy_impl->make_sstable_set(schema),
|
||||
schema);
|
||||
schema,
|
||||
make_lw_shared<sstable_list>());
|
||||
}
|
||||
|
||||
using sstable_reader_factory_type = std::function<flat_mutation_reader(shared_sstable&, const dht::partition_range& pr)>;
|
||||
@@ -1274,7 +830,7 @@ sstable_set::create_single_key_sstable_reader(
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
assert(pr.is_singular() && pr.start()->value().has_key());
|
||||
return _all->version().create_single_key_sstable_reader(cf, std::move(schema),
|
||||
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
|
||||
std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "sstables/progress_monitor.hh"
|
||||
#include "shared_sstable.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <vector>
|
||||
@@ -35,158 +34,31 @@ class estimated_histogram;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
// This is an implementation of a set of sstables. The sstable_set allows:
|
||||
// - selecting sstables from a given partition_range
|
||||
// - selecting sstables with the same run identifiers
|
||||
// - creating incremental_selectors, used to incrementally select sstables from the set using ring-position
|
||||
// - many utilities of the std::set (inserting, erasing, checking membership, checking emptiness, iterating)
|
||||
// - creating copies
|
||||
|
||||
// The main use of the sstable_set is in the table class. The sstable_set that is stored there needs to be copied
|
||||
// every time it is modified, to allow existing sstable readers to continue using the old version. For this reason
|
||||
// the sstable_set is implemented in a way that allows fast copying.
|
||||
|
||||
// This is achieved by associating each copy of an sstable_set with an sstable_set_version which contains all changes
|
||||
// made to that copy. Each sstable_set_version has a reference to the sstable_set_version associated with the sstable_set
|
||||
// that was copied. We say that the copied version is a parent, and that the copy version is a child, or that it is "based"
|
||||
// on the copied version.
|
||||
// This allows easy checking if an sstable is an element of an sstable_set copy - the answer is yes if the sstable was added
|
||||
// in this copy or if it wasn't erased in it but it was an element of the parent version.
|
||||
// It's worth adding that this solution makes a copied sstable_set immutable. To support modifying a copied sstable_set anyway,
|
||||
// any modification applied to it replaces its associated sstable_set_version with a new one, based on the immutable version.
|
||||
// The new version hasn't been copied, co it can be modified.
|
||||
|
||||
// With the ability to check whether an sstable is an element of an sstable_set, all the methods that select sstables
|
||||
// from the set can follow the same rule: get results that include all sstables from any copy, and filter out those sstables
|
||||
// that aren't elements of current copy. These results are received from the class called sstable_set_data structure.
|
||||
|
||||
// This solution requires a special way of finding out when an sstable should be removed from sstable_set_data. We achieve this
|
||||
// by counting, for each sstable, in how many different versions has it been added. If that number is zero - the sstable should
|
||||
// be completely removed. This number is decreased when deleting versions, when erasing in the same version it was added, and
|
||||
// in one other case, as a result of propagating a change from a versions children to their parent, explained further in following
|
||||
// paragraphs.
|
||||
|
||||
// With this approach, the length of the longest chain of sstable_set_versions that are based on one another should be as
|
||||
// short as possible. To achieve that, we are merging pairs of versions. If a version is not referenced by any sstable_sets or
|
||||
// sstable_lists, we know that it won't be modified or read from, so we can merge its changes into versions that are based on it.
|
||||
// We don't want to copy these changes though, so we wait with merging until there is only one child version.
|
||||
|
||||
// This waiting causes one more problem: an sstable may have been added in a version that has no reference from an sstable_set or
|
||||
// an sstable_list, and it may have been erased in all its child versions. In such scenario, the sstable can't be selected from
|
||||
// any version, so it should be removed from the set completely. To handle such situations, if a change has been made to an
|
||||
// sstable in all children of some version that has no referenece from an sstable_set or sstable_list, the change is removed
|
||||
// in all children and added to the parent instead.
|
||||
// This solution guarantees that when a version is ready for merging, the version it is being merged into contains no changes,
|
||||
// because if it had any, they would be propagated to this version instead. As a result, merging versions is simply reassigning
|
||||
// sets of changes. The number of times an sstable change gets propagated into a parent version is limited by the number of ancestors
|
||||
// of a version.
|
||||
|
||||
class incremental_selector_impl;
|
||||
class sstable_set_impl;
|
||||
class incremental_selector_impl;
|
||||
|
||||
// Structure holds all sstables (a.k.a. fragments) that belong to same run identifier, which is an UUID.
|
||||
// SStables in that same run will not overlap with one another.
|
||||
class sstable_run {
|
||||
std::unordered_set<shared_sstable> _all;
|
||||
sstable_list _all;
|
||||
public:
|
||||
sstable_run() = default;
|
||||
sstable_run(std::unordered_set<shared_sstable> all) : _all(std::move(all)) {}
|
||||
void insert(shared_sstable sst);
|
||||
void erase(shared_sstable sst);
|
||||
// Data size of the whole run, meaning it's a sum of the data size of all its fragments.
|
||||
uint64_t data_size() const;
|
||||
const std::unordered_set<shared_sstable>& all() const { return _all; }
|
||||
const sstable_list& all() const { return _all; }
|
||||
};
|
||||
|
||||
// The very base class of an sstable_set. Stores all sstables that were added in
|
||||
// any version of the set. Its methods return supersets of values returned by
|
||||
// any single version.
|
||||
struct sstable_set_data {
|
||||
std::unique_ptr<sstable_set_impl> impl;
|
||||
std::unordered_map<utils::UUID, std::unordered_set<shared_sstable>> all_runs;
|
||||
// For each sstable, stores in how many different versions has it been inserted
|
||||
std::map<shared_sstable, unsigned> sstables_and_times_added;
|
||||
|
||||
sstable_set_data(std::unique_ptr<sstable_set_impl> impl);
|
||||
void insert(shared_sstable sst);
|
||||
void erase(shared_sstable sst);
|
||||
std::vector<shared_sstable> select(const dht::partition_range& range) const;
|
||||
std::unordered_set<shared_sstable> select_by_run_id(utils::UUID run_id) const;
|
||||
void remove(shared_sstable sst);
|
||||
};
|
||||
|
||||
class sstable_set_version;
|
||||
|
||||
// Manages a pointer to an sstable_set_version - the when sstable_set_version gets removed when all
|
||||
// sstable_set_version_references are removed, or when there is only one sstable_set_version_reference
|
||||
// and that reference is owned by another sstable_set_version.
|
||||
// In the second case the data from the managed version is merged into the other version before removal.
|
||||
class sstable_set_version_reference {
|
||||
mutable sstable_set_version* _p = nullptr;
|
||||
|
||||
explicit sstable_set_version_reference(sstable_set_version* p);
|
||||
public:
|
||||
~sstable_set_version_reference();
|
||||
sstable_set_version_reference() = default;
|
||||
sstable_set_version_reference(const sstable_set_version_reference& ref);
|
||||
sstable_set_version_reference(sstable_set_version_reference&&) noexcept;
|
||||
sstable_set_version_reference& operator=(const sstable_set_version_reference& x);
|
||||
sstable_set_version_reference& operator=(sstable_set_version_reference&&) noexcept;
|
||||
sstable_set_version& operator*() const noexcept { return *_p; }
|
||||
sstable_set_version* operator->() const noexcept { return _p; }
|
||||
sstable_set_version* get() const noexcept { return _p; }
|
||||
explicit operator bool() const noexcept { return bool(_p); }
|
||||
friend class sstable_set_version;
|
||||
};
|
||||
|
||||
// The data storage for an sstable_set. Can be used like a std::set<shared_sstable> (although with slightly
|
||||
// costlier operations).
|
||||
class sstable_list {
|
||||
sstable_set_version_reference _version;
|
||||
public:
|
||||
sstable_list(std::unique_ptr<sstable_set_impl> impl, schema_ptr s);
|
||||
sstable_list(const sstable_list& sstl);
|
||||
sstable_list(sstable_list&& sstl) noexcept;
|
||||
sstable_list& operator=(const sstable_list& sstl);
|
||||
sstable_list& operator=(sstable_list&& sstl) noexcept;
|
||||
public:
|
||||
class const_iterator {
|
||||
public:
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using value_type = const shared_sstable;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using pointer = const shared_sstable*;
|
||||
using reference = const shared_sstable&;
|
||||
private:
|
||||
std::map<shared_sstable, unsigned>::const_iterator _it;
|
||||
const sstable_set_version_reference* _ver;
|
||||
void advance();
|
||||
public:
|
||||
const_iterator(std::map<shared_sstable, unsigned>::const_iterator it, const sstable_set_version_reference* ver);
|
||||
const_iterator& operator++();
|
||||
const_iterator operator++(int);
|
||||
const shared_sstable& operator*() const;
|
||||
bool operator==(const const_iterator& it) const;
|
||||
};
|
||||
using iterator = const_iterator;
|
||||
const_iterator begin() const;
|
||||
const_iterator end() const;
|
||||
|
||||
size_t size() const;
|
||||
void insert(shared_sstable sst);
|
||||
void erase(shared_sstable sst);
|
||||
bool contains(shared_sstable sst) const;
|
||||
bool empty() const;
|
||||
const sstable_set_version& version() const;
|
||||
};
|
||||
|
||||
// A set of sstables associated with a table.
|
||||
class sstable_set : public enable_lw_shared_from_this<sstable_set> {
|
||||
std::unique_ptr<sstable_set_impl> _impl;
|
||||
schema_ptr _schema;
|
||||
// used to support column_family::get_sstable(), which wants to return an sstable_list
|
||||
// that has a reference somewhere
|
||||
lw_shared_ptr<sstable_list> _all;
|
||||
std::unordered_map<utils::UUID, sstable_run> _all_runs;
|
||||
public:
|
||||
sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s);
|
||||
~sstable_set();
|
||||
sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s, lw_shared_ptr<sstable_list> all);
|
||||
sstable_set(const sstable_set&);
|
||||
sstable_set(sstable_set&&) noexcept;
|
||||
sstable_set& operator=(const sstable_set&);
|
||||
@@ -194,7 +66,7 @@ public:
|
||||
std::vector<shared_sstable> select(const dht::partition_range& range) const;
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const;
|
||||
lw_shared_ptr<const sstable_list> all() const;
|
||||
lw_shared_ptr<sstable_list> all() const { return _all; }
|
||||
void insert(shared_sstable sst);
|
||||
void erase(shared_sstable sst);
|
||||
|
||||
@@ -207,10 +79,9 @@ public:
|
||||
mutable std::optional<nonwrapping_range<dht::ring_position_view>> _current_range_view;
|
||||
mutable std::vector<shared_sstable> _current_sstables;
|
||||
mutable dht::ring_position_view _current_next_position = dht::ring_position_view::min();
|
||||
lw_shared_ptr<sstable_list> _sstl;
|
||||
public:
|
||||
~incremental_selector();
|
||||
incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s, lw_shared_ptr<sstable_list> sstl);
|
||||
incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s);
|
||||
incremental_selector(incremental_selector&&) noexcept;
|
||||
|
||||
struct selection {
|
||||
@@ -302,59 +173,7 @@ flat_mutation_reader make_restricted_range_sstable_reader(
|
||||
mutation_reader::forwarding,
|
||||
read_monitor_generator& rmg = default_read_monitor_generator());
|
||||
|
||||
class sstable_set_version {
|
||||
// shared by all sstable_set_versions that were based on the same original set
|
||||
lw_shared_ptr<sstable_set_data> _base_set;
|
||||
schema_ptr _schema;
|
||||
sstable_set_version_reference _prev;
|
||||
mutable std::unordered_set<sstable_set_version*> _next;
|
||||
std::unordered_set<shared_sstable> _added;
|
||||
std::unordered_set<shared_sstable> _erased;
|
||||
// is equal to the number of sstable_set_versions based on this version increased by one if there is
|
||||
// an sstable_list that references this version
|
||||
unsigned _reference_count = 0;
|
||||
public:
|
||||
~sstable_set_version();
|
||||
sstable_set_version(std::unique_ptr<sstable_set_impl> impl, schema_ptr schema);
|
||||
explicit sstable_set_version(sstable_set_version* ver);
|
||||
private:
|
||||
void propagate_inserted_sstable(const shared_sstable& sst) noexcept;
|
||||
void propagate_erased_sstable(const shared_sstable& sst) noexcept;
|
||||
void propagate_changes_from_next_versions() noexcept;
|
||||
public:
|
||||
const sstable_set_version* get_previous_version() const;
|
||||
bool can_merge_with_next() const noexcept;
|
||||
void merge_with_next() noexcept;
|
||||
bool can_delete() const noexcept;
|
||||
void add_reference() noexcept;
|
||||
void remove_reference() noexcept;
|
||||
schema_ptr get_schema() const;
|
||||
std::vector<shared_sstable> select(const dht::partition_range& range) const;
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const;
|
||||
const std::map<shared_sstable, unsigned>& all() const;
|
||||
sstable_set_version_reference insert(shared_sstable sst);
|
||||
sstable_set_version_reference erase(shared_sstable sst);
|
||||
bool contains(shared_sstable sst) const;
|
||||
size_t size() const;
|
||||
std::unique_ptr<incremental_selector_impl> make_incremental_selector() const;
|
||||
flat_mutation_reader create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const;
|
||||
public:
|
||||
sstable_set_version_reference get_reference_to_this();
|
||||
sstable_set_version_reference get_reference_to_new_copy();
|
||||
};
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true);
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata = true);
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run);
|
||||
|
||||
|
||||
@@ -41,7 +41,6 @@ public:
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const = 0;
|
||||
virtual void insert(shared_sstable sst) = 0;
|
||||
virtual void erase(shared_sstable sst) = 0;
|
||||
virtual bool empty() const = 0;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const = 0;
|
||||
|
||||
virtual flat_mutation_reader create_single_key_sstable_reader(
|
||||
@@ -66,7 +65,6 @@ public:
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual bool empty() const override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
};
|
||||
@@ -105,7 +103,6 @@ public:
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range) const override;
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual bool empty() const override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
class incremental_selector;
|
||||
};
|
||||
@@ -127,7 +124,6 @@ public:
|
||||
virtual std::vector<shared_sstable> select(const dht::partition_range& range = query::full_partition_range) const override;
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual bool empty() const override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
|
||||
std::unique_ptr<position_reader_queue> make_min_position_reader_queue(
|
||||
|
||||
20
table.cc
20
table.cc
@@ -718,17 +718,21 @@ void table::rebuild_statistics() {
|
||||
future<lw_shared_ptr<sstables::sstable_set>>
|
||||
table::build_new_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& old_sstables) {
|
||||
auto current_sstables = _sstables;
|
||||
auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema);
|
||||
|
||||
auto new_sstable_set = *_sstables;
|
||||
for (auto& tab : new_sstables) {
|
||||
new_sstable_set.insert(tab);
|
||||
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
|
||||
|
||||
// this might seem dangerous, but "move" here just avoids constness,
|
||||
// making the two ranges compatible when compiling with boost 1.55.
|
||||
// Noone is actually moving anything...
|
||||
for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables->all()))) {
|
||||
if (!s.contains(tab)) {
|
||||
new_sstable_list.insert(tab);
|
||||
}
|
||||
co_await make_ready_future<>(); // yield if needed.
|
||||
}
|
||||
for (auto& tab : old_sstables) {
|
||||
new_sstable_set.erase(tab);
|
||||
co_await make_ready_future<>(); // yield if needed.
|
||||
}
|
||||
co_return make_lw_shared<sstables::sstable_set>(std::move(new_sstable_set));
|
||||
co_return make_lw_shared<sstables::sstable_set>(std::move(new_sstable_list));
|
||||
}
|
||||
|
||||
// Note: must run in a seastar thread
|
||||
|
||||
@@ -1,638 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <random>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/align.hh>
|
||||
#include <seastar/core/aligned_buffer.hh>
|
||||
#include "sstables/compaction.hh"
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "test/boost/sstable_test.hh"
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include "sstables/compaction_strategy_impl.hh"
|
||||
#include "sstables/date_tiered_compaction_strategy.hh"
|
||||
#include "sstables/time_window_compaction_strategy.hh"
|
||||
#include "sstables/leveled_compaction_strategy.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "sstables/sstable_set_impl.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
static const sstring some_keyspace("ks");
|
||||
static const sstring some_column_family("cf");
|
||||
|
||||
static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) {
|
||||
auto sst = env.make_sstable(schema, "", gen, la, big);
|
||||
sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key));
|
||||
return sst;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(simple_versioned_sstable_set_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto key_and_token_pair = token_generation_for_current_shard(8);
|
||||
auto decorated_keys = boost::copy_range<std::vector<dht::decorated_key>>(
|
||||
key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair<sstring, dht::token>& key_and_token) {
|
||||
auto value = bytes(reinterpret_cast<const signed char*>(key_and_token.first.data()), key_and_token.first.size());
|
||||
auto pk = sstables::key::from_bytes(value).to_partition_key(*s);
|
||||
return dht::decorate_key(*s, std::move(pk));
|
||||
}));
|
||||
struct snapshot_and_selections {
|
||||
std::optional<sstable_set> ssts;
|
||||
std::vector<std::unordered_set<shared_sstable>> selections;
|
||||
std::unordered_map<utils::UUID, std::unordered_set<shared_sstable>> runs;
|
||||
std::unordered_set<shared_sstable> all;
|
||||
snapshot_and_selections(sstable_set&& ssts, const std::vector<std::unordered_set<shared_sstable>>& selections,
|
||||
const std::unordered_map<utils::UUID, std::unordered_set<shared_sstable>>& runs, const std::unordered_set<shared_sstable>& all)
|
||||
: ssts(std::move(ssts)), selections(selections), runs(runs), all(all) { }
|
||||
};
|
||||
auto check = [&decorated_keys] (snapshot_and_selections& version) {
|
||||
for (int j = 0; j < 8; j++) {
|
||||
auto sel = version.ssts->select(dht::partition_range::make_singular(decorated_keys[j]));
|
||||
BOOST_REQUIRE_EQUAL(sel.size(), version.selections[j].size());
|
||||
for (auto& sst : sel) {
|
||||
BOOST_REQUIRE(version.selections[j].contains(sst));
|
||||
}
|
||||
}
|
||||
for (auto& [uuid, run] : version.runs) {
|
||||
if (run.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<sstable_run> runs = version.ssts->select_sstable_runs({*run.begin()});
|
||||
// only one sstable -> only one run
|
||||
BOOST_REQUIRE_EQUAL(runs[0].all().size(), run.size());
|
||||
for (auto& sst : runs[0].all()) {
|
||||
BOOST_REQUIRE(run.contains(sst));
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(version.ssts->all()->size(), version.all.size());
|
||||
for (auto& sst : *version.ssts->all()) {
|
||||
BOOST_REQUIRE(version.all.contains(sst));
|
||||
}
|
||||
};
|
||||
// check that selecting from older snapshots of an sstable_set gives correct results.
|
||||
{
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
std::vector<std::unordered_set<shared_sstable>> current_selections(8);
|
||||
std::unordered_map<utils::UUID, std::unordered_set<shared_sstable>> current_runs;
|
||||
std::unordered_set<shared_sstable> current_all;
|
||||
|
||||
std::vector<snapshot_and_selections> versions;
|
||||
versions.reserve(20);
|
||||
int token = 0;
|
||||
for (int i = 0; i < 20; i++) {
|
||||
for (int j = 0; j < 10; j++) {
|
||||
auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1);
|
||||
set->insert(sst);
|
||||
current_selections[token].insert(sst);
|
||||
current_runs[sst->run_identifier()].insert(sst);
|
||||
current_all.insert(sst);
|
||||
++token %= 8;
|
||||
}
|
||||
for (int j = 0; j < 5; j++) {
|
||||
auto sst = *set->all()->begin();
|
||||
set->erase(sst);
|
||||
for (auto& sel : current_selections) {
|
||||
// actually erases only from one
|
||||
sel.erase(sst);
|
||||
}
|
||||
current_runs[sst->run_identifier()].erase(sst);
|
||||
current_all.erase(sst);
|
||||
}
|
||||
versions.emplace_back(std::move(*set), current_selections, current_runs, current_all);
|
||||
set = versions.back().ssts;
|
||||
}
|
||||
|
||||
for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) {
|
||||
check(versions[i]);
|
||||
// by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots
|
||||
versions[i].ssts.reset();
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_list_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto key_and_token_pair = token_generation_for_current_shard(8);
|
||||
auto decorated_keys = boost::copy_range<std::vector<dht::decorated_key>>(
|
||||
key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair<sstring, dht::token>& key_and_token) {
|
||||
auto value = bytes(reinterpret_cast<const signed char*>(key_and_token.first.data()), key_and_token.first.size());
|
||||
auto pk = sstables::key::from_bytes(value).to_partition_key(*s);
|
||||
return dht::decorate_key(*s, std::move(pk));
|
||||
}));
|
||||
|
||||
|
||||
struct list_and_sstables {
|
||||
std::optional<sstable_list> list;
|
||||
std::unordered_set<shared_sstable> sstables_in_list;
|
||||
|
||||
list_and_sstables(sstable_list&& sstl, std::unordered_set<shared_sstable> sstables_in_list)
|
||||
: list(std::move(sstl)), sstables_in_list(sstables_in_list) { }
|
||||
};
|
||||
auto check = [] (list_and_sstables& version) {
|
||||
BOOST_REQUIRE_EQUAL(version.list->size(), version.sstables_in_list.size());
|
||||
for (auto& sst : *version.list) {
|
||||
BOOST_REQUIRE(version.list->contains(sst));
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
std::optional<sstable_list> l = *set->all();
|
||||
std::unordered_set<shared_sstable> sstables_in_list;
|
||||
|
||||
|
||||
std::vector<list_and_sstables> versions;
|
||||
versions.reserve(20);
|
||||
int token = 0;
|
||||
for (int i = 0; i < 20; i++) {
|
||||
for (int j = 0; j < 10; j++) {
|
||||
auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1);
|
||||
l->insert(sst);
|
||||
sstables_in_list.insert(sst);
|
||||
++token %= 8;
|
||||
}
|
||||
for (int j = 0; j < 5; j++) {
|
||||
auto sst = *l->begin();
|
||||
l->erase(sst);
|
||||
sstables_in_list.erase(sst);
|
||||
}
|
||||
versions.emplace_back(std::move(*l), sstables_in_list);
|
||||
l = versions.back().list;
|
||||
}
|
||||
|
||||
for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) {
|
||||
check(versions[i]);
|
||||
// by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots
|
||||
versions[i].list.reset();
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_version_merging_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(2);
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
std::optional<sstable_list> list = *set->all();
|
||||
// set -> list
|
||||
BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version());
|
||||
std::optional<sstable_list> list2 = *set->all();
|
||||
// set -> list
|
||||
// -> list2
|
||||
BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(&set->all()->version(), list2->version().get_previous_version());
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
set->insert(sst);
|
||||
// set' -> list
|
||||
// -> list2
|
||||
// -> set
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version());
|
||||
BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version());
|
||||
sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[1].first, key_and_token_pair[1].first, 1);
|
||||
set->insert(sst);
|
||||
// set' -> list
|
||||
// -> list2
|
||||
// -> set
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version());
|
||||
BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version());
|
||||
std::optional<sstable_list> list3 = list;
|
||||
// set' -> list -> list3
|
||||
// -> list2
|
||||
// -> set
|
||||
BOOST_REQUIRE_EQUAL(&list->version(), list3->version().get_previous_version());
|
||||
list.reset();
|
||||
// set' -> list3
|
||||
// -> list2
|
||||
// -> set
|
||||
BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list3->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version());
|
||||
set.reset();
|
||||
// set' -> list3
|
||||
// -> list2
|
||||
BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version());
|
||||
std::optional<sstable_list> list4 = list3;
|
||||
std::optional<sstable_list> list5 = list3;
|
||||
// set' -> list3 -> list4
|
||||
// -> list5
|
||||
// -> list2
|
||||
BOOST_REQUIRE_EQUAL(&list3->version(), list4->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(&list3->version(), list5->version().get_previous_version());
|
||||
list3.reset();
|
||||
// set' -> list3' -> list4
|
||||
// -> list5
|
||||
// -> list2
|
||||
BOOST_REQUIRE_NE(list2->version().get_previous_version(), list4->version().get_previous_version());
|
||||
BOOST_REQUIRE_EQUAL(list4->version().get_previous_version(), list5->version().get_previous_version());
|
||||
list4.reset();
|
||||
// set' -> list5
|
||||
// -> list2
|
||||
BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list5->version().get_previous_version());
|
||||
list2.reset();
|
||||
// list5
|
||||
BOOST_REQUIRE_EQUAL(nullptr, list5->version().get_previous_version());
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_erased_in_last_descendant_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(1);
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
// -> set4
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3.reset();
|
||||
set4->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set3;
|
||||
std::optional<sstable_set> set5 = set3;
|
||||
// set -> set2
|
||||
// -> set3 -> set4
|
||||
// -> set5
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3.reset();
|
||||
set4->erase(sst);
|
||||
set5->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_ancestor_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(1);
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
// set -> set2
|
||||
set2->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
set2->erase(sst);
|
||||
set3->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
// -> set4
|
||||
set2->erase(sst);
|
||||
set3.reset();
|
||||
set4->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set3;
|
||||
std::optional<sstable_set> set5 = set3;
|
||||
// set -> set2
|
||||
// -> set3 -> set4
|
||||
// -> set5
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set4->erase(sst);
|
||||
set5->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set3.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_descendant_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
|
||||
auto key_and_token_pair = token_generation_for_current_shard(1);
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set3.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set;
|
||||
// set -> set2
|
||||
// -> set3
|
||||
// -> set4
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set4.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
{
|
||||
std::optional<sstable_set> set = cs.make_sstable_set(s);
|
||||
auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1);
|
||||
bool is_sstable_removed = false;
|
||||
utils::observer<sstable&> observer = sst->add_on_closed_handler([&] (sstable& sst) {
|
||||
is_sstable_removed = true;
|
||||
});
|
||||
set->insert(sst);
|
||||
std::optional<sstable_set> set2 = set;
|
||||
std::optional<sstable_set> set3 = set;
|
||||
std::optional<sstable_set> set4 = set3;
|
||||
std::optional<sstable_set> set5 = set3;
|
||||
// set -> set2
|
||||
// -> set3 -> set4
|
||||
// -> set5
|
||||
set.reset();
|
||||
set2->erase(sst);
|
||||
set3.reset();
|
||||
set4->erase(sst);
|
||||
sst = nullptr;
|
||||
BOOST_REQUIRE(!is_sstable_removed);
|
||||
set5.reset();
|
||||
BOOST_REQUIRE(is_sstable_removed);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
class simple_sstable_set {
|
||||
std::unique_ptr<sstable_set_impl> _impl;
|
||||
schema_ptr _schema;
|
||||
// used to support column_family::get_sstable(), which wants to return an sstable_list
|
||||
// that has a reference somewhere
|
||||
lw_shared_ptr<std::unordered_set<shared_sstable>> _all;
|
||||
std::unordered_map<utils::UUID, sstable_run> _all_runs;
|
||||
public:
|
||||
~simple_sstable_set() = default;
|
||||
|
||||
simple_sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr schema)
|
||||
: _impl(std::move(impl))
|
||||
, _schema(std::move(schema))
|
||||
, _all(make_lw_shared<std::unordered_set<shared_sstable>>()) {
|
||||
}
|
||||
|
||||
simple_sstable_set(const simple_sstable_set& x)
|
||||
: _impl(x._impl->clone())
|
||||
, _schema(x._schema)
|
||||
, _all(make_lw_shared<std::unordered_set<shared_sstable>>(*x._all))
|
||||
, _all_runs(x._all_runs) {
|
||||
}
|
||||
|
||||
simple_sstable_set(simple_sstable_set&& x) noexcept = default;
|
||||
|
||||
simple_sstable_set& operator=(const simple_sstable_set& x) {
|
||||
if (this != &x) {
|
||||
auto tmp = simple_sstable_set(x);
|
||||
*this = std::move(tmp);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
simple_sstable_set& operator=(simple_sstable_set&&) noexcept = default;
|
||||
|
||||
std::vector<shared_sstable> select(const dht::partition_range& range) const {
|
||||
return _impl->select(range);
|
||||
}
|
||||
|
||||
// Return all runs which contain any of the input sstables.
|
||||
std::vector<sstable_run> select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
|
||||
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
|
||||
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
|
||||
return _all_runs.at(run_id);
|
||||
}));
|
||||
}
|
||||
|
||||
lw_shared_ptr<std::unordered_set<shared_sstable>> all() const { return _all; }
|
||||
|
||||
void insert(shared_sstable sst) {
|
||||
_impl->insert(sst);
|
||||
_all->insert(sst);
|
||||
_all_runs[sst->run_identifier()].insert(sst);
|
||||
}
|
||||
|
||||
void erase(shared_sstable sst) {
|
||||
_impl->erase(sst);
|
||||
_all->erase(sst);
|
||||
_all_runs[sst->run_identifier()].erase(sst);
|
||||
}
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_set_random_walk_test) {
|
||||
return test_env::do_with([] (test_env& env) {
|
||||
auto rand = std::default_random_engine();
|
||||
auto op_gen = std::uniform_int_distribution<unsigned>(0, 7);
|
||||
auto nr_dist = std::geometric_distribution<size_t>(0.7);
|
||||
auto s = make_shared_schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type);
|
||||
auto leveled_cs = leveled_compaction_strategy(s->compaction_strategy_options());
|
||||
std::vector<sstable_set> sstable_sets;
|
||||
std::vector<simple_sstable_set> simple_sstable_sets;
|
||||
sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s);
|
||||
simple_sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s);
|
||||
auto key_and_token_pair = token_generation_for_current_shard(1000);
|
||||
std::vector<shared_sstable> sstables(1000);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
sstables[i] = sstable_for_overlapping_test(env, s, i, key_and_token_pair[i].first, key_and_token_pair[i].first, i);
|
||||
}
|
||||
for (auto i = 0; i != 100000; i++) {
|
||||
auto op = op_gen(rand);
|
||||
auto u = std::uniform_int_distribution<size_t>(0, sstable_sets.size() - 1);
|
||||
auto idx = u(rand);
|
||||
switch (op) {
|
||||
case 0: {
|
||||
// delete
|
||||
if (sstable_sets.size() > 1) {
|
||||
sstable_sets.erase(sstable_sets.begin() + idx);
|
||||
simple_sstable_sets.erase(simple_sstable_sets.begin() + idx);
|
||||
break;
|
||||
}
|
||||
// if we can't remove the version, let's create one
|
||||
[[fallthrough]];
|
||||
}
|
||||
case 1: {
|
||||
// copy
|
||||
if (sstable_sets.size() < 100) {
|
||||
sstable_sets.emplace_back(sstable_sets[idx]);
|
||||
simple_sstable_sets.emplace_back(simple_sstable_sets[idx]);
|
||||
for (auto& sst : *simple_sstable_sets.back().all()) {
|
||||
BOOST_REQUIRE(sstable_sets.back().all()->contains(sst));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// modify
|
||||
auto sst_u = std::uniform_int_distribution<size_t>(0, 999);
|
||||
auto sst_idx = sst_u(rand);
|
||||
if (simple_sstable_sets[idx].all()->contains(sstables[sst_idx])) {
|
||||
sstable_sets[idx].erase(sstables[sst_idx]);
|
||||
simple_sstable_sets[idx].erase(sstables[sst_idx]);
|
||||
BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx]));
|
||||
BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx]));
|
||||
} else {
|
||||
sstable_sets[idx].insert(sstables[sst_idx]);
|
||||
simple_sstable_sets[idx].insert(sstables[sst_idx]);
|
||||
BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx]));
|
||||
BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx]));
|
||||
}
|
||||
}
|
||||
for (int j = 0; j < sstable_sets.size(); j++) {
|
||||
BOOST_REQUIRE_EQUAL(sstable_sets[j].all()->size(), simple_sstable_sets[j].all()->size());
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
@@ -37,6 +37,40 @@
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <array>
|
||||
|
||||
constexpr auto la = sstables::sstable::version_types::la;
|
||||
constexpr auto big = sstables::sstable::format_types::big;
|
||||
|
||||
class column_family_test {
|
||||
lw_shared_ptr<column_family> _cf;
|
||||
public:
|
||||
column_family_test(lw_shared_ptr<column_family> cf) : _cf(cf) {}
|
||||
|
||||
void add_sstable(sstables::shared_sstable sstable) {
|
||||
_cf->_sstables->insert(std::move(sstable));
|
||||
}
|
||||
|
||||
// NOTE: must run in a thread
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
_cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0();
|
||||
}
|
||||
|
||||
static void update_sstables_known_generation(column_family& cf, unsigned generation) {
|
||||
cf.update_sstables_known_generation(generation);
|
||||
}
|
||||
|
||||
static uint64_t calculate_generation_for_new_table(column_family& cf) {
|
||||
return cf.calculate_generation_for_new_table();
|
||||
}
|
||||
|
||||
static int64_t calculate_shard_from_sstable_generation(int64_t generation) {
|
||||
return column_family::calculate_shard_from_sstable_generation(generation);
|
||||
}
|
||||
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> mt) {
|
||||
return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional());
|
||||
}
|
||||
};
|
||||
|
||||
namespace sstables {
|
||||
|
||||
|
||||
@@ -31,41 +31,6 @@
|
||||
#include "test/lib/test_services.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
constexpr auto la = sstables::sstable::version_types::la;
|
||||
constexpr auto big = sstables::sstable::format_types::big;
|
||||
|
||||
class column_family_test {
|
||||
lw_shared_ptr<column_family> _cf;
|
||||
public:
|
||||
column_family_test(lw_shared_ptr<column_family> cf) : _cf(cf) {}
|
||||
|
||||
void add_sstable(sstables::shared_sstable sstable) {
|
||||
_cf->_sstables->insert(std::move(sstable));
|
||||
}
|
||||
|
||||
// NOTE: must run in a thread
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
_cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0();
|
||||
}
|
||||
|
||||
static void update_sstables_known_generation(column_family& cf, unsigned generation) {
|
||||
cf.update_sstables_known_generation(generation);
|
||||
}
|
||||
|
||||
static uint64_t calculate_generation_for_new_table(column_family& cf) {
|
||||
return cf.calculate_generation_for_new_table();
|
||||
}
|
||||
|
||||
static int64_t calculate_shard_from_sstable_generation(int64_t generation) {
|
||||
return column_family::calculate_shard_from_sstable_generation(generation);
|
||||
}
|
||||
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> mt) {
|
||||
return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional());
|
||||
}
|
||||
};
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class test_env_sstables_manager : public sstables_manager {
|
||||
|
||||
@@ -1,111 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "database.hh"
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/perf/perf.hh"
|
||||
#include <seastar/core/app-template.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include "test/lib/sstable_test_env.hh"
|
||||
#include <iostream>
|
||||
|
||||
static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) {
|
||||
auto sst = env.make_sstable(schema, "", gen, la, big);
|
||||
sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key));
|
||||
return sst;
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
app_template app;
|
||||
return app.run(argc, argv, [&app] {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
using namespace std::chrono;
|
||||
using namespace std::chrono_literals;
|
||||
auto start = high_resolution_clock::now();
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg;
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker);
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::leveled);
|
||||
|
||||
constexpr int ssts_in_level_0 = 10;
|
||||
std::function<int(int)> idx_to_level = [&] (int i) {
|
||||
if (i < ssts_in_level_0) {
|
||||
return 0;
|
||||
}
|
||||
return 1 + idx_to_level((i - ssts_in_level_0) / 10 + ssts_in_level_0 - 1);
|
||||
};
|
||||
auto level_to_size = [] (int level) {
|
||||
if (level == 0) {
|
||||
return ssts_in_level_0;
|
||||
}
|
||||
return int(pow(10, level));
|
||||
};
|
||||
|
||||
auto kt_pair = token_generation_for_current_shard(1120);
|
||||
auto min_max_keys = [&kt_pair, &level_to_size] (auto level, auto pos_in_level) -> std::pair<sstring, sstring> {
|
||||
auto last_key_idx = kt_pair.size() - 1;
|
||||
if (level == 0) {
|
||||
return { kt_pair[0].first, kt_pair[last_key_idx].first };
|
||||
}
|
||||
auto total_ranges = kt_pair.size();
|
||||
auto level_size_in_ssts = level_to_size(level);
|
||||
unsigned ranges_per_sst = std::max(1U, unsigned(floor(float(total_ranges) / level_size_in_ssts)));
|
||||
sstring min_key = kt_pair.at(pos_in_level).first;
|
||||
sstring max_key = kt_pair.at(std::min(pos_in_level + ranges_per_sst - 1, unsigned(last_key_idx))).first;
|
||||
return {min_key, max_key};
|
||||
};
|
||||
|
||||
std::vector<shared_sstable> inputs[3], outputs[3];
|
||||
|
||||
std::array<unsigned, 9> pos_in_levels{0};
|
||||
pos_in_levels.fill(0);
|
||||
auto start2 = high_resolution_clock::now();
|
||||
for (auto i = 0; i < 1120; i++) {
|
||||
auto level = idx_to_level(i);
|
||||
auto [min, max] = min_max_keys(level, pos_in_levels[level]++);
|
||||
auto sst = sstable_for_overlapping_test(env, s, i, min, max, uint32_t(level));
|
||||
column_family_test(cf).add_sstable(sst);
|
||||
if (level >= 1 && pos_in_levels[level] < 10) {
|
||||
inputs[level-1].push_back(sst);
|
||||
}
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
|
||||
for (auto i = 0; i < 30; i++) {
|
||||
auto [min, max] = min_max_keys(1 + i / 10, i % 10);
|
||||
auto sst = sstable_for_overlapping_test(env, s, i, min, max, 1 + i / 10);
|
||||
outputs[i / 10].push_back(sst);
|
||||
seastar::thread::maybe_yield();
|
||||
}
|
||||
for (auto i = 0; i < 3; i++) {
|
||||
auto t1 = high_resolution_clock::now();
|
||||
column_family_test(cf).rebuild_sstable_list(outputs[i], inputs[i]);
|
||||
auto t2 = high_resolution_clock::now();
|
||||
std::cout << "Replacing 10 L" << i + 1 <<" sstables took "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count() << "ms to complete\n";
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user