From 5f4bf183874a3ff6492d85d27d5371f8cc428678 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 2 Mar 2021 13:21:22 +0200 Subject: [PATCH] Revert "Merge 'sstables: add versioning to the sstable_set ' from Wojciech Mitros" This reverts commit 31909515b37a096e62cbc23b63e4e3e4e574bfe7, reversing changes made to ef97adc72a0d5e8877885810b1a7e0fcbb11dc29. It shows many serious regressions in dtest. Fixes #8197. --- api/storage_service.cc | 4 +- configure.py | 2 - db/view/view_update_generator.cc | 2 +- service/storage_service.cc | 3 +- sstables/shared_sstable.hh | 1 + sstables/sstable_set.cc | 584 ++++------------------------ sstables/sstable_set.hh | 203 +--------- sstables/sstable_set_impl.hh | 4 - table.cc | 20 +- test/boost/sstable_set_test.cc | 638 ------------------------------- test/boost/sstable_test.hh | 34 ++ test/lib/sstable_test_env.hh | 35 -- test/perf/perf_sstable_set.cc | 111 ------ 13 files changed, 133 insertions(+), 1508 deletions(-) delete mode 100644 test/boost/sstable_set_test.cc delete mode 100644 test/perf/perf_sstable_set.cc diff --git a/api/storage_service.cc b/api/storage_service.cc index 67d530f491..f5cdbbc5c7 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -987,8 +987,8 @@ void set_storage_service(http_context& ctx, routes& r) { ss::table_sstables tst; tst.keyspace = schema->ks_name(); tst.table = schema->cf_name(); - auto sstables = t->get_sstables_including_compacted_undeleted(); - for (auto sstable : *sstables) { + + for (auto sstable : *t->get_sstables_including_compacted_undeleted()) { auto ts = db_clock::to_time_t(sstable->data_file_write_time()); ::tm t; ::gmtime_r(&ts, &t); diff --git a/configure.py b/configure.py index d26089fa7d..3b79b6c42a 100755 --- a/configure.py +++ b/configure.py @@ -392,7 +392,6 @@ scylla_tests = set([ 'test/boost/sstable_conforms_to_mutation_source_test', 'test/boost/sstable_resharding_test', 'test/boost/sstable_directory_test', - 'test/boost/sstable_set_test', 'test/boost/sstable_test', 'test/boost/sstable_move_test', 'test/boost/storage_proxy_test', @@ -436,7 +435,6 @@ scylla_tests = set([ 'test/perf/perf_row_cache_reads', 'test/perf/perf_simple_query', 'test/perf/perf_sstable', - 'test/perf/perf_sstable_set', 'test/unit/lsa_async_eviction_test', 'test/unit/lsa_sync_eviction_test', 'test/unit/row_cache_alloc_stress_test', diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 150b593965..703d8ce454 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -67,7 +67,7 @@ future<> view_update_generator::start() { // Exploit the fact that sstables in the staging directory // are usually non-overlapping and use a partitioned set for // the read. - auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, false)); + auto ssts = make_lw_shared(sstables::make_partitioned_sstable_set(s, make_lw_shared(sstable_list{}), false)); for (auto& sst : sstables) { ssts->insert(sst); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 279abb14e0..c8531c6d91 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2607,7 +2607,8 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, size_t nr_sst_current = 0; while (!sstables.empty()) { auto ops_uuid = utils::make_random_uuid(); - auto sst_set = make_lw_shared(sstables::make_partitioned_sstable_set(s, false)); + auto sst_set = make_lw_shared(sstables::make_partitioned_sstable_set(s, + make_lw_shared(sstable_list{}), false)); size_t batch_sst_nr = 16; std::vector sst_names; std::vector sst_processed; diff --git a/sstables/shared_sstable.hh b/sstables/shared_sstable.hh index 7071db9f62..736fa06244 100644 --- a/sstables/shared_sstable.hh +++ b/sstables/shared_sstable.hh @@ -46,6 +46,7 @@ struct lw_shared_ptr_deleter { namespace sstables { using shared_sstable = seastar::lw_shared_ptr; +using sstable_list = std::unordered_set; } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 69124810db..4b89b4a9ec 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -63,526 +63,93 @@ std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { return os; } -sstable_set_data::sstable_set_data(std::unique_ptr impl) - : impl(std::move(impl)) { +sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) + : _impl(std::move(impl)) + , _schema(std::move(s)) + , _all(std::move(all)) { } -void sstable_set_data::insert(shared_sstable sst) { - auto it = sstables_and_times_added.find(sst); - if (it != sstables_and_times_added.end()) { - // the sstable has already been added in some version - it->second++; - return; +sstable_set::sstable_set(const sstable_set& x) + : _impl(x._impl->clone()) + , _schema(x._schema) + , _all(make_lw_shared(*x._all)) + , _all_runs(x._all_runs) { +} + +sstable_set::sstable_set(sstable_set&&) noexcept = default; + +sstable_set& +sstable_set::operator=(const sstable_set& x) { + if (this != &x) { + auto tmp = sstable_set(x); + *this = std::move(tmp); } + return *this; +} + +sstable_set& +sstable_set::operator=(sstable_set&&) noexcept = default; + +std::vector +sstable_set::select(const dht::partition_range& range) const { + return _impl->select(range); +} + +std::vector +sstable_set::select_sstable_runs(const std::vector& sstables) const { + auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); + return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { + return _all_runs.at(run_id); + })); +} + +void +sstable_set::insert(shared_sstable sst) { + _impl->insert(sst); try { - impl->insert(sst); - all_runs[sst->run_identifier()].insert(sst); - sstables_and_times_added.emplace(sst, 1); - } catch (...) { - impl->erase(sst); - auto runs_it = all_runs.find(sst->run_identifier()); - if (runs_it != all_runs.end()) { - runs_it->second.erase(sst); - if (runs_it->second.empty()) { - all_runs.erase(runs_it); - } + _all->insert(sst); + try { + _all_runs[sst->run_identifier()].insert(sst); + } catch (...) { + _all->erase(sst); + throw; } + } catch (...) { + _impl->erase(sst); throw; } } -std::vector sstable_set_data::select(const dht::partition_range& range) const { - return impl->select(range); -} - -std::unordered_set sstable_set_data::select_by_run_id(utils::UUID run_id) const { - return all_runs.at(run_id); -} - -// Called when a version that was adding an sstable was removed or when the sstable was later erased in that version. -void sstable_set_data::remove(shared_sstable sst) { - if (--sstables_and_times_added.at(sst) == 0) { - impl->erase(sst); - all_runs[sst->run_identifier()].erase(sst); - sstables_and_times_added.erase(sst); - } -} - -sstable_set_version_reference::~sstable_set_version_reference() { - if (_p) { - _p->remove_reference(); - if (_p->can_merge_with_next()) { - // merging will destroy the last reference to the version and the version will be deleted as a result - _p->merge_with_next(); - } else if (_p->can_delete()) { - delete _p; - _p = nullptr; - } - } -} - -sstable_set_version_reference::sstable_set_version_reference(sstable_set_version* p) : _p(p) { - if (_p) { - _p->add_reference(); - } -} - -sstable_set_version_reference::sstable_set_version_reference(const sstable_set_version_reference& ref) : _p(ref._p) { - if (_p) { - _p->add_reference(); - } -} - -sstable_set_version_reference::sstable_set_version_reference(sstable_set_version_reference&& ref) noexcept : _p(ref._p) { - ref._p = nullptr; -} - -sstable_set_version_reference& sstable_set_version_reference::operator=(const sstable_set_version_reference& ref) { - *this = sstable_set_version_reference(ref); - return *this; -} - -sstable_set_version_reference& sstable_set_version_reference::operator=(sstable_set_version_reference&& ref) noexcept { - if (this != &ref) { - // Destroying this reference may invalide other references, so we're taking over the pointer managed by - // the moved reference, and reassigning it after calling the destructor - auto ptr = ref._p; - ref._p = nullptr; - this->~sstable_set_version_reference(); - _p = ptr; - } - return *this; -} - -static sstable_set_version_reference make_sstable_set_version(std::unique_ptr impl, schema_ptr s) { - sstable_set_version* new_version = new sstable_set_version(std::move(impl), std::move(s)); - return new_version->get_reference_to_this(); -} - -sstable_list::sstable_list(std::unique_ptr impl, schema_ptr s) - : _version(make_sstable_set_version(std::move(impl), std::move(s))) { -} - -sstable_list::sstable_list(const sstable_list& sstl) - : _version(sstl._version->get_reference_to_new_copy()) { - // copying an sstable_list creates a new sstable_set_version -} - -sstable_list::sstable_list(sstable_list&& sstl) noexcept = default; - -sstable_list& sstable_list::operator=(const sstable_list& sstl) { - if (this != &sstl) { - *this = sstable_list(sstl); - } - return *this; -} - -sstable_list& sstable_list::operator=(sstable_list&& sstl) noexcept { - if (this != &sstl) { - this->~sstable_list(); - _version = std::move(sstl._version); - } - return *this; -} - -// Moves the iterator to the next sstable which is contained by the associated sstable_set, or to the end -// If the iterator already references a satisfying sstable, no changes are made. -void sstable_list::const_iterator::advance() { - while (_it != (*_ver)->all().end() && !(*_ver)->contains(_it->first)) { - _it++; - } -} - -sstable_list::const_iterator::const_iterator(std::map::const_iterator it, const sstable_set_version_reference* ver) - : _it(std::move(it)) - , _ver(ver) { - advance(); -} - -sstable_list::const_iterator& sstable_list::const_iterator::operator++() { - assert(_it != (*_ver)->all().end()); - _it++; - advance(); - return *this; -} - -sstable_list::const_iterator sstable_list::const_iterator::operator++(int) { - const_iterator it = *this; - operator++(); - return it; -} - -const shared_sstable& sstable_list::const_iterator::operator*() const { - return _it->first; -} - -bool sstable_list::const_iterator::operator==(const const_iterator& it) const { - assert(_ver == it._ver); - return _it == it._it; -} - -sstable_list::const_iterator sstable_list::begin() const { - return const_iterator(_version->all().begin(), &_version); -} - -sstable_list::const_iterator sstable_list::end() const { - return const_iterator(_version->all().end(), &_version); -} - -size_t sstable_list::size() const { - return _version->size(); -} - -void sstable_list::insert(shared_sstable sst) { - _version = _version->insert(sst); -} - -void sstable_list::erase(shared_sstable sst) { - _version = _version->erase(sst); -} - -bool sstable_list::contains(shared_sstable sst) const { - return _version->contains(sst); -} - -bool sstable_list::empty() const { - return _version->size() == 0; -} - -const sstable_set_version& sstable_list::version() const { - return *_version; -} - -sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s) { - if (!impl->empty()) { - throw std::logic_error("Can't create an sstable_set using a non-empty sstable_set_impl"); - } - _all = make_lw_shared(std::move(impl), std::move(s)); -} - -sstable_set::sstable_set(const sstable_set& x) - : _all(make_lw_shared(*x._all)) { -} - -sstable_set::sstable_set(sstable_set&& x) noexcept = default; - -sstable_set& sstable_set::operator=(const sstable_set& ssts) { - *this = sstable_set(ssts); - return *this; -} - -sstable_set& sstable_set::operator=(sstable_set&& ssts) noexcept = default; - - -std::vector sstable_set::select(const dht::partition_range& range) const { - return _all->version().select(range); -} - -// Return all runs which contain any of the input sstables. -std::vector sstable_set::select_sstable_runs(const std::vector& sstables) const { - return _all->version().select_sstable_runs(sstables); -} - -lw_shared_ptr sstable_set::all() const { - return _all; -} - -void sstable_set::insert(shared_sstable sst) { - _all->insert(sst); -} - -void sstable_set::erase(shared_sstable sst) { +void +sstable_set::erase(shared_sstable sst) { + _impl->erase(sst); _all->erase(sst); + _all_runs[sst->run_identifier()].erase(sst); +} + +sstable_set::~sstable_set() = default; + +sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) + : _impl(std::move(impl)) + , _cmp(s) { } sstable_set::incremental_selector::~incremental_selector() = default; sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; -sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s, lw_shared_ptr sstl) - : _impl(std::move(impl)) - , _cmp(s) - , _sstl(std::move(sstl)) { -} - -sstable_set::incremental_selector::selection sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { +sstable_set::incremental_selector::selection +sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { - std::vector current_versioned_sstables; - std::tie(_current_range, current_versioned_sstables, _current_next_position) = _impl->select(pos); - _current_sstables = boost::copy_range>(current_versioned_sstables - | boost::adaptors::filtered([this] (shared_sstable sst) { return _sstl->contains(sst); }) - | boost::adaptors::transformed([] (shared_sstable sst) { return sst; })); + std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); } return {_current_sstables, _current_next_position}; } -sstables::sstable_set::incremental_selector sstable_set::make_incremental_selector() const { - return incremental_selector(_all->version().make_incremental_selector(), *_all->version().get_schema(), _all); -} - -sstable_set_version::~sstable_set_version() { - for (auto& added_sst : _added) { - _base_set->remove(added_sst); - } - if (_prev) { - _prev->_next.erase(this); - } -} - -// the sstable_set_impl must be empty -sstable_set_version::sstable_set_version(std::unique_ptr impl, schema_ptr schema) - : _base_set(make_lw_shared(std::move(impl))) - , _schema(std::move(schema)) { -} - -// Creates a new version based on ver -sstable_set_version::sstable_set_version(sstable_set_version* ver) - : _base_set(ver->_base_set) - , _schema(ver->_schema) - , _prev(ver) { - _prev->_next.insert(this); -} - -// Merges changes made in this version into the next version (can be called only when there is a single next version, -// and no further changes can be made to this one, i.e. no sstable_list references this version). -void sstable_set_version::merge_with_next() noexcept { - auto next_version = *_next.begin(); - next_version->_added = std::move(_added); - next_version->_erased = std::move(_erased); - auto next_nh = _next.extract(*_next.begin()); - if (_prev) { - _prev->_next.erase(this); - _prev->_next.insert(std::move(next_nh)); - } - next_version->_prev = std::move(_prev); // destroys this by overwriting the last reference -} - -void sstable_set_version::propagate_inserted_sstable(const shared_sstable& sst) noexcept { - if (_reference_count > _next.size()) { - // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it - return; - } - for (auto& ver_chck : _next) { - if (!ver_chck->_added.contains(sst)) { - return; - } - } - // Remove the sstable from child versions and get a node handle to insert in this version - auto sst_nh = (*_next.begin())->_added.extract(sst); - for (auto& ver_chck : _next) { - auto nh = ver_chck->_added.extract(sst_nh.value()); - if (!nh.empty()) { - _base_set->remove(nh.value()); - } - } - auto it = _erased.find(sst_nh.value()); - if (it != _erased.end()) { - // If the sstable was erased in this version and added in all its children, its as if it weren't added or inserted in any of them - // because we won't read from this version anymore. - _erased.erase(it); - _base_set->remove(sst_nh.value()); - } else { - auto added_it = _added.insert(std::move(sst_nh)).position; - if (_prev) { - _prev->propagate_inserted_sstable(*added_it); - } - } -} - -void sstable_set_version::propagate_erased_sstable(const shared_sstable& sst) noexcept { - if (_reference_count > _next.size()) { - // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it - return; - } - for (auto& ver_chck : _next) { - if (!ver_chck->_erased.contains(sst)) { - return; - } - } - // Remove the sstable from child versions and get a node handle to insert in this version - auto sst_nh = (*_next.begin())->_erased.extract(sst); - for (auto& ver_chck : _next) { - ver_chck->_erased.extract(sst_nh.value()); - } - auto it = _added.find(sst_nh.value()); - if (it != _added.end()) { - // If the sstable was added in this version and erased in all its children, its as if it weren't added or inserted in any of them - // because we won't read from this version anymore. - _added.erase(it); - _base_set->remove(sst_nh.value()); - } else { - auto erased_it = _erased.insert(std::move(sst_nh)).position; - if (_prev) { - _prev->propagate_erased_sstable(*erased_it); - } - } -} - -// Called when a reference to the version gets removed - if the reference was from an sstable_list, it's the first time we can propagate any -// changes, and if the reference was from another sstable_set_version, we want to check if there were any changes that were present in all -// versions based on this one, but absent in the version that was just removed. -void sstable_set_version::propagate_changes_from_next_versions() noexcept { - if (_reference_count > _next.size() || _next.empty()) { - // If there exists a reference outside child versions (from sstable_list), this version can still be read from, so we can't modify it - // Or there are no child versions so there is nothing to propagate - return; - } - sstable_set_version* next_ver = *_next.begin(); - // Propagate additions - for (auto ver : _next) { - if (ver->_added.size() < next_ver->_added.size()) { - next_ver = ver; - } - } - for (auto it = next_ver->_added.begin(); it != next_ver->_added.end();) { - auto& sst = *it; - it++; - propagate_inserted_sstable(sst); - } - - next_ver = *_next.begin(); - // Propagate erasures - for (auto ver : _next) { - if (ver->_erased.size() < next_ver->_erased.size()) { - next_ver = ver; - } - } - for (auto it = next_ver->_erased.begin(); it != next_ver->_erased.end();) { - auto& sst = *it; - it++; - propagate_erased_sstable(sst); - } -} - -const sstable_set_version* sstable_set_version::get_previous_version() const { - return _prev.get(); -} - -bool sstable_set_version::can_merge_with_next() const noexcept { - return _reference_count == 1 && _next.size() == 1; -} - -bool sstable_set_version::can_delete() const noexcept { - return _reference_count == 0; -} - -void sstable_set_version::add_reference() noexcept { - _reference_count++; -} - -void sstable_set_version::remove_reference() noexcept { - _reference_count--; - propagate_changes_from_next_versions(); -} - -schema_ptr sstable_set_version::get_schema() const { - return _schema; -} - -std::vector sstable_set_version::select(const dht::partition_range& range) const { - return boost::copy_range>(_base_set->select(range) - | boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); })); -} - -// Return all runs which contain any of the input sstables. -std::vector sstable_set_version::select_sstable_runs(const std::vector& sstables) const { - auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); - return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { - return sstable_run(boost::copy_range>(_base_set->select_by_run_id(run_id) - | boost::adaptors::filtered([this] (shared_sstable sst) { return this->contains(sst); }))); - })); -} - -const std::map& sstable_set_version::all() const { - return _base_set->sstables_and_times_added; -} - -// Provides strong exception guarantee -sstable_set_version_reference sstable_set_version::insert(shared_sstable sst) { - if (this->contains(sst)) { - return get_reference_to_this(); - } - if (_next.size()) { - auto sstvr = get_reference_to_new_copy(); - // The new version has no copies based on it, so inserting into it doesn't create another version - return sstvr->insert(sst); - } - auto it = _erased.find(sst); - if (it != _erased.end()) { - _erased.erase(it); - } else { - _base_set->insert(sst); - try { - _added.insert(sst); - if (_prev) { - _prev->propagate_inserted_sstable(sst); - } - } catch (...) { - _base_set->remove(sst); - throw; - } - } - return get_reference_to_this(); -} - -// Provides strong exception guarantee -sstable_set_version_reference sstable_set_version::erase(shared_sstable sst) { - if (!this->contains(sst)) { - return get_reference_to_this(); - } - if (_next.size()) { - auto sstvr = get_reference_to_new_copy(); - // The new version has no copies based on it, so erasing from it doesn't create another version - return sstvr->erase(sst); - } - auto it = _added.find(sst); - if (it != _added.end()) { - _added.erase(it); - _base_set->remove(sst); - } else { - _erased.insert(sst); - if (_prev) { - _prev->propagate_erased_sstable(sst); - } - } - return get_reference_to_this(); -} - -bool sstable_set_version::contains(shared_sstable sst) const { - return _added.contains(sst) || (!_erased.contains(sst) && _prev && _prev->contains(sst)); -} - -size_t sstable_set_version::size() const { - return _added.size() - _erased.size() + (_prev ? _prev->size() : 0); -} - -std::unique_ptr sstable_set_version::make_incremental_selector() const { - return _base_set->impl->make_incremental_selector(); -} - -flat_mutation_reader -sstable_set_version::create_single_key_sstable_reader( - column_family* cf, - schema_ptr schema, - reader_permit permit, - utils::estimated_histogram& sstable_histogram, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { - return _base_set->impl->create_single_key_sstable_reader(cf, std::move(schema), - std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); -} - -sstable_set_version_reference sstable_set_version::get_reference_to_this() { - return sstable_set_version_reference(this); -} - -sstable_set_version_reference sstable_set_version::get_reference_to_new_copy() { - return sstable_set_version_reference(new sstable_set_version(this)); +sstable_set::incremental_selector +sstable_set::make_incremental_selector() const { + return incremental_selector(_impl->make_incremental_selector(), *_schema); } std::unique_ptr bag_sstable_set::clone() const { @@ -604,10 +171,6 @@ void bag_sstable_set::erase(shared_sstable sst) { } } -bool bag_sstable_set::empty() const { - return _sstables.empty(); -} - class bag_sstable_set::incremental_selector : public incremental_selector_impl { const std::vector& _sstables; public: @@ -736,10 +299,6 @@ void partitioned_sstable_set::erase(shared_sstable sst) { } } -bool partitioned_sstable_set::empty() const { - return _unleveled_sstables.empty() && _leveled_sstables.empty(); -} - class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { schema_ptr _schema; const std::vector& _unleveled_sstables; @@ -837,10 +396,6 @@ void time_series_sstable_set::erase(shared_sstable sst) { } } -bool time_series_sstable_set::empty() const { - return _sstables->empty(); -} - std::unique_ptr time_series_sstable_set::make_incremental_selector() const { struct selector : public incremental_selector_impl { const time_series_sstable_set& _set; @@ -980,15 +535,16 @@ std::unique_ptr time_window_compaction_strategy::make_sstable_ return std::make_unique(std::move(schema)); } -sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) { - return sstable_set(std::make_unique(schema, use_level_metadata), schema); +sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { + return sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); } sstable_set compaction_strategy::make_sstable_set(schema_ptr schema) const { return sstable_set( _compaction_strategy_impl->make_sstable_set(schema), - schema); + schema, + make_lw_shared()); } using sstable_reader_factory_type = std::function; @@ -1274,7 +830,7 @@ sstable_set::create_single_key_sstable_reader( streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { assert(pr.is_singular() && pr.start()->value().has_key()); - return _all->version().create_single_key_sstable_reader(cf, std::move(schema), + return _impl->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); } diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index ebe1ae1214..8fbcf4c13a 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -24,7 +24,6 @@ #include "flat_mutation_reader.hh" #include "sstables/progress_monitor.hh" #include "shared_sstable.hh" -#include "sstables/sstables.hh" #include "dht/i_partitioner.hh" #include #include @@ -35,158 +34,31 @@ class estimated_histogram; namespace sstables { -// This is an implementation of a set of sstables. The sstable_set allows: -// - selecting sstables from a given partition_range -// - selecting sstables with the same run identifiers -// - creating incremental_selectors, used to incrementally select sstables from the set using ring-position -// - many utilities of the std::set (inserting, erasing, checking membership, checking emptiness, iterating) -// - creating copies - -// The main use of the sstable_set is in the table class. The sstable_set that is stored there needs to be copied -// every time it is modified, to allow existing sstable readers to continue using the old version. For this reason -// the sstable_set is implemented in a way that allows fast copying. - -// This is achieved by associating each copy of an sstable_set with an sstable_set_version which contains all changes -// made to that copy. Each sstable_set_version has a reference to the sstable_set_version associated with the sstable_set -// that was copied. We say that the copied version is a parent, and that the copy version is a child, or that it is "based" -// on the copied version. -// This allows easy checking if an sstable is an element of an sstable_set copy - the answer is yes if the sstable was added -// in this copy or if it wasn't erased in it but it was an element of the parent version. -// It's worth adding that this solution makes a copied sstable_set immutable. To support modifying a copied sstable_set anyway, -// any modification applied to it replaces its associated sstable_set_version with a new one, based on the immutable version. -// The new version hasn't been copied, co it can be modified. - -// With the ability to check whether an sstable is an element of an sstable_set, all the methods that select sstables -// from the set can follow the same rule: get results that include all sstables from any copy, and filter out those sstables -// that aren't elements of current copy. These results are received from the class called sstable_set_data structure. - -// This solution requires a special way of finding out when an sstable should be removed from sstable_set_data. We achieve this -// by counting, for each sstable, in how many different versions has it been added. If that number is zero - the sstable should -// be completely removed. This number is decreased when deleting versions, when erasing in the same version it was added, and -// in one other case, as a result of propagating a change from a versions children to their parent, explained further in following -// paragraphs. - -// With this approach, the length of the longest chain of sstable_set_versions that are based on one another should be as -// short as possible. To achieve that, we are merging pairs of versions. If a version is not referenced by any sstable_sets or -// sstable_lists, we know that it won't be modified or read from, so we can merge its changes into versions that are based on it. -// We don't want to copy these changes though, so we wait with merging until there is only one child version. - -// This waiting causes one more problem: an sstable may have been added in a version that has no reference from an sstable_set or -// an sstable_list, and it may have been erased in all its child versions. In such scenario, the sstable can't be selected from -// any version, so it should be removed from the set completely. To handle such situations, if a change has been made to an -// sstable in all children of some version that has no referenece from an sstable_set or sstable_list, the change is removed -// in all children and added to the parent instead. -// This solution guarantees that when a version is ready for merging, the version it is being merged into contains no changes, -// because if it had any, they would be propagated to this version instead. As a result, merging versions is simply reassigning -// sets of changes. The number of times an sstable change gets propagated into a parent version is limited by the number of ancestors -// of a version. - -class incremental_selector_impl; class sstable_set_impl; +class incremental_selector_impl; // Structure holds all sstables (a.k.a. fragments) that belong to same run identifier, which is an UUID. // SStables in that same run will not overlap with one another. class sstable_run { - std::unordered_set _all; + sstable_list _all; public: - sstable_run() = default; - sstable_run(std::unordered_set all) : _all(std::move(all)) {} void insert(shared_sstable sst); void erase(shared_sstable sst); // Data size of the whole run, meaning it's a sum of the data size of all its fragments. uint64_t data_size() const; - const std::unordered_set& all() const { return _all; } + const sstable_list& all() const { return _all; } }; -// The very base class of an sstable_set. Stores all sstables that were added in -// any version of the set. Its methods return supersets of values returned by -// any single version. -struct sstable_set_data { - std::unique_ptr impl; - std::unordered_map> all_runs; - // For each sstable, stores in how many different versions has it been inserted - std::map sstables_and_times_added; - - sstable_set_data(std::unique_ptr impl); - void insert(shared_sstable sst); - void erase(shared_sstable sst); - std::vector select(const dht::partition_range& range) const; - std::unordered_set select_by_run_id(utils::UUID run_id) const; - void remove(shared_sstable sst); -}; - -class sstable_set_version; - -// Manages a pointer to an sstable_set_version - the when sstable_set_version gets removed when all -// sstable_set_version_references are removed, or when there is only one sstable_set_version_reference -// and that reference is owned by another sstable_set_version. -// In the second case the data from the managed version is merged into the other version before removal. -class sstable_set_version_reference { - mutable sstable_set_version* _p = nullptr; - - explicit sstable_set_version_reference(sstable_set_version* p); -public: - ~sstable_set_version_reference(); - sstable_set_version_reference() = default; - sstable_set_version_reference(const sstable_set_version_reference& ref); - sstable_set_version_reference(sstable_set_version_reference&&) noexcept; - sstable_set_version_reference& operator=(const sstable_set_version_reference& x); - sstable_set_version_reference& operator=(sstable_set_version_reference&&) noexcept; - sstable_set_version& operator*() const noexcept { return *_p; } - sstable_set_version* operator->() const noexcept { return _p; } - sstable_set_version* get() const noexcept { return _p; } - explicit operator bool() const noexcept { return bool(_p); } - friend class sstable_set_version; -}; - -// The data storage for an sstable_set. Can be used like a std::set (although with slightly -// costlier operations). -class sstable_list { - sstable_set_version_reference _version; -public: - sstable_list(std::unique_ptr impl, schema_ptr s); - sstable_list(const sstable_list& sstl); - sstable_list(sstable_list&& sstl) noexcept; - sstable_list& operator=(const sstable_list& sstl); - sstable_list& operator=(sstable_list&& sstl) noexcept; -public: - class const_iterator { - public: - using iterator_category = std::forward_iterator_tag; - using value_type = const shared_sstable; - using difference_type = std::ptrdiff_t; - using pointer = const shared_sstable*; - using reference = const shared_sstable&; - private: - std::map::const_iterator _it; - const sstable_set_version_reference* _ver; - void advance(); - public: - const_iterator(std::map::const_iterator it, const sstable_set_version_reference* ver); - const_iterator& operator++(); - const_iterator operator++(int); - const shared_sstable& operator*() const; - bool operator==(const const_iterator& it) const; - }; - using iterator = const_iterator; - const_iterator begin() const; - const_iterator end() const; - - size_t size() const; - void insert(shared_sstable sst); - void erase(shared_sstable sst); - bool contains(shared_sstable sst) const; - bool empty() const; - const sstable_set_version& version() const; -}; - -// A set of sstables associated with a table. class sstable_set : public enable_lw_shared_from_this { + std::unique_ptr _impl; + schema_ptr _schema; // used to support column_family::get_sstable(), which wants to return an sstable_list // that has a reference somewhere lw_shared_ptr _all; + std::unordered_map _all_runs; public: - sstable_set(std::unique_ptr impl, schema_ptr s); + ~sstable_set(); + sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all); sstable_set(const sstable_set&); sstable_set(sstable_set&&) noexcept; sstable_set& operator=(const sstable_set&); @@ -194,7 +66,7 @@ public: std::vector select(const dht::partition_range& range) const; // Return all runs which contain any of the input sstables. std::vector select_sstable_runs(const std::vector& sstables) const; - lw_shared_ptr all() const; + lw_shared_ptr all() const { return _all; } void insert(shared_sstable sst); void erase(shared_sstable sst); @@ -207,10 +79,9 @@ public: mutable std::optional> _current_range_view; mutable std::vector _current_sstables; mutable dht::ring_position_view _current_next_position = dht::ring_position_view::min(); - lw_shared_ptr _sstl; public: ~incremental_selector(); - incremental_selector(std::unique_ptr impl, const schema& s, lw_shared_ptr sstl); + incremental_selector(std::unique_ptr impl, const schema& s); incremental_selector(incremental_selector&&) noexcept; struct selection { @@ -302,59 +173,7 @@ flat_mutation_reader make_restricted_range_sstable_reader( mutation_reader::forwarding, read_monitor_generator& rmg = default_read_monitor_generator()); -class sstable_set_version { - // shared by all sstable_set_versions that were based on the same original set - lw_shared_ptr _base_set; - schema_ptr _schema; - sstable_set_version_reference _prev; - mutable std::unordered_set _next; - std::unordered_set _added; - std::unordered_set _erased; - // is equal to the number of sstable_set_versions based on this version increased by one if there is - // an sstable_list that references this version - unsigned _reference_count = 0; -public: - ~sstable_set_version(); - sstable_set_version(std::unique_ptr impl, schema_ptr schema); - explicit sstable_set_version(sstable_set_version* ver); -private: - void propagate_inserted_sstable(const shared_sstable& sst) noexcept; - void propagate_erased_sstable(const shared_sstable& sst) noexcept; - void propagate_changes_from_next_versions() noexcept; -public: - const sstable_set_version* get_previous_version() const; - bool can_merge_with_next() const noexcept; - void merge_with_next() noexcept; - bool can_delete() const noexcept; - void add_reference() noexcept; - void remove_reference() noexcept; - schema_ptr get_schema() const; - std::vector select(const dht::partition_range& range) const; - // Return all runs which contain any of the input sstables. - std::vector select_sstable_runs(const std::vector& sstables) const; - const std::map& all() const; - sstable_set_version_reference insert(shared_sstable sst); - sstable_set_version_reference erase(shared_sstable sst); - bool contains(shared_sstable sst) const; - size_t size() const; - std::unique_ptr make_incremental_selector() const; - flat_mutation_reader create_single_key_sstable_reader( - column_family* cf, - schema_ptr schema, - reader_permit permit, - utils::estimated_histogram& sstable_histogram, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const; -public: - sstable_set_version_reference get_reference_to_this(); - sstable_set_version_reference get_reference_to_new_copy(); -}; - -sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true); +sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata = true); std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run); diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index 785e46cc77..28e3cf77fd 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -41,7 +41,6 @@ public: virtual std::vector select(const dht::partition_range& range) const = 0; virtual void insert(shared_sstable sst) = 0; virtual void erase(shared_sstable sst) = 0; - virtual bool empty() const = 0; virtual std::unique_ptr make_incremental_selector() const = 0; virtual flat_mutation_reader create_single_key_sstable_reader( @@ -66,7 +65,6 @@ public: virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; - virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; class incremental_selector; }; @@ -105,7 +103,6 @@ public: virtual std::vector select(const dht::partition_range& range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; - virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; class incremental_selector; }; @@ -127,7 +124,6 @@ public: virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; virtual void insert(shared_sstable sst) override; virtual void erase(shared_sstable sst) override; - virtual bool empty() const override; virtual std::unique_ptr make_incremental_selector() const override; std::unique_ptr make_min_position_reader_queue( diff --git a/table.cc b/table.cc index ac9a3df716..416881a65e 100644 --- a/table.cc +++ b/table.cc @@ -718,17 +718,21 @@ void table::rebuild_statistics() { future> table::build_new_sstable_list(const std::vector& new_sstables, const std::vector& old_sstables) { + auto current_sstables = _sstables; + auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema); - auto new_sstable_set = *_sstables; - for (auto& tab : new_sstables) { - new_sstable_set.insert(tab); + std::unordered_set s(old_sstables.begin(), old_sstables.end()); + + // this might seem dangerous, but "move" here just avoids constness, + // making the two ranges compatible when compiling with boost 1.55. + // Noone is actually moving anything... + for (auto&& tab : boost::range::join(new_sstables, std::move(*current_sstables->all()))) { + if (!s.contains(tab)) { + new_sstable_list.insert(tab); + } co_await make_ready_future<>(); // yield if needed. } - for (auto& tab : old_sstables) { - new_sstable_set.erase(tab); - co_await make_ready_future<>(); // yield if needed. - } - co_return make_lw_shared(std::move(new_sstable_set)); + co_return make_lw_shared(std::move(new_sstable_list)); } // Note: must run in a seastar thread diff --git a/test/boost/sstable_set_test.cc b/test/boost/sstable_set_test.cc deleted file mode 100644 index 887bb9bb94..0000000000 --- a/test/boost/sstable_set_test.cc +++ /dev/null @@ -1,638 +0,0 @@ -/* - * Copyright (C) 2020 ScyllaDB - */ - -/* - * This file is part of Scylla. - * - * Scylla is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Scylla is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Scylla. If not, see . - */ - -#include -#include -#include -#include -#include -#include "sstables/compaction.hh" -#include -#include -#include "test/boost/sstable_test.hh" -#include -#include -#include "sstables/compaction_strategy_impl.hh" -#include "sstables/date_tiered_compaction_strategy.hh" -#include "sstables/time_window_compaction_strategy.hh" -#include "sstables/leveled_compaction_strategy.hh" -#include "sstables/sstable_set.hh" -#include "sstables/sstable_set_impl.hh" - -using namespace sstables; - -static const sstring some_keyspace("ks"); -static const sstring some_column_family("cf"); - -static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) { - auto sst = env.make_sstable(schema, "", gen, la, big); - sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key)); - return sst; -} - -SEASTAR_TEST_CASE(simple_versioned_sstable_set_test) { - return test_env::do_with([] (test_env& env) { - auto s = make_shared_schema({}, some_keyspace, some_column_family, - {{"p1", utf8_type}}, {}, {}, {}, utf8_type); - auto key_and_token_pair = token_generation_for_current_shard(8); - auto decorated_keys = boost::copy_range>( - key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair& key_and_token) { - auto value = bytes(reinterpret_cast(key_and_token.first.data()), key_and_token.first.size()); - auto pk = sstables::key::from_bytes(value).to_partition_key(*s); - return dht::decorate_key(*s, std::move(pk)); - })); - struct snapshot_and_selections { - std::optional ssts; - std::vector> selections; - std::unordered_map> runs; - std::unordered_set all; - snapshot_and_selections(sstable_set&& ssts, const std::vector>& selections, - const std::unordered_map>& runs, const std::unordered_set& all) - : ssts(std::move(ssts)), selections(selections), runs(runs), all(all) { } - }; - auto check = [&decorated_keys] (snapshot_and_selections& version) { - for (int j = 0; j < 8; j++) { - auto sel = version.ssts->select(dht::partition_range::make_singular(decorated_keys[j])); - BOOST_REQUIRE_EQUAL(sel.size(), version.selections[j].size()); - for (auto& sst : sel) { - BOOST_REQUIRE(version.selections[j].contains(sst)); - } - } - for (auto& [uuid, run] : version.runs) { - if (run.empty()) { - continue; - } - std::vector runs = version.ssts->select_sstable_runs({*run.begin()}); - // only one sstable -> only one run - BOOST_REQUIRE_EQUAL(runs[0].all().size(), run.size()); - for (auto& sst : runs[0].all()) { - BOOST_REQUIRE(run.contains(sst)); - } - } - BOOST_REQUIRE_EQUAL(version.ssts->all()->size(), version.all.size()); - for (auto& sst : *version.ssts->all()) { - BOOST_REQUIRE(version.all.contains(sst)); - } - }; - // check that selecting from older snapshots of an sstable_set gives correct results. - { - auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); - std::optional set = cs.make_sstable_set(s); - std::vector> current_selections(8); - std::unordered_map> current_runs; - std::unordered_set current_all; - - std::vector versions; - versions.reserve(20); - int token = 0; - for (int i = 0; i < 20; i++) { - for (int j = 0; j < 10; j++) { - auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1); - set->insert(sst); - current_selections[token].insert(sst); - current_runs[sst->run_identifier()].insert(sst); - current_all.insert(sst); - ++token %= 8; - } - for (int j = 0; j < 5; j++) { - auto sst = *set->all()->begin(); - set->erase(sst); - for (auto& sel : current_selections) { - // actually erases only from one - sel.erase(sst); - } - current_runs[sst->run_identifier()].erase(sst); - current_all.erase(sst); - } - versions.emplace_back(std::move(*set), current_selections, current_runs, current_all); - set = versions.back().ssts; - } - - for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) { - check(versions[i]); - // by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots - versions[i].ssts.reset(); - } - } - return make_ready_future<>(); - }); -} - -SEASTAR_TEST_CASE(sstable_list_test) { - return test_env::do_with([] (test_env& env) { - auto s = make_shared_schema({}, some_keyspace, some_column_family, - {{"p1", utf8_type}}, {}, {}, {}, utf8_type); - auto key_and_token_pair = token_generation_for_current_shard(8); - auto decorated_keys = boost::copy_range>( - key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair& key_and_token) { - auto value = bytes(reinterpret_cast(key_and_token.first.data()), key_and_token.first.size()); - auto pk = sstables::key::from_bytes(value).to_partition_key(*s); - return dht::decorate_key(*s, std::move(pk)); - })); - - - struct list_and_sstables { - std::optional list; - std::unordered_set sstables_in_list; - - list_and_sstables(sstable_list&& sstl, std::unordered_set sstables_in_list) - : list(std::move(sstl)), sstables_in_list(sstables_in_list) { } - }; - auto check = [] (list_and_sstables& version) { - BOOST_REQUIRE_EQUAL(version.list->size(), version.sstables_in_list.size()); - for (auto& sst : *version.list) { - BOOST_REQUIRE(version.list->contains(sst)); - } - }; - - { - auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); - std::optional set = cs.make_sstable_set(s); - std::optional l = *set->all(); - std::unordered_set sstables_in_list; - - - std::vector versions; - versions.reserve(20); - int token = 0; - for (int i = 0; i < 20; i++) { - for (int j = 0; j < 10; j++) { - auto sst = sstable_for_overlapping_test(env, s, i, key_and_token_pair[token].first, key_and_token_pair[token].first, 1); - l->insert(sst); - sstables_in_list.insert(sst); - ++token %= 8; - } - for (int j = 0; j < 5; j++) { - auto sst = *l->begin(); - l->erase(sst); - sstables_in_list.erase(sst); - } - versions.emplace_back(std::move(*l), sstables_in_list); - l = versions.back().list; - } - - for (unsigned i : {15, 12, 6, 9, 19, 14, 4, 5, 13, 16, 2, 7, 0, 1, 10, 11, 3, 17, 8, 18}) { - check(versions[i]); - // by removing the reference (by overwriting) we test if it doesn't have influence on results on other snapshots - versions[i].list.reset(); - } - } - return make_ready_future<>(); - }); -} - -SEASTAR_TEST_CASE(sstable_set_version_merging_test) { - return test_env::do_with([] (test_env& env) { - auto s = make_shared_schema({}, some_keyspace, some_column_family, - {{"p1", utf8_type}}, {}, {}, {}, utf8_type); - auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); - auto key_and_token_pair = token_generation_for_current_shard(2); - std::optional set = cs.make_sstable_set(s); - std::optional list = *set->all(); - // set -> list - BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version()); - std::optional list2 = *set->all(); - // set -> list - // -> list2 - BOOST_REQUIRE_EQUAL(&set->all()->version(), list->version().get_previous_version()); - BOOST_REQUIRE_EQUAL(&set->all()->version(), list2->version().get_previous_version()); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - set->insert(sst); - // set' -> list - // -> list2 - // -> set - BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version()); - BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version()); - BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version()); - sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[1].first, key_and_token_pair[1].first, 1); - set->insert(sst); - // set' -> list - // -> list2 - // -> set - BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list->version().get_previous_version()); - BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list2->version().get_previous_version()); - BOOST_REQUIRE_NE(&set->all()->version(), list->version().get_previous_version()); - std::optional list3 = list; - // set' -> list -> list3 - // -> list2 - // -> set - BOOST_REQUIRE_EQUAL(&list->version(), list3->version().get_previous_version()); - list.reset(); - // set' -> list3 - // -> list2 - // -> set - BOOST_REQUIRE_EQUAL(set->all()->version().get_previous_version(), list3->version().get_previous_version()); - BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version()); - set.reset(); - // set' -> list3 - // -> list2 - BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list3->version().get_previous_version()); - std::optional list4 = list3; - std::optional list5 = list3; - // set' -> list3 -> list4 - // -> list5 - // -> list2 - BOOST_REQUIRE_EQUAL(&list3->version(), list4->version().get_previous_version()); - BOOST_REQUIRE_EQUAL(&list3->version(), list5->version().get_previous_version()); - list3.reset(); - // set' -> list3' -> list4 - // -> list5 - // -> list2 - BOOST_REQUIRE_NE(list2->version().get_previous_version(), list4->version().get_previous_version()); - BOOST_REQUIRE_EQUAL(list4->version().get_previous_version(), list5->version().get_previous_version()); - list4.reset(); - // set' -> list5 - // -> list2 - BOOST_REQUIRE_EQUAL(list2->version().get_previous_version(), list5->version().get_previous_version()); - list2.reset(); - // list5 - BOOST_REQUIRE_EQUAL(nullptr, list5->version().get_previous_version()); - return make_ready_future<>(); - }); -} - -SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_erased_in_last_descendant_test) { - return test_env::do_with([] (test_env& env) { - auto s = make_shared_schema({}, some_keyspace, some_column_family, - {{"p1", utf8_type}}, {}, {}, {}, utf8_type); - auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); - auto key_and_token_pair = token_generation_for_current_shard(1); - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - // set -> set2 - // -> set3 - set.reset(); - set2->erase(sst); - set3->erase(sst); - sst = nullptr; - BOOST_REQUIRE(is_sstable_removed); - } - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - std::optional set4 = set; - // set -> set2 - // -> set3 - // -> set4 - set.reset(); - set2->erase(sst); - set3.reset(); - set4->erase(sst); - sst = nullptr; - BOOST_REQUIRE(is_sstable_removed); - } - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - std::optional set4 = set3; - std::optional set5 = set3; - // set -> set2 - // -> set3 -> set4 - // -> set5 - set.reset(); - set2->erase(sst); - set3.reset(); - set4->erase(sst); - set5->erase(sst); - sst = nullptr; - BOOST_REQUIRE(is_sstable_removed); - } - return make_ready_future<>(); - }); -} - -SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_ancestor_test) { - return test_env::do_with([] (test_env& env) { - auto s = make_shared_schema({}, some_keyspace, some_column_family, - {{"p1", utf8_type}}, {}, {}, {}, utf8_type); - auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); - auto key_and_token_pair = token_generation_for_current_shard(1); - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - // set -> set2 - set2->erase(sst); - sst = nullptr; - BOOST_REQUIRE(!is_sstable_removed); - set.reset(); - BOOST_REQUIRE(is_sstable_removed); - } - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - // set -> set2 - // -> set3 - set2->erase(sst); - set3->erase(sst); - sst = nullptr; - BOOST_REQUIRE(!is_sstable_removed); - set.reset(); - BOOST_REQUIRE(is_sstable_removed); - } - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - std::optional set4 = set; - // set -> set2 - // -> set3 - // -> set4 - set2->erase(sst); - set3.reset(); - set4->erase(sst); - sst = nullptr; - BOOST_REQUIRE(!is_sstable_removed); - set.reset(); - BOOST_REQUIRE(is_sstable_removed); - } - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - std::optional set4 = set3; - std::optional set5 = set3; - // set -> set2 - // -> set3 -> set4 - // -> set5 - set.reset(); - set2->erase(sst); - set4->erase(sst); - set5->erase(sst); - sst = nullptr; - BOOST_REQUIRE(!is_sstable_removed); - set3.reset(); - BOOST_REQUIRE(is_sstable_removed); - } - return make_ready_future<>(); - }); -} - - -SEASTAR_TEST_CASE(sstable_set_propagate_erased_sstables_remove_reference_to_descendant_test) { - return test_env::do_with([] (test_env& env) { - auto s = make_shared_schema({}, some_keyspace, some_column_family, - {{"p1", utf8_type}}, {}, {}, {}, utf8_type); - auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); - auto key_and_token_pair = token_generation_for_current_shard(1); - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - // set -> set2 - // -> set3 - set.reset(); - set2->erase(sst); - sst = nullptr; - BOOST_REQUIRE(!is_sstable_removed); - set3.reset(); - BOOST_REQUIRE(is_sstable_removed); - } - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - std::optional set4 = set; - // set -> set2 - // -> set3 - // -> set4 - set.reset(); - set2->erase(sst); - set3->erase(sst); - sst = nullptr; - BOOST_REQUIRE(!is_sstable_removed); - set4.reset(); - BOOST_REQUIRE(is_sstable_removed); - } - { - std::optional set = cs.make_sstable_set(s); - auto sst = sstable_for_overlapping_test(env, s, 1, key_and_token_pair[0].first, key_and_token_pair[0].first, 1); - bool is_sstable_removed = false; - utils::observer observer = sst->add_on_closed_handler([&] (sstable& sst) { - is_sstable_removed = true; - }); - set->insert(sst); - std::optional set2 = set; - std::optional set3 = set; - std::optional set4 = set3; - std::optional set5 = set3; - // set -> set2 - // -> set3 -> set4 - // -> set5 - set.reset(); - set2->erase(sst); - set3.reset(); - set4->erase(sst); - sst = nullptr; - BOOST_REQUIRE(!is_sstable_removed); - set5.reset(); - BOOST_REQUIRE(is_sstable_removed); - } - return make_ready_future<>(); - }); -} - -class simple_sstable_set { - std::unique_ptr _impl; - schema_ptr _schema; - // used to support column_family::get_sstable(), which wants to return an sstable_list - // that has a reference somewhere - lw_shared_ptr> _all; - std::unordered_map _all_runs; -public: - ~simple_sstable_set() = default; - - simple_sstable_set(std::unique_ptr impl, schema_ptr schema) - : _impl(std::move(impl)) - , _schema(std::move(schema)) - , _all(make_lw_shared>()) { - } - - simple_sstable_set(const simple_sstable_set& x) - : _impl(x._impl->clone()) - , _schema(x._schema) - , _all(make_lw_shared>(*x._all)) - , _all_runs(x._all_runs) { - } - - simple_sstable_set(simple_sstable_set&& x) noexcept = default; - - simple_sstable_set& operator=(const simple_sstable_set& x) { - if (this != &x) { - auto tmp = simple_sstable_set(x); - *this = std::move(tmp); - } - return *this; - } - - simple_sstable_set& operator=(simple_sstable_set&&) noexcept = default; - - std::vector select(const dht::partition_range& range) const { - return _impl->select(range); - } - - // Return all runs which contain any of the input sstables. - std::vector select_sstable_runs(const std::vector& sstables) const { - auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); - return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { - return _all_runs.at(run_id); - })); - } - - lw_shared_ptr> all() const { return _all; } - - void insert(shared_sstable sst) { - _impl->insert(sst); - _all->insert(sst); - _all_runs[sst->run_identifier()].insert(sst); - } - - void erase(shared_sstable sst) { - _impl->erase(sst); - _all->erase(sst); - _all_runs[sst->run_identifier()].erase(sst); - } -}; - -SEASTAR_TEST_CASE(sstable_set_random_walk_test) { - return test_env::do_with([] (test_env& env) { - auto rand = std::default_random_engine(); - auto op_gen = std::uniform_int_distribution(0, 7); - auto nr_dist = std::geometric_distribution(0.7); - auto s = make_shared_schema({}, some_keyspace, some_column_family, - {{"p1", utf8_type}}, {}, {}, {}, utf8_type); - auto leveled_cs = leveled_compaction_strategy(s->compaction_strategy_options()); - std::vector sstable_sets; - std::vector simple_sstable_sets; - sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s); - simple_sstable_sets.emplace_back(leveled_cs.make_sstable_set(s), s); - auto key_and_token_pair = token_generation_for_current_shard(1000); - std::vector sstables(1000); - for (int i = 0; i < 1000; i++) { - sstables[i] = sstable_for_overlapping_test(env, s, i, key_and_token_pair[i].first, key_and_token_pair[i].first, i); - } - for (auto i = 0; i != 100000; i++) { - auto op = op_gen(rand); - auto u = std::uniform_int_distribution(0, sstable_sets.size() - 1); - auto idx = u(rand); - switch (op) { - case 0: { - // delete - if (sstable_sets.size() > 1) { - sstable_sets.erase(sstable_sets.begin() + idx); - simple_sstable_sets.erase(simple_sstable_sets.begin() + idx); - break; - } - // if we can't remove the version, let's create one - [[fallthrough]]; - } - case 1: { - // copy - if (sstable_sets.size() < 100) { - sstable_sets.emplace_back(sstable_sets[idx]); - simple_sstable_sets.emplace_back(simple_sstable_sets[idx]); - for (auto& sst : *simple_sstable_sets.back().all()) { - BOOST_REQUIRE(sstable_sets.back().all()->contains(sst)); - } - } - break; - } - default: - // modify - auto sst_u = std::uniform_int_distribution(0, 999); - auto sst_idx = sst_u(rand); - if (simple_sstable_sets[idx].all()->contains(sstables[sst_idx])) { - sstable_sets[idx].erase(sstables[sst_idx]); - simple_sstable_sets[idx].erase(sstables[sst_idx]); - BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx])); - BOOST_REQUIRE(!sstable_sets[idx].all()->contains(sstables[sst_idx])); - } else { - sstable_sets[idx].insert(sstables[sst_idx]); - simple_sstable_sets[idx].insert(sstables[sst_idx]); - BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx])); - BOOST_REQUIRE(sstable_sets[idx].all()->contains(sstables[sst_idx])); - } - } - for (int j = 0; j < sstable_sets.size(); j++) { - BOOST_REQUIRE_EQUAL(sstable_sets[j].all()->size(), simple_sstable_sets[j].all()->size()); - } - } - return make_ready_future<>(); - }); -} diff --git a/test/boost/sstable_test.hh b/test/boost/sstable_test.hh index efc0eba8df..30790ab29c 100644 --- a/test/boost/sstable_test.hh +++ b/test/boost/sstable_test.hh @@ -37,6 +37,40 @@ #include #include +constexpr auto la = sstables::sstable::version_types::la; +constexpr auto big = sstables::sstable::format_types::big; + +class column_family_test { + lw_shared_ptr _cf; +public: + column_family_test(lw_shared_ptr cf) : _cf(cf) {} + + void add_sstable(sstables::shared_sstable sstable) { + _cf->_sstables->insert(std::move(sstable)); + } + + // NOTE: must run in a thread + void rebuild_sstable_list(const std::vector& new_sstables, + const std::vector& sstables_to_remove) { + _cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0(); + } + + static void update_sstables_known_generation(column_family& cf, unsigned generation) { + cf.update_sstables_known_generation(generation); + } + + static uint64_t calculate_generation_for_new_table(column_family& cf) { + return cf.calculate_generation_for_new_table(); + } + + static int64_t calculate_shard_from_sstable_generation(int64_t generation) { + return column_family::calculate_shard_from_sstable_generation(generation); + } + + future try_flush_memtable_to_sstable(lw_shared_ptr mt) { + return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional()); + } +}; namespace sstables { diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index e2a3a1a26a..347ddb543a 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -31,41 +31,6 @@ #include "test/lib/test_services.hh" #include "test/lib/log.hh" -constexpr auto la = sstables::sstable::version_types::la; -constexpr auto big = sstables::sstable::format_types::big; - -class column_family_test { - lw_shared_ptr _cf; -public: - column_family_test(lw_shared_ptr cf) : _cf(cf) {} - - void add_sstable(sstables::shared_sstable sstable) { - _cf->_sstables->insert(std::move(sstable)); - } - - // NOTE: must run in a thread - void rebuild_sstable_list(const std::vector& new_sstables, - const std::vector& sstables_to_remove) { - _cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0(); - } - - static void update_sstables_known_generation(column_family& cf, unsigned generation) { - cf.update_sstables_known_generation(generation); - } - - static uint64_t calculate_generation_for_new_table(column_family& cf) { - return cf.calculate_generation_for_new_table(); - } - - static int64_t calculate_shard_from_sstable_generation(int64_t generation) { - return column_family::calculate_shard_from_sstable_generation(generation); - } - - future try_flush_memtable_to_sstable(lw_shared_ptr mt) { - return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional()); - } -}; - namespace sstables { class test_env_sstables_manager : public sstables_manager { diff --git a/test/perf/perf_sstable_set.cc b/test/perf/perf_sstable_set.cc deleted file mode 100644 index 02cc128a9e..0000000000 --- a/test/perf/perf_sstable_set.cc +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (C) 2020 ScyllaDB - */ - -/* - * This file is part of Scylla. - * - * Scylla is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Scylla is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Scylla. If not, see . - */ - -#include "database.hh" -#include "test/lib/simple_schema.hh" -#include "test/perf/perf.hh" -#include -#include -#include "test/lib/sstable_test_env.hh" -#include - -static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) { - auto sst = env.make_sstable(schema, "", gen, la, big); - sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key)); - return sst; -} - -int main(int argc, char* argv[]) { - app_template app; - return app.run(argc, argv, [&app] { - return test_env::do_with_async([] (test_env& env) { - using namespace std::chrono; - using namespace std::chrono_literals; - auto start = high_resolution_clock::now(); - simple_schema ss; - auto s = ss.schema(); - auto cm = make_lw_shared(); - column_family::config cfg; - auto cl_stats = make_lw_shared(); - auto tracker = make_lw_shared(); - auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker); - cf->set_compaction_strategy(sstables::compaction_strategy_type::leveled); - - constexpr int ssts_in_level_0 = 10; - std::function idx_to_level = [&] (int i) { - if (i < ssts_in_level_0) { - return 0; - } - return 1 + idx_to_level((i - ssts_in_level_0) / 10 + ssts_in_level_0 - 1); - }; - auto level_to_size = [] (int level) { - if (level == 0) { - return ssts_in_level_0; - } - return int(pow(10, level)); - }; - - auto kt_pair = token_generation_for_current_shard(1120); - auto min_max_keys = [&kt_pair, &level_to_size] (auto level, auto pos_in_level) -> std::pair { - auto last_key_idx = kt_pair.size() - 1; - if (level == 0) { - return { kt_pair[0].first, kt_pair[last_key_idx].first }; - } - auto total_ranges = kt_pair.size(); - auto level_size_in_ssts = level_to_size(level); - unsigned ranges_per_sst = std::max(1U, unsigned(floor(float(total_ranges) / level_size_in_ssts))); - sstring min_key = kt_pair.at(pos_in_level).first; - sstring max_key = kt_pair.at(std::min(pos_in_level + ranges_per_sst - 1, unsigned(last_key_idx))).first; - return {min_key, max_key}; - }; - - std::vector inputs[3], outputs[3]; - - std::array pos_in_levels{0}; - pos_in_levels.fill(0); - auto start2 = high_resolution_clock::now(); - for (auto i = 0; i < 1120; i++) { - auto level = idx_to_level(i); - auto [min, max] = min_max_keys(level, pos_in_levels[level]++); - auto sst = sstable_for_overlapping_test(env, s, i, min, max, uint32_t(level)); - column_family_test(cf).add_sstable(sst); - if (level >= 1 && pos_in_levels[level] < 10) { - inputs[level-1].push_back(sst); - } - seastar::thread::maybe_yield(); - } - - for (auto i = 0; i < 30; i++) { - auto [min, max] = min_max_keys(1 + i / 10, i % 10); - auto sst = sstable_for_overlapping_test(env, s, i, min, max, 1 + i / 10); - outputs[i / 10].push_back(sst); - seastar::thread::maybe_yield(); - } - for (auto i = 0; i < 3; i++) { - auto t1 = high_resolution_clock::now(); - column_family_test(cf).rebuild_sstable_list(outputs[i], inputs[i]); - auto t2 = high_resolution_clock::now(); - std::cout << "Replacing 10 L" << i + 1 <<" sstables took " - << std::chrono::duration_cast(t2 - t1).count() << "ms to complete\n"; - } - }); - }); -}