From d04cdce0fc56d524e7e5bc6be4dfda1ecdb86822 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 12 Nov 2024 16:22:18 +0200 Subject: [PATCH 01/12] view: mutate_MV: calculate keyspace-dependent flags once All view live in the same keyspace as their base table, so calculate the keyspace-dependent flags once, outside the per-view update loop. Signed-off-by: Benny Halevy --- db/view/view.cc | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index e34297bed6..11be889b26 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1856,21 +1856,20 @@ future<> view_update_generator::mutate_MV( service::allow_hints allow_hints, wait_for_all_updates wait_for_all) { + auto& ks = _db.find_keyspace(base->ks_name()); + bool network_topology = dynamic_cast(&ks.get_replication_strategy()); + // We set legacy self-pairing for old vnode-based tables (for backward + // compatibility), and unset it for tablets - where range movements + // are more frequent and backward compatibility is less important. + // TODO: Maybe allow users to set use_legacy_self_pairing explicitly + // on a view, like we have the synchronous_updates_flag. + bool use_legacy_self_pairing = !ks.uses_tablets(); auto base_ermp = base->table().get_effective_replication_map(); static constexpr size_t max_concurrent_updates = 128; co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms); - co_await max_concurrent_for_each(view_updates, max_concurrent_updates, - [this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all, base_ermp] (frozen_mutation_and_schema mut) mutable -> future<> { + co_await max_concurrent_for_each(view_updates, max_concurrent_updates, [&] (frozen_mutation_and_schema mut) mutable -> future<> { auto view_token = dht::get_token(*mut.s, mut.fm.key()); auto view_ermp = mut.s->table().get_effective_replication_map(); - auto& ks = _proxy.local().local_db().find_keyspace(mut.s->ks_name()); - bool network_topology = dynamic_cast(&ks.get_replication_strategy()); - // We set legacy self-pairing for old vnode-based tables (for backward - // compatibility), and unset it for tablets - where range movements - // are more frequent and backward compatibility is less important. - // TODO: Maybe allow users to set use_legacy_self_pairing explicitly - // on a view, like we have the synchronous_updates_flag. - bool use_legacy_self_pairing = !ks.uses_tablets(); auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token, use_legacy_self_pairing, cf_stats); auto remote_endpoints = view_ermp->get_pending_replicas(view_token); auto sem_units = seastar::make_lw_shared(pending_view_updates.split(memory_usage_of(mut))); From 91d3bf8ebc2c8fdac5fb6750af18010fbb88c2d0 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 12 Nov 2024 16:22:18 +0200 Subject: [PATCH 02/12] view: mutate_MV: lookup base and view erms synchronously Although at the moment storage_service::replicate_to_all_cores may yield between updating the base and view tables with a new effective_replication_map, scylladb/scylladb#21781 was submitted to change that so that they are updated atomically together. This change prepares for the above change, and is harmless at the moment. Signed-off-by: Benny Halevy --- db/view/view.cc | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 11be889b26..ed36667cb8 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1864,12 +1864,23 @@ future<> view_update_generator::mutate_MV( // TODO: Maybe allow users to set use_legacy_self_pairing explicitly // on a view, like we have the synchronous_updates_flag. bool use_legacy_self_pairing = !ks.uses_tablets(); - auto base_ermp = base->table().get_effective_replication_map(); + std::unordered_map erms; + auto get_erm = [&] (table_id id) { + auto it = erms.find(id); + if (it == erms.end()) { + it = erms.emplace(id, _db.find_column_family(id).get_effective_replication_map()).first; + } + return it->second; + }; + auto base_ermp = get_erm(base->id()); + for (const auto& mut : view_updates) { + (void)get_erm(mut.s->id()); + } static constexpr size_t max_concurrent_updates = 128; co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms); co_await max_concurrent_for_each(view_updates, max_concurrent_updates, [&] (frozen_mutation_and_schema mut) mutable -> future<> { auto view_token = dht::get_token(*mut.s, mut.fm.key()); - auto view_ermp = mut.s->table().get_effective_replication_map(); + auto view_ermp = erms.at(mut.s->id()); auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token, use_legacy_self_pairing, cf_stats); auto remote_endpoints = view_ermp->get_pending_replicas(view_token); auto sem_units = seastar::make_lw_shared(pending_view_updates.split(memory_usage_of(mut))); From 6d4de30a3ac3987c6900c934b2e4136aa503b95e Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 13 Nov 2024 10:33:00 +0200 Subject: [PATCH 03/12] view: mutate_MV: optimize remote_endpoints filtering check Currently we always lookup both `my_address` and *target_endpoint in remote_endpoints. But if my_address is in remote_endpoints in some cases the second lookup is not needed, so do it only to decide whether to swap target_endpoint with my_address, if found in remote_endpoints, or to remove that match, if *target_endpoint is already pending as well. Signed-off-by: Benny Halevy --- db/view/view.cc | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index ed36667cb8..049fdfa534 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1908,13 +1908,18 @@ future<> view_update_generator::mutate_MV( if (*target_endpoint == *remote_it) { remote_endpoints.erase(remote_it); } else { - std::swap(*target_endpoint, *remote_it); + auto target_remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), *target_endpoint); + if (target_remote_it != remote_endpoints.end()) { + target_endpoint = *remote_it; + remote_endpoints.erase(remote_it); + } else { + std::swap(*target_endpoint, *remote_it); + } } } - } - // It's still possible that a target endpoint is duplicated in the remote endpoints list, - // so let's get rid of the duplicate if it exists - if (target_endpoint) { + } else if (target_endpoint) { + // It's still possible that a target endpoint is duplicated in the remote endpoints list, + // so let's get rid of the duplicate if it exists auto remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), *target_endpoint); if (remote_it != remote_endpoints.end()) { remote_endpoints.erase(remote_it); From 97f85e52f7aebf5c52fa2ae8967da1972dcc289d Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 5 Nov 2024 13:41:57 +0200 Subject: [PATCH 04/12] view: get_view_natural_endpoint: clarify documentation "self-pairing" is enabled only when use_legacy_self_pairing is enabled. That is currently unclear in the documentation comment for this function. Signed-off-by: Benny Halevy --- db/view/view.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 049fdfa534..c775f921e2 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1723,8 +1723,10 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo // // If the keyspace's replication strategy is a NetworkTopologyStrategy, // we pair only nodes in the same datacenter. -// If one of the base replicas also happens to be a view replica, it is -// paired with itself (with the other nodes paired by order in the list +// +// When use_legacy_self_pairing is enabled, if one of the base replicas +// also happens to be a view replica, it is paired with itself +// (with the other nodes paired by order in the list // after taking this node out). // // If the assumption that the given base token belongs to this replica From cadd33bdf6263f21add0a4bfd08d6fd053e181e6 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 6 Nov 2024 16:37:03 +0200 Subject: [PATCH 05/12] view: get_view_natural_endpoint: refactor predicate function Simplify the function logic by calculating the predicate function once, before scanning all base and view replicas, rather than testing the different options in the inner loop. Signed-off-by: Benny Halevy --- db/view/view.cc | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index c775f921e2..712fb81531 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1745,17 +1745,24 @@ get_view_natural_endpoint( auto my_datacenter = topology.get_datacenter(); std::vector base_endpoints, view_endpoints; + std::function is_candidate; + if (network_topology) { + is_candidate = [&] (const locator::host_id& ep) { return topology.get_datacenter(ep) == my_datacenter; }; + } else { + is_candidate = [&] (const locator::host_id&) { return true; }; + } + // We need to use get_replicas() for pairing to be stable in case base or view tablet // is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas. for (auto&& base_endpoint : base_erm->get_replicas(base_token)) { - if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) { + if (is_candidate(base_endpoint)) { base_endpoints.push_back(base_endpoint); } } auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology(); - for (auto&& view_endpoint : view_erm->get_replicas(view_token)) { - if (use_legacy_self_pairing) { + if (use_legacy_self_pairing) { + for (auto&& view_endpoint : view_erm->get_replicas(view_token)) { auto it = std::find(base_endpoints.begin(), base_endpoints.end(), view_endpoint); // If this base replica is also one of the view replicas, we use @@ -1771,8 +1778,10 @@ get_view_natural_endpoint( } else if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) { view_endpoints.push_back(view_endpoint); } - } else { - if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) { + } + } else { + for (auto&& view_endpoint : view_erm->get_replicas(view_token)) { + if (is_candidate(view_endpoint)) { view_endpoints.push_back(view_endpoint); } } From 6f8f03f5934ce3da8967833371be61a3d95f8c33 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 5 Nov 2024 19:16:12 +0200 Subject: [PATCH 06/12] feature_service: add tablet_rack_aware_view_pairing feature Signed-off-by: Benny Halevy --- gms/feature_service.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 8b1f7a78a4..6bbc1f1106 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -148,6 +148,7 @@ public: gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv }; gms::feature tablet_merge { *this, "TABLET_MERGE"sv }; + gms::feature tablet_rack_aware_view_pairing { *this, "TABLET_RACK_AWARE_VIEW_PAIRING"sv }; gms::feature tablet_migration_virtual_task { *this, "TABLET_MIGRATION_VIRTUAL_TASK"sv }; gms::feature tablet_resize_virtual_task { *this, "TABLET_RESIZE_VIRTUAL_TASK"sv }; From 2bfebc1f625c5d7138b43419d6b991e8e4f2534f Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 14 Nov 2024 10:48:17 +0200 Subject: [PATCH 07/12] locator: node: add dc and rack getters To simplify searching and sorting using std::ranges projection using std::mem_fn. Signed-off-by: Benny Halevy --- locator/topology.hh | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/locator/topology.hh b/locator/topology.hh index d1c141b506..4dbbf09158 100644 --- a/locator/topology.hh +++ b/locator/topology.hh @@ -96,6 +96,14 @@ public: return _dc_rack; } + const sstring& dc() const noexcept { + return _dc_rack.dc; + } + + const sstring& rack() const noexcept { + return _dc_rack.rack; + } + // Is this "localhost"? this_node is_this_node() const noexcept { return _is_this_node; } From 2589115337395d2adec3152aba70a47686d76e86 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 18 Nov 2024 16:40:46 +0200 Subject: [PATCH 08/12] locator: topology: consult local_dc_rack if node not found by host_id Like get_location by inet_address, there is a case, when a node is replaced that the node cannot be found by host_id. Currently get_location would return a reference based on the nullptr which might cause a segfault as seen in testing. Instead, if the host_id is of the location, revert to calling get_location() which consults this_node or _cfg.local_dc_rack. Otherwise, throw a runtime_error. Signed-off-by: Benny Halevy --- locator/topology.cc | 11 +++++++++++ locator/topology.hh | 6 +++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/locator/topology.cc b/locator/topology.cc index caa4f1d2e0..375e68077e 100644 --- a/locator/topology.cc +++ b/locator/topology.cc @@ -477,6 +477,17 @@ bool topology::has_node(host_id id) const noexcept { return bool(node); } +const endpoint_dc_rack& topology::get_location_slow(host_id id) const { + // We should do the following check after lookup in nodes. + // In tests, there may be no config for local node, so fall back to get_location() + // only if no mapping is found. Otherwise, get_location() will return empty location + // from config or random node, neither of which is correct. + if (id == _cfg.this_host_id) { + return get_location(); + } + throw std::runtime_error(format("Requested location for node {} not in topology. backtrace {}", id, lazy_backtrace())); +} + void topology::sort_by_proximity(locator::host_id address, host_id_vector_replica_set& addresses) const { if (can_sort_by_proximity()) { do_sort_by_proximity(address, addresses); diff --git a/locator/topology.hh b/locator/topology.hh index 4dbbf09158..b1f284ff8a 100644 --- a/locator/topology.hh +++ b/locator/topology.hh @@ -297,7 +297,10 @@ public: // Get dc/rack location of a node identified by host_id // The specified node must exist. const endpoint_dc_rack& get_location(host_id id) const { - return find_node(id)->dc_rack(); + if (auto node = find_node(id)) { + return node->dc_rack(); + } + return get_location_slow(id); } // Get datacenter of this node @@ -400,6 +403,7 @@ private: } void seed_random_engine(random_engine_type::result_type); + const endpoint_dc_rack& get_location_slow(host_id id) const; unsigned _shard; config _cfg; From 858b0a51f815a447d551cbce9059d500224b6785 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 14 Nov 2024 11:01:41 +0200 Subject: [PATCH 09/12] view: get_view_natural_endpoint: track replica locator::nodes Rather than tracking only the replica host_id, keep track of the locator:::node& to prepare for rack-aware pairing. Signed-off-by: Benny Halevy --- db/view/view.cc | 48 ++++++++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 712fb81531..f94ba587f7 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1743,28 +1743,38 @@ get_view_natural_endpoint( auto& topology = base_erm->get_token_metadata_ptr()->get_topology(); auto me = topology.my_host_id(); auto my_datacenter = topology.get_datacenter(); - std::vector base_endpoints, view_endpoints; + using node_vector = std::vector>; + node_vector base_endpoints, view_endpoints; - std::function is_candidate; + auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& { + if (auto* np = topology.find_node(ep)) { + return *np; + } + throw std::runtime_error(format("get_view_natural_endpoint: {} replica {} not found in topology", is_view ? "view" : "base", ep)); + }; + std::function is_candidate; if (network_topology) { - is_candidate = [&] (const locator::host_id& ep) { return topology.get_datacenter(ep) == my_datacenter; }; + is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; }; } else { - is_candidate = [&] (const locator::host_id&) { return true; }; + is_candidate = [&] (const locator::node&) { return true; }; } + auto process_candidate = [&] (node_vector& nodes, const locator::topology& topology, const locator::host_id& ep, bool is_view) { + auto& node = resolve(topology, ep, is_view); + if (is_candidate(node)) { + nodes.emplace_back(node); + } + }; // We need to use get_replicas() for pairing to be stable in case base or view tablet // is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas. for (auto&& base_endpoint : base_erm->get_replicas(base_token)) { - if (is_candidate(base_endpoint)) { - base_endpoints.push_back(base_endpoint); - } + process_candidate(base_endpoints, topology, base_endpoint, false); } auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology(); if (use_legacy_self_pairing) { for (auto&& view_endpoint : view_erm->get_replicas(view_token)) { - auto it = std::find(base_endpoints.begin(), base_endpoints.end(), - view_endpoint); + auto it = std::ranges::find(base_endpoints, view_endpoint, std::mem_fn(&locator::node::host_id)); // If this base replica is also one of the view replicas, we use // ourselves as the view replica. if (view_endpoint == me && it != base_endpoints.end()) { @@ -1775,20 +1785,21 @@ get_view_natural_endpoint( // otherwise. if (it != base_endpoints.end()) { base_endpoints.erase(it); - } else if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) { - view_endpoints.push_back(view_endpoint); + } else { + auto& node = resolve(view_topology, view_endpoint, true); + if (!network_topology || node.dc() == my_datacenter) { + view_endpoints.push_back(node); + } } } } else { for (auto&& view_endpoint : view_erm->get_replicas(view_token)) { - if (is_candidate(view_endpoint)) { - view_endpoints.push_back(view_endpoint); - } + process_candidate(view_endpoints, view_topology, view_endpoint, true); } } SCYLLA_ASSERT(base_endpoints.size() == view_endpoints.size()); - auto base_it = std::find(base_endpoints.begin(), base_endpoints.end(), me); + auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id)); if (base_it == base_endpoints.end()) { // This node is not a base replica of this key, so we return empty // FIXME: This case shouldn't happen, and if it happens, a view update @@ -1796,7 +1807,7 @@ get_view_natural_endpoint( ++cf_stats.total_view_updates_on_wrong_node; return {}; } - auto replica = view_endpoints[base_it - base_endpoints.begin()]; + size_t idx = base_it - base_endpoints.begin(); // https://github.com/scylladb/scylladb/issues/19439 // With tablets, a node being replaced might transition to "left" state @@ -1805,8 +1816,9 @@ get_view_natural_endpoint( // but are still replicas. Therefore, there is no other sensible option // right now but to give up attempt to send the update or write a hint // to the paired, permanently down replica. - if (!view_topology.get_node(replica).left()) { - return replica; + const auto& node = view_endpoints[idx].get(); + if (!node.left()) { + return node.host_id(); } else { return std::nullopt; } From 0e388a1594cc1eeda09ee0727831801d4400d9aa Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 14 Nov 2024 11:09:53 +0200 Subject: [PATCH 10/12] view: get_view_natural_endpoint: handle case when there are too few view replicas Currently, when reducing RF, we may drop replicas from the view before dropping replicas from the base table. Since get_view_natural_endpoint is allowed to return a disengaged optional if it can't find a pair for the base replica, replcace the exiting assertion with code handling this case, and count those events in a new table metric: total_view_updates_failed_pairing. Note that this does not fix the root cause for the issue which is the unsynchronized dropping of replicas, that should be atomic, using a single group0 transaction. Refs scylladb/scylladb#21492 Signed-off-by: Benny Halevy --- db/view/view.cc | 8 +++++++- replica/database.cc | 3 +++ replica/database.hh | 3 +++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/db/view/view.cc b/db/view/view.cc index f94ba587f7..7ae89ba888 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1798,7 +1798,6 @@ get_view_natural_endpoint( } } - SCYLLA_ASSERT(base_endpoints.size() == view_endpoints.size()); auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id)); if (base_it == base_endpoints.end()) { // This node is not a base replica of this key, so we return empty @@ -1808,6 +1807,13 @@ get_view_natural_endpoint( return {}; } size_t idx = base_it - base_endpoints.begin(); + if (idx >= view_endpoints.size()) { + // There are fewer view replicas than base replicas + // FIXME: This might still happen when reducing replication factor with tablets, + // see https://github.com/scylladb/scylladb/issues/21492 + ++cf_stats.total_view_updates_failed_pairing; + return {}; + } // https://github.com/scylladb/scylladb/issues/19439 // With tablets, a node being replaced might transition to "left" state diff --git a/replica/database.cc b/replica/database.cc index 4a8579446b..ed97a53696 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -677,6 +677,9 @@ database::setup_metrics() { sm::make_total_operations("total_view_updates_on_wrong_node", _cf_stats.total_view_updates_on_wrong_node, sm::description("Total number of view updates which are computed on the wrong node.")).set_skip_when_empty(), + + sm::make_total_operations("total_view_updates_failed_pairing", _cf_stats.total_view_updates_failed_pairing, + sm::description("Total number of view updates for which we failed base/view pairing.")).set_skip_when_empty(), }); if (this_shard_id() == 0) { _metrics.add_group("database", { diff --git a/replica/database.hh b/replica/database.hh index 1109e4890f..7444c0dbcb 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -332,6 +332,9 @@ struct cf_stats { // How many times we build view updates only to realize it's the wrong node and drop the update uint64_t total_view_updates_on_wrong_node = 0; + + // How many times we failed to resolve base/view pairing + uint64_t total_view_updates_failed_pairing = 0; }; class table; From 249b793674bea924ffdabf3912463d5735f850ae Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 5 Nov 2024 10:17:17 +0200 Subject: [PATCH 11/12] view: get_view_natural_endpoint: implement rack-aware pairing for tablets Enabled with the tablets_rack_aware_view_pairing cluster feature rack-aware pairing pairs base to view replicas that are in the same dc and rack, using their ordinality in the replica map We distinguish between 2 cases: - Simple rack-aware pairing: when the replication factor in the dc is a multiple of the number of racks and the minimum number of nodes per rack in the dc is greater than or equal to rf / nr_racks. In this case (that includes the single rack case), all racks would have the same number of replicas, so we first filter all replicas by dc and rack, retaining their ordinality in the process, and finally, we pair between the base replicas and view replicas, that are in the same rack, using their original order in the tablet-map replica set. For example, nr_racks=2, rf=4: base_replicas = { N00, N01, N10, N11 } view_replicas = { N11, N12, N01, N02 } pairing would be: { N00, N01 }, { N01, N02 }, { N10, N11 }, { N11, N12 } Note that we don't optimize for self-pairing if it breaks pairing ordinality. - Complex rack-aware pairing: when the replication factor is not a multiple of nr_racks. In this case, we attempt best-match pairing in all racks, using the minimum number of base or view replicas in each rack (given their global ordinality), while pairing all the other replicas, across racks, sorted by their ordinality. For example, nr_racks=4, rf=3: base_replicas = { N00, N10, N20 } view_replicas = { N11, N21, N31 } pairing would be: { N00, N31 }*, { N10, N11 }, { N20, N21 } * cross-rack pair If we'd simply stable-sort both base and view replicas by rack, we might end up with much worse pairing across racks: { N00, N11 }*, { N10, N21 }*, { N20, N31 }* * cross-rack pair Fixes scylladb/scylladb#17147 Signed-off-by: Benny Halevy --- db/view/view.cc | 120 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 5 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 7ae89ba888..4bf3b9e468 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -45,6 +45,7 @@ #include "gms/inet_address.hh" #include "gms/feature_service.hh" #include "keys.hh" +#include "locator/abstract_replication_strategy.hh" #include "locator/network_topology_strategy.hh" #include "mutation/mutation.hh" #include "mutation/mutation_partition.hh" @@ -1729,23 +1730,56 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo // (with the other nodes paired by order in the list // after taking this node out). // +// If the table uses tablets and the replication strategy is NetworkTopologyStrategy +// and the replication factor in the node's datacenter is a multiple of the number +// of racks in the datacenter, then pairing is rack-aware. In this case, +// all racks have the same number of replicas, and those are never migrated +// outside their racks. Therefore, the base replicas are naturally paired with the +// view replicas that are in the same rack, based on the ordinal position. +// Note that typically, there is a single replica per rack and pairing is trivial. +// // If the assumption that the given base token belongs to this replica // does not hold, we return an empty optional. static std::optional get_view_natural_endpoint( const locator::effective_replication_map_ptr& base_erm, const locator::effective_replication_map_ptr& view_erm, - bool network_topology, + const locator::abstract_replication_strategy& replication_strategy, const dht::token& base_token, const dht::token& view_token, bool use_legacy_self_pairing, + bool use_tablets_rack_aware_view_pairing, replica::cf_stats& cf_stats) { auto& topology = base_erm->get_token_metadata_ptr()->get_topology(); auto me = topology.my_host_id(); - auto my_datacenter = topology.get_datacenter(); + auto& my_location = topology.get_location(me); + auto& my_datacenter = my_location.dc; + auto* network_topology = dynamic_cast(&replication_strategy); + auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology; + bool simple_rack_aware_pairing = false; using node_vector = std::vector>; + node_vector orig_base_endpoints, orig_view_endpoints; node_vector base_endpoints, view_endpoints; + if (rack_aware_pairing) { + auto dc_rf = network_topology->get_replication_factor(my_datacenter); + const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter); + // Simple rack-aware pairing is possible when the datacenter replication factor + // is a multiple of the number of racks in the datacenter. + if (dc_rf % racks.size() == 0) { + simple_rack_aware_pairing = true; + size_t rack_rf = dc_rf / racks.size(); + // If any rack doesn't have enough nodes to satisfy the per-rack rf + // simple rack-aware pairing is disabled. + for (const auto& [rack, nodes] : racks) { + if (nodes.size() < rack_rf) { + simple_rack_aware_pairing = false; + break; + } + } + } + } + auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& { if (auto* np = topology.find_node(ep)) { return *np; @@ -1753,7 +1787,10 @@ get_view_natural_endpoint( throw std::runtime_error(format("get_view_natural_endpoint: {} replica {} not found in topology", is_view ? "view" : "base", ep)); }; std::function is_candidate; - if (network_topology) { + if (simple_rack_aware_pairing) { + is_candidate = [&] (const locator::node& node) { return node.dc_rack() == my_location; }; + } else if (network_topology) { + // Also for the (rack_aware_pairing && !simple_rack_aware_pairing) case is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; }; } else { is_candidate = [&] (const locator::node&) { return true; }; @@ -1798,12 +1835,76 @@ get_view_natural_endpoint( } } + orig_base_endpoints = base_endpoints; + orig_view_endpoints = view_endpoints; + + // For the complex rack_aware_pairing case, nodes are already filtered by datacenter + // Use best-match, for the minimum number of base and view replicas in each rack, + // and ordinal match for the rest. + if (rack_aware_pairing && !simple_rack_aware_pairing) { + struct indexed_replica { + size_t idx; + std::reference_wrapper node; + }; + std::unordered_map> base_racks, view_racks; + + // First, index all replicas by rack + auto index_replica_set = [] (std::unordered_map>& racks, const node_vector& replicas) { + size_t idx = 0; + for (const auto& r: replicas) { + racks[r.get().rack()].emplace_back(idx++, r); + } + }; + index_replica_set(base_racks, base_endpoints); + index_replica_set(view_racks, view_endpoints); + + // Try optimistically pairing `me` first + const auto& my_base_replicas = base_racks[my_location.rack]; + auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); }); + if (base_it == my_base_replicas.end()) { + return std::nullopt; + } + const auto& my_view_replicas = view_racks[my_location.rack]; + size_t idx = base_it - my_base_replicas.begin(); + if (idx < my_view_replicas.size()) { + return my_view_replicas[idx].node.get().host_id(); + } + + // Collect all unpaired base and view replicas, + // where the number of replicas in the base rack is different than the respective view rack + std::vector unpaired_base_replicas, unpaired_view_replicas; + for (const auto& [rack, base_replicas] : base_racks) { + const auto& view_replicas = view_racks[rack]; + for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) { + unpaired_base_replicas.emplace_back(base_replicas[i]); + } + } + for (const auto& [rack, view_replicas] : view_racks) { + const auto& base_replicas = base_racks[rack]; + for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) { + unpaired_view_replicas.emplace_back(view_replicas[i]); + } + } + + // Sort by the original ordinality, and copy the sorted results + // back into {base,view}_endpoints, for backward compatible processing below. + std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx)); + base_endpoints.clear(); + std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node)); + + std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx)); + view_endpoints.clear(); + std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node)); + } + auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id)); if (base_it == base_endpoints.end()) { // This node is not a base replica of this key, so we return empty // FIXME: This case shouldn't happen, and if it happens, a view update // would be lost. ++cf_stats.total_view_updates_on_wrong_node; + vlogger.warn("Could not find {} in base_endpoints={}", me, + orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id))); return {}; } size_t idx = base_it - base_endpoints.begin(); @@ -1812,6 +1913,10 @@ get_view_natural_endpoint( // FIXME: This might still happen when reducing replication factor with tablets, // see https://github.com/scylladb/scylladb/issues/21492 ++cf_stats.total_view_updates_failed_pairing; + vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me, + rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none", + orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)), + orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id))); return {}; } @@ -1886,7 +1991,7 @@ future<> view_update_generator::mutate_MV( wait_for_all_updates wait_for_all) { auto& ks = _db.find_keyspace(base->ks_name()); - bool network_topology = dynamic_cast(&ks.get_replication_strategy()); + auto& replication = ks.get_replication_strategy(); // We set legacy self-pairing for old vnode-based tables (for backward // compatibility), and unset it for tablets - where range movements // are more frequent and backward compatibility is less important. @@ -1905,12 +2010,17 @@ future<> view_update_generator::mutate_MV( for (const auto& mut : view_updates) { (void)get_erm(mut.s->id()); } + // Enable rack-aware view updates pairing for tablets + // when the cluster feature is enabled so that all replicas agree + // on the pairing algorithm. + bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets(); static constexpr size_t max_concurrent_updates = 128; co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms); co_await max_concurrent_for_each(view_updates, max_concurrent_updates, [&] (frozen_mutation_and_schema mut) mutable -> future<> { auto view_token = dht::get_token(*mut.s, mut.fm.key()); auto view_ermp = erms.at(mut.s->id()); - auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, network_topology, base_token, view_token, use_legacy_self_pairing, cf_stats); + auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, replication, base_token, view_token, + use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats); auto remote_endpoints = view_ermp->get_pending_replicas(view_token); auto sem_units = seastar::make_lw_shared(pending_view_updates.split(memory_usage_of(mut))); From dd21d591f65c9f5bc1c2d2d1f5038ce529a19bba Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 7 Nov 2024 11:41:44 +0200 Subject: [PATCH 12/12] network_topology_strategy_test: add tablets rack_aware_view_pairing tests Test the simple case of base/view pairing with replication_factor that is a multiple of the number of racks. As well as the complex case when simple_tablets_rack_aware_view_pairing is not possible. Signed-off-by: Benny Halevy --- db/view/view.cc | 8 +- db/view/view.hh | 11 + test/boost/network_topology_strategy_test.cc | 328 +++++++++++++++++++ 3 files changed, 344 insertions(+), 3 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 4bf3b9e468..b53f2220ed 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -1740,8 +1741,9 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo // // If the assumption that the given base token belongs to this replica // does not hold, we return an empty optional. -static std::optional +std::optional get_view_natural_endpoint( + locator::host_id me, const locator::effective_replication_map_ptr& base_erm, const locator::effective_replication_map_ptr& view_erm, const locator::abstract_replication_strategy& replication_strategy, @@ -1751,7 +1753,6 @@ get_view_natural_endpoint( bool use_tablets_rack_aware_view_pairing, replica::cf_stats& cf_stats) { auto& topology = base_erm->get_token_metadata_ptr()->get_topology(); - auto me = topology.my_host_id(); auto& my_location = topology.get_location(me); auto& my_datacenter = my_location.dc; auto* network_topology = dynamic_cast(&replication_strategy); @@ -2014,12 +2015,13 @@ future<> view_update_generator::mutate_MV( // when the cluster feature is enabled so that all replicas agree // on the pairing algorithm. bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets(); + auto me = base_ermp->get_topology().my_host_id(); static constexpr size_t max_concurrent_updates = 128; co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms); co_await max_concurrent_for_each(view_updates, max_concurrent_updates, [&] (frozen_mutation_and_schema mut) mutable -> future<> { auto view_token = dht::get_token(*mut.s, mut.fm.key()); auto view_ermp = erms.at(mut.s->id()); - auto target_endpoint = get_view_natural_endpoint(base_ermp, view_ermp, replication, base_token, view_token, + auto target_endpoint = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats); auto remote_endpoints = view_ermp->get_pending_replicas(view_token); auto sem_units = seastar::make_lw_shared(pending_view_updates.split(memory_usage_of(mut))); diff --git a/db/view/view.hh b/db/view/view.hh index f35a07a63a..59a17d5174 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -350,6 +350,17 @@ size_t memory_usage_of(const frozen_mutation_and_schema& mut); */ std::vector with_base_info_snapshot(std::vector); +std::optional get_view_natural_endpoint( + locator::host_id node, + const locator::effective_replication_map_ptr& base_erm, + const locator::effective_replication_map_ptr& view_erm, + const locator::abstract_replication_strategy& replication_strategy, + const dht::token& base_token, + const dht::token& view_token, + bool use_legacy_self_pairing, + bool use_tablets_basic_rack_aware_view_pairing, + replica::cf_stats& cf_stats); + } } diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index 312f79a954..f0019ea46a 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -10,6 +10,7 @@ #include #include "gms/inet_address.hh" #include "inet_address_vectors.hh" +#include "locator/host_id.hh" #include "locator/types.hh" #include "locator/snitch_base.hh" #include "utils/assert.hh" @@ -32,6 +33,7 @@ #include #include #include +#include #include #include #include "test/lib/log.hh" @@ -1206,4 +1208,330 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) { BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack_v2); } +SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) { + auto my_address = gms::inet_address("localhost"); + + // Create the RackInferringSnitch + snitch_config cfg; + cfg.listen_address = my_address; + cfg.broadcast_address = my_address; + cfg.name = "RackInferringSnitch"; + sharded snitch; + snitch.start(cfg).get(); + auto stop_snitch = defer([&snitch] { snitch.stop().get(); }); + snitch.invoke_on_all(&snitch_ptr::start).get(); + + locator::token_metadata::config tm_cfg; + tm_cfg.topo_cfg.this_endpoint = my_address; + tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; + + std::map node_count_per_dc; + std::map> node_count_per_rack; + std::vector ring_points; + + auto& random_engine = seastar::testing::local_random_engine; + unsigned shard_count = 2; + size_t num_dcs = 1 + tests::random::get_int(3); + + // Generate a random cluster + double point = 1; + for (size_t dc = 0; dc < num_dcs; ++dc) { + sstring dc_name = fmt::format("{}", 100 + dc); + size_t num_racks = 1 + tests::random::get_int(5); + for (size_t rack = 0; rack < num_racks; ++rack) { + sstring rack_name = fmt::format("{}", 10 + rack); + size_t rack_nodes = 1 + tests::random::get_int(2); + for (size_t i = 1; i <= rack_nodes; ++i) { + ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i))); + node_count_per_dc[dc_name]++; + node_count_per_rack[dc_name][rack_name]++; + point++; + } + } + } + + testlog.debug("node_count_per_rack={}", node_count_per_rack); + + // Initialize the token_metadata + locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { + auto& topo = tm.get_topology(); + for (const auto& [ring_point, endpoint, id] : ring_points) { + std::unordered_set tokens; + tokens.insert(token{tests::d2t(ring_point / ring_points.size())}); + topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count); + co_await tm.update_normal_tokens(std::move(tokens), id); + } + }).get(); + + auto base_schema = schema_builder("ks", "base") + .with_column("k", utf8_type, column_kind::partition_key) + .with_column("v", utf8_type) + .build(); + + auto view_schema = schema_builder("ks", "view") + .with_column("v", utf8_type, column_kind::partition_key) + .with_column("k", utf8_type) + .build(); + + auto tmptr = stm.get(); + + // Create the replication strategy + auto make_random_options = [&] () { + auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to(); + std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine); + std::map options; + for (const auto& dc : option_dcs) { + auto num_racks = node_count_per_rack.at(dc).size(); + auto max_rf_factor = std::ranges::min(std::ranges::views::transform(node_count_per_rack.at(dc), [] (auto& x) { return x.second; })); + auto rf = num_racks * tests::random::get_int(1UL, max_rf_factor); + options.emplace(dc, fmt::to_string(rf)); + } + return options; + }; + + auto options = make_random_options(); + size_t tablet_count = 1 + tests::random::get_int(99); + testlog.debug("tablet_count={} rf_options={}", tablet_count, options); + locator::replication_strategy_params params(options, tablet_count); + auto ars_ptr = abstract_replication_strategy::create_replication_strategy( + "NetworkTopologyStrategy", params); + auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware(); + BOOST_REQUIRE(tab_awr_ptr); + auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get(); + auto base_table_id = base_schema->id(); + testlog.debug("base_table_id={}", base_table_id); + auto view_table_id = view_schema->id(); + auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get(); + testlog.debug("view_table_id={}", view_table_id); + + stm.mutate_token_metadata([&] (token_metadata& tm) { + tm.tablets().set_tablet_map(base_table_id, base_tmap); + tm.tablets().set_tablet_map(view_table_id, view_tmap); + return make_ready_future(); + }).get(); + + tmptr = stm.get(); + auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr); + auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr); + + auto& topology = tmptr->get_topology(); + testlog.debug("topology: {}", topology.get_datacenter_racks()); + + // Test tablets rack-aware base-view pairing + auto base_token = dht::token::get_random_token(); + auto view_token = dht::token::get_random_token(); + bool use_legacy_self_pairing = false; + bool use_tablets_basic_rack_aware_view_pairing = true; + const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas; + replica::cf_stats cf_stats; + std::unordered_map base_to_view_pairing; + std::unordered_map view_to_base_pairing; + for (const auto& base_replica : base_replicas) { + auto& base_host = base_replica.host; + auto view_ep_opt = db::view::get_view_natural_endpoint( + base_host, + base_erm, + view_erm, + *ars_ptr, + base_token, + view_token, + use_legacy_self_pairing, + use_tablets_basic_rack_aware_view_pairing, + cf_stats); + + // view pair must be found + BOOST_REQUIRE(view_ep_opt); + auto& view_ep = *view_ep_opt; + + // Assert pairing uniqueness + auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep); + BOOST_REQUIRE(inserted_base_pair); + auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host); + BOOST_REQUIRE(inserted_view_pair); + + auto& base_location = topology.find_node(base_host)->dc_rack(); + auto& view_location = topology.find_node(view_ep)->dc_rack(); + + // Assert dc- and rack- aware pairing + BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc); + BOOST_REQUIRE_EQUAL(base_location.rack, view_location.rack); + } +} + +// Called in a seastar thread +void test_complex_rack_aware_view_pairing_test(bool more_or_less) { + auto my_address = gms::inet_address("localhost"); + + // Create the RackInferringSnitch + snitch_config cfg; + cfg.listen_address = my_address; + cfg.broadcast_address = my_address; + cfg.name = "RackInferringSnitch"; + sharded snitch; + snitch.start(cfg).get(); + auto stop_snitch = defer([&snitch] { snitch.stop().get(); }); + snitch.invoke_on_all(&snitch_ptr::start).get(); + + locator::token_metadata::config tm_cfg; + tm_cfg.topo_cfg.this_endpoint = my_address; + tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; + + std::map node_count_per_dc; + std::map> node_count_per_rack; + std::vector ring_points; + + auto& random_engine = seastar::testing::local_random_engine; + unsigned shard_count = 2; + size_t num_dcs = 1 + tests::random::get_int(3); + + // Generate a random cluster + double point = 1; + for (size_t dc = 0; dc < num_dcs; ++dc) { + sstring dc_name = fmt::format("{}", 100 + dc); + size_t num_racks = 2 + tests::random::get_int(4); + for (size_t rack = 0; rack < num_racks; ++rack) { + sstring rack_name = fmt::format("{}", 10 + rack); + size_t rack_nodes = 1 + tests::random::get_int(2); + for (size_t i = 1; i <= rack_nodes; ++i) { + ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i))); + node_count_per_dc[dc_name]++; + node_count_per_rack[dc_name][rack_name]++; + point++; + } + } + } + + testlog.debug("node_count_per_rack={}", node_count_per_rack); + + // Initialize the token_metadata + locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { + auto& topo = tm.get_topology(); + for (const auto& [ring_point, endpoint, id] : ring_points) { + std::unordered_set tokens; + tokens.insert(token{tests::d2t(ring_point / ring_points.size())}); + topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count); + co_await tm.update_normal_tokens(std::move(tokens), id); + } + }).get(); + + auto base_schema = schema_builder("ks", "base") + .with_column("k", utf8_type, column_kind::partition_key) + .with_column("v", utf8_type) + .build(); + + auto view_schema = schema_builder("ks", "view") + .with_column("v", utf8_type, column_kind::partition_key) + .with_column("k", utf8_type) + .build(); + + auto tmptr = stm.get(); + + // Create the replication strategy + auto make_random_options = [&] () { + auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to(); + std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine); + std::map options; + for (const auto& dc : option_dcs) { + auto num_racks = node_count_per_rack.at(dc).size(); + auto rf = more_or_less ? + tests::random::get_int(num_racks, node_count_per_dc[dc]) : + tests::random::get_int(1UL, num_racks); + options.emplace(dc, fmt::to_string(rf)); + } + return options; + }; + + auto options = make_random_options(); + size_t tablet_count = 1 + tests::random::get_int(99); + testlog.debug("tablet_count={} rf_options={}", tablet_count, options); + locator::replication_strategy_params params(options, tablet_count); + auto ars_ptr = abstract_replication_strategy::create_replication_strategy( + "NetworkTopologyStrategy", params); + auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware(); + BOOST_REQUIRE(tab_awr_ptr); + auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get(); + auto base_table_id = base_schema->id(); + testlog.debug("base_table_id={}", base_table_id); + auto view_table_id = view_schema->id(); + auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get(); + testlog.debug("view_table_id={}", view_table_id); + + stm.mutate_token_metadata([&] (token_metadata& tm) { + tm.tablets().set_tablet_map(base_table_id, base_tmap); + tm.tablets().set_tablet_map(view_table_id, view_tmap); + return make_ready_future(); + }).get(); + + tmptr = stm.get(); + auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr); + auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr); + + auto& topology = tmptr->get_topology(); + testlog.debug("topology: {}", topology.get_datacenter_racks()); + + // Test tablets rack-aware base-view pairing + auto base_token = dht::token::get_random_token(); + auto view_token = dht::token::get_random_token(); + bool use_legacy_self_pairing = false; + bool use_tablets_basic_rack_aware_view_pairing = true; + const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas; + replica::cf_stats cf_stats; + std::unordered_map base_to_view_pairing; + std::unordered_map view_to_base_pairing; + std::unordered_map same_rack_pairs; + std::unordered_map cross_rack_pairs; + for (const auto& base_replica : base_replicas) { + auto& base_host = base_replica.host; + auto view_ep_opt = db::view::get_view_natural_endpoint( + base_host, + base_erm, + view_erm, + *ars_ptr, + base_token, + view_token, + use_legacy_self_pairing, + use_tablets_basic_rack_aware_view_pairing, + cf_stats); + + // view pair must be found + if (!view_ep_opt) { + BOOST_FAIL(format("Could not pair base_host={} base_token={} view_token={}", base_host, base_token, view_token)); + } + BOOST_REQUIRE(view_ep_opt); + auto& view_ep = *view_ep_opt; + + // Assert pairing uniqueness + auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep); + BOOST_REQUIRE(inserted_base_pair); + auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host); + BOOST_REQUIRE(inserted_view_pair); + + auto& base_location = topology.find_node(base_host)->dc_rack(); + auto& view_location = topology.find_node(view_ep)->dc_rack(); + + // Assert dc- and rack- aware pairing + BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc); + + if (base_location.rack == view_location.rack) { + same_rack_pairs[base_location.dc]++; + } else { + cross_rack_pairs[base_location.dc]++; + } + } + for (const auto& [dc, rf_opt] : options) { + auto rf = std::stol(rf_opt); + BOOST_REQUIRE_EQUAL(same_rack_pairs[dc] + cross_rack_pairs[dc], rf); + } +} + +SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_lt_racks) { + test_complex_rack_aware_view_pairing_test(false); +} + +SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_gt_racks) { + test_complex_rack_aware_view_pairing_test(true); +} + BOOST_AUTO_TEST_SUITE_END()