From 0c2c00f01b52357e94dfed0c4c549f3fb6701087 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 4 Feb 2024 14:15:09 +0200 Subject: [PATCH] table: tablet_storage_group_manager: make tablet_sstable_set Make a specialized sstable_set for tablets via tablet_storage_group_manager::make_sstable_set. This sstable set takes a snapshot of the storage_groups (compound) sstable_sets and maps the selected tokens directly into the tablet compound_sstable_set. Refs #16876 Signed-off-by: Benny Halevy --- replica/database.hh | 2 + replica/table.cc | 5 +- replica/tablets.cc | 287 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 290 insertions(+), 4 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index f0324aae06..756a3e4f76 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1240,6 +1240,8 @@ public: friend class compaction_group; }; +lw_shared_ptr make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&); + using user_types_metadata = data_dictionary::user_types_metadata; using keyspace_metadata = data_dictionary::keyspace_metadata; diff --git a/replica/table.cc b/replica/table.cc index 1e815cf257..70de939243 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -688,11 +688,8 @@ public: future<> maybe_split_compaction_group_of(size_t idx) override; lw_shared_ptr make_sstable_set() const override { - // TODO: switch to a specialized set for groups which assumes disjointness across compound sets and incrementally read from them. // FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time. - auto sstable_sets = boost::copy_range>>(compaction_groups() - | boost::adaptors::transformed(std::mem_fn(&compaction_group::make_compound_sstable_set))); - return make_lw_shared(sstables::make_compound_sstable_set(schema(), std::move(sstable_sets))); + return make_tablet_sstable_set(schema(), *this, *_tablet_map); } }; diff --git a/replica/tablets.cc b/replica/tablets.cc index d9528881df..6d668dc008 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -17,6 +17,8 @@ #include "replica/database.hh" #include "replica/tablets.hh" #include "replica/tablet_mutation_builder.hh" +#include "sstables/sstable_set.hh" +#include "dht/token.hh" namespace replica { @@ -308,4 +310,289 @@ future> read_tablet_mutations(seastar::sharded> _sstable_sets; + size_t _size = 0; + uint64_t _bytes_on_disk = 0; + +public: + tablet_sstable_set(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap) + : _schema(std::move(s)) + , _tablet_map(tmap.tablet_count()) + { + _sstable_sets.reserve(sgm.storage_groups().size()); + for (const auto& sg : sgm.storage_groups()) { + if (sg) { + auto set = sg->make_sstable_set(); + _size += set->size(); + _bytes_on_disk += set->bytes_on_disk(); + _sstable_sets.emplace_back(std::move(set)); + } else { + _sstable_sets.emplace_back(); + } + } + } + + tablet_sstable_set(const tablet_sstable_set& o) + : _schema(o._schema) + , _tablet_map(o._tablet_map.tablet_count()) + , _sstable_sets(o._sstable_sets) + , _size(o._size) + , _bytes_on_disk(o._bytes_on_disk) + {} + + static lw_shared_ptr make(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap) { + return make_lw_shared(std::make_unique(std::move(s), sgm, tmap)); + } + + const schema_ptr& schema() const noexcept { + return _schema; + } + + virtual std::unique_ptr clone() const override { + return std::make_unique(*this); + } + + virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; + virtual lw_shared_ptr all() const override; + virtual stop_iteration for_each_sstable_until(std::function func) const override; + virtual future for_each_sstable_gently_until(std::function(const sstables::shared_sstable&)> func) const override; + virtual bool insert(sstables::shared_sstable sst) override; + virtual bool erase(sstables::shared_sstable sst) override; + virtual size_t size() const noexcept override { + return _size; + } + virtual uint64_t bytes_on_disk() const noexcept override { + return _bytes_on_disk; + } + virtual selector_and_schema_t make_incremental_selector() const override; + + virtual flat_mutation_reader_v2 create_single_key_sstable_reader( + replica::column_family*, + schema_ptr, + reader_permit, + utils::estimated_histogram&, + const dht::partition_range&, + const query::partition_slice&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + const sstables::sstable_predicate&) const override; + +private: + size_t group_of(const dht::token& t) const noexcept { + return _tablet_map.get_tablet_id(t).id; + } + dht::token first_token_of(size_t idx) const noexcept { +#ifndef SCYLLA_BUILD_MODE_RELEASE + if (idx >= _tablet_map.tablet_count()) { + on_fatal_internal_error(tablet_logger, format("first_token_of: idx={} out of range", idx)); + } +#endif + return _tablet_map.get_first_token(tablet_id(idx)); + } + dht::token last_token_of(size_t idx) const noexcept { +#ifndef SCYLLA_BUILD_MODE_RELEASE + if (idx >= _tablet_map.tablet_count()) { + on_fatal_internal_error(tablet_logger, format("last_token_of: idx={} out of range", idx)); + } +#endif + return _tablet_map.get_last_token(tablet_id(idx)); + } + stop_iteration for_each_sstable_set_until(const dht::partition_range&, std::function)>) const; + future for_each_sstable_set_gently_until(const dht::partition_range&, std::function(lw_shared_ptr)>) const; + + friend class tablet_incremental_selector; +}; + +lw_shared_ptr make_tablet_sstable_set(schema_ptr s, const storage_group_manager& sgm, const locator::tablet_map& tmap) { + return tablet_sstable_set::make(std::move(s), sgm, tmap); } + +stop_iteration tablet_sstable_set::for_each_sstable_set_until(const dht::partition_range& pr, std::function)> func) const { + size_t candidate_start = pr.start() ? group_of(pr.start()->value().token()) : size_t(0); + size_t candidate_end = pr.end() ? group_of(pr.end()->value().token()) : (_sstable_sets.size() - 1); + for (auto i = candidate_start; i <= candidate_end; i++) { + if (const auto& set = _sstable_sets[i]) { + if (func(set) == stop_iteration::yes) { + return stop_iteration::yes; + } + } + } + return stop_iteration::no; +} + +future tablet_sstable_set::for_each_sstable_set_gently_until(const dht::partition_range& pr, std::function(lw_shared_ptr)> func) const { + size_t candidate_start = pr.start() ? group_of(pr.start()->value().token()) : size_t(0); + size_t candidate_end = pr.end() ? group_of(pr.end()->value().token()) : (_sstable_sets.size() - 1); + for (auto i = candidate_start; i <= candidate_end; i++) { + if (const auto& set = _sstable_sets[i]) { + if (co_await func(set) == stop_iteration::yes) { + co_return stop_iteration::yes; + } + } + } + co_return stop_iteration::no; +} + +std::vector tablet_sstable_set::select(const dht::partition_range& range) const { + std::vector ret; + for_each_sstable_set_until(range, [&] (lw_shared_ptr set) { + auto ssts = set->select(range); + if (ret.empty()) { + ret = std::move(ssts); + } else { + std::move(ssts.begin(), ssts.end(), std::back_inserter(ret)); + } + return stop_iteration::no; + }); + tablet_logger.debug("tablet_sstable_set::select: range={} ret={}", range, ret.size()); + return ret; +} + +lw_shared_ptr tablet_sstable_set::all() const { + auto ret = make_lw_shared(); + for_each_sstable_set_until(query::full_partition_range, [&] (lw_shared_ptr set) { + set->for_each_sstable([&] (const sstables::shared_sstable& sst) { + ret->insert(sst); + }); + return stop_iteration::no; + }); + return ret; +} + +stop_iteration tablet_sstable_set::for_each_sstable_until(std::function func) const { + return for_each_sstable_set_until(query::full_partition_range, [func = std::move(func)] (lw_shared_ptr set) { + return set->for_each_sstable_until(func); + }); +} + +future tablet_sstable_set::for_each_sstable_gently_until(std::function(const sstables::shared_sstable&)> func) const { + return for_each_sstable_set_gently_until(query::full_partition_range, [func = std::move(func)] (lw_shared_ptr set) { + return set->for_each_sstable_gently_until(func); + }); +} + +bool tablet_sstable_set::insert(sstables::shared_sstable sst) { + throw_with_backtrace(); +} +bool tablet_sstable_set::erase(sstables::shared_sstable sst) { + throw_with_backtrace(); +} + +class tablet_incremental_selector : public sstables::incremental_selector_impl { + const tablet_sstable_set& _tset; + + // _cur_set and _cur_selector contain a snapshot + // for the currently selected compaction_group. + lw_shared_ptr _cur_set; + std::optional _cur_selector; + dht::token _lowest_next_token = dht::maximum_token(); + +public: + tablet_incremental_selector(const tablet_sstable_set& tset) + : _tset(tset) + {} + + virtual std::tuple, dht::ring_position_ext> select(const dht::ring_position_view& pos) override { + // Always return minimum singular range, such that incremental_selector::select() will always call this function, + // which in turn will find the next sstable set to select sstables from. + const dht::partition_range current_range = dht::partition_range::make_singular(dht::ring_position::min()); + + // pos must be monotonically increasing in the weak sense + // but caller can skip to a position outside the current set + auto token = pos.token(); + if (!_cur_set || pos.token() >= _lowest_next_token) { + auto idx = _tset.group_of(token); + if (!token.is_maximum()) { + _cur_set = _tset._sstable_sets[idx]; + } + // Set the next token to point to the next engaged storage group. + // It will be considered later on when the _cur_set is exhausted + _lowest_next_token = find_lowest_next_token(idx+1); + } + + if (!_cur_set) { + auto lowest_next_position = _lowest_next_token.is_maximum() + ? dht::ring_position_ext::max() + : dht::ring_position_ext::starting_at(_lowest_next_token); + tablet_logger.debug("tablet_incremental_selector {}.{}: select pos={}: returning 0 sstables, next_pos={}", + _tset.schema()->ks_name(), _tset.schema()->cf_name(), pos, lowest_next_position); + return std::make_tuple(std::move(current_range), std::vector{}, lowest_next_position); + } + + _cur_selector.emplace(_cur_set->make_incremental_selector()); + + auto res = _cur_selector->select(pos); + // Return all sstables selected on the requested position from the first matching sstable set. + // This assumes that the underlying sstable sets are disjoint in their token ranges so + // only one of them contain any given token. + auto sstables = std::move(res.sstables); + // Return the lowest next position, such that this function will be called again to select the + // lowest next position from the selector which previously returned it. + // Until the current selector is exhausted. In that case, + // jump to the next compaction_group sstable set. + dht::ring_position_ext next_position = res.next_position; + if (next_position.is_max()) { + // _cur_selector is exhausted. + // Return a position starting at `_lowest_next_token` + // that was calculated for the _cur_set + // (unless it's already maximum_token in which case we just return next_position == ring_position::max()). + _cur_set = {}; + _cur_selector.reset(); + if (!_lowest_next_token.is_maximum()) { + next_position = dht::ring_position_ext::starting_at(_lowest_next_token); + } + } + + tablet_logger.debug("tablet_incremental_selector {}.{}: select pos={}: returning {} sstables, next_pos={}", + _tset.schema()->ks_name(), _tset.schema()->cf_name(), pos, sstables.size(), next_position); + return std::make_tuple(std::move(current_range), std::move(sstables), std::move(next_position)); + } + +private: + // Find the start token of the first engaged sstable_set + // starting the search from `idx`. + dht::token find_lowest_next_token(size_t idx) { + while (idx < _tset._sstable_sets.size()) { + if (_tset._sstable_sets[idx]) { + return _tset.first_token_of(idx); + } + ++idx; + } + return dht::maximum_token(); + } +}; + +flat_mutation_reader_v2 +tablet_sstable_set::create_single_key_sstable_reader( + replica::column_family* cf, + schema_ptr schema, + reader_permit permit, + utils::estimated_histogram& sstable_histogram, + const dht::partition_range& pr, + const query::partition_slice& slice, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& predicate) const { + // The singular partition_range start bound must be engaged. + auto idx = group_of(pr.start()->value().token()); + const auto& set = _sstable_sets[idx]; + if (!set) { + return make_empty_flat_reader_v2(cf->schema(), std::move(permit)); + } + return set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate); +} + +sstables::sstable_set_impl::selector_and_schema_t tablet_sstable_set::make_incremental_selector() const { + return std::make_tuple(std::make_unique(*this), *_schema); +} + +} // namespace replica