diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 1f9d3d1274..3fd6a23033 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -12,6 +12,8 @@ #undef SEASTAR_TESTING_MAIN #include #include "test/lib/random_utils.hh" +#include "service/topology_mutation.hh" +#include "service/storage_service.hh" #include #include #include "test/lib/cql_test_env.hh" @@ -19,6 +21,7 @@ #include "test/lib/simple_schema.hh" #include "test/lib/key_utils.hh" #include "test/lib/test_utils.hh" +#include "test/lib/topology_builder.hh" #include "db/config.hh" #include "db/schema_tables.hh" #include "schema/schema_builder.hh" @@ -38,6 +41,7 @@ #include "service/topology_state_machine.hh" #include +#include BOOST_AUTO_TEST_SUITE(tablets_test) @@ -51,13 +55,6 @@ static api::timestamp_type current_timestamp(cql_test_env& e) { return utils::UUID_gen::micros_timestamp(e.get_system_keyspace().local().get_last_group0_state_id().get()) + 1; } -static utils::UUID next_uuid() { - static uint64_t counter = 1; - return utils::UUID_gen::get_time_UUID(std::chrono::system_clock::time_point( - std::chrono::duration_cast( - std::chrono::seconds(counter++)))); -} - static void verify_tablet_metadata_persistence(cql_test_env& env, const tablet_metadata& tm, api::timestamp_type& ts) { save_tablet_metadata(env.local_db(), tm, ts++).get(); @@ -109,6 +106,37 @@ future add_table(cql_test_env& e, sstring test_ks_name = "") { co_return id; } +// Run in a seastar thread +static +sstring add_keyspace(cql_test_env& e, std::unordered_map dc_rf, int initial_tablets = 0) { + static std::atomic ks_id = 0; + auto ks_name = fmt::format("keyspace{}", ks_id.fetch_add(1)); + sstring rf_options; + for (auto& [dc, rf] : dc_rf) { + rf_options += format(", '{}': {}", dc, rf); + } + e.execute_cql(fmt::format("create keyspace {} with replication = {{'class': 'NetworkTopologyStrategy'{}}}" + " and tablets = {{'enabled': true, 'initial': {}}}", + ks_name, rf_options, initial_tablets)).get(); + return ks_name; +} + +// Run in a seastar thread +void mutate_tablets(cql_test_env& e, const group0_guard& guard, seastar::noncopyable_function(tablet_metadata&)> mutator) { + auto& stm = e.shared_token_metadata().local(); + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { + return mutator(tm.tablets()); + }).get(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); +} + +// Run in a seastar thread +void mutate_tablets(cql_test_env& e, seastar::noncopyable_function(tablet_metadata&)> mutator) { + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + mutate_tablets(e, guard, std::move(mutator)); +} + SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); @@ -1370,7 +1398,11 @@ void apply_resize_plan(token_metadata& tm, const migration_plan& plan) { } static -future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& stm, const migration_plan& plan) { +future<> handle_resize_finalize(cql_test_env& e, group0_guard& guard, const migration_plan& plan) { + auto& talloc = e.get_tablet_allocator().local(); + auto& stm = e.shared_token_metadata().local(); + bool changed = false; + for (auto table_id : plan.resize_plan().finalize_resize) { auto tm = stm.get(); const auto& old_tmap = tm->tablets().get_tablet_map(table_id); @@ -1380,11 +1412,23 @@ future<> handle_resize_finalize(tablet_allocator& talloc, shared_token_metadata& new_resize_decision.sequence_number = old_tmap.resize_decision().next_sequence_number(); new_tmap.set_resize_decision(std::move(new_resize_decision)); - co_await stm.mutate_token_metadata([table_id, &new_tmap] (token_metadata& tm) { + co_await stm.mutate_token_metadata([table_id, &new_tmap, &changed] (token_metadata& tm) { + changed = true; tm.tablets().set_tablet_map(table_id, std::move(new_tmap)); return make_ready_future<>(); }); } + + if (changed) { + // Need to reload on each resize because table object expects tablet count to change by a factor of 2. + co_await save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()); + co_await e.get_storage_service().local().load_tablet_metadata({}); + + // Need a new guard to make sure later changes use later timestamp. + release_guard(std::move(guard)); + abort_source as; + guard = co_await e.get_raft_group0_client().start_operation(as); + } } // Reflects the plan in a given token metadata as if the migrations were fully executed. @@ -1429,11 +1473,15 @@ static void check_tablet_invariants(const tablet_metadata& tmeta); static -void rebalance_tablets(tablet_allocator& talloc, - shared_token_metadata& stm, - locator::load_stats_ptr load_stats = {}, - std::unordered_set skiplist = {}, - std::function stop = nullptr) { +void do_rebalance_tablets(cql_test_env& e, + group0_guard& guard, + locator::load_stats_ptr load_stats = {}, + std::unordered_set skiplist = {}, + std::function stop = nullptr) +{ + auto& talloc = e.get_tablet_allocator().local(); + auto& stm = e.shared_token_metadata().local(); + // Sanity limit to avoid infinite loops. // The x10 factor is arbitrary, it's there to account for more complex schedules than direct migration. auto max_iterations = 1 + get_tablet_count(stm.get()->tablets()) * 10; @@ -1450,11 +1498,38 @@ void rebalance_tablets(tablet_allocator& talloc, apply_plan(tm, plan); return make_ready_future<>(); }).get(); - handle_resize_finalize(talloc, stm, plan).get(); + handle_resize_finalize(e, guard, plan).get(); } throw std::runtime_error("rebalance_tablets(): convergence not reached within limit"); } +// Invokes the tablet scheduler and executes its plan, continuously until it emits an empty plan. +// Simulates topology coordinator but doesn't perform actual migration, +// only reflects it in the metadata. +// Run in a seastar thread. +void rebalance_tablets(cql_test_env& e, + load_stats_ptr load_stats = nullptr, + std::unordered_set skiplist = {}, + std::function stop = nullptr) { + abort_source as; + testlog.debug("rebalance_tablets(): start"); + + auto guard = e.get_raft_group0_client().start_operation(as).get(); + testlog.debug("rebalance_tablets(): took group0 guard"); + + do_rebalance_tablets(e, guard, std::move(load_stats), std::move(skiplist), std::move(stop)); + testlog.debug("rebalance_tablets(): rebalanced"); + + // We should not introduce inconsistency between on-disk state and in-memory state + // as that may violate invariants and cause failures in later operations + // causing test flakiness. + auto& stm = e.shared_token_metadata().local(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); + e.get_storage_service().local().load_tablet_metadata({}).get(); + + testlog.debug("rebalance_tablets(): done"); +} + static void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_metadata& stm) { while (true) { @@ -1491,36 +1566,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { do_with_cql_env_thread([] (auto& e) { // Tests the scenario of bootstrapping a single node // Verifies that load balancer sees it and moves tablets to that node. - - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); + topology_builder topo(e); unsigned shard_count = 2; + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1550,11 +1606,11 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + + auto& stm = e.shared_token_metadata().local(); // Sanity check { @@ -1568,7 +1624,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); { load_sketch load(stm.get()); @@ -1590,32 +1646,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { // Tests the scenario of balacning cluster with DOWN node // Verifies that load balancer doesn't moves tablets to that node. - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - unsigned shard_count = 2; - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); - stm.mutate_token_metadata([&] (token_metadata& tm) { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1645,11 +1686,11 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); return make_ready_future<>(); - }).get(); + }); + + auto& stm = e.shared_token_metadata().local(); // Sanity check { @@ -1663,7 +1704,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, {host3}); + rebalance_tablets(e, {}, {host3}); { load_sketch load(stm.get()); @@ -1678,36 +1719,17 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication factor of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + unsigned shard_count = 2; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::decommissioning, shard_count); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 2; - - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1737,13 +1759,13 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { tablet_replica {host3, 1}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -1753,12 +1775,9 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); } - stm.mutate_token_metadata([&](token_metadata& tm) { - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::left); - return make_ready_future<>(); - }).get(); + topo.set_node_state(host3, node_state::left); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); { load_sketch load(stm.get()); @@ -1778,49 +1797,18 @@ SEASTAR_THREAD_TEST_CASE(test_table_creation_during_decommission) { auto cfg = tablet_cql_test_config(); cfg.db_config->tablets_initial_scale_factor(1); do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); + topo.add_node(node_state::normal); + topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::decommissioning); + auto host4 = topo.add_node(node_state::left); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); - locator::endpoint_dc_rack dcrack = { "datacenter1", "rack1" }; + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}); + auto table1 = add_table(e, ks_name).get(); + auto s = e.local_db().find_schema(table1); - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = dcrack - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, dcrack, node::state::normal, 1); - tm.update_topology(host2, dcrack, node::state::normal, 1); - tm.update_topology(host3, dcrack, node::state::being_decommissioned, 16); - tm.update_topology(host4, dcrack, node::state::left, 16); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); - co_return; - }).get(); - - sstring ks_name = "test_ks"; - sstring table_name = "table1"; - e.execute_cql(format("create keyspace {} with replication = " - "{{'class': 'NetworkTopologyStrategy', '{}': 1}} " - "and tablets = {{'enabled': true}}", ks_name, dcrack.dc)).get(); - e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get(); - auto s = e.local_db().find_schema(ks_name, table_name); - - auto* rs = e.local_db().find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware(); - BOOST_REQUIRE(rs); - auto tmap = rs->allocate_tablets_for_new_table(s, stm.get(), 1).get(); + auto& stm = e.shared_token_metadata().local(); + auto& tmap = stm.get()->tablets().get_tablet_map(table1); // Verify we do not treat leaving nodes as having capacity. BOOST_REQUIRE_EQUAL(tmap.tablet_count(), 2); @@ -1840,54 +1828,20 @@ SEASTAR_THREAD_TEST_CASE(test_table_creation_during_rack_decommission) { // The problematic scenario happens when allocating tablets for a new table // when there is a rack with only non-normal nodes. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); + topo.add_node(); + topo.add_node(); + topo.start_new_rack(); + auto host3 = topo.add_node(node_state::decommissioning); + auto host4 = topo.add_node(node_state::left); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 8); + auto table1 = add_table(e, ks_name).get(); - auto dc = "datacenter1"; - locator::endpoint_dc_rack dcrack = { dc, "rack1" }; - locator::endpoint_dc_rack dcrack2 = { dc, "rack2" }; + rebalance_tablets(e); - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = dcrack - } - }); - - const unsigned shard_count = 1; - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, dcrack, node::state::normal, shard_count); - tm.update_topology(host2, dcrack, node::state::normal, shard_count); - tm.update_topology(host3, dcrack2, node::state::being_decommissioned, shard_count); - tm.update_topology(host4, dcrack2, node::state::left, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); - co_return; - }).get(); - - sstring ks_name = "test_ks"; - sstring table_name = "table1"; - e.execute_cql(format("create keyspace {} with replication = " - "{{'class': 'NetworkTopologyStrategy', '{}': 1}} " - "and tablets = {{'enabled': true, 'initial': 8}}", ks_name, dcrack.dc)).get(); - e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get(); - auto s = e.local_db().find_schema(ks_name, table_name); - - auto* rs = e.local_db().find_keyspace(ks_name).get_replication_strategy().maybe_as_tablet_aware(); - BOOST_REQUIRE(rs); - auto tmap = rs->allocate_tablets_for_new_table(s, stm.get(), 8).get(); + auto& stm = e.shared_token_metadata().local(); + auto& tmap = stm.get()->tablets().get_tablet_map(table1); tmap.for_each_tablet([&](auto tid, auto& tinfo) { for (auto& replica : tinfo.replicas) { @@ -1903,45 +1857,21 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication constraints of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + std::vector racks; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + topology_builder topo(e); + racks.push_back(topo.rack()); + auto host1 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); + topo.start_new_rack(); + racks.push_back(topo.rack()); + auto host2 = topo.add_node(node_state::normal); + auto host4 = topo.add_node(node_state::decommissioning); - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } - }; - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = racks[0] - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 1; - - tm.update_topology(host1, racks[0], node::state::normal, shard_count); - tm.update_topology(host2, racks[1], node::state::normal, shard_count); - tm.update_topology(host3, racks[0], node::state::normal, shard_count); - tm.update_topology(host4, racks[1], node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -1971,13 +1901,13 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { tablet_replica {host2, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2006,45 +1936,21 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that it is impossible to distribute replicas without violating rack uniqueness. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + std::vector racks; - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + topology_builder topo(e); + racks.push_back(topo.rack()); + auto host1 = topo.add_node(node_state::normal); + auto host2 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); + topo.start_new_rack(); + racks.push_back(topo.rack()); + auto host4 = topo.add_node(node_state::decommissioning); - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } - }; - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = racks[0] - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 1; - - tm.update_topology(host1, racks[0], node::state::normal, shard_count); - tm.update_topology(host2, racks[0], node::state::normal, shard_count); - tm.update_topology(host3, racks[0], node::state::normal, shard_count); - tm.update_topology(host4, racks[1], node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2074,13 +1980,11 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { tablet_replica {host4, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e), std::runtime_error); }).get(); } @@ -2088,36 +1992,15 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication factor of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 2); + auto host2 = topo.add_node(node_state::normal, 2); + auto host3 = topo.add_node(node_state::decommissioning, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - const unsigned shard_count = 2; - - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, - shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2127,13 +2010,11 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { tablet_replica {host3, 0}, } }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e), std::runtime_error); }).get(); } @@ -2146,33 +2027,15 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) // which when executed will achieve perfect balance, // which is a proof that it doesn't stop due to active migrations. - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 1); + auto host2 = topo.add_node(node_state::normal, 1); + auto host3 = topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); std::optional tid = tmap.first_tablet(); for (int i = 0; i < 4; ++i) { @@ -2193,11 +2056,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) }, tablet_replica {host3, 0} }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); rebalance_tablets_as_in_progress(e.get_tablet_allocator().local(), stm); execute_transitions(stm); @@ -2211,39 +2076,25 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(h), 2); } } + + // Restore consistency between stm and system tables before releasing group0 guard. + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); }).get(); } #ifdef SCYLLA_ENABLE_ERROR_INJECTION SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::normal, 1); + auto host2 = topo.add_node(node_state::normal, 1); + topo.add_node(node_state::normal, 2); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); std::optional tid = tmap.first_tablet(); for (int i = 0; i < 4; ++i) { @@ -2255,14 +2106,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { }); tid = tmap.next_tablet(*tid); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + auto& stm = e.shared_token_metadata().local(); BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get()).get().empty()); utils::get_local_injector().enable("tablet_allocator_shuffle"); @@ -2277,39 +2127,18 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); - inet_address ip4("192.168.0.4"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); - auto host4 = host_id(next_uuid()); + const auto shard_count = 2; + auto host1 = topo.add_node(node_state::normal, shard_count); + auto host2 = topo.add_node(node_state::normal, shard_count); + auto host3 = topo.add_node(node_state::normal, shard_count); + auto host4 = topo.add_node(node_state::normal, shard_count); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host4, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 4))}, host3); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(4. / 4))}, host4); + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 16); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2319,13 +2148,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2342,33 +2171,16 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { do_with_cql_env_thread([](auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::decommissioning, 8); + auto host2 = topo.add_node(node_state::normal, 1); + auto host3 = topo.add_node(node_state::normal, 7); - auto table1 = table_id(next_uuid()); - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned, 8); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, 1); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::normal, 7); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(1. / 4))}, host1); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(2. / 4))}, host2); - co_await tm.update_normal_tokens(std::unordered_set {token(tests::d2t(3. / 4))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid: tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2377,17 +2189,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); auto until_nodes_drained = [] (const migration_plan& plan) { return !plan.has_nodes_to_drain(); }; - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, {}, until_nodes_drained); + rebalance_tablets(e, {}, {}, until_nodes_drained); + + auto& stm = e.shared_token_metadata().local(); { load_sketch load(stm.get()); @@ -2404,33 +2216,20 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::normal, 2); + topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); // host1 is loaded and host2 is empty, resulting in an imbalance. // host1's shard 0 is loaded and shard 1 is empty, resulting in intra-node imbalance. - stm.mutate_token_metadata([&] (auto& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); - + mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2439,11 +2238,9 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get(); @@ -2496,31 +2293,18 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(node_state::removing, 2); + topo.add_node(node_state::normal, 2); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + auto& stm = e.shared_token_metadata().local(); + mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { @@ -2529,11 +2313,9 @@ SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { } }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); migration_plan plan = e.get_tablet_allocator().local().balance_tablets(stm.get()).get(); BOOST_REQUIRE(plan.has_nodes_to_drain()); @@ -2545,42 +2327,27 @@ SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { SEASTAR_THREAD_TEST_CASE(test_plan_fails_when_removing_last_replica) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); + auto host1 = topo.add_node(); - auto host1 = host_id(next_uuid()); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); + auto table1 = add_table(e, ks_name).get(); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 1; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); + topo.set_node_state(host1, node_state::removing); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); } - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); std::unordered_set skiplist = {host1}; - BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, skiplist), std::runtime_error); + BOOST_REQUIRE_THROW(rebalance_tablets(e, {}, skiplist), std::runtime_error); }).get(); } @@ -2593,35 +2360,16 @@ SEASTAR_THREAD_TEST_CASE(test_skiplist_is_ignored_when_draining) { // all the tablets of the drained node, doubling its load. // It's safer to let the drain fail/stall. do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); - inet_address ip3("192.168.0.3"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); - auto host3 = host_id(next_uuid()); + auto host1 = topo.add_node(node_state::removing); + auto host2 = topo.add_node(node_state::normal); + auto host3 = topo.add_node(node_state::normal); - auto table1 = table_id(next_uuid()); - - unsigned shard_count = 1; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, locator::node::state::being_removed, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, locator::node::state::normal, shard_count); - tm.update_topology(host3, locator::endpoint_dc_rack::default_location, locator::node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 3))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 3))}, host2); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(3. / 3))}, host3); + auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 2); + auto table1 = add_table(e, ks_name).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(2); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { @@ -2631,14 +2379,13 @@ SEASTAR_THREAD_TEST_CASE(test_skiplist_is_ignored_when_draining) { tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); - tablet_metadata tmeta; tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); co_return; - }).get(); + }); + auto& stm = e.shared_token_metadata().local(); std::unordered_set skiplist = {host2}; - rebalance_tablets(e.get_tablet_allocator().local(), stm, {}, skiplist); + rebalance_tablets(e, {}, skiplist); { load_sketch load(stm.get()); @@ -2692,59 +2439,48 @@ allocate_replicas_in_racks(const std::vector& racks, int rf, SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { do_with_cql_env_thread([] (auto& e) { + topology_builder topo(e); const int n_hosts = 6; + auto shard_count = 2; std::vector hosts; - for (int i = 0; i < n_hosts; ++i) { - hosts.push_back(host_id(next_uuid())); - } + std::unordered_map> hosts_by_rack; - std::vector racks = { - endpoint_dc_rack{ "dc1", "rack-1" }, - endpoint_dc_rack{ "dc1", "rack-2" } + std::vector racks { + topo.rack(), + topo.start_new_rack(), }; + for (int i = 0; i < n_hosts; ++i) { + auto rack = racks[(i + 1) % racks.size()]; + auto h = topo.add_node(node_state::normal, shard_count, rack); + if (i) { + // Leave the first host empty by making it invisible to allocation algorithm. + hosts_by_rack[rack.rack].push_back(h); + } + } + + auto& stm = e.shared_token_metadata().local(); + for (int i = 0; i < 13; ++i) { - std::unordered_map> hosts_by_rack; - - semaphore sem(1); - shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { - locator::topology::config { - .this_endpoint = inet_address("192.168.0.1"), - .this_host_id = hosts[0], - .local_dc_rack = racks[1] - } - }); - size_t total_tablet_count = 0; - stm.mutate_token_metadata([&](token_metadata& tm) { - tablet_metadata tmeta; - - int i = 0; - for (auto h : hosts) { - auto shard_count = 2; - auto rack = racks[++i % racks.size()]; - tm.update_topology(h, rack, node::state::normal, shard_count); - if (h != hosts[0]) { - // Leave the first host empty by making it invisible to allocation algorithm. - hosts_by_rack[rack.rack].push_back(h); - } + std::vector keyspaces; + size_t tablet_count_bits = 8; + int rf = tests::random::get_int(2, 4); + for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) { + if (tests::random::get_bool()) { + continue; } - - size_t tablet_count_bits = 8; - int rf = tests::random::get_int(2, 4); - for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) { - if (tests::random::get_bool()) { - continue; - } - auto table = table_id(next_uuid()); - tablet_map tmap(1 << log2_tablets); + auto initial_tablets = 1 << log2_tablets; + keyspaces.push_back(add_keyspace(e, {{topo.dc(), rf}}, initial_tablets)); + auto table = add_table(e, keyspaces.back()).get(); + mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { + tablet_map tmap(initial_tablets); for (auto tid : tmap.tablet_ids()) { // Choose replicas randomly while loading racks evenly. std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); tablet_replica_set replicas; for (auto h : replica_hosts) { - auto shard_count = tm.get_topology().find_node(h)->get_shard_count(); auto shard = tests::random::get_int(0, shard_count - 1); replicas.push_back(tablet_replica {h, shard}); } @@ -2752,17 +2488,16 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { } total_tablet_count += tmap.tablet_count(); tmeta.set_tablet_map(table, std::move(tmap)); - } - tm.set_tablets(std::move(tmeta)); - return make_ready_future<>(); - }).get(); + return make_ready_future<>(); + }); + } testlog.debug("tablet metadata: {}", stm.get()->tablets()); testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size()); check_tablet_invariants(stm.get()->tablets()); - rebalance_tablets(e.get_tablet_allocator().local(), stm); + rebalance_tablets(e); check_tablet_invariants(stm.get()->tablets()); @@ -2788,6 +2523,10 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { // Uncomment the following line when the algorithm is improved. // BOOST_REQUIRE(min_max_load.max() - min_max_load.min() <= 1); } + + seastar::parallel_for_each(keyspaces, [&] (const sstring& ks) { + return e.execute_cql(fmt::format("DROP KEYSPACE {}", ks)).discard_result(); + }).get(); } }).get(); } @@ -2863,57 +2602,52 @@ using hosts_by_rack_map = std::unordered_map>; static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n_racks, const int rf, const int n_hosts, const unsigned shard_count, const unsigned initial_tablets, std::function set_tablets) { + topology_builder topo(e); + rack_vector racks; for (int i = 0; i < n_racks; i++) { - racks.push_back(endpoint_dc_rack{"dc1", format("rack-{}", i + 1)}); + racks.push_back(topo.rack()); + topo.start_new_rack(); } testlog.info("merge colocation test - hosts={}, racks={}, rf={}, shard_count={}, initial_tablets={}", n_hosts, racks.size(), rf, shard_count, initial_tablets); - std::vector hosts; + hosts_by_rack_map hosts_by_rack; + for (int i = 0; i < n_hosts; ++i) { - hosts.push_back(host_id(next_uuid())); + auto rack = racks[i % racks.size()]; + auto h = topo.add_node(node_state::normal, shard_count, rack); + hosts_by_rack[rack.rack].push_back(h); } - auto table1 = add_table(e).get(); + auto ks_name = add_keyspace(e, {{topo.dc(), rf}}, initial_tablets); + auto table1 = add_table(e, ks_name).get(); + auto& stm = e.shared_token_metadata().local(); - hosts_by_rack_map hosts_by_rack; - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = inet_address("192.168.0.1"), - .this_host_id = hosts[0], - .local_dc_rack = racks[std::min(1, n_racks - 1)] - } - }); + { + abort_source as; + auto guard = e.get_raft_group0_client().start_operation(as).get(); + stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { + tablet_metadata& tmeta = tm.tablets(); + tablet_map tmap(initial_tablets); + locator::resize_decision decision; + // leaves growing mode, allowing for merge decision. + decision.sequence_number = decision.next_sequence_number(); + tmap.set_resize_decision(std::move(decision)); + set_tablets(tm, tmap, racks, hosts_by_rack); + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future < > (); + }).get(); + save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); + } - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tablet_metadata tmeta; - - int i = 0; - for (auto h : hosts) { - auto rack = racks[++i % racks.size()]; - hosts_by_rack[rack.rack].push_back(h); - tm.update_topology(h, rack, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(float(i) / hosts.size()))}, h); - testlog.debug("adding host {}, rack {}, token {}", h, rack.rack, token(tests::d2t(1. / hosts.size()))); - } - - tablet_map tmap(initial_tablets); - locator::resize_decision decision; - // leaves growing mode, allowing for merge decision. - decision.sequence_number = decision.next_sequence_number(); - tmap.set_resize_decision(std::move(decision)); - set_tablets(tm, tmap, racks, hosts_by_rack); - tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); - }).get(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); }; auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { - rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + rebalance_tablets(e, make_lw_shared(std::move(load_stats))); }; const uint64_t target_tablet_size = service::default_target_tablet_size; @@ -3055,57 +2789,29 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { do_with_cql_env_thread([] (auto& e) { - inet_address ip1("192.168.0.1"); - inet_address ip2("192.168.0.2"); + topology_builder topo(e); - auto host1 = host_id(next_uuid()); - auto host2 = host_id(next_uuid()); + topo.add_node(node_state::normal, 2); + topo.add_node(node_state::normal, 2); - auto table1 = add_table(e).get(); + const size_t initial_tablets = 2; + auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, initial_tablets); + auto table1 = add_table(e, ks_name).get(); - unsigned shard_count = 2; - - semaphore sem(1); - shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ - locator::topology::config{ - .this_endpoint = ip1, - .this_host_id = host1, - .local_dc_rack = locator::endpoint_dc_rack::default_location - } - }); - - stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { - tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - tm.update_topology(host2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(1. / 2))}, host1); - co_await tm.update_normal_tokens(std::unordered_set{token(tests::d2t(2. / 2))}, host2); - - tablet_map tmap(2); - for (auto tid : tmap.tablet_ids()) { - tmap.set_tablet(tid, tablet_info { - tablet_replica_set { - tablet_replica {host1, tests::random::get_int(0, shard_count - 1)}, - tablet_replica {host2, tests::random::get_int(0, shard_count - 1)}, - } - }); - } - tablet_metadata tmeta; - tmeta.set_tablet_map(table1, std::move(tmap)); - tm.set_tablets(std::move(tmeta)); - }).get(); + auto& stm = e.shared_token_metadata().local(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); }; + auto resize_decision = [&] { return stm.get()->tablets().get_tablet_map(table1).resize_decision(); }; auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { - rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + rebalance_tablets(e, make_lw_shared(std::move(load_stats))); }; - const size_t initial_tablets = tablet_count(); const uint64_t max_tablet_size = service::default_target_tablet_size * 2; auto to_size_in_bytes = [&] (double max_tablet_size_pctg) -> uint64_t { return (max_tablet_size * max_tablet_size_pctg) * tablet_count();