replication strategy

This patch converts (for very small value of 'converts') some
replication related classes. Only static topology is supported (it is
created in keyspace::create_replication_strategy()). During mutation
no replication is done, since messaging service is not ready yet,
only endpoints are calculated.
This commit is contained in:
Gleb Natapov
2015-04-02 11:09:08 +03:00
committed by Tomasz Grabiec
parent eee72251d0
commit 47ac784425
15 changed files with 425 additions and 20 deletions

View File

@@ -0,0 +1,35 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "abstract_replication_strategy.hh"
namespace locator {
abstract_replication_strategy::abstract_replication_strategy(const sstring& ks_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options) :
_ks_name(ks_name), _config_options(config_options), _token_metadata(token_metadata) {}
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options) {
return replication_strategy_registry::create(strategy_name, ks_name, token_metadata, config_options);
}
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints(token& search_token) {
const token& key_token = _token_metadata.first_token(search_token);
return calculate_natural_endpoints(key_token);
}
std::unordered_map<sstring, strategy_creator_type> replication_strategy_registry::_strategies;
void replication_strategy_registry::register_strategy(sstring name, strategy_creator_type creator) {
_strategies.emplace(name, std::move(creator));
}
std::unique_ptr<abstract_replication_strategy> replication_strategy_registry::create(const sstring& name, const sstring& keyspace_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options) {
return _strategies[name](keyspace_name, token_metadata, config_options);
}
replication_strategy_registrator::replication_strategy_registrator(sstring name, strategy_creator_type creator) {
replication_strategy_registry::register_strategy(name, creator);
}
}

View File

@@ -0,0 +1,50 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include <memory>
#include <functional>
#include <unordered_map>
#include "gms/inet_address.hh"
#include "dht/i_partitioner.hh"
#include "token_metadata.hh"
// forward declaration since database.hh includes this file
class keyspace;
namespace locator {
using inet_address = gms::inet_address;
using token = dht::token;
class abstract_replication_strategy {
protected:
sstring _ks_name;
keyspace* _keyspace = nullptr;
std::unordered_map<sstring, sstring> _config_options;
token_metadata& _token_metadata;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token) = 0;
public:
abstract_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options);
virtual ~abstract_replication_strategy() {}
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options);
std::vector<inet_address> get_natural_endpoints(token& search_token);
};
using strategy_creator_type = std::function<std::unique_ptr<abstract_replication_strategy>(const sstring&, token_metadata&, std::unordered_map<sstring, sstring>&)>;
class replication_strategy_registry {
static std::unordered_map<sstring, strategy_creator_type> _strategies;
public:
static void register_strategy(sstring name, strategy_creator_type creator);
static std::unique_ptr<abstract_replication_strategy> create(const sstring& name, const sstring& keyspace_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options);
};
class replication_strategy_registrator {
public:
explicit replication_strategy_registrator(sstring name, strategy_creator_type creator);
};
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include <algorithm>
#include "simple_strategy.hh"
namespace locator {
simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, config_options) {}
std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const token& t) {
size_t replicas = get_replication_factor();
const std::vector<token>& tokens = _token_metadata.sorted_tokens();
std::vector<inet_address> endpoints;
endpoints.reserve(replicas);
if (tokens.empty()) {
return endpoints;
}
auto it = tokens.begin() + _token_metadata.first_token_index(t);
auto c = tokens.size();
while(endpoints.size() < replicas && c) {
inet_address ep = _token_metadata.get_endpoint(*(it++));
if (std::find(endpoints.begin(), endpoints.end(), ep) == endpoints.end()) {
endpoints.push_back(ep);
}
c--;
// wrap around
if (it == tokens.end()) {
it = tokens.begin();
}
}
return endpoints;
}
size_t simple_strategy::get_replication_factor() const {
auto it = _config_options.find("replication_factor");
if (it == _config_options.end()) {
return 1;
}
return std::stol(it->second);
}
static replication_strategy_registrator registerator("org.apache.cassandra.locator.SimpleStrategy",
[] (const sstring& keyspace_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options) {
return std::make_unique<simple_strategy>(keyspace_name, token_metadata, config_options);
});
}

View File

@@ -0,0 +1,20 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include "abstract_replication_strategy.hh"
namespace locator {
class simple_strategy : public abstract_replication_strategy {
protected:
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token) override;
public:
simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, std::unordered_map<sstring, sstring>& config_options);
virtual ~simple_strategy() {};
size_t get_replication_factor() const;
};
}

112
locator/token_metadata.cc Normal file
View File

@@ -0,0 +1,112 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "token_metadata.hh"
namespace locator {
token_metadata::token_metadata(std::map<token, inet_address> token_to_endpoint_map, boost::bimap<inet_address, utils::UUID> endpoints_map, topology topology) :
_token_to_endpoint_map(token_to_endpoint_map), _endpoint_to_host_id_map(endpoints_map), _topology(topology) {
_sorted_tokens = sorted_tokens();
}
std::vector<token> token_metadata::sort_tokens() {
std::vector<token> sorted;
sorted.reserve(_token_to_endpoint_map.size());
for (auto&& i : _token_to_endpoint_map) {
sorted.push_back(i.first);
}
return sorted;
}
const std::vector<token>& token_metadata::sorted_tokens() const {
return _sorted_tokens;
}
/**
* Update token map with a single token/endpoint pair in normal state.
*/
void token_metadata::update_normal_token(token t, inet_address endpoint)
{
update_normal_tokens(std::unordered_set<token>({t}), endpoint);
}
void token_metadata::update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint) {
std::unordered_map<inet_address, std::unordered_set<token>> endpoint_tokens ({{endpoint, tokens}});
update_normal_tokens(endpoint_tokens);
}
/**
* Update token map with a set of token/endpoint pairs in normal state.
*
* Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple)
* is expensive (CASSANDRA-3831).
*
* @param endpointTokens
*/
void token_metadata::update_normal_tokens(std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
if (endpoint_tokens.empty()) {
return;
}
bool should_sort_tokens = false;
for (auto&& i : endpoint_tokens)
{
inet_address endpoint = i.first;
std::unordered_set<token>& tokens = i.second;
assert(!tokens.empty());
for(auto it = _token_to_endpoint_map.begin(), ite = _token_to_endpoint_map.end(); it != ite;) {
if(it->second == endpoint) {
it = _token_to_endpoint_map.erase(it);
} else {
++it;
}
}
#if 0
bootstrapTokens.removeValue(endpoint);
topology.addEndpoint(endpoint);
leavingEndpoints.remove(endpoint);
removeFromMoving(endpoint); // also removing this endpoint from moving
#endif
for (const token& t : tokens)
{
auto prev = _token_to_endpoint_map.insert(std::pair<token, inet_address>(t, endpoint));
should_sort_tokens |= prev.second; // new token inserted -> sort
if (prev.first->second != endpoint) {
//logger.warn("Token {} changing ownership from {} to {}", token, prev.first->second, endpoint);
prev.first->second = endpoint;
}
}
}
if (should_sort_tokens) {
_sorted_tokens = sort_tokens();
}
}
size_t token_metadata::first_token_index(const token& start) {
assert(_sorted_tokens.size() > 0);
auto it = std::lower_bound(_sorted_tokens.begin(), _sorted_tokens.end(), start);
if (it == _sorted_tokens.end()) {
return 0;
} else {
return std::distance(_sorted_tokens.begin(), it);
}
}
const token& token_metadata::first_token(const token& start) {
return _sorted_tokens[first_token_index(start)];
}
inet_address token_metadata::get_endpoint(const token& token) const {
auto it = _token_to_endpoint_map.find(token);
assert (it != _token_to_endpoint_map.end());
return it->second;
}
}

53
locator/token_metadata.hh Normal file
View File

@@ -0,0 +1,53 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include <map>
#include <unordered_set>
#include <boost/bimap.hpp>
#include "gms/inet_address.hh"
#include "dht/i_partitioner.hh"
#include "utils/UUID.hh"
namespace locator {
using inet_address = gms::inet_address;
using token = dht::token;
class topology {
};
class token_metadata final {
/**
* Maintains token to endpoint map of every node in the cluster.
* Each Token is associated with exactly one Address, but each Address may have
* multiple tokens. Hence, the BiMultiValMap collection.
*/
// FIXME: have to be BiMultiValMap
std::map<token, inet_address> _token_to_endpoint_map;
/** Maintains endpoint to host ID map of every node in the cluster */
boost::bimap<inet_address, utils::UUID> _endpoint_to_host_id_map;
std::vector<token> _sorted_tokens;
topology _topology;
std::vector<token> sort_tokens();
token_metadata(std::map<token, inet_address> token_to_endpoint_map, boost::bimap<inet_address, utils::UUID> endpoints_map, topology topology);
public:
token_metadata() {};
const std::vector<token>& sorted_tokens() const;
void update_normal_token(token token, inet_address endpoint);
void update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint);
void update_normal_tokens(std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
const token& first_token(const token& start);
size_t first_token_index(const token& start);
inet_address get_endpoint(const token& token) const;
};
}