dht: add dht::get_token
and replace all calls to dht::global_partitioner().get_token dht::get_token is better because it takes schema and uses it to obtain partitioner instead of using a global partitioner. Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
@@ -1131,7 +1131,7 @@ std::optional<shard_id> rmw_operation::shard_for_execute(bool needs_read_before_
|
||||
}
|
||||
// If we're still here, cas() *will* be called by execute(), so let's
|
||||
// find the appropriate shard to run it on:
|
||||
auto token = dht::global_partitioner().get_token(*_schema, _pk);
|
||||
auto token = dht::get_token(*_schema, _pk);
|
||||
auto desired_shard = service::storage_proxy::cas_shard(*_schema, token);
|
||||
if (desired_shard == engine().cpu_id()) {
|
||||
return {};
|
||||
|
||||
@@ -172,7 +172,7 @@ topology_description generate_topology_description(
|
||||
repeat([&] {
|
||||
for (int i = 0; i < 500; ++i) {
|
||||
auto stream_id = make_random_stream_id();
|
||||
auto token = partitioner.get_token(*schema, stream_id.to_partition_key(*schema));
|
||||
auto token = dht::get_token(*schema, stream_id.to_partition_key(*schema));
|
||||
|
||||
// Find the token range into which our stream_id's token landed.
|
||||
auto it = std::lower_bound(tokens.begin(), tokens.end(), token);
|
||||
|
||||
@@ -63,7 +63,7 @@ public:
|
||||
|
||||
bytes_opt execute(cql_serialization_format sf, const std::vector<bytes_opt>& parameters) override {
|
||||
auto key = partition_key::from_optional_exploded(*_schema, parameters);
|
||||
auto tok = dht::global_partitioner().get_token(*_schema, key);
|
||||
auto tok = dht::get_token(*_schema, key);
|
||||
warn(unimplemented::cause::VALIDATION);
|
||||
return tok.data();
|
||||
}
|
||||
|
||||
@@ -421,7 +421,7 @@ single_column_primary_key_restrictions<partition_key>::bounds_ranges(const query
|
||||
}
|
||||
ranges.emplace_back(std::move(r).transform(
|
||||
[this] (partition_key&& k) -> query::ring_position {
|
||||
auto token = dht::global_partitioner().get_token(*_schema, k);
|
||||
auto token = dht::get_token(*_schema, k);
|
||||
return { std::move(token), std::move(k) };
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -1041,7 +1041,7 @@ query::partition_slice indexed_table_select_statement::get_partition_slice_for_g
|
||||
// Computed token column needs to be added to index view restrictions
|
||||
const column_definition& token_cdef = *_view_schema->clustering_key_columns().begin();
|
||||
auto base_pk = partition_key::from_optional_exploded(*_schema, _restrictions->get_partition_key_restrictions()->values(options));
|
||||
bytes token_value = dht::global_partitioner().get_token(*_schema, base_pk).data();
|
||||
bytes token_value = dht::get_token(*_schema, base_pk).data();
|
||||
auto token_restriction = ::make_shared<restrictions::single_column_restriction::EQ>(token_cdef, ::make_shared<cql3::constants::value>(cql3::raw_value::make_value(token_value)));
|
||||
clustering_restrictions->merge_with(token_restriction);
|
||||
|
||||
|
||||
@@ -651,7 +651,7 @@ database::shard_of(const frozen_mutation& m) {
|
||||
// sent the partition key in legacy form or together
|
||||
// with token.
|
||||
schema_ptr schema = find_schema(m.column_family_id());
|
||||
return dht::shard_of(*schema, dht::global_partitioner().get_token(*schema, m.key(*schema)));
|
||||
return dht::shard_of(*schema, dht::get_token(*schema, m.key(*schema)));
|
||||
}
|
||||
|
||||
void database::add_keyspace(sstring name, keyspace k) {
|
||||
|
||||
@@ -679,7 +679,7 @@ void manager::end_point_hints_manager::sender::start() {
|
||||
future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_mutation_and_schema m) {
|
||||
keyspace& ks = _db.find_keyspace(m.s->ks_name());
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
auto token = dht::global_partitioner().get_token(*m.s, m.fm.key(*m.s));
|
||||
auto token = dht::get_token(*m.s, m.fm.key(*m.s));
|
||||
std::vector<gms::inet_address> natural_endpoints = rs.get_natural_endpoints(std::move(token));
|
||||
|
||||
return do_send_one_mutation(std::move(m), natural_endpoints);
|
||||
|
||||
@@ -141,7 +141,7 @@ static std::vector<sstring> get_keyspaces(const schema& s, const database& db, d
|
||||
return boost::copy_range<std::vector<sstring>>(
|
||||
range.slice(keyspaces, std::move(cmp)) | boost::adaptors::filtered([&s] (const auto& ks) {
|
||||
// If this is a range query, results are divided between shards by the partition key (keyspace_name).
|
||||
return shard_of(s, dht::global_partitioner().get_token(s,
|
||||
return shard_of(s, dht::get_token(s,
|
||||
partition_key::from_single_value(s, utf8_type->decompose(ks))))
|
||||
== engine().cpu_id();
|
||||
})
|
||||
|
||||
@@ -671,6 +671,10 @@ inline decorated_key decorate_key(const schema& s, partition_key&& key) {
|
||||
return s.get_partitioner().decorate_key(s, std::move(key));
|
||||
}
|
||||
|
||||
inline token get_token(const schema& s, partition_key_view key) {
|
||||
return s.get_partitioner().get_token(s, key);
|
||||
}
|
||||
|
||||
dht::partition_range to_partition_range(dht::token_range);
|
||||
|
||||
// Each shard gets a sorted, disjoint vector of ranges
|
||||
|
||||
4
keys.cc
4
keys.cc
@@ -76,8 +76,8 @@ partition_key_view::legacy_tri_compare(const schema& s, partition_key_view o) co
|
||||
|
||||
int
|
||||
partition_key_view::ring_order_tri_compare(const schema& s, partition_key_view k2) const {
|
||||
auto t1 = dht::global_partitioner().get_token(s, *this);
|
||||
auto t2 = dht::global_partitioner().get_token(s, k2);
|
||||
auto t1 = dht::get_token(s, *this);
|
||||
auto t2 = dht::get_token(s, k2);
|
||||
if (t1 != t2) {
|
||||
return t1 < t2 ? -1 : 1;
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key)
|
||||
future<prepare_response> paxos_state::prepare(tracing::trace_state_ptr tr_state, schema_ptr schema,
|
||||
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
|
||||
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
|
||||
dht::token token = dht::global_partitioner().get_token(*schema, key);
|
||||
dht::token token = dht::get_token(*schema, key);
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
return with_locked_key(token, timeout, [&cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable {
|
||||
|
||||
@@ -4819,7 +4819,7 @@ void storage_proxy::init_messaging_service() {
|
||||
|
||||
return get_schema_for_read(cmd.schema_version, src_addr).then([this, cmd = std::move(cmd), key = std::move(key), ballot,
|
||||
only_digest, da, timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable {
|
||||
dht::token token = dht::global_partitioner().get_token(*schema, key);
|
||||
dht::token token = dht::get_token(*schema, key);
|
||||
unsigned shard = dht::shard_of(*schema, token);
|
||||
bool local = shard == engine().cpu_id();
|
||||
get_stats().replica_cross_shard_ops += !local;
|
||||
|
||||
4
table.cc
4
table.cc
@@ -1322,8 +1322,8 @@ static bool needs_cleanup(const sstables::shared_sstable& sst,
|
||||
schema_ptr s) {
|
||||
auto first = sst->get_first_partition_key();
|
||||
auto last = sst->get_last_partition_key();
|
||||
auto first_token = dht::global_partitioner().get_token(*s, first);
|
||||
auto last_token = dht::global_partitioner().get_token(*s, last);
|
||||
auto first_token = dht::get_token(*s, first);
|
||||
auto last_token = dht::get_token(*s, last);
|
||||
dht::token_range sst_token_range = dht::token_range::make(first_token, last_token);
|
||||
|
||||
// return true iff sst partition range isn't fully contained in any of the owned ranges.
|
||||
|
||||
Reference in New Issue
Block a user