dht, storage_proxy: Abstract token space splitting
Currently, scans are splitting partition ranges around tokens. This will have to change with tablets, where we should split at tablet boundaries. This patch introduces token_range_splitter which abstracts this task. It is provided by effective_replication_map implementation.
This commit is contained in:
@@ -107,6 +107,10 @@ inet_address_vector_topology_change vnode_effective_replication_map::get_pending
|
||||
return _tmptr->pending_endpoints_for(search_token, ks_name);
|
||||
}
|
||||
|
||||
std::unique_ptr<token_range_splitter> vnode_effective_replication_map::make_splitter() const {
|
||||
return locator::make_splitter(_tmptr);
|
||||
}
|
||||
|
||||
const per_table_replication_strategy* abstract_replication_strategy::maybe_as_per_table() const {
|
||||
if (!_per_table) {
|
||||
return nullptr;
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "gms/inet_address.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "locator/snitch_base.hh"
|
||||
#include "locator/token_range_splitter.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "token_metadata.hh"
|
||||
#include "snitch_base.hh"
|
||||
@@ -190,6 +191,10 @@ public:
|
||||
/// Pending replica is a replica which gains ownership of data.
|
||||
/// Non-empty only during topology change.
|
||||
virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const = 0;
|
||||
|
||||
/// Returns a token_range_splitter which is line with the replica assignment of this replication map.
|
||||
/// The splitter can live longer than this instance.
|
||||
virtual std::unique_ptr<token_range_splitter> make_splitter() const = 0;
|
||||
};
|
||||
|
||||
using effective_replication_map_ptr = seastar::shared_ptr<const effective_replication_map>;
|
||||
@@ -248,6 +253,7 @@ public: // effective_replication_map
|
||||
inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override;
|
||||
inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override;
|
||||
inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override;
|
||||
std::unique_ptr<token_range_splitter> make_splitter() const override;
|
||||
public:
|
||||
explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept
|
||||
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
|
||||
|
||||
@@ -113,8 +113,7 @@ public:
|
||||
*/
|
||||
boost::iterator_range<token_metadata::tokens_iterator> ring_range(const token& start) const;
|
||||
|
||||
boost::iterator_range<token_metadata::tokens_iterator> ring_range(
|
||||
const std::optional<dht::partition_range::bound>& start) const;
|
||||
boost::iterator_range<token_metadata::tokens_iterator> ring_range(dht::ring_position_view pos) const;
|
||||
|
||||
topology& get_topology() {
|
||||
return _topology;
|
||||
@@ -559,21 +558,8 @@ void token_metadata_impl::add_bootstrap_token(token t, inet_address endpoint) {
|
||||
}
|
||||
|
||||
boost::iterator_range<token_metadata::tokens_iterator>
|
||||
token_metadata_impl::ring_range(const std::optional<dht::partition_range::bound>& start) const {
|
||||
auto r = ring_range(start ? start->value().token() : dht::minimum_token());
|
||||
|
||||
if (!r.empty()) {
|
||||
// We should skip the first token if it's excluded by the range.
|
||||
if (start
|
||||
&& !start->is_inclusive()
|
||||
&& !start->value().has_key()
|
||||
&& start->value().token() == *r.begin())
|
||||
{
|
||||
r.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
return r;
|
||||
token_metadata_impl::ring_range(const dht::ring_position_view start) const {
|
||||
return ring_range(start.token());
|
||||
}
|
||||
|
||||
void token_metadata_impl::add_bootstrap_tokens(std::unordered_set<token> tokens, inet_address endpoint) {
|
||||
@@ -1026,10 +1012,39 @@ token_metadata::ring_range(const token& start) const {
|
||||
}
|
||||
|
||||
boost::iterator_range<token_metadata::tokens_iterator>
|
||||
token_metadata::ring_range(const std::optional<dht::partition_range::bound>& start) const {
|
||||
token_metadata::ring_range(dht::ring_position_view start) const {
|
||||
return _impl->ring_range(start);
|
||||
}
|
||||
|
||||
class token_metadata_ring_splitter : public locator::token_range_splitter {
|
||||
token_metadata_ptr _tmptr;
|
||||
boost::iterator_range<token_metadata::tokens_iterator> _range;
|
||||
public:
|
||||
token_metadata_ring_splitter(token_metadata_ptr tmptr)
|
||||
: _tmptr(std::move(tmptr))
|
||||
, _range(_tmptr->sorted_tokens().empty() // ring_range() throws if the ring is empty
|
||||
? boost::make_iterator_range(token_metadata::tokens_iterator(), token_metadata::tokens_iterator())
|
||||
: _tmptr->ring_range(dht::minimum_token()))
|
||||
{ }
|
||||
|
||||
void reset(dht::ring_position_view pos) override {
|
||||
_range = _tmptr->ring_range(pos);
|
||||
}
|
||||
|
||||
std::optional<dht::token> next_token() override {
|
||||
if (_range.empty()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
auto t = *_range.begin();
|
||||
_range.drop_front();
|
||||
return t;
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<locator::token_range_splitter> make_splitter(token_metadata_ptr tmptr) {
|
||||
return std::make_unique<token_metadata_ring_splitter>(std::move(tmptr));
|
||||
}
|
||||
|
||||
topology&
|
||||
token_metadata::get_topology() {
|
||||
return _impl->get_topology();
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include <unordered_map>
|
||||
#include "gms/inet_address.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "locator/token_range_splitter.hh"
|
||||
#include "inet_address_vectors.hh"
|
||||
#include <optional>
|
||||
#include <memory>
|
||||
@@ -76,6 +77,7 @@ public:
|
||||
};
|
||||
using inet_address = gms::inet_address;
|
||||
private:
|
||||
friend class token_metadata_ring_splitter;
|
||||
class tokens_iterator {
|
||||
public:
|
||||
using iterator_category = std::input_iterator_tag;
|
||||
@@ -131,8 +133,11 @@ public:
|
||||
* @return The requested range (see the description above)
|
||||
*/
|
||||
boost::iterator_range<tokens_iterator> ring_range(const token& start) const;
|
||||
boost::iterator_range<tokens_iterator> ring_range(
|
||||
const std::optional<dht::partition_range::bound>& start) const;
|
||||
|
||||
/**
|
||||
* Returns a range of tokens such that the first token t satisfies dht::ring_position_view::ending_at(t) >= start.
|
||||
*/
|
||||
boost::iterator_range<tokens_iterator> ring_range(dht::ring_position_view start) const;
|
||||
|
||||
topology& get_topology();
|
||||
const topology& get_topology() const;
|
||||
@@ -345,4 +350,6 @@ public:
|
||||
static future<> mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata&)> func);
|
||||
};
|
||||
|
||||
std::unique_ptr<locator::token_range_splitter> make_splitter(token_metadata_ptr);
|
||||
|
||||
}
|
||||
|
||||
42
locator/token_range_splitter.hh
Normal file
42
locator/token_range_splitter.hh
Normal file
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "dht/token.hh"
|
||||
|
||||
#include <optional>
|
||||
|
||||
namespace locator {
|
||||
|
||||
/// Generates split points which divide the ring into ranges which share the same replica set.
|
||||
///
|
||||
/// Initially the ring space the splitter works with is set to the whole ring.
|
||||
/// The space can be changed using reset().
|
||||
class token_range_splitter {
|
||||
public:
|
||||
virtual ~token_range_splitter() = default;
|
||||
|
||||
/// Resets the splitter to work with the ring range [pos, +inf).
|
||||
virtual void reset(dht::ring_position_view pos) = 0;
|
||||
|
||||
/// Each token t returned by next_token() means that keys in the range:
|
||||
///
|
||||
/// [prev_pos, dht::ring_position_view::ending_at(t))
|
||||
///
|
||||
/// share the same replica set.
|
||||
///
|
||||
/// If this is the first call to next_token() after construction or reset() then prev_pos is the
|
||||
/// beginning of the ring space. Otherwise, it is dht::ring_position_view::ending_at(prev_t)
|
||||
/// where prev_t is the token returned by the previous call to next_token().
|
||||
/// If std::nullopt is returned it means that the ring space was exhausted.
|
||||
virtual std::optional<dht::token> next_token() = 0;
|
||||
};
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user