From 61532eb53b91abd9d48f068a1ccfae51e60b2796 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 1 Feb 2025 00:04:27 +0100 Subject: [PATCH 1/6] topology_state_machine: Introduce lock transition Will be used in load balancer tests to prevent concurrent topology operations, in particular background load balancing. load balancer will be invoked explicitly by the test. Disabling load balancer in topology is not a solution, because we want the explicit call to perform the load balancing. --- docs/dev/topology-over-raft.md | 3 +++ service/storage_service.cc | 2 ++ service/topology_coordinator.cc | 4 ++++ service/topology_state_machine.cc | 1 + service/topology_state_machine.hh | 1 + 5 files changed, 11 insertions(+) diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 5195ed2754..e3e39d6ab9 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -124,6 +124,9 @@ Additionally to specific node states, there entire topology can also be in a tra it from group 0. We also use this state to rollback a failed bootstrap or decommission. - `rollback_to_normal` - the decommission or removenode operation failed. Rollback the operation by moving the node we tried to decommission/remove back to the normal state. +- `lock` - the topology stays in this state until externally changed (to null state), preventing topology + requests from starting. Intended to be used in tests which want to prevent internally-triggered topology + operations during the test. When a node bootstraps, we create new tokens for it and a new CDC generation and enter the `commit_cdc_generation` state. Once the generation is committed, diff --git a/service/storage_service.cc b/service/storage_service.cc index 7a367a13fc..bbd4af7091 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -744,6 +744,8 @@ future<> storage_service::topology_state_load(state_change_hint hint) { return read_new_t::no; } switch (*state) { + case topology::transition_state::lock: + [[fallthrough]]; case topology::transition_state::join_group0: [[fallthrough]]; case topology::transition_state::tablet_migration: diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index f886b5fbea..5b9f907577 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -2341,6 +2341,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case topology::transition_state::tablet_resize_finalization: co_await handle_tablet_resize_finalization(std::move(guard)); break; + case topology::transition_state::lock: + release_guard(std::move(guard)); + co_await await_event(); + break; case topology::transition_state::left_token_ring: { auto node = get_node_to_work_on(std::move(guard)); diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 190ffaab66..57b99e807a 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -153,6 +153,7 @@ static std::unordered_map transition_state_ {topology::transition_state::left_token_ring, "left token ring"}, {topology::transition_state::rollback_to_normal, "rollback to normal"}, {topology::transition_state::truncate_table, "truncate table"}, + {topology::transition_state::lock, "lock"}, }; // Allows old deprecated names to be recognized and point to the correct transition. diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 8e7014c8ff..5e03e3d1d9 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -120,6 +120,7 @@ struct topology { left_token_ring, rollback_to_normal, truncate_table, + lock, }; std::optional tstate; From 3bb9d2fbdb79f9920c1e5e8c66544bae36f69d61 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 1 Feb 2025 00:07:24 +0100 Subject: [PATCH 2/6] test: cql_test_env: Expose topology_state_machine --- test/lib/cql_test_env.cc | 4 ++++ test/lib/cql_test_env.hh | 2 ++ 2 files changed, 6 insertions(+) diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 4335784dd9..9b77149850 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -411,6 +411,10 @@ public: return _token_metadata; } + virtual sharded& get_topology_state_machine() override { + return _topology_state_machine; + } + virtual future<> refresh_client_state() override { return _core_local.invoke_on_all([] (core_local_state& state) { return state.client_state.maybe_update_per_service_level_params(); diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index f313aae003..dced13e400 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -189,6 +189,8 @@ public: virtual sharded& get_shared_token_metadata() = 0; + virtual sharded& get_topology_state_machine() = 0; + data_dictionary::database data_dictionary(); virtual sharded& service_level_controller_service() = 0; From 0d259bb1757c0c7ae0b435276d8f2c89434451f2 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 1 Feb 2025 00:08:26 +0100 Subject: [PATCH 3/6] test: lib: Introduce topology_builder Will be used by load balancer tests which need more than a single-node topology, and which want to create proper schema in the database which depends on that topology, in particular creating keyspaces with replication factor > 1. We need to do that because load balancer will use replication strategy from the database as part of plan making. --- test/lib/topology_builder.hh | 210 +++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 test/lib/topology_builder.hh diff --git a/test/lib/topology_builder.hh b/test/lib/topology_builder.hh new file mode 100644 index 0000000000..743ff2f01e --- /dev/null +++ b/test/lib/topology_builder.hh @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2025-present ScyllaDB + * + */ + +/* + * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) + */ + +#pragma once + +#include "cql_test_env.hh" +#include "locator/topology.hh" +#include "gms/inet_address.hh" +#include "service/topology_mutation.hh" +#include "service/topology_state_machine.hh" +#include "service/raft/raft_group0_client.hh" +#include "locator/host_id.hh" +#include "test/lib/log.hh" +#include "version.hh" + +#include + + +/// Modifies topology inside a given cql_test_env. +/// Local node's membership is not affected, but it belongs to a different DC than those produced by this builder. +/// +/// Creating the builder locks the topology state machine so there are no concurrent topology operations +/// and load balancing. +/// The built topology is not removed when the builder is destroyed and the state machine is left locked. +/// +/// All methods expect to be run in a seastar thread. +/// +/// Examples usage: +/// +/// topology_builder topo(e); +/// auto host1 = topo.add_node(); // dc1 rack1 +/// auto host2 = topo.add_node(); // dc1 rack1 +/// topo.start_new_dc(); +/// auto host3 = topo.add_node(); // dc2 rack1 +/// auto host4 = topo.add_node(); // dc2 rack1 +/// topo.start_new_rack(); +/// auto host5 = topo.add_node(); // dc2 rack2 +/// auto host6 = topo.add_node(); // dc2 rack2 +/// +class topology_builder { +public: + using inet_address = locator::inet_address; + using endpoint_dc_rack = locator::endpoint_dc_rack; +private: + cql_test_env& _env; + int _nr_nodes = 0; + int _rack_id; + sstring _dc; + sstring _rack; +private: + inet_address make_node_address(int n) { + assert(n > 0); + int a = n % 256; + n /= 256; + int b = n % 256; + n /= 256; + assert(n < 256); + return inet_address(fmt::format("10.{}.{}.{}", n, b, a)); + } + + // Locks the topology to prevent concurrent topology operations and load balancing. + // Setting transition_state to "lock" blocks background load-balancing which could interfere with the test + // and prevents errors from load_topology_state() complaining about nodes in transition with no transition state. + void lock_topology() { + abort_source as; + auto& client = _env.get_raft_group0_client(); + service::topology& topo = _env.get_topology_state_machine().local()._topology; + + while (true) { + if (topo.tstate && *topo.tstate == service::topology::transition_state::lock) { + testlog.info("Topology is locked"); + return; + } + + auto guard = client.start_operation(as).get(); + + if (topo.tstate) { + testlog.info("Waiting for topology state machine to be idle"); + release_guard(std::move(guard)); + _env.get_topology_state_machine().local().await_not_busy().get(); + testlog.info("Woken up"); + continue; + } + + service::topology_change change({service::topology_mutation_builder(guard.write_timestamp()) + .set_transition_state(service::topology::transition_state::lock) + .build()}); + service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, "locking topology"); + try { + client.add_entry(std::move(g0_cmd), std::move(guard), as).get(); + } catch (service::group0_concurrent_modification&) { + testlog.info("Concurrent modification detected while locking topology, retrying"); + } + } + } +public: + topology_builder(cql_test_env& e) + : _env(e) + { + start_new_dc(); + lock_topology(); + } + + // Returns a new token from some sequence of unique tokens. + // Uniqueness is in the scope of the process, not just this object. + dht::token new_token() { + static std::atomic next_token = 1; + return dht::token(next_token.fetch_add(1)); + } + + // Returns the name of the currently built DC. + const sstring& dc() const { + return _dc; + } + + // Returns location of the currently built rack. + endpoint_dc_rack rack() const { + return {_dc, _rack}; + } + + // Starts building a new rack in the current DC. + // Returns location of the new rack. + endpoint_dc_rack start_new_rack() { + _rack_id++; + _rack = fmt::format("rack{}", _rack_id); + return rack(); + } + + // Starts building a new DC. + // DC is named uniquely in the scope of the process, not just this object. + endpoint_dc_rack start_new_dc() { + static std::atomic next_id = 1; + _dc = fmt::format("dc{}", next_id.fetch_add(1)); + _rack_id = 0; + return start_new_rack(); + } + + locator::host_id add_node(service::node_state state = service::node_state::normal, + unsigned shard_count = 1, + std::optional rack_override = {}) + { + ++_nr_nodes; + + auto ip = make_node_address(_nr_nodes); + auto id = locator::host_id(utils::UUID_gen::get_time_UUID()); + auto dc_rack = rack_override.value_or(rack()); + dht::token token = new_token(); + std::unordered_set tokens({token}); + + abort_source as; + auto& client = _env.get_raft_group0_client(); + + while (true) { + auto guard = client.start_operation(as).get(); + + service::topology_mutation_builder builder(guard.write_timestamp()); + builder.with_node(raft::server_id(id.uuid())) + .set("datacenter", dc_rack.dc) + .set("rack", dc_rack.rack) + .set("node_state", state) + .set("shard_count", (uint32_t) shard_count) + .set("cleanup_status", service::cleanup_status::clean) + .set("release_version", version::release()) + .set("num_tokens", (uint32_t) 1) + .set("tokens_string", fmt::format("{}", token)) + .set("tokens", tokens) + .set("supported_features", std::set()) + .set("request_id", utils::UUID()) + .set("ignore_msb", (uint32_t) 0); + service::topology_change change({builder.build()}); + service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, + format("adding node {} to topology", id)); + testlog.info("Adding node {}/{} dc={} rack={} to topology", id, ip, dc_rack.dc, dc_rack.rack); + try { + client.add_entry(std::move(g0_cmd), std::move(guard), as).get(); + break; + } catch (service::group0_concurrent_modification&) { + testlog.warn("Concurrent modification detected, retrying"); + } + } + return id; + } + + void set_node_state(locator::host_id id, service::node_state state) { + abort_source as; + auto& client = _env.get_raft_group0_client(); + while (true) { + auto guard = client.start_operation(as).get(); + service::topology_mutation_builder builder(guard.write_timestamp()); + builder.with_node(raft::server_id(id.uuid())) + .set("node_state", state); + service::topology_change change({builder.build()}); + service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, + format("node {} state={}", id, state)); + testlog.info("Changing node {} state={}", id, state); + try { + client.add_entry(std::move(g0_cmd), std::move(guard), as).get(); + break; + } catch (service::group0_concurrent_modification&) { + testlog.warn("Concurrent modification detected, retrying"); + } + } + } +}; From ca6159fbe2a664c04cfb86a73c67d272063f0ddd Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 30 Jan 2025 13:25:11 +0100 Subject: [PATCH 4/6] test: tablets_test: Create proper schema in load balancer tests This is in preparation for load balancer changes needed to respect per-table tablet hints and respecting per-shard tablet count goal. After those changes, load balancer consults with the replication strategy in the database, so we need to create proper schema in the database. To do that, we need proper topology for replication strategies which use RF > 1, otherwise keyspace creation will fail. --- test/boost/tablets_test.cc | 990 +++++++++++++------------------------ 1 file changed, 348 insertions(+), 642 deletions(-) diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 1f9d3d1274..3fd6a23033 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -12,6 +12,8 @@ #undef SEASTAR_TESTING_MAIN #include #include "test/lib/random_utils.hh" +#include "service/topology_mutation.hh" +#include "service/storage_service.hh" #include #include #include "test/lib/cql_test_env.hh" @@ -19,6 +21,7 @@ #include "test/lib/simple_schema.hh" #include "test/lib/key_utils.hh" #include "test/lib/test_utils.hh" +#include "test/lib/topology_builder.hh" #include "db/config.hh" #include "db/schema_tables.hh" #include "schema/schema_builder.hh" @@ -38,6 +41,7 @@ #include "service/topology_state_machine.hh" #include +#include BOOST_AUTO_TEST_SUITE(tablets_test) @@ -51,13 +55,6 @@ static api::timestamp_type current_timestamp(cql_test_env& e) { return utils::UUID_gen::micros_timestamp(e.get_system_keyspace().local().get_last_group0_state_id().get()) + 1; } -static utils::UUID next_uuid() { - static uint64_t counter = 1; - return utils::UUID_gen::get_time_UUID(std::chrono::system_clock::time_point( - std::chrono::duration_cast( - std::chrono::seconds(counter++)))); -} - static void verify_tablet_metadata_persistence(cql_test_env& env, const tablet_metadata& tm, api::timestamp_type& ts) { save_tablet_metadata(env.local_db(), tm, ts++).get(); @@ -109,6 +106,37 @@ future add_table(cql_test_env& e, sstring test_ks_name = "") { co_return id; } +// Run in a seastar thread +static +sstring add_keyspace(cql_test_env& e, std::unordered_map dc_rf, int initial_tablets = 0) { + static std::atomic ks_id = 0; + auto ks_name = fmt::format("keyspace{}", ks_id.fetch_add(1)); + sstring rf_options; + for (auto& [dc, rf] : dc_rf) { + rf_options += format(", '{}': {}", dc, rf); + } + e.execute_cql(fmt::format("create keyspace {} with replication = {{'class': 'NetworkTopologyStrategy'{}}}" + " and tablets = {{'enabled': true, 'initial': {}}}", + ks_name, rf_options, initial_tablets)).get(); + return ks_name; +} + +// Run in a seastar thread +void mutate_tablets(cql_test_env& e, const group0_guard& guard, seastar::noncopyable_function(tablet_metadata&)> mutator) { + auto& stm = e.shared_token_metadata().local(); + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { + return mutator(tm.tablets()); + }).get(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); +} + +// Run in a seastar thread +void mutate_tablets(cql_test_env& e, seastar::noncopyable_function(tablet_metadata&)> mutator) { + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + mutate_tablets(e, guard, std::move(mutator)); +} + SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); @@ -1370,7 +1398,11 @@ void apply_resize_plan(token_metadata& tm, const migration_plan& plan) { } static -future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& stm, const migration_plan& plan) { +future<> handle_resize_finalize(cql_test_env& e, group0_guard& guard, const migration_plan& plan) { + auto& talloc = e.get_tablet_allocator().local(); + auto& stm = e.shared_token_metadata().local(); + bool changed = false; + for (auto table_id : plan.resize_plan().finalize_resize) { auto tm = stm.get(); const auto& old_tmap = tm->tablets().get_tablet_map(table_id); @@ -1380,11 +1412,23 @@ future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& new_resize_decision.sequence_number = old_tmap.resize_decision().next_sequence_number(); new_tmap.set_resize_decision(std::move(new_resize_decision)); - co_await stm.mutate_token_metadata([table_id, &new_tmap] (token_metadata& tm) { + co_await stm.mutate_token_metadata([table_id, &new_tmap, &changed] (token_metadata& tm) { + changed = true; tm.tablets().set_tablet_map(table_id, std::move(new_tmap)); return make_ready_future<>(); }); } + + if (changed) { + // Need to reload on each resize because table object expects tablet count to change by a factor of 2. + co_await save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()); + co_await e.get_storage_service().local().load_tablet_metadata({}); + + // Need a new guard to make sure later changes use later timestamp. + release_guard(std::move(guard)); + abort_source as; + guard = co_await e.get_raft_group0_client().start_operation(as); + } } // Reflects the plan in a given token metadata as if the migrations were fully executed. @@ -1429,11 +1473,15 @@ static void check_tablet_invariants(const tablet_metadata& tmeta); static -void rebalance_tablets(tablet_allocator& talloc, - shared_token_metadata& stm, - locator::load_stats_ptr load_stats = {}, - std::unordered_set skiplist = {}, - std::function stop = nullptr) { +void do_rebalance_tablets(cql_test_env& e, + group0_guard& guard, + locator::load_stats_ptr load_stats = {}, + std::unordered_set skiplist = {}, + std::function stop = nullptr) +{ + auto& talloc = e.get_tablet_allocator().local(); + auto& stm = e.shared_token_metadata().local(); + // Sanity limit to avoid infinite loops. // The x10 factor is arbitrary, it's there to account for more complex schedules than direct migration. auto max_iterations = 1 + get_tablet_count(stm.get()->tablets()) * 10; @@ -1450,11 +1498,38 @@ void rebalance_tablets(tablet_allocator& talloc, apply_plan(tm, plan); return make_ready_future<>(); }).get(); - handle_resize_finalize(talloc, stm, plan).get(); + handle_resize_finalize(e, guard, plan).get(); } throw std::runtime_error("rebalance_tablets(): convergence not reached within limit"); } +// Invokes the tablet scheduler and executes its plan, continuously until it emits an empty plan. +// Simulates topology coordinator but doesn't perform actual migration, +// only reflects it in the metadata. +// Run in a seastar thread. +void rebalance_tablets(cql_test_env& e, + load_stats_ptr load_stats = nullptr, + std::unordered_set skiplist = {}, + std::function stop = nullptr) { + abort_source as; + testlog.debug("rebalance_tablets(): start"); + + auto guard = e.get_raft_group0_client().start_operation(as).get(); + testlog.debug("rebalance_tablets(): took group0 guard"); + + do_rebalance_tablets(e, guard, std::move(load_stats), std::move(skiplist), std::move(stop)); + testlog.debug("rebalance_tablets(): rebalanced"); + + // We should not introduce inconsistency between on-disk state and in-memory state + // as that may violate invariants and cause failures in later operations + // causing test flakiness. + auto& stm = e.shared_token_metadata().local(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); + e.get_storage_service().local().load_tablet_metadata({}).get(); + + testlog.debug("rebalance_tablets(): done"); +} + static void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_metadata& stm) { while (true) { @@ -1491,36 +1566,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { do_with_cql_env_thread([] (auto& e) { // Tests the scenario of bootstrapping a single node // Verifies that load balancer sees it and moves tablets to that node. - - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); + topology_builder topo(e); unsigned shard_count = 2; + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1550,11 +1606,11 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + + auto& stm = e.shared_token_metadata().local(); // Sanity check { @@ -1568,7 +1624,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); { load_sketch load(stm.get()); @@ -1590,32 +1646,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { // Tests the scenario of balacning cluster with DOWN node // Verifies that load balancer doesn't moves tablets to that node. - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - unsigned shard_count = 2; - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); - stm.mutate_token_metadata([&] (token_metadata& tm) { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1645,11 +1686,11 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); return make_ready_future<>(); - }).get(); + }); + + auto& stm = e.shared_token_metadata().local(); // Sanity check { @@ -1663,7 +1704,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, {host3}); + rebalance_tablets(e, {}, {host3}); { load_sketch load(stm.get()); @@ -1678,36 +1719,17 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication factor of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + unsigned shard_count = 2; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::decommissioning, shard_count); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 2; - - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1737,13 +1759,13 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { tablet_replica {host3, 1}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -1753,12 +1775,9 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - stm.mutate_token_metadata([&](token_metadata& tm) { - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::left); - return make_ready_future<>(); - }).get(); + topo.set_node_state(host3, node_state::left); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); { load_sketch load(stm.get()); @@ -1778,49 +1797,18 @@ SEASTAR_THREAD_TEST_CASE(test_table_creation_during_decommission) { auto cfg = tablet_cql_test_config(); cfg.db_config->tablets_initial_scale_factor(1); do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); + topo.add_node(node_state::normal); + topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::decommissioning); + auto host4 = topo.add_node(node_state::left); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); - locator::endpoint_dc_rack dcrack = { "datacenter1", "rack1" }; + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}); + auto table1 = add_table(e, ks_name).get(); + auto s = e.local_db().find_schema(table1); - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = dcrack - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, dcrack, node::state::normal, 1); - tm.update_topology(host2, dcrack, node::state::normal, 1); - tm.update_topology(host3, dcrack, node::state::being_decommissioned, 16); - tm.update_topology(host4, dcrack, node::state::left, 16); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); - co_return; - }).get(); - - sstring ks_name = "test_ks"; - sstring table_name = "table1"; - e.execute_cql(format("create keyspace {} with replication = " - "{{'class': 'NetworkTopologyStrategy', '{}': 1}} " - "and tablets = {{'enabled': true}}", ks_name, dcrack.dc)).get(); - e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get(); - auto s = e.local_db().find_schema(ks_name, table_name); - - auto* rs = e.local_db().find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware(); - BOOST_REQUIRE(rs); - auto tmap = rs->allocate_tablets_for_new_table(s, stm.get(), 1).get(); + auto& stm = e.shared_token_metadata().local(); + auto& tmap = stm.get()->tablets().get_tablet_map(table1); // Verify we do not treat leaving nodes as having capacity. BOOST_REQUIRE_EQUAL(tmap.tablet_count(), 2); @@ -1840,54 +1828,20 @@ SEASTAR_THREAD_TEST_CASE(test_table_creation_during_rack_decommission) { // The problematic scenario happens when allocating tablets for a new table // when there is a rack with only non-normal nodes. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); + topo.add_node(); + topo.add_node(); + topo.start_new_rack(); + auto host3 = topo.add_node(node_state::decommissioning); + auto host4 = topo.add_node(node_state::left); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 8); + auto table1 = add_table(e, ks_name).get(); - auto dc = "datacenter1"; - locator::endpoint_dc_rack dcrack = { dc, "rack1" }; - locator::endpoint_dc_rack dcrack2 = { dc, "rack2" }; + rebalance_tablets(e); - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = dcrack - } - }); - - const unsigned shard_count = 1; - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, dcrack, node::state::normal, shard_count); - tm.update_topology(host2, dcrack, node::state::normal, shard_count); - tm.update_topology(host3, dcrack2, node::state::being_decommissioned, shard_count); - tm.update_topology(host4, dcrack2, node::state::left, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); - co_return; - }).get(); - - sstring ks_name = "test_ks"; - sstring table_name = "table1"; - e.execute_cql(format("create keyspace {} with replication = " - "{{'class': 'NetworkTopologyStrategy', '{}': 1}} " - "and tablets = {{'enabled': true, 'initial': 8}}", ks_name, dcrack.dc)).get(); - e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get(); - auto s = e.local_db().find_schema(ks_name, table_name); - - auto* rs = e.local_db().find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware(); - BOOST_REQUIRE(rs); - auto tmap = rs->allocate_tablets_for_new_table(s, stm.get(), 8).get(); + auto& stm = e.shared_token_metadata().local(); + auto& tmap = stm.get()->tablets().get_tablet_map(table1); tmap.for_each_tablet([&](auto tid, auto& tinfo) { for (auto& replica : tinfo.replicas) { @@ -1903,45 +1857,21 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication constraints of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + std::vector racks; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + topology_builder topo(e); + racks.push_back(topo.rack()); + auto host1 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); + topo.start_new_rack(); + racks.push_back(topo.rack()); + auto host2 = topo.add_node(node_state::normal); + auto host4 = topo.add_node(node_state::decommissioning); - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } - }; - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = racks[0] - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 1; - - tm.update_topology(host1, racks[0], node::state::normal, shard_count); - tm.update_topology(host2, racks[1], node::state::normal, shard_count); - tm.update_topology(host3, racks[0], node::state::normal, shard_count); - tm.update_topology(host4, racks[1], node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1971,13 +1901,13 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2006,45 +1936,21 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that it is impossible to distribute replicas without violating rack uniqueness. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + std::vector racks; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + topology_builder topo(e); + racks.push_back(topo.rack()); + auto host1 = topo.add_node(node_state::normal); + auto host2 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); + topo.start_new_rack(); + racks.push_back(topo.rack()); + auto host4 = topo.add_node(node_state::decommissioning); - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } - }; - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = racks[0] - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 1; - - tm.update_topology(host1, racks[0], node::state::normal, shard_count); - tm.update_topology(host2, racks[0], node::state::normal, shard_count); - tm.update_topology(host3, racks[0], node::state::normal, shard_count); - tm.update_topology(host4, racks[1], node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2074,13 +1980,11 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { tablet_replica {host4, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e), std::runtime_error); }).get(); } @@ -2088,36 +1992,15 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication factor of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 2); + auto host2 = topo.add_node(node_state::normal, 2); + auto host3 = topo.add_node(node_state::decommissioning, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 2; - - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2127,13 +2010,11 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { tablet_replica {host3, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e), std::runtime_error); }).get(); } @@ -2146,33 +2027,15 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) // which when executed will achieve perfect balance, // which is a proof that it doesn't stop due to active migrations. - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 1); + auto host2 = topo.add_node(node_state::normal, 1); + auto host3 = topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); std::optional tid = tmap.first_tablet(); for (int i = 0; i < 4; ++i) { @@ -2193,11 +2056,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) }, tablet_replica {host3, 0} }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); rebalance_tablets_as_in_progress(e.get_tablet_allocator().local(), stm); execute_transitions(stm); @@ -2211,39 +2076,25 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(h), 2); } } + + // Restore consistency between stm and system tables before releasing group0 guard. + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); }).get(); } #ifdef SCYLLA_ENABLE_ERROR_INJECTION SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::normal, 1); + auto host2 = topo.add_node(node_state::normal, 1); + topo.add_node(node_state::normal, 2); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); std::optional tid = tmap.first_tablet(); for (int i = 0; i < 4; ++i) { @@ -2255,14 +2106,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { }); tid = tmap.next_tablet(*tid); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + auto& stm = e.shared_token_metadata().local(); BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get()).get().empty()); utils::get_local_injector().enable("tablet_allocator_shuffle"); @@ -2277,39 +2127,18 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + const auto shard_count = 2; + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); + auto host4 = topo.add_node(node_state::normal, shard_count); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host4, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 16); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2319,13 +2148,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2342,33 +2171,16 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::decommissioning, 8); + auto host2 = topo.add_node(node_state::normal, 1); + auto host3 = topo.add_node(node_state::normal, 7); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, 8); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 7); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(3. / 4))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid: tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2377,17 +2189,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); auto until_nodes_drained = [] (const migration_plan& plan) { return !plan.has_nodes_to_drain(); }; - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, {}, until_nodes_drained); + rebalance_tablets(e, {}, {}, until_nodes_drained); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2404,33 +2216,20 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 2); + topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); // host1 is loaded and host2 is empty, resulting in an imbalance. // host1's shard 0 is loaded and shard 1 is empty, resulting in intra-node imbalance. - stm.mutate_token_metadata([&] (auto& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); - + mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2439,11 +2238,9 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get(); @@ -2496,31 +2293,18 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::removing, 2); + topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); + mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2529,11 +2313,9 @@ SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); migration_plan plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get(); BOOST_REQUIRE(plan.has_nodes_to_drain()); @@ -2545,42 +2327,27 @@ SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { SEASTAR_THREAD_TEST_CASE(test_plan_fails_when_removing_last_replica) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(); - auto host1 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 1; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); + topo.set_node_state(host1, node_state::removing); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); std::unordered_set skiplist = {host1}; - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, skiplist), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e, {}, skiplist), std::runtime_error); }).get(); } @@ -2593,35 +2360,16 @@ SEASTAR_THREAD_TEST_CASE(test_skiplist_is_ignored_when_draining) { // all the tablets of the drained node, doubling its load. // It's safer to let the drain fail/stall. do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::removing); + auto host2 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 1; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, locator::node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, locator::node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 2); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(2); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2631,14 +2379,13 @@ SEASTAR_THREAD_TEST_CASE(test_skiplist_is_ignored_when_draining) { tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + auto& stm = e.shared_token_metadata().local(); std::unordered_set skiplist = {host2}; - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, skiplist); + rebalance_tablets(e, {}, skiplist); { load_sketch load(stm.get()); @@ -2692,59 +2439,48 @@ allocate_replicas_in_racks(const std::vector& racks, int rf, SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { do_with_cql_env_thread([] (auto& e) { + topology_builder topo(e); const int n_hosts = 6; + auto shard_count = 2; std::vector hosts; - for (int i = 0; i < n_hosts; ++i) { - hosts.push_back(host_id(next_uuid())); - } + std::unordered_map> hosts_by_rack; - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } + std::vector racks { + topo.rack(), + topo.start_new_rack(), }; + for (int i = 0; i < n_hosts; ++i) { + auto rack = racks[(i + 1) % racks.size()]; + auto h = topo.add_node(node_state::normal, shard_count, rack); + if (i) { + // Leave the first host empty by making it invisible to allocation algorithm. + hosts_by_rack[rack.rack].push_back(h); + } + } + + auto& stm = e.shared_token_metadata().local(); + for (int i = 0; i < 13; ++i) { - std::unordered_map> hosts_by_rack; - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = inet_address("192.168.0.1"), - .this_host_id = hosts[0], - .local_dc_rack = racks[1] - } - }); - size_t total_tablet_count = 0; - stm.mutate_token_metadata([&](token_metadata& tm) { - tablet_metadata tmeta; - - int i = 0; - for (auto h : hosts) { - auto shard_count = 2; - auto rack = racks[++i % racks.size()]; - tm.update_topology(h, rack, node::state::normal, shard_count); - if (h != hosts[0]) { - // Leave the first host empty by making it invisible to allocation algorithm. - hosts_by_rack[rack.rack].push_back(h); - } + std::vector keyspaces; + size_t tablet_count_bits = 8; + int rf = tests::random::get_int(2, 4); + for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) { + if (tests::random::get_bool()) { + continue; } - - size_t tablet_count_bits = 8; - int rf = tests::random::get_int(2, 4); - for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) { - if (tests::random::get_bool()) { - continue; - } - auto table = table_id(next_uuid()); - tablet_map tmap(1 << log2_tablets); + auto initial_tablets = 1 << log2_tablets; + keyspaces.push_back(add_keyspace(e, {{topo.dc(), rf}}, initial_tablets)); + auto table = add_table(e, keyspaces.back()).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { + tablet_map tmap(initial_tablets); for (auto tid : tmap.tablet_ids()) { // Choose replicas randomly while loading racks evenly. std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); tablet_replica_set replicas; for (auto h : replica_hosts) { - auto shard_count = tm.get_topology().find_node(h)->get_shard_count(); auto shard = tests::random::get_int(0, shard_count - 1); replicas.push_back(tablet_replica {h, shard}); } @@ -2752,17 +2488,16 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { } total_tablet_count += tmap.tablet_count(); tmeta.set_tablet_map(table, std::move(tmap)); - } - tm.set_tablets(std::move(tmeta)); - return make_ready_future<>(); - }).get(); + return make_ready_future<>(); + }); + } testlog.debug("tablet metadata: {}", stm.get()->tablets()); testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size()); check_tablet_invariants(stm.get()->tablets()); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); check_tablet_invariants(stm.get()->tablets()); @@ -2788,6 +2523,10 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { // Uncomment the following line when the algorithm is improved. // BOOST_REQUIRE(min_max_load.max() - min_max_load.min() <= 1); } + + seastar::parallel_for_each(keyspaces, [&] (const sstring& ks) { + return e.execute_cql(fmt::format("DROP KEYSPACE {}", ks)).discard_result(); + }).get(); } }).get(); } @@ -2863,57 +2602,52 @@ using hosts_by_rack_map = std::unordered_map>; static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n_racks, const int rf, const int n_hosts, const unsigned shard_count, const unsigned initial_tablets, std::function set_tablets) { + topology_builder topo(e); + rack_vector racks; for (int i = 0; i < n_racks; i++) { - racks.push_back(endpoint_dc_rack{"dc1", format("rack-{}", i + 1)}); + racks.push_back(topo.rack()); + topo.start_new_rack(); } testlog.info("merge colocation test - hosts={}, racks={}, rf={}, shard_count={}, initial_tablets={}", n_hosts, racks.size(), rf, shard_count, initial_tablets); - std::vector hosts; + hosts_by_rack_map hosts_by_rack; + for (int i = 0; i < n_hosts; ++i) { - hosts.push_back(host_id(next_uuid())); + auto rack = racks[i % racks.size()]; + auto h = topo.add_node(node_state::normal, shard_count, rack); + hosts_by_rack[rack.rack].push_back(h); } - auto table1 = add_table(e).get(); + auto ks_name = add_keyspace(e, {{topo.dc(), rf}}, initial_tablets); + auto table1 = add_table(e, ks_name).get(); + auto& stm = e.shared_token_metadata().local(); - hosts_by_rack_map hosts_by_rack; - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = inet_address("192.168.0.1"), - .this_host_id = hosts[0], - .local_dc_rack = racks[std::min(1, n_racks - 1)] - } - }); + { + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { + tablet_metadata& tmeta = tm.tablets(); + tablet_map tmap(initial_tablets); + locator::resize_decision decision; + // leaves growing mode, allowing for merge decision. + decision.sequence_number = decision.next_sequence_number(); + tmap.set_resize_decision(std::move(decision)); + set_tablets(tm, tmap, racks, hosts_by_rack); + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future < > (); + }).get(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); + } - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tablet_metadata tmeta; - - int i = 0; - for (auto h : hosts) { - auto rack = racks[++i % racks.size()]; - hosts_by_rack[rack.rack].push_back(h); - tm.update_topology(h, rack, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(float(i) / hosts.size()))}, h); - testlog.debug("adding host {}, rack {}, token {}", h, rack.rack, token(tests::d2t(1. / hosts.size()))); - } - - tablet_map tmap(initial_tablets); - locator::resize_decision decision; - // leaves growing mode, allowing for merge decision. - decision.sequence_number = decision.next_sequence_number(); - tmap.set_resize_decision(std::move(decision)); - set_tablets(tm, tmap, racks, hosts_by_rack); - tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); - }).get(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); }; auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { - rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + rebalance_tablets(e, make_lw_shared(std::move(load_stats))); }; const uint64_t target_tablet_size = service::default_target_tablet_size; @@ -3055,57 +2789,29 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + topo.add_node(node_state::normal, 2); + topo.add_node(node_state::normal, 2); - auto table1 = add_table(e).get(); + const size_t initial_tablets = 2; + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, initial_tablets); + auto table1 = add_table(e, ks_name).get(); - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); - - tablet_map tmap(2); - for (auto tid : tmap.tablet_ids()) { - tmap.set_tablet(tid, tablet_info { - tablet_replica_set { - tablet_replica {host1, tests::random::get_int(0, shard_count - 1)}, - tablet_replica {host2, tests::random::get_int(0, shard_count - 1)}, - } - }); - } - tablet_metadata tmeta; - tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); - }).get(); + auto& stm = e.shared_token_metadata().local(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); }; + auto resize_decision = [&] { return stm.get()->tablets().get_tablet_map(table1).resize_decision(); }; auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { - rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + rebalance_tablets(e, make_lw_shared(std::move(load_stats))); }; - const size_t initial_tablets = tablet_count(); const uint64_t max_tablet_size = service::default_target_tablet_size * 2; auto to_size_in_bytes = [&] (double max_tablet_size_pctg) -> uint64_t { return (max_tablet_size * max_tablet_size_pctg) * tablet_count(); From 58460a8863449062904b3abde2f7838dd1d877fd Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sun, 2 Feb 2025 11:50:29 +0100 Subject: [PATCH 5/6] tests: tablets: Set initial tablets to 1 to exit growing mode After tablet hints, there is no notion of leaving growing mode and tablet count is sustained continuously by initial tablet option, so we need to lower it for merge to happen. --- test/boost/tablets_test.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 3fd6a23033..0e809b0611 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -2642,6 +2642,8 @@ static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); } + // Lower "initial" tablets option, allowing for merge decision. + e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); @@ -2833,6 +2835,9 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); } + // Drop initial tablet count to 1 so merge can happen. + e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get(); + // avg size hits split threshold, and balancer emits split request { locator::load_stats load_stats = { From 1854ea216578b3aedf02ecd9df0602ba7b074a4f Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 3 Feb 2025 10:33:28 +0100 Subject: [PATCH 6/6] test: tablets: Drop keyspace after do_test_load_balancing_merge_colocation() scenario This scenario is invoked in a loop in the test_load_balancing_merge_colocation_with_random_load test case, which will cause accumulation of tablet maps making each reload slower in subsequent iterations. It wasn't a problem before because we overwritten tablet_metadata in each iteration to contain only tablets for the current table, but now we need to keep it consistent with the schema and don't do that. --- test/boost/tablets_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 0e809b0611..e70af0ee92 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -2670,6 +2670,8 @@ static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n check_tablet_invariants(stm.get()->tablets()); BOOST_REQUIRE_LT(tablet_count(), old_tablet_count); } + + e.execute_cql(fmt::format("drop keyspace {}", ks_name)).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_random_load) {