abstract_replication_strategy: calculate_natural_endpoints: make it work with both versions of token_metadata

We've updated all the places where token_metadata
is mutated, and now we can progress to the next stage
of the refactoring - gradually switching the read
code paths.

The calculate_natural_endpoints function
is at the core of all of them. It decides to what nodes
the given token should be replicated to for the given
token_metadata. It has a lot of usages in various contexts,
we can't switch them all in one commit, so instead we
allowed the function to behave in both ways. If
use_host_id parameter is false, the function uses the provided
token_metadata as is and returns endpoint_set as a result.
If it's true, it uses get_new() on the provided token_metadata
and returns host_id_set as a result.

The scope of the whole refactoring is limited to the erm data
structure, its interface will be kept inet_address based for now.
This means we'll often need to resolve host_ids to inet_address-es
as soon as we got a result from calculated_natural_endpoints.
A new calculate_natural_ips function is added for convenience.
It uses the new token_metadata and immediately resolves
returned host_id-s to inet_address-es.

The auxiliary declarations natural_ep_type, set_type, vector_type,
get_self_id, select_tm are introduced only for the sake of
migration, they will be removed later.
This commit is contained in:
Petr Gusev
2023-10-24 12:15:45 +04:00
parent 1960436d93
commit d5b4b02b28
14 changed files with 93 additions and 40 deletions

View File

@@ -9,8 +9,13 @@
#pragma once
#include "gms/inet_address.hh"
#include "locator/host_id.hh"
#include "utils/small_vector.hh"
using inet_address_vector_replica_set = utils::small_vector<gms::inet_address, 3>;
using inet_address_vector_topology_change = utils::small_vector<gms::inet_address, 1>;
using host_id_vector_replica_set = utils::small_vector<locator::host_id, 3>;
using host_id_vector_topology_change = utils::small_vector<locator::host_id, 1>;

View File

@@ -19,6 +19,18 @@
namespace locator {
static endpoint_set resolve_endpoints(const host_id_set& host_ids, const token_metadata2& tm) {
endpoint_set result{};
result.reserve(host_ids.size());
for (const auto& host_id: host_ids) {
// Empty host_id is used as a marker for local address.
// The reason for this hack is that we need local_strategy to
// work before the local host_id is loaded from the system.local table.
result.push_back(host_id ? tm.get_endpoint_for_host_id(host_id) : tm.get_topology().my_address());
}
return result;
}
logging::logger rslogger("replication_strategy");
abstract_replication_strategy::abstract_replication_strategy(
@@ -56,6 +68,11 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring&
}
}
future<endpoint_set> abstract_replication_strategy::calculate_natural_ips(const token& search_token, const token_metadata2_ptr& tm) const {
const auto host_ids = co_await calculate_natural_endpoints(search_token, token_metadata(tm), true);
co_return resolve_endpoints(get<host_id_set>(host_ids), *tm);
}
using strategy_class_registry = class_registry<
locator::abstract_replication_strategy,
const locator::replication_strategy_config_options&>;
@@ -261,7 +278,7 @@ abstract_replication_strategy::get_ranges(inet_address ep, const token_metadata&
// Using the common path would make the function quadratic in the number of endpoints.
should_add = true;
} else {
auto eps = co_await calculate_natural_endpoints(tok, tm);
auto eps = get<endpoint_set>(co_await calculate_natural_endpoints(tok, tm, false));
should_add = eps.contains(ep);
}
if (should_add) {
@@ -326,7 +343,7 @@ abstract_replication_strategy::get_range_addresses(const token_metadata& tm) con
std::unordered_map<dht::token_range, inet_address_vector_replica_set> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
auto eps = co_await calculate_natural_endpoints(t, tm);
auto eps = get<endpoint_set>(co_await calculate_natural_endpoints(t, tm, false));
for (auto& r : ranges) {
ret.emplace(r, eps.get_vector());
}
@@ -341,7 +358,7 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p
temp.update_topology(pending_address, std::move(dr));
co_await temp.update_normal_tokens(pending_tokens, pending_address);
for (const auto& t : temp.sorted_tokens()) {
auto eps = co_await calculate_natural_endpoints(t, temp);
auto eps = get<endpoint_set>(co_await calculate_natural_endpoints(t, temp, false));
if (eps.contains(pending_address)) {
dht::token_range_vector r = temp.get_primary_ranges_for(t);
rslogger.debug("get_pending_address_ranges: token={} primary_range={} endpoint={}", t, r, pending_address);
@@ -372,8 +389,8 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
const auto token = all_tokens[i];
auto current_endpoints = co_await rs->calculate_natural_endpoints(token, base_token_metadata);
auto target_endpoints = co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata);
auto current_endpoints = get<endpoint_set>(co_await rs->calculate_natural_endpoints(token, base_token_metadata, false));
auto target_endpoints = get<endpoint_set>(co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata, false));
auto add_mapping = [&](ring_mapping& target, std::unordered_set<inet_address>&& endpoints) {
using interval = ring_mapping::interval_type;
@@ -422,11 +439,11 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
}
} else if (depend_on_token) {
for (const auto &t : sorted_tokens) {
auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr);
auto eps = get<endpoint_set>(co_await rs->calculate_natural_endpoints(t, *tmptr, false));
replication_map.emplace(t, std::move(eps).extract_vector());
}
} else {
auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr);
auto eps = get<endpoint_set>(co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr, false));
replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector());
}

View File

@@ -53,12 +53,19 @@ using replication_strategy_config_options = std::map<sstring, sstring>;
using replication_map = std::unordered_map<token, inet_address_vector_replica_set>;
using endpoint_set = utils::basic_sequenced_set<inet_address, inet_address_vector_replica_set>;
using host_id_set = utils::basic_sequenced_set<locator::host_id, host_id_vector_replica_set>;
using natural_ep_type = std::variant<endpoint_set, host_id_set>;
template <typename NodeId>
using set_type = std::conditional_t<std::is_same_v<NodeId, inet_address>, endpoint_set, host_id_set>;
template <typename NodeId>
using vector_type = std::conditional_t<std::is_same_v<NodeId, inet_address>, inet_address_vector_replica_set, host_id_vector_replica_set>;
class vnode_effective_replication_map;
class effective_replication_map_factory;
class per_table_replication_strategy;
class tablet_aware_replication_strategy;
class abstract_replication_strategy : public seastar::enable_shared_from_this<abstract_replication_strategy> {
friend class vnode_effective_replication_map;
friend class per_table_replication_strategy;
@@ -85,6 +92,20 @@ protected:
rslogger.debug(fmt, std::forward<Args>(args)...);
}
template <typename NodeId>
static NodeId get_self_id(const generic_token_metadata<NodeId>& tm) {
if constexpr(std::is_same_v<NodeId, gms::inet_address>) {
return tm.get_topology().my_address();
} else {
return NodeId{};
}
}
template <typename Func>
static future<natural_ep_type> select_tm(Func&& func, const token_metadata& tm, bool use_host_id) {
return use_host_id ? func(*tm.template get_new()) : func(tm);
}
public:
using ptr_type = seastar::shared_ptr<abstract_replication_strategy>;
@@ -101,7 +122,8 @@ public:
// is small, that implementation may not yield since by itself it won't cause a reactor stall (assuming practical
// cluster sizes and number of tokens per node). The caller is responsible for yielding if they call this function
// in a loop.
virtual future<endpoint_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const = 0;
virtual future<natural_ep_type> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool use_host_id) const = 0;
future<endpoint_set> calculate_natural_ips(const token& search_token, const token_metadata2_ptr& tm) const;
virtual ~abstract_replication_strategy() {}
static ptr_type create_replication_strategy(const sstring& strategy_name, const replication_strategy_config_options& config_options);

View File

@@ -20,13 +20,15 @@ everywhere_replication_strategy::everywhere_replication_strategy(const replicati
_natural_endpoints_depend_on_token = false;
}
future<endpoint_set> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const {
future<natural_ep_type> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool use_host_id) const {
return select_tm([this]<typename NodeId>(const generic_token_metadata<NodeId>& tm) -> future<natural_ep_type> {
if (tm.sorted_tokens().empty()) {
endpoint_set result{inet_address_vector_replica_set({tm.get_topology().my_address()})};
return make_ready_future<endpoint_set>(std::move(result));
set_type<NodeId> result{vector_type<NodeId>({this->get_self_id<NodeId>(tm)})};
return make_ready_future<natural_ep_type>(std::move(result));
}
const auto& all_endpoints = tm.get_all_endpoints();
return make_ready_future<endpoint_set>(endpoint_set(all_endpoints.begin(), all_endpoints.end()));
return make_ready_future<natural_ep_type>(set_type<NodeId>(all_endpoints.begin(), all_endpoints.end()));
}, tm, use_host_id);
}
size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const {

View File

@@ -18,7 +18,7 @@ class everywhere_replication_strategy : public abstract_replication_strategy {
public:
everywhere_replication_strategy(const replication_strategy_config_options& config_options);
virtual future<endpoint_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
virtual future<natural_ep_type> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool host_id) const override;
virtual void validate_options(const gms::feature_service&) const override { /* noop */ }

View File

@@ -18,8 +18,10 @@ local_strategy::local_strategy(const replication_strategy_config_options& config
_natural_endpoints_depend_on_token = false;
}
future<endpoint_set> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
return make_ready_future<endpoint_set>(endpoint_set({tm.get_topology().my_address()}));
future<natural_ep_type> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, bool use_host_id) const {
return select_tm([this]<typename NodeId>(const generic_token_metadata<NodeId>& tm) -> future<natural_ep_type> {
return make_ready_future<natural_ep_type>(set_type<NodeId>({this->get_self_id<NodeId>(tm)}));
}, tm, use_host_id);
}
void local_strategy::validate_options(const gms::feature_service&) const {

View File

@@ -27,7 +27,7 @@ public:
virtual ~local_strategy() {};
virtual size_t get_replication_factor(const token_metadata&) const override;
virtual future<endpoint_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
virtual future<natural_ep_type> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool host_id) const override;
virtual void validate_options(const gms::feature_service&) const override;

View File

@@ -76,13 +76,14 @@ network_topology_strategy::network_topology_strategy(
using endpoint_dc_rack_set = std::unordered_set<endpoint_dc_rack>;
template <typename NodeId>
class natural_endpoints_tracker {
/**
* Endpoint adder applying the replication rules for a given DC.
*/
struct data_center_endpoints {
/** List accepted endpoints get pushed into. */
endpoint_set& _endpoints;
set_type<NodeId>& _endpoints;
/**
* Racks encountered so far. Replicas are put into separate racks while possible.
@@ -95,7 +96,7 @@ class natural_endpoints_tracker {
size_t _rf_left;
ssize_t _acceptable_rack_repeats;
data_center_endpoints(size_t rf, size_t rack_count, size_t node_count, endpoint_set& endpoints, endpoint_dc_rack_set& racks)
data_center_endpoints(size_t rf, size_t rack_count, size_t node_count, set_type<NodeId>& endpoints, endpoint_dc_rack_set& racks)
: _endpoints(endpoints)
, _racks(racks)
// If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF.
@@ -109,7 +110,7 @@ class natural_endpoints_tracker {
* Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful.
* Returns true if the endpoint was added, and this datacenter does not require further replicas.
*/
bool add_endpoint_and_check_if_done(const inet_address& ep, const endpoint_dc_rack& location) {
bool add_endpoint_and_check_if_done(const NodeId& ep, const endpoint_dc_rack& location) {
if (done()) {
return false;
}
@@ -160,7 +161,7 @@ class natural_endpoints_tracker {
}
};
const token_metadata& _tm;
const generic_token_metadata<NodeId>& _tm;
const topology& _tp;
std::unordered_map<sstring, size_t> _dc_rep_factor;
@@ -168,7 +169,7 @@ class natural_endpoints_tracker {
// We want to preserve insertion order so that the first added endpoint
// becomes primary.
//
endpoint_set _replicas;
set_type<NodeId> _replicas;
// tracks the racks we have already placed replicas in
endpoint_dc_rack_set _seen_racks;
@@ -189,7 +190,7 @@ class natural_endpoints_tracker {
size_t _dcs_to_fill;
public:
natural_endpoints_tracker(const token_metadata& tm, const std::unordered_map<sstring, size_t>& dc_rep_factor)
natural_endpoints_tracker(const generic_token_metadata<NodeId>& tm, const std::unordered_map<sstring, size_t>& dc_rep_factor)
: _tm(tm)
, _tp(_tm.get_topology())
, _dc_rep_factor(dc_rep_factor)
@@ -219,7 +220,7 @@ public:
}
}
bool add_endpoint_and_check_if_done(inet_address ep) {
bool add_endpoint_and_check_if_done(NodeId ep) {
auto& loc = _tp.get_location(ep);
auto i = _dcs.find(loc.dc);
if (i != _dcs.end() && i->second.add_endpoint_and_check_if_done(ep, loc)) {
@@ -232,27 +233,29 @@ public:
return _dcs_to_fill == 0;
}
endpoint_set& replicas() noexcept {
set_type<NodeId>& replicas() noexcept {
return _replicas;
}
};
future<endpoint_set>
future<natural_ep_type>
network_topology_strategy::calculate_natural_endpoints(
const token& search_token, const token_metadata& tm) const {
const token& search_token, const token_metadata& tm, bool use_host_id) const {
natural_endpoints_tracker tracker(tm, _dc_rep_factor);
return select_tm([&]<typename NodeId>(const generic_token_metadata<NodeId>& tm) -> future<natural_ep_type> {
natural_endpoints_tracker<NodeId> tracker(tm, _dc_rep_factor);
for (auto& next : tm.ring_range(search_token)) {
co_await coroutine::maybe_yield();
inet_address ep = *tm.get_endpoint(next);
NodeId ep = *tm.get_endpoint(next);
if (tracker.add_endpoint_and_check_if_done(ep)) {
break;
}
}
co_return std::move(tracker.replicas());
}, tm, use_host_id);
}
void network_topology_strategy::validate_options(const gms::feature_service& fs) const {
@@ -306,7 +309,7 @@ future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(sch
auto token_range = tm->ring_range(dht::token::get_random_token());
for (tablet_id tb : tablets.tablet_ids()) {
natural_endpoints_tracker tracker(*tm, _dc_rep_factor);
natural_endpoints_tracker<gms::inet_address> tracker(*tm, _dc_rep_factor);
while (true) {
co_await coroutine::maybe_yield();

View File

@@ -50,8 +50,8 @@ protected:
* calculate endpoints in one pass through the tokens by tracking our
* progress in each DC, rack etc.
*/
virtual future<endpoint_set> calculate_natural_endpoints(
const token& search_token, const token_metadata& tm) const override;
virtual future<natural_ep_type> calculate_natural_endpoints(
const token& search_token, const token_metadata& tm, bool host_id) const override;
virtual void validate_options(const gms::feature_service&) const override;

View File

@@ -33,15 +33,16 @@ simple_strategy::simple_strategy(const replication_strategy_config_options& conf
}
}
future<endpoint_set> simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
future<natural_ep_type> simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, bool use_host_id) const {
return select_tm([&]<typename NodeId>(const generic_token_metadata<NodeId>& tm) -> future<natural_ep_type> {
const std::vector<token>& tokens = tm.sorted_tokens();
if (tokens.empty()) {
co_return endpoint_set();
co_return set_type<NodeId>{};
}
size_t replicas = _replication_factor;
endpoint_set endpoints;
set_type<NodeId> endpoints;
endpoints.reserve(replicas);
for (auto& token : tm.ring_range(t)) {
@@ -61,6 +62,7 @@ future<endpoint_set> simple_strategy::calculate_natural_endpoints(const token& t
}
co_return endpoints;
}, tm, use_host_id);
}
size_t simple_strategy::get_replication_factor(const token_metadata&) const {

View File

@@ -26,7 +26,7 @@ public:
return true;
}
virtual future<endpoint_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
virtual future<natural_ep_type> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool host_id) const override;
private:
size_t _replication_factor = 1;
};

View File

@@ -1719,7 +1719,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
// Find (for each range) all nodes that store replicas for these ranges as well
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto eps = strat.calculate_natural_endpoints(end_token, *tmptr).get0();
auto eps = get<locator::endpoint_set>(strat.calculate_natural_endpoints(end_token, *tmptr, false).get0());
current_replica_endpoints.emplace(r, std::move(eps));
seastar::thread::maybe_yield();
}
@@ -1738,7 +1738,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
ops->check_abort();
}
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
const auto new_eps = strat.calculate_natural_endpoints(end_token, temp).get0();
const auto new_eps = get<locator::endpoint_set>(strat.calculate_natural_endpoints(end_token, temp, false).get0());
const auto& current_eps = current_replica_endpoints[r];
std::unordered_set<inet_address> neighbors_set = new_eps.get_set();
bool skip_this_range = false;
@@ -1929,7 +1929,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
auto& r = *it;
seastar::thread::maybe_yield();
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_endpoints(end_token, *tmptr).get0() |
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(get<locator::endpoint_set>(strat.calculate_natural_endpoints(end_token, *tmptr, false).get0()) |
boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) {
if (node == myip) {
return false;

View File

@@ -5960,7 +5960,7 @@ storage_service::get_changed_ranges_for_leaving(locator::vnode_effective_replica
const auto& rs = erm->get_replication_strategy();
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto new_replica_endpoints = co_await rs.calculate_natural_endpoints(end_token, temp);
auto new_replica_endpoints = get<locator::endpoint_set>(co_await rs.calculate_natural_endpoints(end_token, temp, false));
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {

View File

@@ -671,7 +671,7 @@ static void test_equivalence(const shared_token_metadata& stm, const locator::to
for (size_t i = 0; i < 1000; ++i) {
auto token = dht::token::get_random_token();
auto expected = calculate_natural_endpoints(token, tm, topo, datacenters);
auto actual = nts.calculate_natural_endpoints(token, tm).get0();
auto actual = get<endpoint_set>(nts.calculate_natural_endpoints(token, tm, false).get0());
// Because the old algorithm does not put the nodes in the correct order in the case where more replicas
// are required than there are racks in a dc, we accept different order as long as the primary