From 10e05eec667e22097cd9fb43da18317aff5aa81e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 9 May 2023 11:55:43 +0200 Subject: [PATCH] storage_proxy: Obtain shard from erm in the read path dht::shard_of() does not use the correct sharder for tablet-based tables. Code which is supposed to work with all kinds of tables should use erm::get_sharder(). --- service/storage_proxy.cc | 29 +++++++++++++++++------------ service/storage_proxy.hh | 14 +++++++++----- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9085376ec8..683f37c833 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -653,12 +653,13 @@ private: // this function assumes singular queries but doesn't validate throw std::runtime_error("READ_DATA called with wrapping range"); } + auto erm = s->table().get_effective_replication_map(); p->get_stats().replica_data_reads++; auto da = oda.value_or(query::digest_algorithm::MD5); query::result_options opts; opts.digest_algo = da; opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest; - return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, rate_limit_info); + return p->query_result_local(erm, std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, rate_limit_info); } else if constexpr (verb == read_verb::read_mutation_data) { p->get_stats().replica_mutation_data_reads++; return p->query_mutations_locally(std::move(s), std::move(cmd), pr2, timeout, trace_state_ptr); @@ -667,9 +668,10 @@ private: // this function assumes singular queries but doesn't validate throw std::runtime_error("READ_DIGEST called with wrapping range"); } + auto erm = s->table().get_effective_replication_map(); p->get_stats().replica_digest_reads++; auto da = oda.value_or(query::digest_algorithm::MD5); - return p->query_result_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, timeout, da, rate_limit_info); + return p->query_result_local_digest(erm, std::move(s), cmd, std::move(pr2.first), trace_state_ptr, timeout, da, rate_limit_info); } else { static_assert(verb == static_cast(-1), "Unsupported verb"); } @@ -870,6 +872,7 @@ static uint32_t random_variable_for_rate_limit() { } static result choose_rate_limit_info( + locator::effective_replication_map_ptr erm, replica::database& db, bool coordinator_in_replica_set, db::operation_type op_type, @@ -880,7 +883,7 @@ static result choose_rate_limit_info( db::per_partition_rate_limit::account_and_enforce enforce_info{ .random_variable = random_variable_for_rate_limit(), }; - if (coordinator_in_replica_set && dht::shard_of(*s, token) == this_shard_id()) { + if (coordinator_in_replica_set && erm->get_sharder(*s).shard_of(token) == this_shard_id()) { auto& cf = db.find_column_family(s); auto decision = db.account_coordinator_operation_to_rate_limit(cf, token, enforce_info, op_type); if (decision) { @@ -2994,7 +2997,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok db::per_partition_rate_limit::info rate_limit_info; if (allow_limit && _db.local().can_apply_per_partition_rate_limit(*s, db::operation_type::write)) { - auto r_rate_limit_info = choose_rate_limit_info(_db.local(), coordinator_in_replica_set, db::operation_type::write, s, token, tr_state); + auto r_rate_limit_info = choose_rate_limit_info(erm, _db.local(), coordinator_in_replica_set, db::operation_type::write, s, token, tr_state); if (!r_rate_limit_info) { return std::move(r_rate_limit_info).as_failure(); } @@ -4779,7 +4782,7 @@ protected: : query::result_options{query::result_request::only_result, query::digest_algorithm::none}; if (fbu::is_me(ep)) { tracing::trace(_trace_state, "read_data: querying locally"); - return _proxy->apply_fence(_proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout, adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), utils::fb_utilities::get_broadcast_address()); + return _proxy->apply_fence(_proxy->query_result_local(_effective_replication_map_ptr, _schema, _cmd, _partition_range, opts, _trace_state, timeout, adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), utils::fb_utilities::get_broadcast_address()); } else { return _proxy->remote().send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, _trace_state, *_cmd, _partition_range, opts.digest_algo, _rate_limit_info, @@ -4790,7 +4793,7 @@ protected: ++_proxy->get_stats().digest_read_attempts.get_ep_stat(get_topology(), ep); if (fbu::is_me(ep)) { tracing::trace(_trace_state, "read_digest: querying locally"); - return _proxy->apply_fence(_proxy->query_result_local_digest(_schema, _cmd, _partition_range, _trace_state, + return _proxy->apply_fence(_proxy->query_result_local_digest(_effective_replication_map_ptr, _schema, _cmd, _partition_range, _trace_state, timeout, digest_algorithm(*_proxy), adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), utils::fb_utilities::get_broadcast_address()); } else { tracing::trace(_trace_state, "read_digest: sending a message to /{}", ep); @@ -5250,7 +5253,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw db::per_partition_rate_limit::info rate_limit_info; if (cmd->allow_limit && _db.local().can_apply_per_partition_rate_limit(*schema, db::operation_type::read)) { - auto r_rate_limit_info = choose_rate_limit_info(_db.local(), !is_read_non_local, db::operation_type::read, schema, token, trace_state); + auto r_rate_limit_info = choose_rate_limit_info(erm, _db.local(), !is_read_non_local, db::operation_type::read, schema, token, trace_state); if (!r_rate_limit_info) { slogger.debug("Read was rate limited"); get_stats().read_rate_limited_by_coordinator.mark(); @@ -5294,19 +5297,19 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw } future>> -storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, query::digest_algorithm da, db::per_partition_rate_limit::info rate_limit_info) { - return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout, rate_limit_info).then([] (rpc::tuple>, cache_temperature> result_and_hit_rate) { +storage_proxy::query_result_local_digest(locator::effective_replication_map_ptr erm, schema_ptr s, lw_shared_ptr cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, query::digest_algorithm da, db::per_partition_rate_limit::info rate_limit_info) { + return query_result_local(std::move(erm), std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout, rate_limit_info).then([] (rpc::tuple>, cache_temperature> result_and_hit_rate) { auto&& [result, hit_rate] = result_and_hit_rate; return make_ready_future>>(rpc::tuple(*result->digest(), result->last_modified(), hit_rate, result->last_position())); }); } future>, cache_temperature>> -storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr cmd, const dht::partition_range& pr, query::result_options opts, +storage_proxy::query_result_local(locator::effective_replication_map_ptr erm, schema_ptr s, lw_shared_ptr cmd, const dht::partition_range& pr, query::result_options opts, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) { cmd->slice.options.set_if(opts.request != query::result_request::only_result); if (pr.is_singular()) { - unsigned shard = dht::shard_of(*s, pr.start()->value().token()); + auto shard = erm->get_sharder(*s).shard_of(pr.start()->value().token()); get_stats().replica_cross_shard_ops += shard != this_shard_id(); return _db.invoke_on(shard, _read_smp_service_group, [gs = global_schema_ptr(s), prv = dht::partition_range_vector({pr}) /* FIXME: pr is copied */, cmd, opts, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state)), rate_limit_info] (replica::database& db) mutable { auto trace_state = gt.get(); @@ -6225,7 +6228,9 @@ storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptrvalue().token()); + auto& table = local_db().find_column_family(s); + auto erm = table.get_effective_replication_map(); + unsigned shard = erm->shard_of(*s, pr.start()->value().token()); get_stats().replica_cross_shard_ops += shard != this_shard_id(); return _db.invoke_on(shard, _read_smp_service_group, [cmd, &pr, gs=global_schema_ptr(s), timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (replica::database& db) mutable { return db.query_mutations(gs, *cmd, pr, gt, timeout).then([] (std::tuple result_ht) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 4a31a5d83b..e8f3a40e72 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -345,12 +345,16 @@ private: const inet_address_vector_replica_set& preferred_endpoints, bool& is_bounced_read, service_permit permit); - future>, cache_temperature>> query_result_local(schema_ptr, lw_shared_ptr cmd, const dht::partition_range& pr, - query::result_options opts, - tracing::trace_state_ptr trace_state, - clock_type::time_point timeout, - db::per_partition_rate_limit::info rate_limit_info); + future>, cache_temperature>> query_result_local( + locator::effective_replication_map_ptr, + schema_ptr, + lw_shared_ptr cmd, const dht::partition_range& pr, + query::result_options opts, + tracing::trace_state_ptr trace_state, + clock_type::time_point timeout, + db::per_partition_rate_limit::info rate_limit_info); future>> query_result_local_digest( + locator::effective_replication_map_ptr, schema_ptr, lw_shared_ptr cmd, const dht::partition_range& pr,