storage_proxy: Obtain shard from erm in the read path

dht::shard_of() does not use the correct sharder for tablet-based tables.
Code which is supposed to work with all kinds of tables should use erm::get_sharder().
This commit is contained in:
Tomasz Grabiec
2023-05-09 11:55:43 +02:00
parent e48ec6fed3
commit 10e05eec66
2 changed files with 26 additions and 17 deletions

View File

@@ -653,12 +653,13 @@ private:
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DATA called with wrapping range");
}
auto erm = s->table().get_effective_replication_map();
p->get_stats().replica_data_reads++;
auto da = oda.value_or(query::digest_algorithm::MD5);
query::result_options opts;
opts.digest_algo = da;
opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, rate_limit_info);
return p->query_result_local(erm, std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout, rate_limit_info);
} else if constexpr (verb == read_verb::read_mutation_data) {
p->get_stats().replica_mutation_data_reads++;
return p->query_mutations_locally(std::move(s), std::move(cmd), pr2, timeout, trace_state_ptr);
@@ -667,9 +668,10 @@ private:
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DIGEST called with wrapping range");
}
auto erm = s->table().get_effective_replication_map();
p->get_stats().replica_digest_reads++;
auto da = oda.value_or(query::digest_algorithm::MD5);
return p->query_result_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr, timeout, da, rate_limit_info);
return p->query_result_local_digest(erm, std::move(s), cmd, std::move(pr2.first), trace_state_ptr, timeout, da, rate_limit_info);
} else {
static_assert(verb == static_cast<read_verb>(-1), "Unsupported verb");
}
@@ -870,6 +872,7 @@ static uint32_t random_variable_for_rate_limit() {
}
static result<db::per_partition_rate_limit::info> choose_rate_limit_info(
locator::effective_replication_map_ptr erm,
replica::database& db,
bool coordinator_in_replica_set,
db::operation_type op_type,
@@ -880,7 +883,7 @@ static result<db::per_partition_rate_limit::info> choose_rate_limit_info(
db::per_partition_rate_limit::account_and_enforce enforce_info{
.random_variable = random_variable_for_rate_limit(),
};
if (coordinator_in_replica_set && dht::shard_of(*s, token) == this_shard_id()) {
if (coordinator_in_replica_set && erm->get_sharder(*s).shard_of(token) == this_shard_id()) {
auto& cf = db.find_column_family(s);
auto decision = db.account_coordinator_operation_to_rate_limit(cf, token, enforce_info, op_type);
if (decision) {
@@ -2994,7 +2997,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
db::per_partition_rate_limit::info rate_limit_info;
if (allow_limit && _db.local().can_apply_per_partition_rate_limit(*s, db::operation_type::write)) {
auto r_rate_limit_info = choose_rate_limit_info(_db.local(), coordinator_in_replica_set, db::operation_type::write, s, token, tr_state);
auto r_rate_limit_info = choose_rate_limit_info(erm, _db.local(), coordinator_in_replica_set, db::operation_type::write, s, token, tr_state);
if (!r_rate_limit_info) {
return std::move(r_rate_limit_info).as_failure();
}
@@ -4779,7 +4782,7 @@ protected:
: query::result_options{query::result_request::only_result, query::digest_algorithm::none};
if (fbu::is_me(ep)) {
tracing::trace(_trace_state, "read_data: querying locally");
return _proxy->apply_fence(_proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout, adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), utils::fb_utilities::get_broadcast_address());
return _proxy->apply_fence(_proxy->query_result_local(_effective_replication_map_ptr, _schema, _cmd, _partition_range, opts, _trace_state, timeout, adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), utils::fb_utilities::get_broadcast_address());
} else {
return _proxy->remote().send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout,
_trace_state, *_cmd, _partition_range, opts.digest_algo, _rate_limit_info,
@@ -4790,7 +4793,7 @@ protected:
++_proxy->get_stats().digest_read_attempts.get_ep_stat(get_topology(), ep);
if (fbu::is_me(ep)) {
tracing::trace(_trace_state, "read_digest: querying locally");
return _proxy->apply_fence(_proxy->query_result_local_digest(_schema, _cmd, _partition_range, _trace_state,
return _proxy->apply_fence(_proxy->query_result_local_digest(_effective_replication_map_ptr, _schema, _cmd, _partition_range, _trace_state,
timeout, digest_algorithm(*_proxy), adjust_rate_limit_for_local_operation(_rate_limit_info)), get_fence(), utils::fb_utilities::get_broadcast_address());
} else {
tracing::trace(_trace_state, "read_digest: sending a message to /{}", ep);
@@ -5250,7 +5253,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
db::per_partition_rate_limit::info rate_limit_info;
if (cmd->allow_limit && _db.local().can_apply_per_partition_rate_limit(*schema, db::operation_type::read)) {
auto r_rate_limit_info = choose_rate_limit_info(_db.local(), !is_read_non_local, db::operation_type::read, schema, token, trace_state);
auto r_rate_limit_info = choose_rate_limit_info(erm, _db.local(), !is_read_non_local, db::operation_type::read, schema, token, trace_state);
if (!r_rate_limit_info) {
slogger.debug("Read was rate limited");
get_stats().read_rate_limited_by_coordinator.mark();
@@ -5294,19 +5297,19 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
}
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature, std::optional<full_position>>>
storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, query::digest_algorithm da, db::per_partition_rate_limit::info rate_limit_info) {
return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout, rate_limit_info).then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> result_and_hit_rate) {
storage_proxy::query_result_local_digest(locator::effective_replication_map_ptr erm, schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, query::digest_algorithm da, db::per_partition_rate_limit::info rate_limit_info) {
return query_result_local(std::move(erm), std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout, rate_limit_info).then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> result_and_hit_rate) {
auto&& [result, hit_rate] = result_and_hit_rate;
return make_ready_future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature, std::optional<full_position>>>(rpc::tuple(*result->digest(), result->last_modified(), hit_rate, result->last_position()));
});
}
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
storage_proxy::query_result_local(locator::effective_replication_map_ptr erm, schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
cmd->slice.options.set_if<query::partition_slice::option::with_digest>(opts.request != query::result_request::only_result);
if (pr.is_singular()) {
unsigned shard = dht::shard_of(*s, pr.start()->value().token());
auto shard = erm->get_sharder(*s).shard_of(pr.start()->value().token());
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return _db.invoke_on(shard, _read_smp_service_group, [gs = global_schema_ptr(s), prv = dht::partition_range_vector({pr}) /* FIXME: pr is copied */, cmd, opts, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state)), rate_limit_info] (replica::database& db) mutable {
auto trace_state = gt.get();
@@ -6225,7 +6228,9 @@ storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_c
storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr trace_state) {
if (pr.is_singular()) {
unsigned shard = dht::shard_of(*s, pr.start()->value().token());
auto& table = local_db().find_column_family(s);
auto erm = table.get_effective_replication_map();
unsigned shard = erm->shard_of(*s, pr.start()->value().token());
get_stats().replica_cross_shard_ops += shard != this_shard_id();
return _db.invoke_on(shard, _read_smp_service_group, [cmd, &pr, gs=global_schema_ptr(s), timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (replica::database& db) mutable {
return db.query_mutations(gs, *cmd, pr, gt, timeout).then([] (std::tuple<reconcilable_result, cache_temperature> result_ht) {

View File

@@ -345,12 +345,16 @@ private:
const inet_address_vector_replica_set& preferred_endpoints,
bool& is_bounced_read,
service_permit permit);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
query::result_options opts,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
db::per_partition_rate_limit::info rate_limit_info);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_result_local(
locator::effective_replication_map_ptr,
schema_ptr,
lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
query::result_options opts,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
db::per_partition_rate_limit::info rate_limit_info);
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature, std::optional<full_position>>> query_result_local_digest(
locator::effective_replication_map_ptr,
schema_ptr,
lw_shared_ptr<query::read_command> cmd,
const dht::partition_range& pr,