token_metadata: Implement calculate_pending_ranges

This commit is contained in:
Asias He
2015-10-19 16:08:24 +08:00
parent a6065397d9
commit c96bc8bbd2
3 changed files with 106 additions and 84 deletions

View File

@@ -80,6 +80,7 @@ protected:
void validate_replication_factor(sstring rf) const;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, token_metadata& tm) const = 0;
friend token_metadata;
public:
abstract_replication_strategy(

View File

@@ -23,9 +23,15 @@
#include "token_metadata.hh"
#include <experimental/optional>
#include "locator/snitch_base.hh"
#include "locator/abstract_replication_strategy.hh"
#include "log.hh"
#include <unordered_map>
#include <algorithm>
namespace locator {
static logging::logger logger("token_metadata");
template <typename C, typename V>
static void remove_by_value(C& container, V value) {
for (auto it = container.begin(); it != container.end();) {
@@ -362,6 +368,102 @@ token_metadata::get_pending_ranges(sstring keyspace_name, inet_address endpoint)
return ret;
}
void token_metadata::calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name) {
std::unordered_multimap<range<token>, inet_address> new_pending_ranges;
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) {
logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name);
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
return;
}
std::unordered_multimap<inet_address, range<token>> address_ranges = strategy.get_address_ranges(*this);
// FIMXE
// Copy of metadata reflecting the situation after all leave operations are finished.
auto all_left_metadata = clone_after_all_left();
// get all ranges that will be affected by leaving nodes
std::unordered_set<range<token>> affected_ranges;
for (auto endpoint : _leaving_endpoints) {
auto r = address_ranges.equal_range(endpoint);
for (auto x = r.first; x != r.second; x++) {
affected_ranges.emplace(x->second);
}
}
// for each of those ranges, find what new nodes will be responsible for the range when
// all leaving nodes are gone.
auto metadata = clone_only_token_map(); // don't do this in the loop! #7758
for (const auto& r : affected_ranges) {
auto t = r.end()->value();
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata);
std::vector<inet_address> diff;
std::set_difference(new_endpoints.begin(), new_endpoints.end(),
current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff));
for (auto& ep : diff) {
new_pending_ranges.emplace(r, ep);
}
}
// At this stage newPendingRanges has been updated according to leave operations. We can
// now continue the calculation by checking bootstrapping nodes.
// For each of the bootstrapping nodes, simply add and remove them one by one to
// allLeftMetadata and check in between what their ranges would be.
std::unordered_multimap<inet_address, token> bootstrap_addresses;
for (auto& x : _bootstrap_tokens) {
bootstrap_addresses.emplace(x.second, x.first);
}
// TODO: share code with unordered_multimap_to_unordered_map
std::unordered_map<inet_address, std::unordered_set<token>> tmp;
for (auto& x : bootstrap_addresses) {
auto& addr = x.first;
auto& t = x.second;
tmp[addr].insert(t);
}
for (auto& x : tmp) {
auto& endpoint = x.first;
auto& tokens = x.second;
all_left_metadata.update_normal_tokens(tokens, endpoint);
for (auto& x : strategy.get_address_ranges(all_left_metadata)) {
if (x.first == endpoint) {
new_pending_ranges.emplace(x.second, endpoint);
}
}
all_left_metadata.remove_endpoint(endpoint);
}
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
// We can now finish the calculation by checking moving nodes.
// For each of the moving nodes, we do the same thing we did for bootstrapping:
// simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
for (auto& moving : _moving_endpoints) {
auto& t = moving.first;
auto& endpoint = moving.second; // address of the moving node
// moving.left is a new token of the endpoint
all_left_metadata.update_normal_token(t, endpoint);
for (auto& x : strategy.get_address_ranges(all_left_metadata)) {
if (x.first == endpoint) {
new_pending_ranges.emplace(x.second, endpoint);
}
}
all_left_metadata.remove_endpoint(endpoint);
}
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
if (logger.is_enabled(logging::log_level::debug)) {
// TODO: Enable printPendingRanges
// logger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "<empty>" : printPendingRanges()));
}
}
/////////////////// class topology /////////////////////////////////////////////
inline void topology::clear() {
_dc_endpoints.clear();

View File

@@ -54,6 +54,8 @@ class keyspace;
namespace locator {
class abstract_replication_strategy;
using inet_address = gms::inet_address;
using token = dht::token;
@@ -654,7 +656,6 @@ public:
std::unordered_map<range<token>, std::unordered_set<inet_address>> get_pending_ranges(sstring keyspace_name);
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint);
#if 0
/**
* Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
*
@@ -678,89 +679,7 @@ public:
* NOTE: This is heavy and ineffective operation. This will be done only once when a node
* changes state in the cluster, so it should be manageable.
*/
public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
{
lock.readLock().lock();
try
{
Multimap<Range<Token>, InetAddress> newPendingRanges = HashMultimap.create();
if (_bootstrap_tokens.isEmpty() && _leaving_endpoints.isEmpty() && _moving_endpoints.isEmpty())
{
if (logger.isDebugEnabled())
logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
_pending_ranges.put(keyspaceName, newPendingRanges);
return;
}
Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
// Copy of metadata reflecting the situation after all leave operations are finished.
TokenMetadata allLeftMetadata = cloneAfterAllLeft();
// get all ranges that will be affected by leaving nodes
Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
for (InetAddress endpoint : _leaving_endpoints)
affectedRanges.addAll(addressRanges.get(endpoint));
// for each of those ranges, find what new nodes will be responsible for the range when
// all leaving nodes are gone.
TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758
for (Range<Token> range : affectedRanges)
{
Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
newPendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints));
}
// At this stage newPendingRanges has been updated according to leave operations. We can
// now continue the calculation by checking bootstrapping nodes.
// For each of the bootstrapping nodes, simply add and remove them one by one to
// allLeftMetadata and check in between what their ranges would be.
Multimap<InetAddress, Token> bootstrapAddresses = _bootstrap_tokens.inverse();
for (InetAddress endpoint : bootstrapAddresses.keySet())
{
Collection<Token> tokens = bootstrapAddresses.get(endpoint);
allLeftMetadata.updateNormalTokens(tokens, endpoint);
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
newPendingRanges.put(range, endpoint);
allLeftMetadata.removeEndpoint(endpoint);
}
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
// We can now finish the calculation by checking moving nodes.
// For each of the moving nodes, we do the same thing we did for bootstrapping:
// simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
for (Pair<Token, InetAddress> moving : _moving_endpoints)
{
InetAddress endpoint = moving.right; // address of the moving node
// moving.left is a new token of the endpoint
allLeftMetadata.updateNormalToken(moving.left, endpoint);
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
{
newPendingRanges.put(range, endpoint);
}
allLeftMetadata.removeEndpoint(endpoint);
}
_pending_ranges.put(keyspaceName, newPendingRanges);
if (logger.isDebugEnabled())
logger.debug("Pending ranges:\n{}", (_pending_ranges.isEmpty() ? "<empty>" : printPendingRanges()));
}
finally
{
lock.readLock().unlock();
}
}
#endif
void calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name);
public:
token get_predecessor(token t);