From d9283bd025b4a4e525137fa8f077ff7517ba0173 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 2 Nov 2023 13:15:32 +0400 Subject: [PATCH] tablets: switch to token_metadata2 locator_topology_test, network_topology_strategy_test and tablets_test are fully switched to the host_id-based token_metadata, meaning they no longer populate the old token_metadata. All the boost and topology tests pass with this change. --- locator/abstract_replication_strategy.cc | 4 +- locator/load_sketch.hh | 4 +- locator/network_topology_strategy.cc | 15 +- locator/tablet_metadata_guard.hh | 2 +- locator/tablet_sharder.hh | 4 +- locator/tablets.cc | 20 +- replica/table.cc | 2 +- service/storage_service.cc | 8 +- service/tablet_allocator.cc | 10 +- service/tablet_allocator.hh | 2 +- test/boost/locator_topology_test.cc | 19 +- test/boost/network_topology_strategy_test.cc | 190 +++++++++---------- test/boost/tablets_test.cc | 185 +++++++++--------- 13 files changed, 225 insertions(+), 240 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index f66e10d8bc..f06a4924e4 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -221,7 +221,7 @@ insert_token_range_to_sorted_container_while_unwrapping( dht::token_range_vector vnode_effective_replication_map::do_get_ranges(noncopyable_function consider_range_for_endpoint) const { dht::token_range_vector ret; - const auto& tm = *_tmptr; + const auto& tm = *_tmptr->get_new(); const auto& sorted_tokens = tm.sorted_tokens(); if (sorted_tokens.empty()) { on_internal_error(rslogger, "Token metadata is empty"); @@ -305,7 +305,7 @@ vnode_effective_replication_map::get_primary_ranges(inet_address ep) const { dht::token_range_vector vnode_effective_replication_map::get_primary_ranges_within_dc(inet_address ep) const { - const topology& topo = _tmptr->get_topology(); + const topology& topo = _tmptr->get_new()->get_topology(); sstring local_dc = topo.get_datacenter(ep); std::unordered_set local_dc_nodes = topo.get_datacenter_endpoints().at(local_dc); // The callback function below is called for each endpoint diff --git a/locator/load_sketch.hh b/locator/load_sketch.hh index df4ac4bbfe..fb663ad5aa 100644 --- a/locator/load_sketch.hh +++ b/locator/load_sketch.hh @@ -56,7 +56,7 @@ class load_sketch { } }; std::unordered_map _nodes; - token_metadata_ptr _tm; + token_metadata2_ptr _tm; private: tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { // We reflect migrations in the load as if they already happened, @@ -65,7 +65,7 @@ private: } public: - load_sketch(token_metadata_ptr tm) + load_sketch(token_metadata2_ptr tm) : _tm(std::move(tm)) { } diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc index b2f7cc31ac..4a18963820 100644 --- a/locator/network_topology_strategy.cc +++ b/locator/network_topology_strategy.cc @@ -300,23 +300,23 @@ future network_topology_strategy::allocate_tablets_for_new_table(sch } tablet_map tablets(tablet_count); - load_sketch load(tm); + load_sketch load(tm->get_new_strong()); co_await load.populate(); // FIXME: Don't use tokens to distribute nodes. // The following reuses the existing token-based algorithm used by NetworkTopologyStrategy. - assert(!tm->sorted_tokens().empty()); - auto token_range = tm->ring_range(dht::token::get_random_token()); + assert(!tm->get_new()->sorted_tokens().empty()); + auto token_range = tm->get_new()->ring_range(dht::token::get_random_token()); for (tablet_id tb : tablets.tablet_ids()) { - natural_endpoints_tracker tracker(*tm, _dc_rep_factor); + natural_endpoints_tracker tracker(*tm->get_new(), _dc_rep_factor); while (true) { co_await coroutine::maybe_yield(); if (token_range.begin() == token_range.end()) { - token_range = tm->ring_range(dht::minimum_token()); + token_range = tm->get_new()->ring_range(dht::minimum_token()); } - inet_address ep = *tm->get_endpoint(*token_range.begin()); + locator::host_id ep = *tm->get_new()->get_endpoint(*token_range.begin()); token_range.drop_front(); if (tracker.add_endpoint_and_check_if_done(ep)) { break; @@ -325,8 +325,7 @@ future network_topology_strategy::allocate_tablets_for_new_table(sch tablet_replica_set replicas; for (auto&& ep : tracker.replicas()) { - auto host = tm->get_host_id(ep); - replicas.emplace_back(tablet_replica{host, load.next_shard(host)}); + replicas.emplace_back(tablet_replica{ep, load.next_shard(ep)}); } tablets.set_tablet(tb, tablet_info{std::move(replicas)}); diff --git a/locator/tablet_metadata_guard.hh b/locator/tablet_metadata_guard.hh index 0731ea8faa..127a0ce137 100644 --- a/locator/tablet_metadata_guard.hh +++ b/locator/tablet_metadata_guard.hh @@ -52,7 +52,7 @@ public: /// Returns tablet_map for the table of the tablet associated with this guard. /// The result is valid until the next deferring point. const locator::tablet_map& get_tablet_map() { - return get_token_metadata()->tablets().get_tablet_map(_tablet.table); + return get_token_metadata()->get_new()->tablets().get_tablet_map(_tablet.table); } }; diff --git a/locator/tablet_sharder.hh b/locator/tablet_sharder.hh index b133d272db..74a5c1fdd5 100644 --- a/locator/tablet_sharder.hh +++ b/locator/tablet_sharder.hh @@ -17,7 +17,7 @@ namespace locator { /// Implements sharder object which reflects assignment of tablets of a given table to local shards. /// Token ranges which don't have local tablets are reported to belong to shard 0. class tablet_sharder : public dht::sharder { - const token_metadata& _tm; + const token_metadata2& _tm; table_id _table; mutable const tablet_map* _tmap = nullptr; private: @@ -29,7 +29,7 @@ private: } } public: - tablet_sharder(const token_metadata& tm, table_id table) + tablet_sharder(const token_metadata2& tm, table_id table) : _tm(tm) , _table(table) { } diff --git a/locator/tablets.cc b/locator/tablets.cc index 3b731324b0..38a55852fa 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -115,7 +115,7 @@ const tablet_map& tablet_metadata::get_tablet_map(table_id id) const { try { return _tablets.at(id); } catch (const std::out_of_range&) { - throw std::runtime_error(format("Tablet map not found for table {}", id)); + throw_with_backtrace(format("Tablet map not found for table {}", id)); } } @@ -338,12 +338,12 @@ private: inet_address_vector_replica_set result; result.reserve(replicas.size()); for (auto&& replica : replicas) { - result.emplace_back(_tmptr->get_endpoint_for_host_id(replica.host)); + result.emplace_back(_tmptr->get_new()->get_endpoint_for_host_id(replica.host)); } return result; } const tablet_map& get_tablet_map() const { - return _tmptr->tablets().get_tablet_map(_table); + return _tmptr->get_new()->tablets().get_tablet_map(_table); } public: tablet_effective_replication_map(table_id table, @@ -352,7 +352,7 @@ public: size_t replication_factor) : effective_replication_map(std::move(rs), std::move(tmptr), replication_factor) , _table(table) - , _sharder(*_tmptr, table) + , _sharder(*_tmptr->get_new(), table) { } virtual ~tablet_effective_replication_map() = default; @@ -399,7 +399,7 @@ public: case write_replica_set_selector::both: tablet_logger.trace("get_pending_endpoints({}): table={}, tablet={}, replica={}", search_token, _table, tablet, info->pending_replica); - return {_tmptr->get_endpoint_for_host_id(info->pending_replica.host)}; + return {_tmptr->get_new()->get_endpoint_for_host_id(info->pending_replica.host)}; case write_replica_set_selector::next: return {}; } @@ -466,7 +466,7 @@ public: } virtual bool has_pending_ranges(inet_address endpoint) const override { - const auto host_id = _tmptr->get_host_id_if_known(endpoint); + const auto host_id = _tmptr->get_new()->get_host_id_if_known(endpoint); if (!host_id.has_value()) { return false; } @@ -480,11 +480,11 @@ public: virtual std::unique_ptr make_splitter() const override { class splitter : public token_range_splitter { - token_metadata_ptr _tmptr; // To keep the tablet map alive. + token_metadata2_ptr _tmptr; // To keep the tablet map alive. const tablet_map& _tmap; std::optional _next; public: - splitter(token_metadata_ptr tmptr, const tablet_map& tmap) + splitter(token_metadata2_ptr tmptr, const tablet_map& tmap) : _tmptr(std::move(tmptr)) , _tmap(tmap) { } @@ -502,7 +502,7 @@ public: return t; } }; - return std::make_unique(_tmptr, get_tablet_map()); + return std::make_unique(_tmptr->get_new_strong(), get_tablet_map()); } const dht::sharder& get_sharder(const schema& s) const override { @@ -554,7 +554,7 @@ effective_replication_map_ptr tablet_aware_replication_strategy::do_make_replica void tablet_metadata_guard::check() noexcept { auto erm = _table->get_effective_replication_map(); - auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_tablet.table); + auto& tmap = erm->get_token_metadata_ptr()->get_new()->tablets().get_tablet_map(_tablet.table); auto* trinfo = tmap.get_tablet_transition_info(_tablet.tablet); if (bool(_stage) != bool(trinfo) || (_stage && _stage != trinfo->stage)) { _abort_source.request_abort(); diff --git a/replica/table.cc b/replica/table.cc index 2488a507b7..2da718f9fd 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -569,7 +569,7 @@ private: const locator::tablet_map& tablet_map() const { // FIXME: cheaper way to retrieve tablet_map than looking up every time in tablet_metadata's map. auto& tm = erm()->get_token_metadata(); - return tm.tablets().get_tablet_map(schema()->id()); + return tm.get_new()->tablets().get_tablet_map(schema()->id()); } public: tablet_compaction_group_manager(replica::table& t) : _t(t) {} diff --git a/service/storage_service.cc b/service/storage_service.cc index a7afc1069e..ee1f63a677 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1608,7 +1608,7 @@ class topology_coordinator { schema_ptr, locator::global_tablet_id, const locator::tablet_transition_info&)> func) { - auto tm = get_token_metadata_ptr(); + auto tm = get_token_metadata_ptr()->get_new(); for (auto&& [table, tmap] : tm->tablets().all_tables()) { co_await coroutine::maybe_yield(); auto s = _db.find_schema(table); @@ -1622,7 +1622,7 @@ class topology_coordinator { void generate_migration_update(std::vector& out, const group0_guard& guard, const tablet_migration_info& mig) { auto s = _db.find_schema(mig.tablet.table); - auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table); + auto& tmap = get_token_metadata_ptr()->get_new()->tablets().get_tablet_map(mig.tablet.table); auto last_token = tmap.get_last_token(mig.tablet.tablet); if (tmap.get_tablet_transition_info(mig.tablet.tablet)) { slogger.warn("Tablet already in transition, ignoring migration: {}", mig); @@ -1781,7 +1781,7 @@ class topology_coordinator { } } if (!preempt) { - auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr()); + auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr()->get_new_strong()); if (!drain || plan.has_nodes_to_drain()) { co_await generate_migration_updates(updates, guard, plan); } @@ -2562,7 +2562,7 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua slogger.debug("raft topology: Evaluating tablet balance"); auto tm = get_token_metadata_ptr(); - auto plan = co_await _tablet_allocator.balance_tablets(tm); + auto plan = co_await _tablet_allocator.balance_tablets(tm->get_new_strong()); if (plan.empty()) { slogger.debug("raft topology: Tablets are balanced"); co_return false; diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index b29e825ede..e66f9127c9 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -199,7 +199,7 @@ class load_balancer { std::optional target_load_sketch; - future get_load_sketch(const token_metadata_ptr& tm) { + future get_load_sketch(const token_metadata2_ptr& tm) { if (!target_load_sketch) { target_load_sketch.emplace(tm); co_await target_load_sketch->populate(id); @@ -255,7 +255,7 @@ class load_balancer { const size_t max_write_streaming_load = 2; const size_t max_read_streaming_load = 4; - token_metadata_ptr _tm; + token_metadata2_ptr _tm; load_balancer_stats_manager& _stats; private: tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { @@ -290,7 +290,7 @@ private: } public: - load_balancer(token_metadata_ptr tm, load_balancer_stats_manager& stats) + load_balancer(token_metadata2_ptr tm, load_balancer_stats_manager& stats) : _tm(std::move(tm)) , _stats(stats) { } @@ -819,7 +819,7 @@ public: _stopped = true; } - future balance_tablets(token_metadata_ptr tm) { + future balance_tablets(token_metadata2_ptr tm) { load_balancer lb(tm, _load_balancer_stats); co_return co_await lb.make_plan(); } @@ -869,7 +869,7 @@ future<> tablet_allocator::stop() { return impl().stop(); } -future tablet_allocator::balance_tablets(locator::token_metadata_ptr tm) { +future tablet_allocator::balance_tablets(locator::token_metadata2_ptr tm) { return impl().balance_tablets(tm); } diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 4402f40cb1..3b90e6d208 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -90,7 +90,7 @@ public: /// /// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account. /// - future balance_tablets(locator::token_metadata_ptr); + future balance_tablets(locator::token_metadata2_ptr); /// Should be called when the node is no longer a leader. void on_leadership_lost(); diff --git a/test/boost/locator_topology_test.cc b/test/boost/locator_topology_test.cc index 5245e96825..730f33b8dd 100644 --- a/test/boost/locator_topology_test.cc +++ b/test/boost/locator_topology_test.cc @@ -263,23 +263,24 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) { shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ topology::config{ .this_endpoint = ip1, + .this_host_id = host1 } }); stm.mutate_token_metadata([&] (token_metadata& tm) { - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, node1_shard_count); - tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, node2_shard_count); - tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, node3_shard_count); + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, node1_shard_count); + tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, node2_shard_count); + tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, node3_shard_count); return make_ready_future<>(); }).get(); // Check that allocation is even when starting from empty state { auto tm = stm.get(); - load_sketch load(tm); + load_sketch load(tm->get_new_strong()); load.populate().get(); std::vector node1_shards(node1_shard_count, 0); @@ -341,13 +342,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) { auto table = table_id(utils::make_random_uuid()); tab_meta.set_tablet_map(table, tmap); - tm.set_tablets(std::move(tab_meta)); + tm.get_new()->set_tablets(std::move(tab_meta)); return make_ready_future<>(); }).get(); { auto tm = stm.get(); - load_sketch load(tm); + load_sketch load(tm->get_new_strong()); load.populate().get(); // host3 has max shard load of 3 and 3 shards, and 4 tablets allocated. diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index c37016ddc5..eeb6ed8896 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -72,7 +72,7 @@ static void check_ranges_are_sorted(vnode_effective_replication_map_ptr erm, gms void strategy_sanity_check( replication_strategy_ptr ars_ptr, - const token_metadata& tm, + const token_metadata2_ptr& tm, const std::map& options) { const network_topology_strategy* nts_ptr = @@ -90,16 +90,16 @@ void strategy_sanity_check( total_rf += rf; } - BOOST_CHECK(ars_ptr->get_replication_factor(tm) == total_rf); + BOOST_CHECK(ars_ptr->get_replication_factor(token_metadata(tm)) == total_rf); } void endpoints_check( replication_strategy_ptr ars_ptr, - const token_metadata& tm, + const token_metadata2_ptr& tm, const inet_address_vector_replica_set& endpoints, const locator::topology& topo) { - auto&& nodes_per_dc = tm.get_topology().get_datacenter_endpoints(); + auto&& nodes_per_dc = tm->get_topology().get_datacenter_endpoints(); const network_topology_strategy* nts_ptr = dynamic_cast(ars_ptr.get()); @@ -111,7 +111,7 @@ void endpoints_check( // Check the total RF BOOST_CHECK(endpoints.size() == total_rf); - BOOST_CHECK(total_rf <= ars_ptr->get_replication_factor(tm)); + BOOST_CHECK(total_rf <= ars_ptr->get_replication_factor(token_metadata(tm))); // Check the uniqueness std::unordered_set ep_set(endpoints.begin(), endpoints.end()); @@ -156,19 +156,19 @@ auto d2t = [](double d) -> int64_t { void full_ring_check(const std::vector& ring_points, const std::map& options, replication_strategy_ptr ars_ptr, - locator::token_metadata_ptr tmptr) { + locator::token_metadata2_ptr tmptr) { auto& tm = *tmptr; const auto& topo = tm.get_topology(); - strategy_sanity_check(ars_ptr, tm, options); + strategy_sanity_check(ars_ptr, tmptr, options); - auto erm = calculate_effective_replication_map(ars_ptr, tmptr).get0(); + auto erm = calculate_effective_replication_map(ars_ptr, make_token_metadata_ptr(tmptr)).get0(); for (auto& rp : ring_points) { double cur_point1 = rp.point - 0.5; token t1(dht::token::kind::key, d2t(cur_point1 / ring_points.size())); auto endpoints1 = erm->get_natural_endpoints(t1); - endpoints_check(ars_ptr, tm, endpoints1, topo); + endpoints_check(ars_ptr, tmptr, endpoints1, topo); print_natural_endpoints(cur_point1, endpoints1); @@ -181,7 +181,7 @@ void full_ring_check(const std::vector& ring_points, token t2(dht::token::kind::key, d2t(cur_point2 / ring_points.size())); auto endpoints2 = erm->get_natural_endpoints(t2); - endpoints_check(ars_ptr, tm, endpoints2, topo); + endpoints_check(ars_ptr, tmptr, endpoints2, topo); check_ranges_are_sorted(erm, rp.host); BOOST_CHECK(endpoints1 == endpoints2); } @@ -190,7 +190,7 @@ void full_ring_check(const std::vector& ring_points, void full_ring_check(const tablet_map& tmap, const std::map& options, replication_strategy_ptr rs_ptr, - locator::token_metadata_ptr tmptr) { + locator::token_metadata2_ptr tmptr) { auto& tm = *tmptr; const auto& topo = tm.get_topology(); @@ -204,7 +204,7 @@ void full_ring_check(const tablet_map& tmap, }; for (tablet_id tb : tmap.tablet_ids()) { - endpoints_check(rs_ptr, tm, to_endpoint_set(tmap.get_tablet_info(tb).replicas), topo); + endpoints_check(rs_ptr, tmptr, to_endpoint_set(tmap.get_tablet_info(tb).replicas), topo); } } @@ -251,21 +251,13 @@ void simple_test() { // Initialize the token_metadata stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - auto update_tm = [&](generic_token_metadata& tm) -> future<> { - auto& topo = tm.get_topology(); - for (const auto& [ring_point, endpoint, id] : ring_points) { - std::unordered_set tokens; - tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())}); - topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal); - if constexpr(std::is_same_v) { - co_await tm.update_normal_tokens(std::move(tokens), endpoint); - } else { - co_await tm.update_normal_tokens(std::move(tokens), id); - } - } - }; - co_await update_tm(tm); - co_await update_tm(*tm.get_new()); + auto& topo = tm.get_new()->get_topology(); + for (const auto& [ring_point, endpoint, id] : ring_points) { + std::unordered_set tokens; + tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())}); + topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal); + co_await tm.get_new()->update_normal_tokens(std::move(tokens), id); + } }).get(); ///////////////////////////////////// @@ -279,7 +271,7 @@ void simple_test() { auto ars_ptr = abstract_replication_strategy::create_replication_strategy( "NetworkTopologyStrategy", options323); - full_ring_check(ring_points, options323, ars_ptr, stm.get()); + full_ring_check(ring_points, options323, ars_ptr, stm.get()->get_new_strong()); /////////////// // Create the replication strategy @@ -292,7 +284,7 @@ void simple_test() { ars_ptr = abstract_replication_strategy::create_replication_strategy( "NetworkTopologyStrategy", options320); - full_ring_check(ring_points, options320, ars_ptr, stm.get()); + full_ring_check(ring_points, options320, ars_ptr, stm.get()->get_new_strong()); // // Check cache invalidation: invalidate the cache and run a full ring @@ -301,11 +293,10 @@ void simple_test() { // corresponding check will fail. // stm.mutate_token_metadata([] (token_metadata& tm) { - tm.invalidate_cached_rings(); tm.get_new()->invalidate_cached_rings(); return make_ready_future<>(); }).get(); - full_ring_check(ring_points, options320, ars_ptr, stm.get()); + full_ring_check(ring_points, options320, ars_ptr, stm.get()->get_new_strong()); } // Run in a seastar thread. @@ -367,25 +358,17 @@ void heavy_origin_test() { } stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - auto update_tm = [&](generic_token_metadata& tm) -> future<> { - auto& topo = tm.get_topology(); - for (const auto& [ring_point, endpoint, id] : ring_points) { - topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal); - if constexpr (std::is_same_v) { - co_await tm.update_normal_tokens(tokens[endpoint], endpoint); - } else { - co_await tm.update_normal_tokens(tokens[endpoint], id); - } - } - }; - co_await update_tm(tm); - co_await update_tm(*tm.get_new()); + auto& topo = tm.get_new()->get_topology(); + for (const auto& [ring_point, endpoint, id] : ring_points) { + topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal); + co_await tm.get_new()->update_normal_tokens(tokens[endpoint], id); + } }).get(); auto ars_ptr = abstract_replication_strategy::create_replication_strategy( "NetworkTopologyStrategy", config_options); - full_ring_check(ring_points, config_options, ars_ptr, stm.get()); + full_ring_check(ring_points, config_options, ars_ptr, stm.get()->get_new_strong()); } @@ -431,13 +414,13 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) { // Initialize the token_metadata stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - auto& topo = tm.get_topology(); + auto& topo = tm.get_new()->get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { std::unordered_set tokens; tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())}); topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal, 1); - tm.update_host_id(id, endpoint); - co_await tm.update_normal_tokens(std::move(tokens), endpoint); + tm.get_new()->update_host_id(id, endpoint); + co_await tm.get_new()->update_normal_tokens(std::move(tokens), id); } }).get(); @@ -462,7 +445,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) { .build(); auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0(); - full_ring_check(tmap, options323, ars_ptr, stm.get()); + full_ring_check(tmap, options323, ars_ptr, stm.get()->get_new_strong()); /////////////// // Create the replication strategy @@ -479,7 +462,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) { BOOST_REQUIRE(tab_awr_ptr); tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0(); - full_ring_check(tmap, options320, ars_ptr, stm.get()); + full_ring_check(tmap, options320, ars_ptr, stm.get()->get_new_strong()); // Test the case of not enough nodes to meet RF in DC 102 std::map options324 = { @@ -495,7 +478,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) { BOOST_REQUIRE(tab_awr_ptr); tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0(); - full_ring_check(tmap, options324, ars_ptr, stm.get()); + full_ring_check(tmap, options324, ars_ptr, stm.get()->get_new_strong()); } /** @@ -508,7 +491,7 @@ static size_t get_replication_factor(const sstring& dc, } static bool has_sufficient_replicas(const sstring& dc, - const std::unordered_map>& dc_replicas, + const std::unordered_map>& dc_replicas, const std::unordered_map>& all_endpoints, const std::unordered_map& datacenters) noexcept { auto dc_replicas_it = dc_replicas.find(dc); @@ -526,7 +509,7 @@ static bool has_sufficient_replicas(const sstring& dc, } static bool has_sufficient_replicas( - const std::unordered_map>& dc_replicas, + const std::unordered_map>& dc_replicas, const std::unordered_map>& all_endpoints, const std::unordered_map& datacenters) noexcept { @@ -540,18 +523,18 @@ static bool has_sufficient_replicas( return true; } -static locator::endpoint_set calculate_natural_endpoints( - const token& search_token, const token_metadata& tm, +static locator::host_id_set calculate_natural_endpoints( + const token& search_token, const token_metadata2& tm, const locator::topology& topo, const std::unordered_map& datacenters) { // // We want to preserve insertion order so that the first added endpoint // becomes primary. // - locator::endpoint_set replicas; + locator::host_id_set replicas; // replicas we have found in each DC - std::unordered_map> dc_replicas; + std::unordered_map> dc_replicas; // tracks the racks we have already placed replicas in std::unordered_map> seen_racks; // @@ -559,7 +542,7 @@ static locator::endpoint_set calculate_natural_endpoints( // when we relax the rack uniqueness we can append this to the current // result so we don't have to wind back the iterator // - std::unordered_map + std::unordered_map skipped_dc_endpoints; // @@ -600,7 +583,7 @@ static locator::endpoint_set calculate_natural_endpoints( break; } - inet_address ep = *tm.get_endpoint(next); + host_id ep = *tm.get_endpoint(next); sstring dc = topo.get_location(ep).dc; auto& seen_racks_dc_set = seen_racks[dc]; @@ -639,7 +622,7 @@ static locator::endpoint_set calculate_natural_endpoints( auto skipped_it = skipped_dc_endpoints_set.begin(); while (skipped_it != skipped_dc_endpoints_set.end() && !has_sufficient_replicas(dc, dc_replicas, all_endpoints, datacenters)) { - inet_address skipped = *skipped_it++; + host_id skipped = *skipped_it++; dc_replicas_dc_set.insert(skipped); replicas.push_back(skipped); } @@ -667,25 +650,25 @@ static void test_equivalence(const shared_token_metadata& stm, const locator::to return std::make_pair(p.first, to_sstring(p.second)); }))); - const token_metadata& tm = *stm.get(); + const token_metadata2& tm = *stm.get()->get_new(); for (size_t i = 0; i < 1000; ++i) { auto token = dht::token::get_random_token(); auto expected = calculate_natural_endpoints(token, tm, topo, datacenters); - auto actual = get(nts.calculate_natural_endpoints(token, tm, false).get0()); + auto actual = get(nts.calculate_natural_endpoints(token, token_metadata(stm.get()->get_new_strong()), true).get0()); // Because the old algorithm does not put the nodes in the correct order in the case where more replicas // are required than there are racks in a dc, we accept different order as long as the primary // replica is the same. BOOST_REQUIRE_EQUAL(expected[0], actual[0]); - BOOST_REQUIRE_EQUAL(std::set(expected.begin(), expected.end()), - std::set(actual.begin(), actual.end())); + BOOST_REQUIRE_EQUAL(std::set(expected.begin(), expected.end()), + std::set(actual.begin(), actual.end())); } } -void generate_topology(topology& topo, const std::unordered_map datacenters, const std::vector& nodes) { +void generate_topology(topology& topo, const std::unordered_map datacenters, const std::vector& nodes) { auto& e1 = seastar::testing::local_random_engine; std::unordered_map racks_per_dc; @@ -705,11 +688,12 @@ void generate_topology(topology& topo, const std::unordered_map out = std::fill_n(out, rf, std::cref(dc)); } + unsigned i = 0; for (auto& node : nodes) { const sstring& dc = dcs[udist(0, dcs.size() - 1)(e1)]; auto rc = racks_per_dc.at(dc); auto r = udist(0, rc)(e1); - topo.add_node(host_id::create_random_id(), node, {dc, to_sstring(r)}, locator::node::state::normal); + topo.add_node(node, inet_address((127u << 24) | ++i), {dc, to_sstring(r)}, locator::node::state::normal); } } @@ -730,10 +714,10 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) { { "rf5_2", 5 }, { "rf5_3", 5 }, }; - std::vector nodes; + std::vector nodes; nodes.reserve(NODES); std::generate_n(std::back_inserter(nodes), NODES, [i = 0u]() mutable { - return inet_address((127u << 24) | ++i); + return host_id{utils::UUID(0, ++i)}; }); for (size_t run = 0; run < RUNS; ++run) { @@ -744,7 +728,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) { while (random_tokens.size() < nodes.size() * VNODES) { random_tokens.insert(dht::token::get_random_token()); } - std::unordered_map> endpoint_tokens; + std::unordered_map> endpoint_tokens; auto next_token_it = random_tokens.begin(); for (auto& node : nodes) { for (size_t i = 0; i < VNODES; ++i) { @@ -752,14 +736,14 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) { next_token_it++; } } - + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - generate_topology(tm.get_topology(), datacenters, nodes); + generate_topology(tm.get_new()->get_topology(), datacenters, nodes); for (auto&& i : endpoint_tokens) { - co_await tm.update_normal_tokens(std::move(i.second), i.first); + co_await tm.get_new()->update_normal_tokens(std::move(i.second), i.first); } }).get(); - test_equivalence(stm, stm.get()->get_topology(), datacenters); + test_equivalence(stm, stm.get()->get_new()->get_topology(), datacenters); } } @@ -837,27 +821,27 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) { { "rf2", 2 }, { "rf3", 3 }, }; - std::vector nodes; + std::vector nodes; nodes.reserve(NODES); auto make_address = [] (unsigned i) { - return inet_address((127u << 24) | i); + return host_id{utils::UUID(0, i)}; }; std::generate_n(std::back_inserter(nodes), NODES, [&, i = 0u]() mutable { return make_address(++i); }); - auto bogus_address = make_address(NODES + 1); + auto bogus_address = inet_address((127u << 24) | static_cast(NODES + 1)); semaphore sem(1); shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg); stm.mutate_token_metadata([&] (token_metadata& tm) { - auto& topo = tm.get_topology(); + auto& topo = tm.get_new()->get_topology(); generate_topology(topo, datacenters, nodes); - const auto& address = nodes[tests::random::get_int(0, NODES-1)]; - const auto& a1 = nodes[tests::random::get_int(0, NODES-1)]; - const auto& a2 = nodes[tests::random::get_int(0, NODES-1)]; + const auto& address = tm.get_new()->get_endpoint_for_host_id(nodes[tests::random::get_int(0, NODES-1)]); + const auto& a1 = tm.get_new()->get_endpoint_for_host_id(nodes[tests::random::get_int(0, NODES-1)]); + const auto& a2 = tm.get_new()->get_endpoint_for_host_id(nodes[tests::random::get_int(0, NODES-1)]); topo.test_compare_endpoints(address, address, address); topo.test_compare_endpoints(address, address, a1); @@ -896,23 +880,23 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) { // get_location() should work before any node is added - BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack); + BOOST_REQUIRE(stm.get()->get_new()->get_topology().get_location() == ip1_dc_rack); stm.mutate_token_metadata([&] (token_metadata& tm) { - tm.update_host_id(host2, ip2); - tm.update_host_id(host1, ip1); // this_node added last on purpose + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host1, ip1); // this_node added last on purpose return make_ready_future<>(); }).get(); - const node* n1 = stm.get()->get_topology().find_node(host1); + const node* n1 = stm.get()->get_new()->get_topology().find_node(host1); BOOST_REQUIRE(n1); BOOST_REQUIRE(bool(n1->is_this_node())); BOOST_REQUIRE_EQUAL(n1->host_id(), host1); BOOST_REQUIRE_EQUAL(n1->endpoint(), ip1); BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack); - BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack); + BOOST_REQUIRE(stm.get()->get_new()->get_topology().get_location() == ip1_dc_rack); - const node* n2 = stm.get()->get_topology().find_node(host2); + const node* n2 = stm.get()->get_new()->get_topology().find_node(host2); BOOST_REQUIRE(n2); BOOST_REQUIRE(!bool(n2->is_this_node())); BOOST_REQUIRE_EQUAL(n2->host_id(), host2); @@ -922,45 +906,45 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) { // Removing local node stm.mutate_token_metadata([&] (token_metadata& tm) { - tm.remove_endpoint(ip1); - tm.update_host_id(host3, ip3); + tm.get_new()->remove_endpoint(host1); + tm.get_new()->update_host_id(host3, ip3); return make_ready_future<>(); }).get(); - n1 = stm.get()->get_topology().find_node(host1); + n1 = stm.get()->get_new()->get_topology().find_node(host1); BOOST_REQUIRE(!n1); - n1 = stm.get()->get_topology().find_node(ip1); + n1 = stm.get()->get_new()->get_topology().find_node(ip1); BOOST_REQUIRE(!n1); // Removing node with no local node stm.mutate_token_metadata([&] (token_metadata& tm) { - tm.remove_endpoint(ip2); + tm.get_new()->remove_endpoint(host2); return make_ready_future<>(); }).get(); - n2 = stm.get()->get_topology().find_node(host2); + n2 = stm.get()->get_new()->get_topology().find_node(host2); BOOST_REQUIRE(!n2); - n2 = stm.get()->get_topology().find_node(ip2); + n2 = stm.get()->get_new()->get_topology().find_node(ip2); BOOST_REQUIRE(!n2); // Repopulate after clear_gently() stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - co_await tm.clear_gently(); - tm.update_host_id(host2, ip2); - tm.update_host_id(host1, ip1); // this_node added last on purpose + co_await tm.get_new()->clear_gently(); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host1, ip1); // this_node added last on purpose }).get(); - n1 = stm.get()->get_topology().find_node(host1); + n1 = stm.get()->get_new()->get_topology().find_node(host1); BOOST_REQUIRE(n1); BOOST_REQUIRE(bool(n1->is_this_node())); BOOST_REQUIRE_EQUAL(n1->host_id(), host1); BOOST_REQUIRE_EQUAL(n1->endpoint(), ip1); BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack); - BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack); + BOOST_REQUIRE(stm.get()->get_new()->get_topology().get_location() == ip1_dc_rack); - n2 = stm.get()->get_topology().find_node(host2); + n2 = stm.get()->get_new()->get_topology().find_node(host2); BOOST_REQUIRE(n2); BOOST_REQUIRE(!bool(n2->is_this_node())); BOOST_REQUIRE_EQUAL(n2->host_id(), host2); @@ -970,15 +954,15 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) { // get_location() should pick up endpoint_dc_rack from node info stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - co_await tm.clear_gently(); - tm.get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::being_decommissioned); + co_await tm.get_new()->clear_gently(); + tm.get_new()->get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::being_decommissioned); }).get(); - n1 = stm.get()->get_topology().find_node(host1); + n1 = stm.get()->get_new()->get_topology().find_node(host1); BOOST_REQUIRE(n1); BOOST_REQUIRE(bool(n1->is_this_node())); BOOST_REQUIRE_EQUAL(n1->host_id(), host1); BOOST_REQUIRE_EQUAL(n1->endpoint(), ip1); BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack_v2); - BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack_v2); + BOOST_REQUIRE(stm.get()->get_new()->get_topology().get_location() == ip1_dc_rack_v2); } diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 754b216849..75430f33f5 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -433,7 +433,7 @@ SEASTAR_TEST_CASE(test_sharder) { auto table1 = table_id(utils::UUID_gen::get_time_UUID()); - token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } }); + token_metadata2 tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } }); tokm.get_topology().add_or_update_endpoint(tokm.get_topology().my_address(), h1); std::vector tablet_ids; @@ -591,7 +591,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) { // Reflects the plan in a given token metadata as if the migrations were fully executed. static -void apply_plan(token_metadata& tm, const migration_plan& plan) { +void apply_plan(token_metadata2& tm, const migration_plan& plan) { for (auto&& mig : plan.migrations()) { tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table); auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); @@ -611,7 +611,7 @@ tablet_transition_info migration_to_transition_info(const tablet_migration_info& // Reflects the plan in a given token metadata as if the migrations were started but not yet executed. static -void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) { +void apply_plan_as_in_progress(token_metadata2& tm, const migration_plan& plan) { for (auto&& mig : plan.migrations()) { tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table); auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); @@ -622,12 +622,12 @@ void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) { static void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm) { while (true) { - auto plan = talloc.balance_tablets(stm.get()).get0(); + auto plan = talloc.balance_tablets(stm.get()->get_new_strong()).get0(); if (plan.empty()) { break; } stm.mutate_token_metadata([&] (token_metadata& tm) { - apply_plan(tm, plan); + apply_plan(*tm.get_new(), plan); return make_ready_future<>(); }).get(); } @@ -636,12 +636,12 @@ void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm) { static void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_metadata& stm) { while (true) { - auto plan = talloc.balance_tablets(stm.get()).get0(); + auto plan = talloc.balance_tablets(stm.get()->get_new_strong()).get0(); if (plan.empty()) { break; } stm.mutate_token_metadata([&] (token_metadata& tm) { - apply_plan_as_in_progress(tm, plan); + apply_plan_as_in_progress(*tm.get_new(), plan); return make_ready_future<>(); }).get(); } @@ -651,7 +651,7 @@ void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_met static void execute_transitions(shared_token_metadata& stm) { stm.mutate_token_metadata([&] (token_metadata& tm) { - for (auto&& [tablet, tmap_] : tm.tablets().all_tables()) { + for (auto&& [tablet, tmap_] : tm.get_new()->tablets().all_tables()) { auto& tmap = tmap_; for (auto&& [tablet, trinfo]: tmap.transitions()) { auto ti = tmap.get_tablet_info(tablet); @@ -690,12 +690,12 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { }); stm.mutate_token_metadata([&] (auto& tm) { - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); tablet_map tmap(4); auto tid = tmap.first_tablet(); @@ -728,13 +728,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { }); tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); // Sanity check { - load_sketch load(stm.get()); + load_sketch load(stm.get()->get_new_strong()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_load(host1), 4); BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host1), 2); @@ -747,7 +747,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { rebalance_tablets(e.get_tablet_allocator().local(), stm); { - load_sketch load(stm.get()); + load_sketch load(stm.get()->template get_new_strong()); load.populate().get(); for (auto h : {host1, host2, host3}) { @@ -786,12 +786,12 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { stm.mutate_token_metadata([&](auto& tm) { const unsigned shard_count = 2; - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, shard_count); tablet_map tmap(4); @@ -825,14 +825,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { }); tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); rebalance_tablets(e.get_tablet_allocator().local(), stm); { - load_sketch load(stm.get()); + load_sketch load(stm.get()->get_new_strong()); load.populate().get(); BOOST_REQUIRE(load.get_avg_shard_load(host1) == 2); BOOST_REQUIRE(load.get_avg_shard_load(host2) == 2); @@ -840,14 +840,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { } stm.mutate_token_metadata([&](auto& tm) { - tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::left); + tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::left); return make_ready_future<>(); }).get(); rebalance_tablets(e.get_tablet_allocator().local(), stm); { - load_sketch load(stm.get()); + load_sketch load(stm.get()->get_new_strong()); load.populate().get(); BOOST_REQUIRE(load.get_avg_shard_load(host1) == 2); BOOST_REQUIRE(load.get_avg_shard_load(host2) == 2); @@ -888,14 +888,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { stm.mutate_token_metadata([&](auto& tm) { const unsigned shard_count = 1; - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_host_id(host4, ip4); - tm.update_topology(ip1, racks[0], std::nullopt, shard_count); - tm.update_topology(ip2, racks[1], std::nullopt, shard_count); - tm.update_topology(ip3, racks[0], std::nullopt, shard_count); - tm.update_topology(ip4, racks[1], node::state::being_decommissioned, + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_host_id(host4, ip4); + tm.get_new()->update_topology(host1, racks[0], std::nullopt, shard_count); + tm.get_new()->update_topology(host2, racks[1], std::nullopt, shard_count); + tm.get_new()->update_topology(host3, racks[0], std::nullopt, shard_count); + tm.get_new()->update_topology(host4, racks[1], node::state::being_decommissioned, shard_count); tablet_map tmap(4); @@ -929,14 +929,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { }); tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); rebalance_tablets(e.get_tablet_allocator().local(), stm); { - load_sketch load(stm.get()); + load_sketch load(stm.get()->get_new_strong()); load.populate().get(); BOOST_REQUIRE(load.get_avg_shard_load(host1) >= 2); BOOST_REQUIRE(load.get_avg_shard_load(host2) >= 2); @@ -947,10 +947,10 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { // Verify replicas are not collocated on racks { auto tm = stm.get(); - auto& tmap = tm->tablets().get_tablet_map(table1); + auto& tmap = tm->get_new()->tablets().get_tablet_map(table1); tmap.for_each_tablet([&](auto tid, auto& tinfo) { - auto rack1 = tm->get_topology().get_rack(tinfo.replicas[0].host); - auto rack2 = tm->get_topology().get_rack(tinfo.replicas[1].host); + auto rack1 = tm->get_new()->get_topology().get_rack(tinfo.replicas[0].host); + auto rack2 = tm->get_new()->get_topology().get_rack(tinfo.replicas[1].host); BOOST_REQUIRE(rack1 != rack2); }).get(); } @@ -989,14 +989,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { stm.mutate_token_metadata([&](auto& tm) { const unsigned shard_count = 1; - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_host_id(host4, ip4); - tm.update_topology(ip1, racks[0], std::nullopt, shard_count); - tm.update_topology(ip2, racks[0], std::nullopt, shard_count); - tm.update_topology(ip3, racks[0], std::nullopt, shard_count); - tm.update_topology(ip4, racks[1], node::state::being_decommissioned, + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_host_id(host4, ip4); + tm.get_new()->update_topology(host1, racks[0], std::nullopt, shard_count); + tm.get_new()->update_topology(host2, racks[0], std::nullopt, shard_count); + tm.get_new()->update_topology(host3, racks[0], std::nullopt, shard_count); + tm.get_new()->update_topology(host4, racks[1], node::state::being_decommissioned, shard_count); tablet_map tmap(4); @@ -1030,7 +1030,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { }); tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); @@ -1063,12 +1063,12 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { stm.mutate_token_metadata([&](auto& tm) { const unsigned shard_count = 2; - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, shard_count); tablet_map tmap(1); @@ -1082,7 +1082,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { }); tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); @@ -1118,12 +1118,12 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) }); stm.mutate_token_metadata([&] (auto& tm) { - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, 1); - tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, 1); - tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, 2); + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, 1); + tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, 1); + tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, 2); tablet_map tmap(4); std::optional tid = tmap.first_tablet(); @@ -1146,7 +1146,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) }); tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); @@ -1154,7 +1154,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) execute_transitions(stm); { - load_sketch load(stm.get()); + load_sketch load(stm.get()->get_new_strong()); load.populate().get(); for (auto h : {host1, host2, host3}) { @@ -1187,12 +1187,12 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { }); stm.mutate_token_metadata([&] (auto& tm) { - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, 1); - tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, 1); - tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, 2); + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, 1); + tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, 1); + tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, 2); tablet_map tmap(4); std::optional tid = tmap.first_tablet(); @@ -1207,20 +1207,20 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { } tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); rebalance_tablets(e.get_tablet_allocator().local(), stm); - BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get()).get0().empty()); + BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get()->get_new_strong()).get0().empty()); utils::get_local_injector().enable("tablet_allocator_shuffle"); auto disable_injection = seastar::defer([&] { utils::get_local_injector().disable("tablet_allocator_shuffle"); }); - BOOST_REQUIRE(!e.get_tablet_allocator().local().balance_tablets(stm.get()).get0().empty()); + BOOST_REQUIRE(!e.get_tablet_allocator().local().balance_tablets(stm.get()->get_new_strong()).get0().empty()); }).get(); } #endif @@ -1250,14 +1250,14 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { }); stm.mutate_token_metadata([&] (auto& tm) { - tm.update_host_id(host1, ip1); - tm.update_host_id(host2, ip2); - tm.update_host_id(host3, ip3); - tm.update_host_id(host4, ip4); - tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip4, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_host_id(host1, ip1); + tm.get_new()->update_host_id(host2, ip2); + tm.get_new()->update_host_id(host3, ip3); + tm.get_new()->update_host_id(host4, ip4); + tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.get_new()->update_topology(host4, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { @@ -1270,14 +1270,14 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { } tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); rebalance_tablets(e.get_tablet_allocator().local(), stm); { - load_sketch load(stm.get()); + load_sketch load(stm.get()->get_new_strong()); load.populate().get(); for (auto h : {host1, host2, host3, host4}) { @@ -1312,8 +1312,8 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { stm.mutate_token_metadata([&] (auto& tm) { tm.update_host_id(host1, ip1); tm.update_host_id(host2, ip2); - tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); - tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { @@ -1399,6 +1399,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { locator::topology::config { .this_endpoint = inet_address("192.168.0.1"), + .this_host_id = hosts[0], .local_dc_rack = racks[1] } }); @@ -1411,9 +1412,9 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { for (auto h : hosts) { auto ip = inet_address(format("192.168.0.{}", ++i)); auto shard_count = 2; - tm.update_host_id(h, ip); + tm.get_new()->update_host_id(h, ip); auto rack = racks[i % racks.size()]; - tm.update_topology(ip, rack, std::nullopt, shard_count); + tm.get_new()->update_topology(h, rack, std::nullopt, shard_count); if (h != hosts[0]) { // Leave the first host empty by making it invisible to allocation algorithm. hosts_by_rack[rack.rack].push_back(h); @@ -1444,7 +1445,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { } tablet_replica_set replicas; for (auto h : replica_hosts) { - auto shard_count = tm.get_topology().find_node(h)->get_shard_count(); + auto shard_count = tm.get_new()->get_topology().find_node(h)->get_shard_count(); auto shard = tests::random::get_int(0, shard_count - 1); replicas.push_back(tablet_replica {h, shard}); } @@ -1453,17 +1454,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { total_tablet_count += tmap.tablet_count(); tmeta.set_tablet_map(table, std::move(tmap)); } - tm.set_tablets(std::move(tmeta)); + tm.get_new()->set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); - testlog.debug("tablet metadata: {}", stm.get()->tablets()); + testlog.debug("tablet metadata: {}", stm.get()->get_new()->tablets()); testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size()); rebalance_tablets(e.get_tablet_allocator().local(), stm); { - load_sketch load(stm.get()); + load_sketch load(stm.get()->get_new_strong()); load.populate().get(); min_max_tracker min_max_load; @@ -1473,7 +1474,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { min_max_load.update(l); } - testlog.debug("tablet metadata: {}", stm.get()->tablets()); + testlog.debug("tablet metadata: {}", stm.get()->get_new()->tablets()); testlog.debug("Min load: {}, max load: {}", min_max_load.min(), min_max_load.max()); // FIXME: The algorithm cannot achieve balance in all cases yet, so we only check that it stops.