storage_proxy: Avoid multishard reader for tablets
Currently, the coordinator splits the partition range at vnode (or tablet) boundaries and then tries to merge adjacent ranges which target the same replica. This is an optimization which makes less sense with tablets, which are supposed to be of substantial size. If we don't merge the ranges, then with tablets we can avoid using the multishard reader on the replica side, since each tablet lives on a single shard. The main reason to avoid a multishard reader is avoiding its complexity, and avoiding adapting it to work with tablet sharding. Currently, the multishard reader implementation makes several assumptions about shard assignment which do not hold with tablets. It assumes that shards are assigned in a round-robin fashion.
This commit is contained in:
@@ -504,4 +504,33 @@ dht::token_range_vector split_token_range_msb(unsigned most_significant_bits) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
dht::token first_token(const dht::partition_range& pr) {
|
||||
auto start = dht::ring_position_view::for_range_start(pr);
|
||||
auto token = start.token();
|
||||
// Check if the range excludes "token".
|
||||
if (!start.key()
|
||||
&& start.get_token_bound() == dht::ring_position::token_bound::end
|
||||
&& token._kind == dht::token::kind::key
|
||||
&& !token.is_last()) {
|
||||
token = dht::next_token(token);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
std::optional<shard_id> is_single_shard(const dht::sharder& sharder, const schema& s, const dht::partition_range& pr) {
|
||||
auto token = first_token(pr);
|
||||
auto shard = sharder.shard_of(token);
|
||||
if (pr.is_singular()) {
|
||||
return shard;
|
||||
}
|
||||
if (auto s_a_t = sharder.next_shard(token)) {
|
||||
dht::ring_position_comparator cmp(s);
|
||||
auto end = dht::ring_position_view::for_range_end(pr);
|
||||
if (cmp(end, dht::ring_position_view::starting_at(s_a_t->token)) > 0) {
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
return shard;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -653,6 +653,14 @@ future<dht::partition_range_vector> subtract_ranges(const schema& schema, const
|
||||
// Returns a token_range vector split based on the given number of most-significant bits
|
||||
dht::token_range_vector split_token_range_msb(unsigned most_significant_bits);
|
||||
|
||||
// Returns the first token included by a partition range.
|
||||
// May return tokens for which is_minimum() or is_maximum() is true.
|
||||
dht::token first_token(const dht::partition_range&);
|
||||
|
||||
// Returns true iff a given partition range is wholly owned by a single shard.
|
||||
// If so, returns that shard. Otherwise, return std::nullopt.
|
||||
std::optional<shard_id> is_single_shard(const dht::sharder&, const schema&, const dht::partition_range&);
|
||||
|
||||
} // dht
|
||||
|
||||
namespace std {
|
||||
|
||||
@@ -83,6 +83,12 @@ public:
|
||||
return _kind == kind::after_all_keys;
|
||||
}
|
||||
|
||||
// Returns true iff this is the largest token which can be associated with a partition key.
|
||||
// Note that this is different that is_maximum().
|
||||
bool is_last() const {
|
||||
return _kind == dht::token::kind::key && _data == std::numeric_limits<int64_t>::max();
|
||||
}
|
||||
|
||||
size_t external_memory_usage() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -222,6 +222,14 @@ public:
|
||||
, _semaphores(smp::count, nullptr) {
|
||||
_readers.resize(smp::count);
|
||||
_permit.set_max_result_size(get_max_result_size());
|
||||
|
||||
if (!_erm->get_replication_strategy().is_vnode_based()) {
|
||||
// The algorithm is full of assumptions about shard assignment being round-robin, static, and full.
|
||||
// This does not hold for tablets. We chose to avoid this algorithm rather than adapting it.
|
||||
// The coordinator should split ranges accordingly.
|
||||
on_internal_error(mmq_log, format("multishard reader cannot be used on tables with non-static sharding: {}.{}",
|
||||
_schema->ks_name(), _schema->cf_name()));
|
||||
}
|
||||
}
|
||||
|
||||
read_context(read_context&&) = delete;
|
||||
|
||||
@@ -2904,8 +2904,9 @@ future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations(
|
||||
auto max_size = db.local().get_unlimited_query_max_result_size();
|
||||
auto max_res_size = query::max_result_size(max_size.soft_limit, max_size.hard_limit, query::result_memory_limiter::maximum_result_size);
|
||||
auto cmd = query::read_command(s->id(), s->version(), ps, max_res_size, query::tombstone_limit::max);
|
||||
if (pr.is_singular()) {
|
||||
unsigned shard = dht::shard_of(*s, pr.start()->value().token());
|
||||
auto erm = s->table().get_effective_replication_map();
|
||||
if (auto shard_opt = dht::is_single_shard(erm->get_sharder(*s), *s, pr)) {
|
||||
auto shard = *shard_opt;
|
||||
co_return co_await db.invoke_on(shard, [gs = global_schema_ptr(s), &cmd, &pr, timeout] (replica::database& db) mutable {
|
||||
return db.query_mutations(gs, cmd, pr, {}, timeout).then([] (std::tuple<reconcilable_result, cache_temperature>&& res) {
|
||||
return make_foreign(make_lw_shared<reconcilable_result>(std::move(std::get<0>(res))));
|
||||
@@ -2929,8 +2930,9 @@ future<foreign_ptr<lw_shared_ptr<query::result>>> query_data(
|
||||
auto cmd = query::read_command(s->id(), s->version(), ps, max_res_size, query::tombstone_limit::max);
|
||||
auto prs = dht::partition_range_vector{pr};
|
||||
auto opts = query::result_options::only_result();
|
||||
if (pr.is_singular()) {
|
||||
unsigned shard = dht::shard_of(*s, pr.start()->value().token());
|
||||
auto erm = s->table().get_effective_replication_map();
|
||||
if (auto shard_opt = dht::is_single_shard(erm->get_sharder(*s), *s, pr)) {
|
||||
auto shard = *shard_opt;
|
||||
co_return co_await db.invoke_on(shard, [gs = global_schema_ptr(s), &cmd, opts, &prs, timeout] (replica::database& db) mutable {
|
||||
return db.query(gs, cmd, opts, prs, {}, timeout).then([] (std::tuple<lw_shared_ptr<query::result>, cache_temperature>&& res) {
|
||||
return make_foreign(std::move(std::get<0>(res)));
|
||||
|
||||
@@ -5308,8 +5308,8 @@ future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
|
||||
storage_proxy::query_result_local(locator::effective_replication_map_ptr erm, schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
|
||||
tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
|
||||
cmd->slice.options.set_if<query::partition_slice::option::with_digest>(opts.request != query::result_request::only_result);
|
||||
if (pr.is_singular()) {
|
||||
auto shard = erm->get_sharder(*s).shard_of(pr.start()->value().token());
|
||||
if (auto shard_opt = dht::is_single_shard(erm->get_sharder(*s), *s, pr)) {
|
||||
auto shard = *shard_opt;
|
||||
get_stats().replica_cross_shard_ops += shard != this_shard_id();
|
||||
return _db.invoke_on(shard, _read_smp_service_group, [gs = global_schema_ptr(s), prv = dht::partition_range_vector({pr}) /* FIXME: pr is copied */, cmd, opts, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state)), rate_limit_info] (replica::database& db) mutable {
|
||||
auto trace_state = gt.get();
|
||||
@@ -5540,6 +5540,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
||||
// getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
|
||||
// the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
|
||||
// still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
|
||||
if (!erm->get_replication_strategy().uses_tablets()) {
|
||||
while (i != ranges.end())
|
||||
{
|
||||
const auto current_range_preferred_replicas = preferred_replicas_for_range(*i);
|
||||
@@ -5638,6 +5639,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
|
||||
++i;
|
||||
merged_ranges.push_back(to_token_range(next_range));
|
||||
}
|
||||
}
|
||||
slogger.trace("creating range read executor for range {} in table {}.{} with targets {}",
|
||||
range, schema->ks_name(), schema->cf_name(), filtered_endpoints);
|
||||
try {
|
||||
@@ -6227,10 +6229,10 @@ future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
|
||||
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
|
||||
storage_proxy::clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr trace_state) {
|
||||
if (pr.is_singular()) {
|
||||
auto& table = local_db().find_column_family(s);
|
||||
auto erm = table.get_effective_replication_map();
|
||||
unsigned shard = erm->shard_of(*s, pr.start()->value().token());
|
||||
auto& table = s->table();
|
||||
auto erm = table.get_effective_replication_map();
|
||||
if (auto shard_opt = dht::is_single_shard(erm->get_sharder(*s), *s, pr)) {
|
||||
auto shard = *shard_opt;
|
||||
get_stats().replica_cross_shard_ops += shard != this_shard_id();
|
||||
return _db.invoke_on(shard, _read_smp_service_group, [cmd, &pr, gs=global_schema_ptr(s), timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (replica::database& db) mutable {
|
||||
return db.query_mutations(gs, *cmd, pr, gt, timeout).then([] (std::tuple<reconcilable_result, cache_temperature> result_ht) {
|
||||
|
||||
Reference in New Issue
Block a user