Merge 'Rack aware view pairing' from Benny Halevy

Enabled with the tablets_rack_aware_view_pairing cluster feature
rack-aware pairing pairs base to view replicas that are in the
same dc and rack, using their ordinality in the replica map

We distinguish between 2 cases:
- Simple rack-aware pairing: when the replication factor in the dc
  is a multiple of the number of racks and the minimum number of nodes
  per rack in the dc is greater than or equal to rf / nr_racks.

  In this case (that includes the single rack case), all racks would
  have the same number of replicas, so we first filter all replicas
  by dc and rack, retaining their ordinality in the process, and
  finally, we pair between the base replicas and view replicas,
  that are in the same rack, using their original order in the
  tablet-map replica set.

  For example, nr_racks=2, rf=4:
  base_replicas = { N00, N01, N10, N11 }
  view_replicas = { N11, N12, N01, N02 }
  pairing would be: { N00, N01 }, { N01, N02 }, { N10, N11 }, { N11, N12 }
  Note that we don't optimize for self-pairing if it breaks pairing ordinality.

- Complex rack-aware pairing: when the replication factor is not
  a multiple of nr_racks.  In this case, we attempt best-match
  pairing in all racks, using the minimum number of base or view replicas
  in each rack (given their global ordinality), while pairing all the other
  replicas, across racks, sorted by their ordinality.

  For example, nr_racks=4, rf=3:
  base_replicas = { N00, N10, N20 }
  view_replicas = { N11, N21, N31 }
  pairing would be: { N00, N31 }\*, { N10, N11 }, { N20, N21 }
  \* cross-rack pair

  If we'd simply stable-sort both base and view replicas by rack,
  we might end up with much worse pairing across racks:
  { N00, N11 }\*, { N10, N21 }\*, { N20, N31 }\*
  \* cross-rack pair

Fixes scylladb/scylladb#17147

* This is an improvement so no backport is required

Closes scylladb/scylladb#21453

* github.com:scylladb/scylladb:
  network_topology_strategy_test: add tablets rack_aware_view_pairing tests
  view: get_view_natural_endpoint: implement rack-aware pairing for tablets
  view: get_view_natural_endpoint: handle case when there are too few view replicas
  view: get_view_natural_endpoint: track replica locator::nodes
  locator: topology: consult local_dc_rack if node not found by host_id
  locator: node: add dc and rack getters
  feature_service: add tablet_rack_aware_view_pairing feature
  view: get_view_natural_endpoint: refactor predicate function
  view: get_view_natural_endpoint: clarify documentation
  view: mutate_MV: optimize remote_endpoints filtering check
  view: mutate_MV: lookup base and view erms synchronously
  view: mutate_MV: calculate keyspace-dependent flags once
This commit is contained in:
Botond Dénes
2025-01-30 11:32:19 +02:00
8 changed files with 569 additions and 44 deletions

View File

@@ -12,6 +12,7 @@
#include <deque>
#include <functional>
#include <optional>
#include <ranges>
#include <unordered_set>
#include <vector>
#include <algorithm>
@@ -45,6 +46,7 @@
#include "gms/inet_address.hh"
#include "gms/feature_service.hh"
#include "keys.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/network_topology_strategy.hh"
#include "mutation/mutation.hh"
#include "mutation/mutation_partition.hh"
@@ -1723,39 +1725,94 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
//
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
// we pair only nodes in the same datacenter.
// If one of the base replicas also happens to be a view replica, it is
// paired with itself (with the other nodes paired by order in the list
//
// When use_legacy_self_pairing is enabled, if one of the base replicas
// also happens to be a view replica, it is paired with itself
// (with the other nodes paired by order in the list
// after taking this node out).
//
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
// and the replication factor in the node's datacenter is a multiple of the number
// of racks in the datacenter, then pairing is rack-aware. In this case,
// all racks have the same number of replicas, and those are never migrated
// outside their racks. Therefore, the base replicas are naturally paired with the
// view replicas that are in the same rack, based on the ordinal position.
// Note that typically, there is a single replica per rack and pairing is trivial.
//
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
static std::optional<locator::host_id>
std::optional<locator::host_id>
get_view_natural_endpoint(
locator::host_id me,
const locator::effective_replication_map_ptr& base_erm,
const locator::effective_replication_map_ptr& view_erm,
bool network_topology,
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_legacy_self_pairing,
bool use_tablets_rack_aware_view_pairing,
replica::cf_stats& cf_stats) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto me = topology.my_host_id();
auto my_datacenter = topology.get_datacenter();
std::vector<locator::host_id> base_endpoints, view_endpoints;
auto& my_location = topology.get_location(me);
auto& my_datacenter = my_location.dc;
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
bool simple_rack_aware_pairing = false;
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector orig_base_endpoints, orig_view_endpoints;
node_vector base_endpoints, view_endpoints;
if (rack_aware_pairing) {
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
// Simple rack-aware pairing is possible when the datacenter replication factor
// is a multiple of the number of racks in the datacenter.
if (dc_rf % racks.size() == 0) {
simple_rack_aware_pairing = true;
size_t rack_rf = dc_rf / racks.size();
// If any rack doesn't have enough nodes to satisfy the per-rack rf
// simple rack-aware pairing is disabled.
for (const auto& [rack, nodes] : racks) {
if (nodes.size() < rack_rf) {
simple_rack_aware_pairing = false;
break;
}
}
}
}
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
if (auto* np = topology.find_node(ep)) {
return *np;
}
throw std::runtime_error(format("get_view_natural_endpoint: {} replica {} not found in topology", is_view ? "view" : "base", ep));
};
std::function<bool(const locator::node&)> is_candidate;
if (simple_rack_aware_pairing) {
is_candidate = [&] (const locator::node& node) { return node.dc_rack() == my_location; };
} else if (network_topology) {
// Also for the (rack_aware_pairing && !simple_rack_aware_pairing) case
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
} else {
is_candidate = [&] (const locator::node&) { return true; };
}
auto process_candidate = [&] (node_vector& nodes, const locator::topology& topology, const locator::host_id& ep, bool is_view) {
auto& node = resolve(topology, ep, is_view);
if (is_candidate(node)) {
nodes.emplace_back(node);
}
};
// We need to use get_replicas() for pairing to be stable in case base or view tablet
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
for (auto&& base_endpoint : base_erm->get_replicas(base_token)) {
if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) {
base_endpoints.push_back(base_endpoint);
}
process_candidate(base_endpoints, topology, base_endpoint, false);
}
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
for (auto&& view_endpoint : view_erm->get_replicas(view_token)) {
if (use_legacy_self_pairing) {
auto it = std::find(base_endpoints.begin(), base_endpoints.end(),
view_endpoint);
if (use_legacy_self_pairing) {
for (auto&& view_endpoint : view_erm->get_replicas(view_token)) {
auto it = std::ranges::find(base_endpoints, view_endpoint, std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
if (view_endpoint == me && it != base_endpoints.end()) {
@@ -1766,26 +1823,103 @@ get_view_natural_endpoint(
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) {
view_endpoints.push_back(view_endpoint);
}
} else {
if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) {
view_endpoints.push_back(view_endpoint);
} else {
auto& node = resolve(view_topology, view_endpoint, true);
if (!network_topology || node.dc() == my_datacenter) {
view_endpoints.push_back(node);
}
}
}
} else {
for (auto&& view_endpoint : view_erm->get_replicas(view_token)) {
process_candidate(view_endpoints, view_topology, view_endpoint, true);
}
}
SCYLLA_ASSERT(base_endpoints.size() == view_endpoints.size());
auto base_it = std::find(base_endpoints.begin(), base_endpoints.end(), me);
orig_base_endpoints = base_endpoints;
orig_view_endpoints = view_endpoints;
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
// Use best-match, for the minimum number of base and view replicas in each rack,
// and ordinal match for the rest.
if (rack_aware_pairing && !simple_rack_aware_pairing) {
struct indexed_replica {
size_t idx;
std::reference_wrapper<const locator::node> node;
};
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
// First, index all replicas by rack
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
size_t idx = 0;
for (const auto& r: replicas) {
racks[r.get().rack()].emplace_back(idx++, r);
}
};
index_replica_set(base_racks, base_endpoints);
index_replica_set(view_racks, view_endpoints);
// Try optimistically pairing `me` first
const auto& my_base_replicas = base_racks[my_location.rack];
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
if (base_it == my_base_replicas.end()) {
return std::nullopt;
}
const auto& my_view_replicas = view_racks[my_location.rack];
size_t idx = base_it - my_base_replicas.begin();
if (idx < my_view_replicas.size()) {
return my_view_replicas[idx].node.get().host_id();
}
// Collect all unpaired base and view replicas,
// where the number of replicas in the base rack is different than the respective view rack
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
for (const auto& [rack, base_replicas] : base_racks) {
const auto& view_replicas = view_racks[rack];
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
unpaired_base_replicas.emplace_back(base_replicas[i]);
}
}
for (const auto& [rack, view_replicas] : view_racks) {
const auto& base_replicas = base_racks[rack];
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
unpaired_view_replicas.emplace_back(view_replicas[i]);
}
}
// Sort by the original ordinality, and copy the sorted results
// back into {base,view}_endpoints, for backward compatible processing below.
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
base_endpoints.clear();
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
view_endpoints.clear();
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
if (idx >= view_endpoints.size()) {
// There are fewer view replicas than base replicas
// FIXME: This might still happen when reducing replication factor with tablets,
// see https://github.com/scylladb/scylladb/issues/21492
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
auto replica = view_endpoints[base_it - base_endpoints.begin()];
// https://github.com/scylladb/scylladb/issues/19439
// With tablets, a node being replaced might transition to "left" state
@@ -1794,8 +1928,9 @@ get_view_natural_endpoint(
// but are still replicas. Therefore, there is no other sensible option
// right now but to give up attempt to send the update or write a hint
// to the paired, permanently down replica.
if (!view_topology.get_node(replica).left()) {
return replica;
const auto& node = view_endpoints[idx].get();
if (!node.left()) {
return node.host_id();
} else {
return std::nullopt;
}
@@ -1856,22 +1991,38 @@ future<> view_update_generator::mutate_MV(
service::allow_hints allow_hints,
wait_for_all_updates wait_for_all)
{
auto base_ermp = base->table().get_effective_replication_map();
auto& ks = _db.find_keyspace(base->ks_name());
auto& replication = ks.get_replication_strategy();
// We set legacy self-pairing for old vnode-based tables (for backward
// compatibility), and unset it for tablets - where range movements
// are more frequent and backward compatibility is less important.
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
// on a view, like we have the synchronous_updates_flag.
bool use_legacy_self_pairing = !ks.uses_tablets();
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
auto get_erm = [&] (table_id id) {
auto it = erms.find(id);
if (it == erms.end()) {
it = erms.emplace(id, _db.find_column_family(id).get_effective_replication_map()).first;
}
return it->second;
};
auto base_ermp = get_erm(base->id());
for (const auto& mut : view_updates) {
(void)get_erm(mut.s->id());
}
// Enable rack-aware view updates pairing for tablets
// when the cluster feature is enabled so that all replicas agree
// on the pairing algorithm.
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
auto me = base_ermp->get_topology().my_host_id();
static constexpr size_t max_concurrent_updates = 128;
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
co_await max_concurrent_for_each(view_updates, max_concurrent_updates,
[this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all, base_ermp] (frozen_mutation_and_schema mut) mutable -> future<> {
co_await max_concurrent_for_each(view_updates, max_concurrent_updates, [&] (frozen_mutation_and_schema mut) mutable -> future<> {
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto view_ermp = mut.s->table().get_effective_replication_map();
auto& ks = _proxy.local().local_db().find_keyspace(mut.s->ks_name());
bool network_topology = dynamic_cast<const locator::network_topology_strategy*>(&ks.get_replication_strategy());
// We set legacy self-pairing for old vnode-based tables (for backward
// compatibility), and unset it for tablets - where range movements
// are more frequent and backward compatibility is less important.
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
// on a view, like we have the synchronous_updates_flag.
bool use_legacy_self_pairing = !ks.uses_tablets();
auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token, use_legacy_self_pairing, cf_stats);
auto view_ermp = erms.at(mut.s->id());
auto target_endpoint = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
auto sem_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_updates.split(memory_usage_of(mut)));
@@ -1898,13 +2049,18 @@ future<> view_update_generator::mutate_MV(
if (*target_endpoint == *remote_it) {
remote_endpoints.erase(remote_it);
} else {
std::swap(*target_endpoint, *remote_it);
auto target_remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), *target_endpoint);
if (target_remote_it != remote_endpoints.end()) {
target_endpoint = *remote_it;
remote_endpoints.erase(remote_it);
} else {
std::swap(*target_endpoint, *remote_it);
}
}
}
}
// It's still possible that a target endpoint is duplicated in the remote endpoints list,
// so let's get rid of the duplicate if it exists
if (target_endpoint) {
} else if (target_endpoint) {
// It's still possible that a target endpoint is duplicated in the remote endpoints list,
// so let's get rid of the duplicate if it exists
auto remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), *target_endpoint);
if (remote_it != remote_endpoints.end()) {
remote_endpoints.erase(remote_it);

View File

@@ -350,6 +350,17 @@ size_t memory_usage_of(const frozen_mutation_and_schema& mut);
*/
std::vector<view_and_base> with_base_info_snapshot(std::vector<view_ptr>);
std::optional<locator::host_id> get_view_natural_endpoint(
locator::host_id node,
const locator::effective_replication_map_ptr& base_erm,
const locator::effective_replication_map_ptr& view_erm,
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_legacy_self_pairing,
bool use_tablets_basic_rack_aware_view_pairing,
replica::cf_stats& cf_stats);
}
}

View File

@@ -148,6 +148,7 @@ public:
gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv };
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
gms::feature tablet_rack_aware_view_pairing { *this, "TABLET_RACK_AWARE_VIEW_PAIRING"sv };
gms::feature tablet_migration_virtual_task { *this, "TABLET_MIGRATION_VIRTUAL_TASK"sv };
gms::feature tablet_resize_virtual_task { *this, "TABLET_RESIZE_VIRTUAL_TASK"sv };

View File

@@ -477,6 +477,17 @@ bool topology::has_node(host_id id) const noexcept {
return bool(node);
}
const endpoint_dc_rack& topology::get_location_slow(host_id id) const {
// We should do the following check after lookup in nodes.
// In tests, there may be no config for local node, so fall back to get_location()
// only if no mapping is found. Otherwise, get_location() will return empty location
// from config or random node, neither of which is correct.
if (id == _cfg.this_host_id) {
return get_location();
}
throw std::runtime_error(format("Requested location for node {} not in topology. backtrace {}", id, lazy_backtrace()));
}
void topology::sort_by_proximity(locator::host_id address, host_id_vector_replica_set& addresses) const {
if (can_sort_by_proximity()) {
do_sort_by_proximity(address, addresses);

View File

@@ -96,6 +96,14 @@ public:
return _dc_rack;
}
const sstring& dc() const noexcept {
return _dc_rack.dc;
}
const sstring& rack() const noexcept {
return _dc_rack.rack;
}
// Is this "localhost"?
this_node is_this_node() const noexcept { return _is_this_node; }
@@ -289,7 +297,10 @@ public:
// Get dc/rack location of a node identified by host_id
// The specified node must exist.
const endpoint_dc_rack& get_location(host_id id) const {
return find_node(id)->dc_rack();
if (auto node = find_node(id)) {
return node->dc_rack();
}
return get_location_slow(id);
}
// Get datacenter of this node
@@ -392,6 +403,7 @@ private:
}
void seed_random_engine(random_engine_type::result_type);
const endpoint_dc_rack& get_location_slow(host_id id) const;
unsigned _shard;
config _cfg;

View File

@@ -677,6 +677,9 @@ database::setup_metrics() {
sm::make_total_operations("total_view_updates_on_wrong_node", _cf_stats.total_view_updates_on_wrong_node,
sm::description("Total number of view updates which are computed on the wrong node.")).set_skip_when_empty(),
sm::make_total_operations("total_view_updates_failed_pairing", _cf_stats.total_view_updates_failed_pairing,
sm::description("Total number of view updates for which we failed base/view pairing.")).set_skip_when_empty(),
});
if (this_shard_id() == 0) {
_metrics.add_group("database", {

View File

@@ -332,6 +332,9 @@ struct cf_stats {
// How many times we build view updates only to realize it's the wrong node and drop the update
uint64_t total_view_updates_on_wrong_node = 0;
// How many times we failed to resolve base/view pairing
uint64_t total_view_updates_failed_pairing = 0;
};
class table;

View File

@@ -10,6 +10,7 @@
#include <fmt/ranges.h>
#include "gms/inet_address.hh"
#include "inet_address_vectors.hh"
#include "locator/host_id.hh"
#include "locator/types.hh"
#include "locator/snitch_base.hh"
#include "utils/assert.hh"
@@ -32,6 +33,7 @@
#include <iostream>
#include <sstream>
#include <compare>
#include <ranges>
#include <boost/range/algorithm/adjacent_find.hpp>
#include <boost/algorithm/cxx11/iota.hpp>
#include "test/lib/log.hh"
@@ -1206,4 +1208,330 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack_v2);
}
SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
std::map<sstring, size_t> node_count_per_dc;
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
std::vector<ring_point> ring_points;
auto& random_engine = seastar::testing::local_random_engine;
unsigned shard_count = 2;
size_t num_dcs = 1 + tests::random::get_int(3);
// Generate a random cluster
double point = 1;
for (size_t dc = 0; dc < num_dcs; ++dc) {
sstring dc_name = fmt::format("{}", 100 + dc);
size_t num_racks = 1 + tests::random::get_int(5);
for (size_t rack = 0; rack < num_racks; ++rack) {
sstring rack_name = fmt::format("{}", 10 + rack);
size_t rack_nodes = 1 + tests::random::get_int(2);
for (size_t i = 1; i <= rack_nodes; ++i) {
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
node_count_per_dc[dc_name]++;
node_count_per_rack[dc_name][rack_name]++;
point++;
}
}
}
testlog.debug("node_count_per_rack={}", node_count_per_rack);
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
co_await tm.update_normal_tokens(std::move(tokens), id);
}
}).get();
auto base_schema = schema_builder("ks", "base")
.with_column("k", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();
auto view_schema = schema_builder("ks", "view")
.with_column("v", utf8_type, column_kind::partition_key)
.with_column("k", utf8_type)
.build();
auto tmptr = stm.get();
// Create the replication strategy
auto make_random_options = [&] () {
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
std::map<sstring, sstring> options;
for (const auto& dc : option_dcs) {
auto num_racks = node_count_per_rack.at(dc).size();
auto max_rf_factor = std::ranges::min(std::ranges::views::transform(node_count_per_rack.at(dc), [] (auto& x) { return x.second; }));
auto rf = num_racks * tests::random::get_int(1UL, max_rf_factor);
options.emplace(dc, fmt::to_string(rf));
}
return options;
};
auto options = make_random_options();
size_t tablet_count = 1 + tests::random::get_int(99);
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
locator::replication_strategy_params params(options, tablet_count);
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", params);
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get();
auto base_table_id = base_schema->id();
testlog.debug("base_table_id={}", base_table_id);
auto view_table_id = view_schema->id();
auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get();
testlog.debug("view_table_id={}", view_table_id);
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.tablets().set_tablet_map(base_table_id, base_tmap);
tm.tablets().set_tablet_map(view_table_id, view_tmap);
return make_ready_future();
}).get();
tmptr = stm.get();
auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr);
auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr);
auto& topology = tmptr->get_topology();
testlog.debug("topology: {}", topology.get_datacenter_racks());
// Test tablets rack-aware base-view pairing
auto base_token = dht::token::get_random_token();
auto view_token = dht::token::get_random_token();
bool use_legacy_self_pairing = false;
bool use_tablets_basic_rack_aware_view_pairing = true;
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
replica::cf_stats cf_stats;
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
std::unordered_map<locator::host_id, locator::host_id> view_to_base_pairing;
for (const auto& base_replica : base_replicas) {
auto& base_host = base_replica.host;
auto view_ep_opt = db::view::get_view_natural_endpoint(
base_host,
base_erm,
view_erm,
*ars_ptr,
base_token,
view_token,
use_legacy_self_pairing,
use_tablets_basic_rack_aware_view_pairing,
cf_stats);
// view pair must be found
BOOST_REQUIRE(view_ep_opt);
auto& view_ep = *view_ep_opt;
// Assert pairing uniqueness
auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep);
BOOST_REQUIRE(inserted_base_pair);
auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host);
BOOST_REQUIRE(inserted_view_pair);
auto& base_location = topology.find_node(base_host)->dc_rack();
auto& view_location = topology.find_node(view_ep)->dc_rack();
// Assert dc- and rack- aware pairing
BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc);
BOOST_REQUIRE_EQUAL(base_location.rack, view_location.rack);
}
}
// Called in a seastar thread
void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
std::map<sstring, size_t> node_count_per_dc;
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
std::vector<ring_point> ring_points;
auto& random_engine = seastar::testing::local_random_engine;
unsigned shard_count = 2;
size_t num_dcs = 1 + tests::random::get_int(3);
// Generate a random cluster
double point = 1;
for (size_t dc = 0; dc < num_dcs; ++dc) {
sstring dc_name = fmt::format("{}", 100 + dc);
size_t num_racks = 2 + tests::random::get_int(4);
for (size_t rack = 0; rack < num_racks; ++rack) {
sstring rack_name = fmt::format("{}", 10 + rack);
size_t rack_nodes = 1 + tests::random::get_int(2);
for (size_t i = 1; i <= rack_nodes; ++i) {
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
node_count_per_dc[dc_name]++;
node_count_per_rack[dc_name][rack_name]++;
point++;
}
}
}
testlog.debug("node_count_per_rack={}", node_count_per_rack);
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
co_await tm.update_normal_tokens(std::move(tokens), id);
}
}).get();
auto base_schema = schema_builder("ks", "base")
.with_column("k", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();
auto view_schema = schema_builder("ks", "view")
.with_column("v", utf8_type, column_kind::partition_key)
.with_column("k", utf8_type)
.build();
auto tmptr = stm.get();
// Create the replication strategy
auto make_random_options = [&] () {
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
std::map<sstring, sstring> options;
for (const auto& dc : option_dcs) {
auto num_racks = node_count_per_rack.at(dc).size();
auto rf = more_or_less ?
tests::random::get_int(num_racks, node_count_per_dc[dc]) :
tests::random::get_int(1UL, num_racks);
options.emplace(dc, fmt::to_string(rf));
}
return options;
};
auto options = make_random_options();
size_t tablet_count = 1 + tests::random::get_int(99);
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
locator::replication_strategy_params params(options, tablet_count);
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", params);
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get();
auto base_table_id = base_schema->id();
testlog.debug("base_table_id={}", base_table_id);
auto view_table_id = view_schema->id();
auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get();
testlog.debug("view_table_id={}", view_table_id);
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.tablets().set_tablet_map(base_table_id, base_tmap);
tm.tablets().set_tablet_map(view_table_id, view_tmap);
return make_ready_future();
}).get();
tmptr = stm.get();
auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr);
auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr);
auto& topology = tmptr->get_topology();
testlog.debug("topology: {}", topology.get_datacenter_racks());
// Test tablets rack-aware base-view pairing
auto base_token = dht::token::get_random_token();
auto view_token = dht::token::get_random_token();
bool use_legacy_self_pairing = false;
bool use_tablets_basic_rack_aware_view_pairing = true;
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
replica::cf_stats cf_stats;
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
std::unordered_map<locator::host_id, locator::host_id> view_to_base_pairing;
std::unordered_map<sstring, size_t> same_rack_pairs;
std::unordered_map<sstring, size_t> cross_rack_pairs;
for (const auto& base_replica : base_replicas) {
auto& base_host = base_replica.host;
auto view_ep_opt = db::view::get_view_natural_endpoint(
base_host,
base_erm,
view_erm,
*ars_ptr,
base_token,
view_token,
use_legacy_self_pairing,
use_tablets_basic_rack_aware_view_pairing,
cf_stats);
// view pair must be found
if (!view_ep_opt) {
BOOST_FAIL(format("Could not pair base_host={} base_token={} view_token={}", base_host, base_token, view_token));
}
BOOST_REQUIRE(view_ep_opt);
auto& view_ep = *view_ep_opt;
// Assert pairing uniqueness
auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep);
BOOST_REQUIRE(inserted_base_pair);
auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host);
BOOST_REQUIRE(inserted_view_pair);
auto& base_location = topology.find_node(base_host)->dc_rack();
auto& view_location = topology.find_node(view_ep)->dc_rack();
// Assert dc- and rack- aware pairing
BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc);
if (base_location.rack == view_location.rack) {
same_rack_pairs[base_location.dc]++;
} else {
cross_rack_pairs[base_location.dc]++;
}
}
for (const auto& [dc, rf_opt] : options) {
auto rf = std::stol(rf_opt);
BOOST_REQUIRE_EQUAL(same_rack_pairs[dc] + cross_rack_pairs[dc], rf);
}
}
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_lt_racks) {
test_complex_rack_aware_view_pairing_test(false);
}
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_gt_racks) {
test_complex_rack_aware_view_pairing_test(true);
}
BOOST_AUTO_TEST_SUITE_END()