From e4865bd4d108202178f42c41133689711bb2fa06 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 30 Mar 2023 18:21:55 +0200 Subject: [PATCH] dht, storage_proxy: Abstract token space splitting Currently, scans are splitting partition ranges around tokens. This will have to change with tablets, where we should split at tablet boundaries. This patch introduces token_range_splitter which abstracts this task. It is provided by effective_replication_map implementation. --- cql3/statements/select_statement.cc | 5 ++- locator/abstract_replication_strategy.cc | 4 ++ locator/abstract_replication_strategy.hh | 6 +++ locator/token_metadata.cc | 51 +++++++++++++++--------- locator/token_metadata.hh | 11 ++++- locator/token_range_splitter.hh | 42 +++++++++++++++++++ query_ranges_to_vnodes.cc | 19 +++++---- query_ranges_to_vnodes.hh | 5 ++- service/forward_service.cc | 2 +- service/storage_proxy.cc | 2 +- test/boost/storage_proxy_test.cc | 2 +- 11 files changed, 113 insertions(+), 36 deletions(-) create mode 100644 locator/token_range_splitter.hh diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index bcb4b985be..fce131d90c 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -46,6 +46,7 @@ #include "utils/result_combinators.hh" #include "utils/result_loop.hh" #include "service/forward_service.hh" +#include "replica/database.hh" template using coordinator_result = cql3::statements::select_statement::coordinator_result; @@ -560,7 +561,9 @@ indexed_table_select_statement::do_execute_base_query( auto cmd = prepare_command_for_base_query(qp, options, state, now, bool(paging_state)); auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options); uint32_t queried_ranges_count = partition_ranges.size(); - query_ranges_to_vnodes_generator ranges_to_vnodes(qp.proxy().get_token_metadata_ptr(), _schema, std::move(partition_ranges)); + auto&& table = qp.proxy().local_db().find_column_family(_schema); + auto erm = table.get_effective_replication_map(); + query_ranges_to_vnodes_generator ranges_to_vnodes(erm->make_splitter(), _schema, std::move(partition_ranges)); struct base_query_state { query::result_merger merger; diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index e6f27e277c..cf1d614490 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -107,6 +107,10 @@ inet_address_vector_topology_change vnode_effective_replication_map::get_pending return _tmptr->pending_endpoints_for(search_token, ks_name); } +std::unique_ptr vnode_effective_replication_map::make_splitter() const { + return locator::make_splitter(_tmptr); +} + const per_table_replication_strategy* abstract_replication_strategy::maybe_as_per_table() const { if (!_per_table) { return nullptr; diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 81e2b82751..3ca30b5e74 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -14,6 +14,7 @@ #include "gms/inet_address.hh" #include "gms/feature_service.hh" #include "locator/snitch_base.hh" +#include "locator/token_range_splitter.hh" #include "dht/i_partitioner.hh" #include "token_metadata.hh" #include "snitch_base.hh" @@ -190,6 +191,10 @@ public: /// Pending replica is a replica which gains ownership of data. /// Non-empty only during topology change. virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const = 0; + + /// Returns a token_range_splitter which is line with the replica assignment of this replication map. + /// The splitter can live longer than this instance. + virtual std::unique_ptr make_splitter() const = 0; }; using effective_replication_map_ptr = seastar::shared_ptr; @@ -248,6 +253,7 @@ public: // effective_replication_map inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override; inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override; inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override; + std::unique_ptr make_splitter() const override; public: explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 32254ca33c..27f59ad7a5 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -113,8 +113,7 @@ public: */ boost::iterator_range ring_range(const token& start) const; - boost::iterator_range ring_range( - const std::optional& start) const; + boost::iterator_range ring_range(dht::ring_position_view pos) const; topology& get_topology() { return _topology; @@ -559,21 +558,8 @@ void token_metadata_impl::add_bootstrap_token(token t, inet_address endpoint) { } boost::iterator_range -token_metadata_impl::ring_range(const std::optional& start) const { - auto r = ring_range(start ? start->value().token() : dht::minimum_token()); - - if (!r.empty()) { - // We should skip the first token if it's excluded by the range. - if (start - && !start->is_inclusive() - && !start->value().has_key() - && start->value().token() == *r.begin()) - { - r.pop_front(); - } - } - - return r; +token_metadata_impl::ring_range(const dht::ring_position_view start) const { + return ring_range(start.token()); } void token_metadata_impl::add_bootstrap_tokens(std::unordered_set tokens, inet_address endpoint) { @@ -1026,10 +1012,39 @@ token_metadata::ring_range(const token& start) const { } boost::iterator_range -token_metadata::ring_range(const std::optional& start) const { +token_metadata::ring_range(dht::ring_position_view start) const { return _impl->ring_range(start); } +class token_metadata_ring_splitter : public locator::token_range_splitter { + token_metadata_ptr _tmptr; + boost::iterator_range _range; +public: + token_metadata_ring_splitter(token_metadata_ptr tmptr) + : _tmptr(std::move(tmptr)) + , _range(_tmptr->sorted_tokens().empty() // ring_range() throws if the ring is empty + ? boost::make_iterator_range(token_metadata::tokens_iterator(), token_metadata::tokens_iterator()) + : _tmptr->ring_range(dht::minimum_token())) + { } + + void reset(dht::ring_position_view pos) override { + _range = _tmptr->ring_range(pos); + } + + std::optional next_token() override { + if (_range.empty()) { + return std::nullopt; + } + auto t = *_range.begin(); + _range.drop_front(); + return t; + } +}; + +std::unique_ptr make_splitter(token_metadata_ptr tmptr) { + return std::make_unique(std::move(tmptr)); +} + topology& token_metadata::get_topology() { return _impl->get_topology(); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index d9eccac12f..e3dffa17f4 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -15,6 +15,7 @@ #include #include "gms/inet_address.hh" #include "dht/i_partitioner.hh" +#include "locator/token_range_splitter.hh" #include "inet_address_vectors.hh" #include #include @@ -76,6 +77,7 @@ public: }; using inet_address = gms::inet_address; private: + friend class token_metadata_ring_splitter; class tokens_iterator { public: using iterator_category = std::input_iterator_tag; @@ -131,8 +133,11 @@ public: * @return The requested range (see the description above) */ boost::iterator_range ring_range(const token& start) const; - boost::iterator_range ring_range( - const std::optional& start) const; + + /** + * Returns a range of tokens such that the first token t satisfies dht::ring_position_view::ending_at(t) >= start. + */ + boost::iterator_range ring_range(dht::ring_position_view start) const; topology& get_topology(); const topology& get_topology() const; @@ -345,4 +350,6 @@ public: static future<> mutate_on_all_shards(sharded& stm, seastar::noncopyable_function (token_metadata&)> func); }; +std::unique_ptr make_splitter(token_metadata_ptr); + } diff --git a/locator/token_range_splitter.hh b/locator/token_range_splitter.hh new file mode 100644 index 0000000000..3f1509dbd5 --- /dev/null +++ b/locator/token_range_splitter.hh @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "dht/i_partitioner.hh" +#include "dht/token.hh" + +#include + +namespace locator { + +/// Generates split points which divide the ring into ranges which share the same replica set. +/// +/// Initially the ring space the splitter works with is set to the whole ring. +/// The space can be changed using reset(). +class token_range_splitter { +public: + virtual ~token_range_splitter() = default; + + /// Resets the splitter to work with the ring range [pos, +inf). + virtual void reset(dht::ring_position_view pos) = 0; + + /// Each token t returned by next_token() means that keys in the range: + /// + /// [prev_pos, dht::ring_position_view::ending_at(t)) + /// + /// share the same replica set. + /// + /// If this is the first call to next_token() after construction or reset() then prev_pos is the + /// beginning of the ring space. Otherwise, it is dht::ring_position_view::ending_at(prev_t) + /// where prev_t is the token returned by the previous call to next_token(). + /// If std::nullopt is returned it means that the ring space was exhausted. + virtual std::optional next_token() = 0; +}; + +} \ No newline at end of file diff --git a/query_ranges_to_vnodes.cc b/query_ranges_to_vnodes.cc index 18cd60d28a..f35591ff55 100644 --- a/query_ranges_to_vnodes.cc +++ b/query_ranges_to_vnodes.cc @@ -20,8 +20,8 @@ const dht::token& end_token(const dht::partition_range& r) { return r.end() ? r.end()->value().token() : max_token; } -query_ranges_to_vnodes_generator::query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local) : - _s(s), _ranges(std::move(ranges)), _i(_ranges.begin()), _local(local), _tmptr(std::move(tmptr)) {} +query_ranges_to_vnodes_generator::query_ranges_to_vnodes_generator(std::unique_ptr splitter, schema_ptr s, dht::partition_range_vector ranges, bool local) : + _s(s), _ranges(std::move(ranges)), _i(_ranges.begin()), _local(local), _splitter(std::move(splitter)) {} dht::partition_range_vector query_ranges_to_vnodes_generator::operator()(size_t n) { n = std::min(n, size_t(1024)); @@ -71,8 +71,9 @@ void query_ranges_to_vnodes_generator::process_one_range(size_t n, dht::partitio } // divide the queryRange into pieces delimited by the ring - auto ring_iter = _tmptr->ring_range(cr.start()); - for (const dht::token& upper_bound_token : ring_iter) { + _splitter->reset(dht::ring_position_view::for_range_start(cr)); + while (auto token_opt = _splitter->next_token()) { + auto upper_bound_token = *token_opt; /* * remainder can be a range/bounds of token _or_ keys and we want to split it with a token: * - if remainder is tokens, then we'll just split using the provided token. @@ -85,15 +86,13 @@ void query_ranges_to_vnodes_generator::process_one_range(size_t n, dht::partitio */ dht::ring_position split_point(upper_bound_token, dht::ring_position::token_bound::end); - if (!cr.contains(split_point, cmp)) { + + if (cmp(dht::ring_position_view::for_range_end(cr), split_point) <= 0) { break; // no more splits } - - // We shouldn't attempt to split on upper bound, because it may result in - // an ambiguous range of the form (x; x] - if (end_token(cr) == upper_bound_token) { - break; + if (cmp(dht::ring_position_view::for_range_start(cr), split_point) >= 0) { + continue; // avoid empty splits } std::pair splits = diff --git a/query_ranges_to_vnodes.hh b/query_ranges_to_vnodes.hh index 8bca4e30dc..5e852230f1 100644 --- a/query_ranges_to_vnodes.hh +++ b/query_ranges_to_vnodes.hh @@ -10,6 +10,7 @@ #include "dht/i_partitioner.hh" #include "locator/token_metadata.hh" +#include "locator/token_range_splitter.hh" #include "schema/schema.hh" class query_ranges_to_vnodes_generator { @@ -17,10 +18,10 @@ class query_ranges_to_vnodes_generator { dht::partition_range_vector _ranges; dht::partition_range_vector::iterator _i; // iterator to current range in _ranges bool _local; - const locator::token_metadata_ptr _tmptr; + std::unique_ptr _splitter; void process_one_range(size_t n, dht::partition_range_vector& ranges); public: - query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local = false); + query_ranges_to_vnodes_generator(std::unique_ptr splitter, schema_ptr s, dht::partition_range_vector ranges, bool local = false); query_ranges_to_vnodes_generator(const query_ranges_to_vnodes_generator&) = delete; query_ranges_to_vnodes_generator(query_ranges_to_vnodes_generator&&) = default; // generate next 'n' vnodes, may return less than requested number of ranges diff --git a/service/forward_service.cc b/service/forward_service.cc index 8a2e871d96..7115f00c6b 100644 --- a/service/forward_service.cc +++ b/service/forward_service.cc @@ -513,7 +513,7 @@ future forward_service::dispatch(query::forward_request r // next_vnode is used to iterate through all vnodes produced by // query_ranges_to_vnodes_generator. auto next_vnode = [ - generator = query_ranges_to_vnodes_generator(get_token_metadata_ptr(), schema, req.pr) + generator = query_ranges_to_vnodes_generator(erm->make_splitter(), schema, req.pr) ] () mutable -> std::optional { if (auto vnode = generator(1); !vnode.empty()) { return vnode[0]; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 964bfe76e0..3ab3b5db41 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -5580,7 +5580,7 @@ storage_proxy::query_partition_key_range(lw_shared_ptr cmd, // (which can be expensive in clusters with vnodes) auto merge_tokens = !erm->get_replication_strategy().natural_endpoints_depend_on_token(); - query_ranges_to_vnodes_generator ranges_to_vnodes(erm->get_token_metadata_ptr(), schema, std::move(partition_ranges), merge_tokens); + query_ranges_to_vnodes_generator ranges_to_vnodes(erm->make_splitter(), schema, std::move(partition_ranges), merge_tokens); int result_rows_per_range = 0; int concurrency_factor = 1; diff --git a/test/boost/storage_proxy_test.cc b/test/boost/storage_proxy_test.cc index 5e6cfc8fef..9668900a33 100644 --- a/test/boost/storage_proxy_test.cc +++ b/test/boost/storage_proxy_test.cc @@ -43,7 +43,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { auto check = [&s](locator::token_metadata_ptr tmptr, dht::partition_range input, dht::partition_range_vector expected) { - query_ranges_to_vnodes_generator ranges_to_vnodes(tmptr, s, {input}); + query_ranges_to_vnodes_generator ranges_to_vnodes(locator::make_splitter(tmptr), s, {input}); auto actual = ranges_to_vnodes(1000); if (!std::equal(actual.begin(), actual.end(), expected.begin(), [&s](auto&& r1, auto&& r2) { return r1.equal(r2, dht::ring_position_comparator(*s));