token_metadata_impl: introduce migration_info
We are going to store read_endpoints in a way similar to pending ranges, so in this commit we add migration_info - a container for two boost::icl::interval_map. Also, _pending_ranges_interval_map is renamed to _keyspace_to_migration_info, since it captures the meaning better.
This commit is contained in:
@@ -60,7 +60,17 @@ private:
|
||||
std::unordered_map<inet_address, inet_address> _replacing_endpoints;
|
||||
|
||||
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;
|
||||
std::unordered_map<sstring, ring_mapping> _pending_ranges_interval_map;
|
||||
// For each keyspace, migration_info contains ranges of tokens and
|
||||
// corresponding replicas to which writes will be directed:
|
||||
// - pending_endpoints - will be appended to normal endpoints for writes;
|
||||
// This data structure is filled only during data migration between nodes
|
||||
// when they are added or removed from the cluster.
|
||||
// During normal operation, token mapping to nodes is
|
||||
// implemented in the effective_replication_map.
|
||||
struct migration_info {
|
||||
ring_mapping pending_endpoints;
|
||||
};
|
||||
std::unordered_map<sstring, migration_info> _keyspace_to_migration_info;
|
||||
|
||||
std::vector<token> _sorted_tokens;
|
||||
|
||||
@@ -332,8 +342,8 @@ future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_async()
|
||||
}
|
||||
ret->_leaving_endpoints = _leaving_endpoints;
|
||||
ret->_replacing_endpoints = _replacing_endpoints;
|
||||
for (const auto& p : _pending_ranges_interval_map) {
|
||||
ret->_pending_ranges_interval_map.emplace(p);
|
||||
for (const auto& p : _keyspace_to_migration_info) {
|
||||
ret->_keyspace_to_migration_info.emplace(p);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
ret->_ring_version = _ring_version;
|
||||
@@ -363,7 +373,7 @@ future<> token_metadata_impl::clear_gently() noexcept {
|
||||
co_await utils::clear_gently(_bootstrap_tokens);
|
||||
co_await utils::clear_gently(_leaving_endpoints);
|
||||
co_await utils::clear_gently(_replacing_endpoints);
|
||||
co_await utils::clear_gently(_pending_ranges_interval_map);
|
||||
co_await utils::clear_gently(_keyspace_to_migration_info);
|
||||
co_await utils::clear_gently(_sorted_tokens);
|
||||
co_await _topology.clear_gently();
|
||||
co_await _tablets.clear_gently();
|
||||
@@ -697,11 +707,11 @@ token_metadata_impl::interval_to_range(boost::icl::interval<token>::interval_typ
|
||||
|
||||
bool
|
||||
token_metadata_impl::has_pending_ranges(sstring keyspace_name, inet_address endpoint) const {
|
||||
const auto it = _pending_ranges_interval_map.find(keyspace_name);
|
||||
if (it == _pending_ranges_interval_map.end()) {
|
||||
const auto it = _keyspace_to_migration_info.find(keyspace_name);
|
||||
if (it == _keyspace_to_migration_info.end()) {
|
||||
return false;
|
||||
}
|
||||
for (const auto& item : it->second) {
|
||||
for (const auto& item : it->second.pending_endpoints) {
|
||||
const auto& nodes = item.second;
|
||||
if (nodes.contains(endpoint)) {
|
||||
return true;
|
||||
@@ -717,7 +727,7 @@ future<> token_metadata_impl::update_pending_ranges(
|
||||
keyspace_name, _bootstrap_tokens, _leaving_endpoints, _replacing_endpoints);
|
||||
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) {
|
||||
tlogger.debug("No bootstrapping, leaving nodes, replacing nodes -> empty pending ranges for {}", keyspace_name);
|
||||
_pending_ranges_interval_map.erase(keyspace_name);
|
||||
_keyspace_to_migration_info.erase(keyspace_name);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -799,8 +809,8 @@ future<> token_metadata_impl::update_pending_ranges(
|
||||
return tokens;
|
||||
});
|
||||
|
||||
_pending_ranges_interval_map[keyspace_name] = std::invoke([&]() -> ring_mapping {
|
||||
ring_mapping pending_ranges;
|
||||
_keyspace_to_migration_info[keyspace_name] = std::invoke([&]() -> migration_info {
|
||||
migration_info migration_info;
|
||||
for (size_t i = 0, size = tokens.size(); i < size; ++i) {
|
||||
seastar::thread::maybe_yield();
|
||||
|
||||
@@ -832,10 +842,10 @@ future<> token_metadata_impl::update_pending_ranges(
|
||||
}
|
||||
}
|
||||
if (!pending_endpoints.empty()) {
|
||||
add_mapping(pending_ranges, std::move(pending_endpoints));
|
||||
add_mapping(migration_info.pending_endpoints, std::move(pending_endpoints));
|
||||
}
|
||||
}
|
||||
return pending_ranges;
|
||||
return migration_info;
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -877,13 +887,13 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) {
|
||||
|
||||
inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
|
||||
// Fast path 0: pending ranges not found for this keyspace_name
|
||||
const auto pr_it = _pending_ranges_interval_map.find(keyspace_name);
|
||||
if (pr_it == _pending_ranges_interval_map.end()) {
|
||||
const auto pr_it = _keyspace_to_migration_info.find(keyspace_name);
|
||||
if (pr_it == _keyspace_to_migration_info.end()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// Fast path 1: empty pending ranges for this keyspace_name
|
||||
const auto& ks_map = pr_it->second;
|
||||
const auto& ks_map = pr_it->second.pending_endpoints;
|
||||
if (ks_map.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user