From fb0bdcec0c86b5e34b6962708b791a9bacc610e7 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 25 May 2023 12:06:21 +0200 Subject: [PATCH] storage_proxy: Avoid multishard reader for tablets Currently, the coordinator splits the partition range at vnode (or tablet) boundaries and then tries to merge adjacent ranges which target the same replica. This is an optimization which makes less sense with tablets, which are supposed to be of substantial size. If we don't merge the ranges, then with tablets we can avoid using the multishard reader on the replica side, since each tablet lives on a single shard. The main reason to avoid a multishard reader is avoiding its complexity, and avoiding adapting it to work with tablet sharding. Currently, the multishard reader implementation makes several assumptions about shard assignment which do not hold with tablets. It assumes that shards are assigned in a round-robin fashion. --- dht/i_partitioner.cc | 29 +++++++++++++++++++++++++++++ dht/i_partitioner.hh | 8 ++++++++ dht/token.hh | 6 ++++++ multishard_mutation_query.cc | 8 ++++++++ replica/database.cc | 10 ++++++---- service/storage_proxy.cc | 14 ++++++++------ 6 files changed, 65 insertions(+), 10 deletions(-) diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index c136a10775..96786ea06e 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -504,4 +504,33 @@ dht::token_range_vector split_token_range_msb(unsigned most_significant_bits) { return ret; } +dht::token first_token(const dht::partition_range& pr) { + auto start = dht::ring_position_view::for_range_start(pr); + auto token = start.token(); + // Check if the range excludes "token". + if (!start.key() + && start.get_token_bound() == dht::ring_position::token_bound::end + && token._kind == dht::token::kind::key + && !token.is_last()) { + token = dht::next_token(token); + } + return token; +} + +std::optional is_single_shard(const dht::sharder& sharder, const schema& s, const dht::partition_range& pr) { + auto token = first_token(pr); + auto shard = sharder.shard_of(token); + if (pr.is_singular()) { + return shard; + } + if (auto s_a_t = sharder.next_shard(token)) { + dht::ring_position_comparator cmp(s); + auto end = dht::ring_position_view::for_range_end(pr); + if (cmp(end, dht::ring_position_view::starting_at(s_a_t->token)) > 0) { + return std::nullopt; + } + } + return shard; +} + } diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index 8887cf3a06..e1cdaf4304 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -653,6 +653,14 @@ future subtract_ranges(const schema& schema, const // Returns a token_range vector split based on the given number of most-significant bits dht::token_range_vector split_token_range_msb(unsigned most_significant_bits); +// Returns the first token included by a partition range. +// May return tokens for which is_minimum() or is_maximum() is true. +dht::token first_token(const dht::partition_range&); + +// Returns true iff a given partition range is wholly owned by a single shard. +// If so, returns that shard. Otherwise, return std::nullopt. +std::optional is_single_shard(const dht::sharder&, const schema&, const dht::partition_range&); + } // dht namespace std { diff --git a/dht/token.hh b/dht/token.hh index 6a4a616ae0..98c646cccb 100644 --- a/dht/token.hh +++ b/dht/token.hh @@ -83,6 +83,12 @@ public: return _kind == kind::after_all_keys; } + // Returns true iff this is the largest token which can be associated with a partition key. + // Note that this is different that is_maximum(). + bool is_last() const { + return _kind == dht::token::kind::key && _data == std::numeric_limits::max(); + } + size_t external_memory_usage() const { return 0; } diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 09a430ddb3..87665e19db 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -222,6 +222,14 @@ public: , _semaphores(smp::count, nullptr) { _readers.resize(smp::count); _permit.set_max_result_size(get_max_result_size()); + + if (!_erm->get_replication_strategy().is_vnode_based()) { + // The algorithm is full of assumptions about shard assignment being round-robin, static, and full. + // This does not hold for tablets. We chose to avoid this algorithm rather than adapting it. + // The coordinator should split ranges accordingly. + on_internal_error(mmq_log, format("multishard reader cannot be used on tables with non-static sharding: {}.{}", + _schema->ks_name(), _schema->cf_name())); + } } read_context(read_context&&) = delete; diff --git a/replica/database.cc b/replica/database.cc index ba3c0c5a56..dc558d08b3 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2904,8 +2904,9 @@ future>> query_mutations( auto max_size = db.local().get_unlimited_query_max_result_size(); auto max_res_size = query::max_result_size(max_size.soft_limit, max_size.hard_limit, query::result_memory_limiter::maximum_result_size); auto cmd = query::read_command(s->id(), s->version(), ps, max_res_size, query::tombstone_limit::max); - if (pr.is_singular()) { - unsigned shard = dht::shard_of(*s, pr.start()->value().token()); + auto erm = s->table().get_effective_replication_map(); + if (auto shard_opt = dht::is_single_shard(erm->get_sharder(*s), *s, pr)) { + auto shard = *shard_opt; co_return co_await db.invoke_on(shard, [gs = global_schema_ptr(s), &cmd, &pr, timeout] (replica::database& db) mutable { return db.query_mutations(gs, cmd, pr, {}, timeout).then([] (std::tuple&& res) { return make_foreign(make_lw_shared(std::move(std::get<0>(res)))); @@ -2929,8 +2930,9 @@ future>> query_data( auto cmd = query::read_command(s->id(), s->version(), ps, max_res_size, query::tombstone_limit::max); auto prs = dht::partition_range_vector{pr}; auto opts = query::result_options::only_result(); - if (pr.is_singular()) { - unsigned shard = dht::shard_of(*s, pr.start()->value().token()); + auto erm = s->table().get_effective_replication_map(); + if (auto shard_opt = dht::is_single_shard(erm->get_sharder(*s), *s, pr)) { + auto shard = *shard_opt; co_return co_await db.invoke_on(shard, [gs = global_schema_ptr(s), &cmd, opts, &prs, timeout] (replica::database& db) mutable { return db.query(gs, cmd, opts, prs, {}, timeout).then([] (std::tuple, cache_temperature>&& res) { return make_foreign(std::move(std::get<0>(res))); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 683f37c833..0edfd30eef 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -5308,8 +5308,8 @@ future>, cache_temperature>> storage_proxy::query_result_local(locator::effective_replication_map_ptr erm, schema_ptr s, lw_shared_ptr cmd, const dht::partition_range& pr, query::result_options opts, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) { cmd->slice.options.set_if(opts.request != query::result_request::only_result); - if (pr.is_singular()) { - auto shard = erm->get_sharder(*s).shard_of(pr.start()->value().token()); + if (auto shard_opt = dht::is_single_shard(erm->get_sharder(*s), *s, pr)) { + auto shard = *shard_opt; get_stats().replica_cross_shard_ops += shard != this_shard_id(); return _db.invoke_on(shard, _read_smp_service_group, [gs = global_schema_ptr(s), prv = dht::partition_range_vector({pr}) /* FIXME: pr is copied */, cmd, opts, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state)), rate_limit_info] (replica::database& db) mutable { auto trace_state = gt.get(); @@ -5540,6 +5540,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. + if (!erm->get_replication_strategy().uses_tablets()) { while (i != ranges.end()) { const auto current_range_preferred_replicas = preferred_replicas_for_range(*i); @@ -5638,6 +5639,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t ++i; merged_ranges.push_back(to_token_range(next_range)); } + } slogger.trace("creating range read executor for range {} in table {}.{} with targets {}", range, schema->ks_name(), schema->cf_name(), filtered_endpoints); try { @@ -6227,10 +6229,10 @@ future>, cache_tempera storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr cmd, const dht::partition_range& pr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr trace_state) { - if (pr.is_singular()) { - auto& table = local_db().find_column_family(s); - auto erm = table.get_effective_replication_map(); - unsigned shard = erm->shard_of(*s, pr.start()->value().token()); + auto& table = s->table(); + auto erm = table.get_effective_replication_map(); + if (auto shard_opt = dht::is_single_shard(erm->get_sharder(*s), *s, pr)) { + auto shard = *shard_opt; get_stats().replica_cross_shard_ops += shard != this_shard_id(); return _db.invoke_on(shard, _read_smp_service_group, [cmd, &pr, gs=global_schema_ptr(s), timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (replica::database& db) mutable { return db.query_mutations(gs, *cmd, pr, gt, timeout).then([] (std::tuple result_ht) {