token_metadata: Calculate pending ranges for replacing node

It will be needed soon for making replace node take writes.

Refs: #5482
This commit is contained in:
Asias He
2020-04-08 14:05:35 +08:00
parent 75cf1d18b5
commit bd6691301e
2 changed files with 94 additions and 2 deletions

View File

@@ -64,6 +64,8 @@ private:
std::unordered_map<token, inet_address> _bootstrap_tokens;
std::unordered_set<inet_address> _leaving_endpoints;
// The map between the existing node to be replaced and the replacing node
std::unordered_map<inet_address, inet_address> _replacing_endpoints;
std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> _pending_ranges;
std::unordered_map<sstring, std::unordered_map<range<token>, std::unordered_set<inet_address>>> _pending_ranges_map;
@@ -359,6 +361,15 @@ public:
bool is_leaving(inet_address endpoint);
// Is this node being replaced by another node
bool is_being_replaced(inet_address endpoint);
// Is any node being replaced by another node
bool is_any_node_being_replaced();
void add_replacing_endpoint(inet_address existing_node, inet_address replacing_node);
void del_replacing_endpoint(inet_address existing_node);
#if 0
private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
#endif
@@ -487,6 +498,10 @@ public:
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata);
future<> calculate_pending_ranges_for_replacing(
token_metadata& unpimplified_this,
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges);
public:
token get_predecessor(token t);
@@ -1221,11 +1236,20 @@ bool token_metadata_impl::is_leaving(inet_address endpoint) {
return _leaving_endpoints.count(endpoint);
}
bool token_metadata_impl::is_being_replaced(inet_address endpoint) {
return _replacing_endpoints.count(endpoint);
}
bool token_metadata_impl::is_any_node_being_replaced() {
return !_replacing_endpoints.empty();
}
void token_metadata_impl::remove_endpoint(inet_address endpoint) {
remove_by_value(_bootstrap_tokens, endpoint);
remove_by_value(_token_to_endpoint_map, endpoint);
_topology.remove_endpoint(endpoint);
_leaving_endpoints.erase(endpoint);
_replacing_endpoints.erase(endpoint);
_endpoint_to_host_id_map.erase(endpoint);
_sorted_tokens = sort_tokens();
invalidate_cached_rings();
@@ -1399,6 +1423,31 @@ future<> token_metadata_impl::calculate_pending_ranges_for_leaving(
});
}
future<> token_metadata_impl::calculate_pending_ranges_for_replacing(
token_metadata& unpimplified_this,
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges) {
if (_replacing_endpoints.empty()) {
return make_ready_future<>();
}
return do_with(_replacing_endpoints, strategy.get_address_ranges(unpimplified_this),
[this, new_pending_ranges] (std::unordered_map<inet_address, inet_address>& replacing_endpoints,
std::unordered_multimap<inet_address, dht::token_range>& address_ranges) {
return do_for_each(replacing_endpoints, [this, new_pending_ranges, &address_ranges]
(const std::pair<gms::inet_address, gms::inet_address>& node) {
auto existing_node = node.first;
auto replacing_node = node.second;
return do_for_each(address_ranges, [this, new_pending_ranges, existing_node, replacing_node]
(const std::pair<gms::inet_address, dht::token_range>& x) {
if (x.first == existing_node) {
tlogger.debug("Node {} replaces {} for range {}", replacing_node, existing_node, x.second);
new_pending_ranges->emplace(x.second, replacing_node);
}
});
});
});
}
void token_metadata_impl::calculate_pending_ranges_for_bootstrap(
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
@@ -1435,12 +1484,15 @@ future<> token_metadata_impl::calculate_pending_ranges(
abstract_replication_strategy& strategy, const sstring& keyspace_name) {
auto new_pending_ranges = make_lw_shared<std::unordered_multimap<range<token>, inet_address>>();
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty()) {
tlogger.debug("No bootstrapping, leaving nodes -> empty pending ranges for {}", keyspace_name);
tlogger.debug("calculate_pending_ranges: keyspace_name={}, bootstrap_tokens={}, leaving nodes={}, replacing_endpoints={}",
keyspace_name, _bootstrap_tokens, _leaving_endpoints, _replacing_endpoints);
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) {
tlogger.debug("No bootstrapping, leaving nodes, replacing nodes -> empty pending ranges for {}", keyspace_name);
set_pending_ranges(keyspace_name, std::move(*new_pending_ranges));
return make_ready_future<>();
}
return calculate_pending_ranges_for_replacing(unpimplified_this, strategy, new_pending_ranges).then([&unpimplified_this, this, keyspace_name, &strategy, new_pending_ranges] {
// Copy of metadata reflecting the situation after all leave operations are finished.
auto all_left_metadata = make_lw_shared<token_metadata>(std::make_unique<token_metadata_impl>(clone_after_all_left()));
@@ -1457,6 +1509,7 @@ future<> token_metadata_impl::calculate_pending_ranges(
}
return make_ready_future<>();
});
});
}
@@ -1487,6 +1540,14 @@ void token_metadata_impl::add_leaving_endpoint(inet_address endpoint) {
_leaving_endpoints.emplace(endpoint);
}
void token_metadata_impl::add_replacing_endpoint(inet_address existing_node, inet_address replacing_node) {
_replacing_endpoints[existing_node] = replacing_node;
}
void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) {
_replacing_endpoints.erase(existing_node);
}
token_metadata_impl token_metadata_impl::clone_after_all_settled() {
token_metadata_impl metadata = clone_only_token_map();
@@ -1756,6 +1817,24 @@ token_metadata::is_leaving(inet_address endpoint) {
return _impl->is_leaving(endpoint);
}
bool
token_metadata::is_being_replaced(inet_address endpoint) {
return _impl->is_being_replaced(endpoint);
}
bool
token_metadata::is_any_node_being_replaced() {
return _impl->is_any_node_being_replaced();
}
void token_metadata::add_replacing_endpoint(inet_address existing_node, inet_address replacing_node) {
_impl->add_replacing_endpoint(existing_node, replacing_node);
}
void token_metadata::del_replacing_endpoint(inet_address existing_node) {
_impl->del_replacing_endpoint(existing_node);
}
token_metadata
token_metadata::clone_only_token_map() {
return token_metadata(std::make_unique<token_metadata_impl>(_impl->clone_only_token_map()));

View File

@@ -227,6 +227,16 @@ public:
bool is_leaving(inet_address endpoint);
// Is this node being replaced by another node
bool is_being_replaced(inet_address endpoint);
// Is any node being replaced by another node
bool is_any_node_being_replaced();
void add_replacing_endpoint(inet_address existing_node, inet_address replacing_node);
void del_replacing_endpoint(inet_address existing_node);
/**
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
* bootstrap tokens and leaving endpoints are not included in the copy.
@@ -288,6 +298,9 @@ public:
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata);
future<> calculate_pending_ranges_for_replacing(
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges);
token get_predecessor(token t);