token_metadata: Split token_metadata::calculate_pending_ranges
token_metadata::calculate_pending_ranges is a complicated function. Split it into 3 parts for leaving operation, moving opeartion, bootstrap opeartion.
This commit is contained in:
@@ -458,21 +458,11 @@ token_metadata::get_pending_ranges(sstring keyspace_name, inet_address endpoint)
|
||||
return ret;
|
||||
}
|
||||
|
||||
future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name) {
|
||||
std::unordered_multimap<range<token>, inet_address> new_pending_ranges;
|
||||
|
||||
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) {
|
||||
tlogger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name);
|
||||
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void token_metadata::calculate_pending_ranges_for_leaving(
|
||||
abstract_replication_strategy& strategy,
|
||||
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
|
||||
lw_shared_ptr<token_metadata> all_left_metadata) {
|
||||
std::unordered_multimap<inet_address, dht::token_range> address_ranges = strategy.get_address_ranges(*this);
|
||||
|
||||
// FIMXE
|
||||
// Copy of metadata reflecting the situation after all leave operations are finished.
|
||||
auto all_left_metadata = clone_after_all_left();
|
||||
|
||||
// get all ranges that will be affected by leaving nodes
|
||||
std::unordered_set<range<token>> affected_ranges;
|
||||
for (auto endpoint : _leaving_endpoints) {
|
||||
@@ -488,21 +478,23 @@ future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy&
|
||||
for (const auto& r : affected_ranges) {
|
||||
auto t = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
|
||||
auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata);
|
||||
auto new_endpoints = strategy.calculate_natural_endpoints(t, *all_left_metadata);
|
||||
std::vector<inet_address> diff;
|
||||
std::sort(current_endpoints.begin(), current_endpoints.end());
|
||||
std::sort(new_endpoints.begin(), new_endpoints.end());
|
||||
std::set_difference(new_endpoints.begin(), new_endpoints.end(),
|
||||
current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff));
|
||||
for (auto& ep : diff) {
|
||||
new_pending_ranges.emplace(r, ep);
|
||||
new_pending_ranges->emplace(r, ep);
|
||||
}
|
||||
}
|
||||
tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges.size());
|
||||
}
|
||||
|
||||
// At this stage newPendingRanges has been updated according to leave operations. We can
|
||||
// now continue the calculation by checking bootstrapping nodes.
|
||||
|
||||
void token_metadata::calculate_pending_ranges_for_bootstrap(
|
||||
abstract_replication_strategy& strategy,
|
||||
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
|
||||
lw_shared_ptr<token_metadata> all_left_metadata) {
|
||||
// For each of the bootstrapping nodes, simply add and remove them one by one to
|
||||
// allLeftMetadata and check in between what their ranges would be.
|
||||
std::unordered_multimap<inet_address, token> bootstrap_addresses;
|
||||
@@ -520,18 +512,20 @@ future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy&
|
||||
for (auto& x : tmp) {
|
||||
auto& endpoint = x.first;
|
||||
auto& tokens = x.second;
|
||||
all_left_metadata.update_normal_tokens(tokens, endpoint);
|
||||
for (auto& x : strategy.get_address_ranges(all_left_metadata)) {
|
||||
all_left_metadata->update_normal_tokens(tokens, endpoint);
|
||||
for (auto& x : strategy.get_address_ranges(*all_left_metadata)) {
|
||||
if (x.first == endpoint) {
|
||||
new_pending_ranges.emplace(x.second, endpoint);
|
||||
new_pending_ranges->emplace(x.second, endpoint);
|
||||
}
|
||||
}
|
||||
all_left_metadata.remove_endpoint(endpoint);
|
||||
all_left_metadata->remove_endpoint(endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
|
||||
// We can now finish the calculation by checking moving nodes.
|
||||
|
||||
void token_metadata::calculate_pending_ranges_for_moving(
|
||||
abstract_replication_strategy& strategy,
|
||||
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
|
||||
lw_shared_ptr<token_metadata> all_left_metadata) {
|
||||
// For each of the moving nodes, we do the same thing we did for bootstrapping:
|
||||
// simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
|
||||
for (auto& moving : _moving_endpoints) {
|
||||
@@ -539,18 +533,42 @@ future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy&
|
||||
auto& endpoint = moving.second; // address of the moving node
|
||||
|
||||
// moving.left is a new token of the endpoint
|
||||
all_left_metadata.update_normal_token(t, endpoint);
|
||||
all_left_metadata->update_normal_token(t, endpoint);
|
||||
|
||||
for (auto& x : strategy.get_address_ranges(all_left_metadata)) {
|
||||
for (auto& x : strategy.get_address_ranges(*all_left_metadata)) {
|
||||
if (x.first == endpoint) {
|
||||
new_pending_ranges.emplace(x.second, endpoint);
|
||||
new_pending_ranges->emplace(x.second, endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
all_left_metadata.remove_endpoint(endpoint);
|
||||
all_left_metadata->remove_endpoint(endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name) {
|
||||
auto new_pending_ranges = make_lw_shared<std::unordered_multimap<range<token>, inet_address>>();
|
||||
|
||||
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) {
|
||||
tlogger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name);
|
||||
set_pending_ranges(keyspace_name, std::move(*new_pending_ranges));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
|
||||
// Copy of metadata reflecting the situation after all leave operations are finished.
|
||||
auto all_left_metadata = make_lw_shared<token_metadata>(clone_after_all_left());
|
||||
|
||||
calculate_pending_ranges_for_leaving(strategy, new_pending_ranges, all_left_metadata);
|
||||
// At this stage newPendingRanges has been updated according to leave operations. We can
|
||||
// now continue the calculation by checking bootstrapping nodes.
|
||||
|
||||
calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata);
|
||||
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
|
||||
// We can now finish the calculation by checking moving nodes.
|
||||
|
||||
calculate_pending_ranges_for_moving(strategy, new_pending_ranges, all_left_metadata);
|
||||
|
||||
set_pending_ranges(keyspace_name, std::move(*new_pending_ranges));
|
||||
|
||||
if (tlogger.is_enabled(logging::log_level::debug)) {
|
||||
tlogger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "<empty>" : print_pending_ranges()));
|
||||
|
||||
@@ -653,6 +653,18 @@ public:
|
||||
* changes state in the cluster, so it should be manageable.
|
||||
*/
|
||||
future<> calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name);
|
||||
void calculate_pending_ranges_for_leaving(
|
||||
abstract_replication_strategy& strategy,
|
||||
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
|
||||
lw_shared_ptr<token_metadata> all_left_metadata);
|
||||
void calculate_pending_ranges_for_bootstrap(
|
||||
abstract_replication_strategy& strategy,
|
||||
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
|
||||
lw_shared_ptr<token_metadata> all_left_metadata);
|
||||
void calculate_pending_ranges_for_moving(
|
||||
abstract_replication_strategy& strategy,
|
||||
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
|
||||
lw_shared_ptr<token_metadata> all_left_metadata);
|
||||
public:
|
||||
|
||||
token get_predecessor(token t);
|
||||
|
||||
Reference in New Issue
Block a user