From ca6159fbe2a664c04cfb86a73c67d272063f0ddd Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 30 Jan 2025 13:25:11 +0100 Subject: [PATCH] test: tablets_test: Create proper schema in load balancer tests This is in preparation for load balancer changes needed to respect per-table tablet hints and respecting per-shard tablet count goal. After those changes, load balancer consults with the replication strategy in the database, so we need to create proper schema in the database. To do that, we need proper topology for replication strategies which use RF > 1, otherwise keyspace creation will fail. --- test/boost/tablets_test.cc | 990 +++++++++++++------------------------ 1 file changed, 348 insertions(+), 642 deletions(-) diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 1f9d3d1274..3fd6a23033 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -12,6 +12,8 @@ #undef SEASTAR_TESTING_MAIN #include #include "test/lib/random_utils.hh" +#include "service/topology_mutation.hh" +#include "service/storage_service.hh" #include #include #include "test/lib/cql_test_env.hh" @@ -19,6 +21,7 @@ #include "test/lib/simple_schema.hh" #include "test/lib/key_utils.hh" #include "test/lib/test_utils.hh" +#include "test/lib/topology_builder.hh" #include "db/config.hh" #include "db/schema_tables.hh" #include "schema/schema_builder.hh" @@ -38,6 +41,7 @@ #include "service/topology_state_machine.hh" #include +#include BOOST_AUTO_TEST_SUITE(tablets_test) @@ -51,13 +55,6 @@ static api::timestamp_type current_timestamp(cql_test_env& e) { return utils::UUID_gen::micros_timestamp(e.get_system_keyspace().local().get_last_group0_state_id().get()) + 1; } -static utils::UUID next_uuid() { - static uint64_t counter = 1; - return utils::UUID_gen::get_time_UUID(std::chrono::system_clock::time_point( - std::chrono::duration_cast( - std::chrono::seconds(counter++)))); -} - static void verify_tablet_metadata_persistence(cql_test_env& env, const tablet_metadata& tm, api::timestamp_type& ts) { save_tablet_metadata(env.local_db(), tm, ts++).get(); @@ -109,6 +106,37 @@ future add_table(cql_test_env& e, sstring test_ks_name = "") { co_return id; } +// Run in a seastar thread +static +sstring add_keyspace(cql_test_env& e, std::unordered_map dc_rf, int initial_tablets = 0) { + static std::atomic ks_id = 0; + auto ks_name = fmt::format("keyspace{}", ks_id.fetch_add(1)); + sstring rf_options; + for (auto& [dc, rf] : dc_rf) { + rf_options += format(", '{}': {}", dc, rf); + } + e.execute_cql(fmt::format("create keyspace {} with replication = {{'class': 'NetworkTopologyStrategy'{}}}" + " and tablets = {{'enabled': true, 'initial': {}}}", + ks_name, rf_options, initial_tablets)).get(); + return ks_name; +} + +// Run in a seastar thread +void mutate_tablets(cql_test_env& e, const group0_guard& guard, seastar::noncopyable_function(tablet_metadata&)> mutator) { + auto& stm = e.shared_token_metadata().local(); + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { + return mutator(tm.tablets()); + }).get(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); +} + +// Run in a seastar thread +void mutate_tablets(cql_test_env& e, seastar::noncopyable_function(tablet_metadata&)> mutator) { + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + mutate_tablets(e, guard, std::move(mutator)); +} + SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); @@ -1370,7 +1398,11 @@ void apply_resize_plan(token_metadata& tm, const migration_plan& plan) { } static -future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& stm, const migration_plan& plan) { +future<> handle_resize_finalize(cql_test_env& e, group0_guard& guard, const migration_plan& plan) { + auto& talloc = e.get_tablet_allocator().local(); + auto& stm = e.shared_token_metadata().local(); + bool changed = false; + for (auto table_id : plan.resize_plan().finalize_resize) { auto tm = stm.get(); const auto& old_tmap = tm->tablets().get_tablet_map(table_id); @@ -1380,11 +1412,23 @@ future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& new_resize_decision.sequence_number = old_tmap.resize_decision().next_sequence_number(); new_tmap.set_resize_decision(std::move(new_resize_decision)); - co_await stm.mutate_token_metadata([table_id, &new_tmap] (token_metadata& tm) { + co_await stm.mutate_token_metadata([table_id, &new_tmap, &changed] (token_metadata& tm) { + changed = true; tm.tablets().set_tablet_map(table_id, std::move(new_tmap)); return make_ready_future<>(); }); } + + if (changed) { + // Need to reload on each resize because table object expects tablet count to change by a factor of 2. + co_await save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()); + co_await e.get_storage_service().local().load_tablet_metadata({}); + + // Need a new guard to make sure later changes use later timestamp. + release_guard(std::move(guard)); + abort_source as; + guard = co_await e.get_raft_group0_client().start_operation(as); + } } // Reflects the plan in a given token metadata as if the migrations were fully executed. @@ -1429,11 +1473,15 @@ static void check_tablet_invariants(const tablet_metadata& tmeta); static -void rebalance_tablets(tablet_allocator& talloc, - shared_token_metadata& stm, - locator::load_stats_ptr load_stats = {}, - std::unordered_set skiplist = {}, - std::function stop = nullptr) { +void do_rebalance_tablets(cql_test_env& e, + group0_guard& guard, + locator::load_stats_ptr load_stats = {}, + std::unordered_set skiplist = {}, + std::function stop = nullptr) +{ + auto& talloc = e.get_tablet_allocator().local(); + auto& stm = e.shared_token_metadata().local(); + // Sanity limit to avoid infinite loops. // The x10 factor is arbitrary, it's there to account for more complex schedules than direct migration. auto max_iterations = 1 + get_tablet_count(stm.get()->tablets()) * 10; @@ -1450,11 +1498,38 @@ void rebalance_tablets(tablet_allocator& talloc, apply_plan(tm, plan); return make_ready_future<>(); }).get(); - handle_resize_finalize(talloc, stm, plan).get(); + handle_resize_finalize(e, guard, plan).get(); } throw std::runtime_error("rebalance_tablets(): convergence not reached within limit"); } +// Invokes the tablet scheduler and executes its plan, continuously until it emits an empty plan. +// Simulates topology coordinator but doesn't perform actual migration, +// only reflects it in the metadata. +// Run in a seastar thread. +void rebalance_tablets(cql_test_env& e, + load_stats_ptr load_stats = nullptr, + std::unordered_set skiplist = {}, + std::function stop = nullptr) { + abort_source as; + testlog.debug("rebalance_tablets(): start"); + + auto guard = e.get_raft_group0_client().start_operation(as).get(); + testlog.debug("rebalance_tablets(): took group0 guard"); + + do_rebalance_tablets(e, guard, std::move(load_stats), std::move(skiplist), std::move(stop)); + testlog.debug("rebalance_tablets(): rebalanced"); + + // We should not introduce inconsistency between on-disk state and in-memory state + // as that may violate invariants and cause failures in later operations + // causing test flakiness. + auto& stm = e.shared_token_metadata().local(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); + e.get_storage_service().local().load_tablet_metadata({}).get(); + + testlog.debug("rebalance_tablets(): done"); +} + static void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_metadata& stm) { while (true) { @@ -1491,36 +1566,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { do_with_cql_env_thread([] (auto& e) { // Tests the scenario of bootstrapping a single node // Verifies that load balancer sees it and moves tablets to that node. - - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); + topology_builder topo(e); unsigned shard_count = 2; + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1550,11 +1606,11 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + + auto& stm = e.shared_token_metadata().local(); // Sanity check { @@ -1568,7 +1624,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); { load_sketch load(stm.get()); @@ -1590,32 +1646,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { // Tests the scenario of balacning cluster with DOWN node // Verifies that load balancer doesn't moves tablets to that node. - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - unsigned shard_count = 2; - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); - stm.mutate_token_metadata([&] (token_metadata& tm) { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1645,11 +1686,11 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); return make_ready_future<>(); - }).get(); + }); + + auto& stm = e.shared_token_metadata().local(); // Sanity check { @@ -1663,7 +1704,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, {host3}); + rebalance_tablets(e, {}, {host3}); { load_sketch load(stm.get()); @@ -1678,36 +1719,17 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication factor of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + unsigned shard_count = 2; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::decommissioning, shard_count); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 2; - - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1737,13 +1759,13 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { tablet_replica {host3, 1}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -1753,12 +1775,9 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - stm.mutate_token_metadata([&](token_metadata& tm) { - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::left); - return make_ready_future<>(); - }).get(); + topo.set_node_state(host3, node_state::left); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); { load_sketch load(stm.get()); @@ -1778,49 +1797,18 @@ SEASTAR_THREAD_TEST_CASE(test_table_creation_during_decommission) { auto cfg = tablet_cql_test_config(); cfg.db_config->tablets_initial_scale_factor(1); do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); + topo.add_node(node_state::normal); + topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::decommissioning); + auto host4 = topo.add_node(node_state::left); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); - locator::endpoint_dc_rack dcrack = { "datacenter1", "rack1" }; + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}); + auto table1 = add_table(e, ks_name).get(); + auto s = e.local_db().find_schema(table1); - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = dcrack - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, dcrack, node::state::normal, 1); - tm.update_topology(host2, dcrack, node::state::normal, 1); - tm.update_topology(host3, dcrack, node::state::being_decommissioned, 16); - tm.update_topology(host4, dcrack, node::state::left, 16); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); - co_return; - }).get(); - - sstring ks_name = "test_ks"; - sstring table_name = "table1"; - e.execute_cql(format("create keyspace {} with replication = " - "{{'class': 'NetworkTopologyStrategy', '{}': 1}} " - "and tablets = {{'enabled': true}}", ks_name, dcrack.dc)).get(); - e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get(); - auto s = e.local_db().find_schema(ks_name, table_name); - - auto* rs = e.local_db().find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware(); - BOOST_REQUIRE(rs); - auto tmap = rs->allocate_tablets_for_new_table(s, stm.get(), 1).get(); + auto& stm = e.shared_token_metadata().local(); + auto& tmap = stm.get()->tablets().get_tablet_map(table1); // Verify we do not treat leaving nodes as having capacity. BOOST_REQUIRE_EQUAL(tmap.tablet_count(), 2); @@ -1840,54 +1828,20 @@ SEASTAR_THREAD_TEST_CASE(test_table_creation_during_rack_decommission) { // The problematic scenario happens when allocating tablets for a new table // when there is a rack with only non-normal nodes. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); + topo.add_node(); + topo.add_node(); + topo.start_new_rack(); + auto host3 = topo.add_node(node_state::decommissioning); + auto host4 = topo.add_node(node_state::left); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 8); + auto table1 = add_table(e, ks_name).get(); - auto dc = "datacenter1"; - locator::endpoint_dc_rack dcrack = { dc, "rack1" }; - locator::endpoint_dc_rack dcrack2 = { dc, "rack2" }; + rebalance_tablets(e); - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = dcrack - } - }); - - const unsigned shard_count = 1; - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, dcrack, node::state::normal, shard_count); - tm.update_topology(host2, dcrack, node::state::normal, shard_count); - tm.update_topology(host3, dcrack2, node::state::being_decommissioned, shard_count); - tm.update_topology(host4, dcrack2, node::state::left, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); - co_return; - }).get(); - - sstring ks_name = "test_ks"; - sstring table_name = "table1"; - e.execute_cql(format("create keyspace {} with replication = " - "{{'class': 'NetworkTopologyStrategy', '{}': 1}} " - "and tablets = {{'enabled': true, 'initial': 8}}", ks_name, dcrack.dc)).get(); - e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get(); - auto s = e.local_db().find_schema(ks_name, table_name); - - auto* rs = e.local_db().find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware(); - BOOST_REQUIRE(rs); - auto tmap = rs->allocate_tablets_for_new_table(s, stm.get(), 8).get(); + auto& stm = e.shared_token_metadata().local(); + auto& tmap = stm.get()->tablets().get_tablet_map(table1); tmap.for_each_tablet([&](auto tid, auto& tinfo) { for (auto& replica : tinfo.replicas) { @@ -1903,45 +1857,21 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication constraints of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + std::vector racks; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + topology_builder topo(e); + racks.push_back(topo.rack()); + auto host1 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); + topo.start_new_rack(); + racks.push_back(topo.rack()); + auto host2 = topo.add_node(node_state::normal); + auto host4 = topo.add_node(node_state::decommissioning); - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } - }; - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = racks[0] - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 1; - - tm.update_topology(host1, racks[0], node::state::normal, shard_count); - tm.update_topology(host2, racks[1], node::state::normal, shard_count); - tm.update_topology(host3, racks[0], node::state::normal, shard_count); - tm.update_topology(host4, racks[1], node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1971,13 +1901,13 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2006,45 +1936,21 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that it is impossible to distribute replicas without violating rack uniqueness. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + std::vector racks; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + topology_builder topo(e); + racks.push_back(topo.rack()); + auto host1 = topo.add_node(node_state::normal); + auto host2 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); + topo.start_new_rack(); + racks.push_back(topo.rack()); + auto host4 = topo.add_node(node_state::decommissioning); - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } - }; - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = racks[0] - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 1; - - tm.update_topology(host1, racks[0], node::state::normal, shard_count); - tm.update_topology(host2, racks[0], node::state::normal, shard_count); - tm.update_topology(host3, racks[0], node::state::normal, shard_count); - tm.update_topology(host4, racks[1], node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2074,13 +1980,11 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { tablet_replica {host4, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e), std::runtime_error); }).get(); } @@ -2088,36 +1992,15 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication factor of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 2); + auto host2 = topo.add_node(node_state::normal, 2); + auto host3 = topo.add_node(node_state::decommissioning, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 2; - - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2127,13 +2010,11 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { tablet_replica {host3, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e), std::runtime_error); }).get(); } @@ -2146,33 +2027,15 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) // which when executed will achieve perfect balance, // which is a proof that it doesn't stop due to active migrations. - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 1); + auto host2 = topo.add_node(node_state::normal, 1); + auto host3 = topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); std::optional tid = tmap.first_tablet(); for (int i = 0; i < 4; ++i) { @@ -2193,11 +2056,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) }, tablet_replica {host3, 0} }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); rebalance_tablets_as_in_progress(e.get_tablet_allocator().local(), stm); execute_transitions(stm); @@ -2211,39 +2076,25 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(h), 2); } } + + // Restore consistency between stm and system tables before releasing group0 guard. + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); }).get(); } #ifdef SCYLLA_ENABLE_ERROR_INJECTION SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::normal, 1); + auto host2 = topo.add_node(node_state::normal, 1); + topo.add_node(node_state::normal, 2); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); std::optional tid = tmap.first_tablet(); for (int i = 0; i < 4; ++i) { @@ -2255,14 +2106,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { }); tid = tmap.next_tablet(*tid); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + auto& stm = e.shared_token_metadata().local(); BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get()).get().empty()); utils::get_local_injector().enable("tablet_allocator_shuffle"); @@ -2277,39 +2127,18 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + const auto shard_count = 2; + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); + auto host4 = topo.add_node(node_state::normal, shard_count); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host4, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 16); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2319,13 +2148,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2342,33 +2171,16 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::decommissioning, 8); + auto host2 = topo.add_node(node_state::normal, 1); + auto host3 = topo.add_node(node_state::normal, 7); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, 8); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 7); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(3. / 4))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid: tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2377,17 +2189,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); auto until_nodes_drained = [] (const migration_plan& plan) { return !plan.has_nodes_to_drain(); }; - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, {}, until_nodes_drained); + rebalance_tablets(e, {}, {}, until_nodes_drained); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2404,33 +2216,20 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 2); + topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); // host1 is loaded and host2 is empty, resulting in an imbalance. // host1's shard 0 is loaded and shard 1 is empty, resulting in intra-node imbalance. - stm.mutate_token_metadata([&] (auto& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); - + mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2439,11 +2238,9 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get(); @@ -2496,31 +2293,18 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::removing, 2); + topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); + mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2529,11 +2313,9 @@ SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); migration_plan plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get(); BOOST_REQUIRE(plan.has_nodes_to_drain()); @@ -2545,42 +2327,27 @@ SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { SEASTAR_THREAD_TEST_CASE(test_plan_fails_when_removing_last_replica) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(); - auto host1 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 1; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); + topo.set_node_state(host1, node_state::removing); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); std::unordered_set skiplist = {host1}; - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, skiplist), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e, {}, skiplist), std::runtime_error); }).get(); } @@ -2593,35 +2360,16 @@ SEASTAR_THREAD_TEST_CASE(test_skiplist_is_ignored_when_draining) { // all the tablets of the drained node, doubling its load. // It's safer to let the drain fail/stall. do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::removing); + auto host2 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 1; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, locator::node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, locator::node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 2); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(2); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2631,14 +2379,13 @@ SEASTAR_THREAD_TEST_CASE(test_skiplist_is_ignored_when_draining) { tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + auto& stm = e.shared_token_metadata().local(); std::unordered_set skiplist = {host2}; - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, skiplist); + rebalance_tablets(e, {}, skiplist); { load_sketch load(stm.get()); @@ -2692,59 +2439,48 @@ allocate_replicas_in_racks(const std::vector& racks, int rf, SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { do_with_cql_env_thread([] (auto& e) { + topology_builder topo(e); const int n_hosts = 6; + auto shard_count = 2; std::vector hosts; - for (int i = 0; i < n_hosts; ++i) { - hosts.push_back(host_id(next_uuid())); - } + std::unordered_map> hosts_by_rack; - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } + std::vector racks { + topo.rack(), + topo.start_new_rack(), }; + for (int i = 0; i < n_hosts; ++i) { + auto rack = racks[(i + 1) % racks.size()]; + auto h = topo.add_node(node_state::normal, shard_count, rack); + if (i) { + // Leave the first host empty by making it invisible to allocation algorithm. + hosts_by_rack[rack.rack].push_back(h); + } + } + + auto& stm = e.shared_token_metadata().local(); + for (int i = 0; i < 13; ++i) { - std::unordered_map> hosts_by_rack; - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = inet_address("192.168.0.1"), - .this_host_id = hosts[0], - .local_dc_rack = racks[1] - } - }); - size_t total_tablet_count = 0; - stm.mutate_token_metadata([&](token_metadata& tm) { - tablet_metadata tmeta; - - int i = 0; - for (auto h : hosts) { - auto shard_count = 2; - auto rack = racks[++i % racks.size()]; - tm.update_topology(h, rack, node::state::normal, shard_count); - if (h != hosts[0]) { - // Leave the first host empty by making it invisible to allocation algorithm. - hosts_by_rack[rack.rack].push_back(h); - } + std::vector keyspaces; + size_t tablet_count_bits = 8; + int rf = tests::random::get_int(2, 4); + for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) { + if (tests::random::get_bool()) { + continue; } - - size_t tablet_count_bits = 8; - int rf = tests::random::get_int(2, 4); - for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) { - if (tests::random::get_bool()) { - continue; - } - auto table = table_id(next_uuid()); - tablet_map tmap(1 << log2_tablets); + auto initial_tablets = 1 << log2_tablets; + keyspaces.push_back(add_keyspace(e, {{topo.dc(), rf}}, initial_tablets)); + auto table = add_table(e, keyspaces.back()).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { + tablet_map tmap(initial_tablets); for (auto tid : tmap.tablet_ids()) { // Choose replicas randomly while loading racks evenly. std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); tablet_replica_set replicas; for (auto h : replica_hosts) { - auto shard_count = tm.get_topology().find_node(h)->get_shard_count(); auto shard = tests::random::get_int(0, shard_count - 1); replicas.push_back(tablet_replica {h, shard}); } @@ -2752,17 +2488,16 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { } total_tablet_count += tmap.tablet_count(); tmeta.set_tablet_map(table, std::move(tmap)); - } - tm.set_tablets(std::move(tmeta)); - return make_ready_future<>(); - }).get(); + return make_ready_future<>(); + }); + } testlog.debug("tablet metadata: {}", stm.get()->tablets()); testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size()); check_tablet_invariants(stm.get()->tablets()); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); check_tablet_invariants(stm.get()->tablets()); @@ -2788,6 +2523,10 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { // Uncomment the following line when the algorithm is improved. // BOOST_REQUIRE(min_max_load.max() - min_max_load.min() <= 1); } + + seastar::parallel_for_each(keyspaces, [&] (const sstring& ks) { + return e.execute_cql(fmt::format("DROP KEYSPACE {}", ks)).discard_result(); + }).get(); } }).get(); } @@ -2863,57 +2602,52 @@ using hosts_by_rack_map = std::unordered_map>; static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n_racks, const int rf, const int n_hosts, const unsigned shard_count, const unsigned initial_tablets, std::function set_tablets) { + topology_builder topo(e); + rack_vector racks; for (int i = 0; i < n_racks; i++) { - racks.push_back(endpoint_dc_rack{"dc1", format("rack-{}", i + 1)}); + racks.push_back(topo.rack()); + topo.start_new_rack(); } testlog.info("merge colocation test - hosts={}, racks={}, rf={}, shard_count={}, initial_tablets={}", n_hosts, racks.size(), rf, shard_count, initial_tablets); - std::vector hosts; + hosts_by_rack_map hosts_by_rack; + for (int i = 0; i < n_hosts; ++i) { - hosts.push_back(host_id(next_uuid())); + auto rack = racks[i % racks.size()]; + auto h = topo.add_node(node_state::normal, shard_count, rack); + hosts_by_rack[rack.rack].push_back(h); } - auto table1 = add_table(e).get(); + auto ks_name = add_keyspace(e, {{topo.dc(), rf}}, initial_tablets); + auto table1 = add_table(e, ks_name).get(); + auto& stm = e.shared_token_metadata().local(); - hosts_by_rack_map hosts_by_rack; - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = inet_address("192.168.0.1"), - .this_host_id = hosts[0], - .local_dc_rack = racks[std::min(1, n_racks - 1)] - } - }); + { + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { + tablet_metadata& tmeta = tm.tablets(); + tablet_map tmap(initial_tablets); + locator::resize_decision decision; + // leaves growing mode, allowing for merge decision. + decision.sequence_number = decision.next_sequence_number(); + tmap.set_resize_decision(std::move(decision)); + set_tablets(tm, tmap, racks, hosts_by_rack); + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future < > (); + }).get(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); + } - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tablet_metadata tmeta; - - int i = 0; - for (auto h : hosts) { - auto rack = racks[++i % racks.size()]; - hosts_by_rack[rack.rack].push_back(h); - tm.update_topology(h, rack, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(float(i) / hosts.size()))}, h); - testlog.debug("adding host {}, rack {}, token {}", h, rack.rack, token(tests::d2t(1. / hosts.size()))); - } - - tablet_map tmap(initial_tablets); - locator::resize_decision decision; - // leaves growing mode, allowing for merge decision. - decision.sequence_number = decision.next_sequence_number(); - tmap.set_resize_decision(std::move(decision)); - set_tablets(tm, tmap, racks, hosts_by_rack); - tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); - }).get(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); }; auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { - rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + rebalance_tablets(e, make_lw_shared(std::move(load_stats))); }; const uint64_t target_tablet_size = service::default_target_tablet_size; @@ -3055,57 +2789,29 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + topo.add_node(node_state::normal, 2); + topo.add_node(node_state::normal, 2); - auto table1 = add_table(e).get(); + const size_t initial_tablets = 2; + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, initial_tablets); + auto table1 = add_table(e, ks_name).get(); - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); - - tablet_map tmap(2); - for (auto tid : tmap.tablet_ids()) { - tmap.set_tablet(tid, tablet_info { - tablet_replica_set { - tablet_replica {host1, tests::random::get_int(0, shard_count - 1)}, - tablet_replica {host2, tests::random::get_int(0, shard_count - 1)}, - } - }); - } - tablet_metadata tmeta; - tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); - }).get(); + auto& stm = e.shared_token_metadata().local(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); }; + auto resize_decision = [&] { return stm.get()->tablets().get_tablet_map(table1).resize_decision(); }; auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { - rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + rebalance_tablets(e, make_lw_shared(std::move(load_stats))); }; - const size_t initial_tablets = tablet_count(); const uint64_t max_tablet_size = service::default_target_tablet_size * 2; auto to_size_in_bytes = [&] (double max_tablet_size_pctg) -> uint64_t { return (max_tablet_size * max_tablet_size_pctg) * tablet_count();