diff --git a/locator/load_sketch.hh b/locator/load_sketch.hh index 47da1adc80..850247233a 100644 --- a/locator/load_sketch.hh +++ b/locator/load_sketch.hh @@ -12,6 +12,7 @@ #include "locator/token_metadata.hh" #include "locator/tablets.hh" #include "utils/stall_free.hh" +#include "utils/div_ceil.hh" #include #include @@ -45,6 +46,14 @@ class load_sketch { s.load = 0; } } + + uint64_t load() const { + uint64_t result = 0; + for (auto&& s : _shards) { + result += s.load; + } + return result; + } }; std::unordered_map _nodes; token_metadata_ptr _tm; @@ -95,6 +104,21 @@ public: std::push_heap(n._shards.begin(), n._shards.end(), shard_load_cmp()); return shard; } + + uint64_t get_load(host_id node) const { + if (!_nodes.contains(node)) { + return 0; + } + return _nodes.at(node).load(); + } + + uint64_t get_avg_shard_load(host_id node) const { + if (!_nodes.contains(node)) { + return 0; + } + auto& n = _nodes.at(node); + return div_ceil(n.load(), n._shards.size()); + } }; } // namespace locator diff --git a/locator/tablets.hh b/locator/tablets.hh index b2029bcf26..b40df63b59 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -80,6 +80,22 @@ std::ostream& operator<<(std::ostream&, const tablet_replica&); using tablet_replica_set = utils::small_vector; +/// Creates a new replica set with old_replica replaced by new_replica. +/// If there is no old_replica, the set is returned unchanged. +inline +tablet_replica_set replace_replica(const tablet_replica_set& rs, tablet_replica old_replica, tablet_replica new_replica) { + tablet_replica_set result; + result.reserve(rs.size()); + for (auto&& r : rs) { + if (r == old_replica) { + result.push_back(new_replica); + } else { + result.push_back(r); + } + } + return result; +} + /// Stores information about a single tablet. struct tablet_info { tablet_replica_set replicas; diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index a0af4746e3..3b7359b574 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -12,12 +12,426 @@ #include "replica/database.hh" #include "service/migration_manager.hh" #include "service/tablet_allocator.hh" +#include "utils/stall_free.hh" +#include "locator/load_sketch.hh" +#include "utils/div_ceil.hh" using namespace locator; using namespace replica; namespace service { +seastar::logger lblogger("load_balancer"); + +/// The algorithm aims to equalize tablet count on each shard. +/// This goal is based on the assumption that every shard has similar processing power and space capacity, +/// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we +/// equalize resource utilization. +/// +/// The algorithm produces a migration plan which is a set of instructions about which tablets to move +/// where. The plan is a small increment, not a complete plan. To achieve balance, the algorithm should +/// be invoked iteratively until an empty plan is returned. +/// +/// The algorithm keeps track of load at two levels, per node and per shard. The reason for this is that +/// we want to equalize the per-node score first, by moving tablets across nodes. Tablets are moved away +/// from the most loaded node first. We also track load per shard, so that we move tablets from the most +/// loaded shard on a given node first. +/// +/// The metric for node load is (number of tablets / shard count) which is the average +/// per-shard load. If we achieve balance according to this metric, and then rebalance the nodes internally, +/// we will achieve global balance on all shards in the cluster. +/// +/// The reason why we focus on nodes first before rebalancing them internally is that this results +/// in less tablet movements than looking at shards only. +/// +/// It would be still beneficial to rebalance tablet-receiving nodes internally before moving tablets +/// to them so that we can distribute load equally without overloading shards which are out of balance, +/// but this is not implemented yet. +/// +/// The outline of the algorithm is as follows: +/// +/// 1. Determine the set of nodes whose load should be balanced. +/// 2. Pick the least-loaded node (target) +/// 3. Keep moving tablets to the target until balance is achieved with the highest-loaded node, +/// or we hit a limit for plan size: +/// 3.1. Pick the most-loaded node (source) +/// 3.2. Pick the most-loaded shard on the source +/// 3.3. Pick one of the candidate tablets on the source shard +/// 3.4. Evaluate collocation constraints for tablet replicas, if they pass: +/// 3.4.1 Pick the least-loaded shard on the target +/// 3.4.2 Generate a migration for the candidate tablet from the source shard to the target shard +/// +/// Even though the algorithm focuses on a single target, the fact the the produced plan is just an increment +/// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer +/// will alternate between them across make_plan() calls. +/// +/// The cost of make_plan() is relatively heavy in terms of preparing data structures, so the current +/// implementation is not efficient if the scheduler would like to call make_plan() multiple times +/// to parallelize execution. This will be addressed in the future by keeping the data structures +/// valid across calls and only recalculating them when starting a new round with a new token metadata version. +/// +class load_balancer { + using global_shard_id = tablet_replica; + using shard_id = seastar::shard_id; + + // Represents metric for per-node load which we want to equalize between nodes. + // It's an average per-shard load in terms of tablet count. + using load_type = double; + + struct shard_load { + size_t tablet_count; + + // Tablets which still have a replica on this shard which are candidates for migrating away from this shard. + std::unordered_set candidates; + + future<> clear_gently() { + return utils::clear_gently(candidates); + } + }; + + struct node_load { + host_id node; + uint64_t shard_count = 0; + uint64_t tablet_count = 0; + + // The average shard load on this node. + load_type avg_load = 0; + + std::vector shards_by_load; // heap which tracks most-loaded shards using shards_by_load_cmp(). + std::vector shards; // Indexed by shard_id to which a given shard_load corresponds. + + // Call when tablet_count changes. + void update() { + avg_load = get_avg_load(tablet_count); + } + + load_type get_avg_load(uint64_t tablets) const { + return double(tablets) / shard_count; + } + + auto shards_by_load_cmp() { + return [this] (const auto& a, const auto& b) { + return shards[a].tablet_count < shards[b].tablet_count; + }; + } + + future<> clear_gently() { + return utils::clear_gently(shards); + } + }; + + token_metadata_ptr _tm; +public: + load_balancer(token_metadata_ptr tm) + : _tm(std::move(tm)) { + } + + future make_plan() { + const locator::topology& topo = _tm->get_topology(); + migration_plan plan; + + // Prepare plans for each DC separately and combine them to be executed in parallel. + for (auto&& dc : topo.get_datacenters()) { + auto dc_plan = co_await make_plan(dc); + lblogger.info("Prepared {} migrations in DC {}", dc_plan.size(), dc); + std::move(dc_plan.begin(), dc_plan.end(), std::back_inserter(plan)); + } + + lblogger.info("Prepared {} migrations", plan.size()); + co_return std::move(plan); + } + + future make_plan(sstring dc) { + lblogger.info("Examining DC {}", dc); + + const locator::topology& topo = _tm->get_topology(); + + // Select subset of nodes to balance. + + std::unordered_map nodes; + topo.for_each_node([&] (const locator::node* node_ptr) { + if (node_ptr->get_state() == locator::node::state::normal && node_ptr->dc_rack().dc == dc) { + node_load& load = nodes[node_ptr->host_id()]; + load.shard_count = node_ptr->get_shard_count(); + load.shards.resize(load.shard_count); + if (!load.shard_count) { + throw std::runtime_error(format("Shard count of {} not found in topology", node_ptr->host_id())); + } + } + }); + + // Compute tablet load on nodes. + + for (auto&& [table, tmap] : _tm->tablets().all_tables()) { + co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) { + for (auto&& replica : ti.replicas) { + if (nodes.contains(replica.host)) { + nodes[replica.host].tablet_count += 1; + // This invariant is assumed later. + if (replica.shard >= nodes[replica.host].shard_count) { + auto gtid = global_tablet_id{table, tid}; + on_internal_error(lblogger, format("Tablet {} replica {} targets non-existent shard", gtid, replica)); + } + } + } + }); + } + + // Compute load imbalance. + + load_type max_load = 0; + load_type min_load = 0; + std::optional min_load_node = std::nullopt; + for (auto&& [host, load] : nodes) { + load.update(); + if (!min_load_node || load.avg_load < min_load) { + min_load = load.avg_load; + min_load_node = host; + } + if (load.avg_load > max_load) { + max_load = load.avg_load; + } + } + + if (max_load == min_load) { + // load is balanced. + // TODO: Evaluate and fix intra-node balance. + co_return migration_plan(); + } + + for (auto&& [host, load] : nodes) { + lblogger.info("Node {}: rack={} avg_load={}, tablets={}, shards={}", + host, topo.find_node(host)->dc_rack().rack, load.avg_load, load.tablet_count, load.shard_count); + } + lblogger.info("target node: {}, avg_load: {}, max: {}", *min_load_node, min_load, max_load); + auto target = *min_load_node; + + // We want to saturate the target node so we migrate several tablets in parallel, one for each shard + // on the target node. This assumes that the target node is well-balanced and that tablet migrations + // complete at the same time. Both assumptions are not generally true in practice, which we currently ignore. + // If target node is not balanced across shards, we will overload some shards. + // If tablets are not balanced in size, throughput will suffer because some shards will be idle sooner than others. + // + // FIXME: To handle the above, we should (1) rebalance the target node + // before migrating tablets from other nodes. If shards are balanced on the target node, the balancer + // will naturally distribute tablets to different shards. Also, (2) we should change this algorithm + // to be a generator for migrations and have a scheduler in the execution layer which pulls migrations + // from this algorithm, batches them and decides how many to execute. + // + // The scheduler decides in which order to execute the plan based on current activity in the system. + // We cannot just ask the planner for the next migration and stop when we hit overload on some shard, + // because that can lead to underutilization of the cluster. Just because the next migration is blocked + // by the target shard being busy doesn't mean we could not proceed with migrations for other shards + // which would be produced by the planner subsequently. + + auto target_node = topo.find_node(target); + auto batch_size = target_node->get_shard_count(); + + // Compute per-shard load and candidate tablets. + + for (auto&& [table, tmap] : _tm->tablets().all_tables()) { + if (!tmap.transitions().empty()) { + // FIXME: The algorithm doesn't support balancing with active transitions yet. They must finish first. + lblogger.warn("Pending transitions active."); + co_return migration_plan(); + } + + co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) { + for (auto&& replica : ti.replicas) { + if (!nodes.contains(replica.host)) { + continue; + } + auto& node_load_info = nodes[replica.host]; + auto&& shard_load_info = node_load_info.shards[replica.shard]; + if (shard_load_info.tablet_count == 0) { + node_load_info.shards_by_load.push_back(replica.shard); + } + shard_load_info.tablet_count += 1; + shard_load_info.candidates.emplace(global_tablet_id{table, tid}); + } + }); + } + + // Prepare candidate nodes and shards for heap-based balancing. + + // heap which tracks most-loaded nodes in terms of avg_load. + std::vector nodes_by_load; + nodes_by_load.reserve(nodes.size()); + auto nodes_cmp = [&] (const host_id& a, const host_id& b) { + return nodes[a].avg_load < nodes[b].avg_load; + }; + + for (auto&& [host, node_load] : nodes) { + if (lblogger.is_enabled(seastar::log_level::debug)) { + shard_id shard = 0; + for (auto&& shard_load : node_load.shards) { + lblogger.debug("shard {}: all tablets: {}, candidates: {}", tablet_replica{host, shard}, + shard_load.tablet_count, shard_load.candidates.size()); + shard++; + } + } + + nodes_by_load.push_back(host); + std::make_heap(node_load.shards_by_load.begin(), node_load.shards_by_load.end(), node_load.shards_by_load_cmp()); + } + + std::make_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp); + + locator::load_sketch target_load(_tm); + co_await target_load.populate(target); + migration_plan plan; + const tablet_metadata& tmeta = _tm->tablets(); + load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates. + auto& target_info = nodes[target]; + while (plan.size() < batch_size && !nodes_by_load.empty()) { + co_await coroutine::maybe_yield(); + + std::pop_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp); + auto src_host = nodes_by_load.back(); + auto& src_node_info = nodes[src_host]; + + // Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load) + // and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than + // that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set. + // This is ensured by the fact that we remove candidates in the order of avg_load from the heap, and + // because we prevent load inversion between candidate and target in the next check. + // So the max avg_load of candidates is that of the current src_node_info, and max avg_load of off-candidates + // is tracked in max_off_candidate_load. If max_off_candidate_load is equal to target's avg_load, + // it means that all nodes have equal avg_load. We take the maximum with the current candidate in src_node_info + // to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0. + if (std::max(max_off_candidate_load, src_node_info.avg_load) == target_info.avg_load) { + lblogger.debug("Balance achieved."); + break; + } + + // If balance is not achieved, still consider migrating from candidate nodes which have higher load than the target. + // max_off_candidate_load may be higher than the load of current candidate. + if (src_node_info.avg_load <= target_info.avg_load) { + lblogger.debug("No more candidate nodes."); + lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}", + src_host, src_node_info.avg_load, target_info.avg_load); + break; + } + + // Prevent load inversion which can lead to oscillations. + if (src_node_info.get_avg_load(nodes[src_host].tablet_count - 1) < + target_info.get_avg_load(target_info.tablet_count + 1)) { + lblogger.debug("No more candidate nodes, load would be inverted. Next candidate is {} with avg_load={}, target's avg_load={}", + src_host, src_node_info.avg_load, target_info.avg_load); + break; + } + + if (src_node_info.shards_by_load.empty()) { + lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining.", + src_host, src_node_info.tablet_count); + max_off_candidate_load = std::max(max_off_candidate_load, src_node_info.avg_load); + nodes_by_load.pop_back(); + continue; + } + auto push_back_node_candidate = seastar::defer([&] { + std::push_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp); + }); + + std::pop_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp()); + auto src_shard = src_node_info.shards_by_load.back(); + auto src = tablet_replica{src_host, src_shard}; + auto&& src_shard_info = src_node_info.shards[src_shard]; + if (src_shard_info.candidates.empty()) { + lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src, src_shard_info.tablet_count); + src_node_info.shards_by_load.pop_back(); + continue; + } + auto push_back_shard_candidate = seastar::defer([&] { + std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp()); + }); + + auto source_tablet = *src_shard_info.candidates.begin(); + src_shard_info.candidates.erase(source_tablet); + + // Check replication strategy constraints. + + auto same_rack = target_node->dc_rack().rack == topo.get_node(src.host).dc_rack().rack; + std::unordered_map rack_load; // Will be built if !same_rack + bool has_replica_on_target = false; + auto& tmap = tmeta.get_tablet_map(source_tablet.table); + for (auto&& r : tmap.get_tablet_info(source_tablet.tablet).replicas) { + if (r.host == target) { + has_replica_on_target = true; + break; + } + if (!same_rack) { + const locator::node& node = topo.get_node(r.host); + if (node.dc_rack().dc == dc) { + rack_load[node.dc_rack().rack] += 1; + } + } + } + + if (has_replica_on_target) { + lblogger.debug("candidate tablet {} skipped because it has a replica on target node", source_tablet); + continue; + } + + // Make sure we don't increase level of duplication of racks in the replica list. + if (!same_rack) { + auto max_rack_load = std::max_element(rack_load.begin(), rack_load.end(), + [] (auto& a, auto& b) { return a.second < b.second; })->second; + auto new_rack_load = rack_load[target_node->dc_rack().rack] + 1; + if (new_rack_load > max_rack_load) { + lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}", + source_tablet, target_node->dc_rack().rack, new_rack_load, max_rack_load); + continue; + } + } + + auto dst = global_shard_id {target, target_load.next_shard(target)}; + lblogger.debug("Select {} to move from {} to {}", source_tablet, src, dst); + plan.push_back(tablet_migration_info {source_tablet, src, dst}); + + target_info.tablet_count += 1; + target_info.update(); + + src_shard_info.tablet_count -= 1; + if (src_shard_info.tablet_count == 0) { + push_back_shard_candidate.cancel(); + src_node_info.shards_by_load.pop_back(); + } + + src_node_info.tablet_count -= 1; + src_node_info.update(); + if (src_node_info.tablet_count == 0) { + push_back_node_candidate.cancel(); + nodes_by_load.pop_back(); + } + } + + if (plan.empty()) { + // Due to replica collocation constraints, it may not be possible to balance the cluster evenly. + // For example, if nodes have different number of shards. Nodes which have more shards will be + // replicas for more tablets which rules out more candidates on other nodes with a higher per-shard load. + // + // Example: + // + // node1: 1 shard + // node2: 1 shard + // node3: 7 shard + // + // If there are 7 tablets and RF=3, each node must have 1 tablet replica. + // So node3 will have average load of 1, and node1 and node2 will have + // average shard load of 7. + lblogger.info("Not possible to achieve balance."); + } + + co_await utils::clear_gently(nodes); + co_return std::move(plan); + } +}; + +future balance_tablets(token_metadata_ptr tm) { + load_balancer lb(tm); + co_return co_await lb.make_plan(); +} + class tablet_allocator_impl : public tablet_allocator::impl , public service::migration_listener::empty_listener { service::migration_notifier& _migration_notifier; diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 31df1c1c3a..a6546de921 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -10,10 +10,39 @@ #include "replica/database.hh" #include "service/migration_manager.hh" +#include "locator/tablets.hh" #include namespace service { +/// Represents intention to move a single tablet replica from src to dst. +struct tablet_migration_info { + locator::global_tablet_id tablet; + locator::tablet_replica src; + locator::tablet_replica dst; +}; + +using migration_plan = utils::chunked_vector; + +/// Returns a tablet migration plan that aims to achieve better load balance in the whole cluster. +/// The plan is computed based on information in the given token_metadata snapshot +/// and thus should be executed and reflected, at least as pending tablet transitions, in token_metadata +/// before this is called again. +/// +/// For any given global_tablet_id there is at most one tablet_migration_info in the returned plan. +/// +/// To achieve full balance, do: +/// +/// while (true) { +/// auto plan = co_await balance_tablets(get_token_metadata()); +/// if (plan.empty()) { +/// break; +/// } +/// co_await execute(plan); +/// } +/// +future balance_tablets(locator::token_metadata_ptr); + class tablet_allocator_impl; class tablet_allocator { diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 3244bf148a..9a1aae0697 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -9,6 +9,7 @@ #include "test/lib/scylla_test_case.hh" +#include "test/lib/random_utils.hh" #include #include "test/lib/cql_test_env.hh" #include "test/lib/log.hh" @@ -17,15 +18,26 @@ #include "replica/tablets.hh" #include "locator/tablets.hh" +#include "service/tablet_allocator.hh" #include "locator/tablet_sharder.hh" +#include "locator/load_sketch.hh" #include "locator/tablet_replication_strategy.hh" #include "utils/fb_utilities.hh" +#include "utils/UUID_gen.hh" using namespace locator; using namespace replica; +using namespace service; static api::timestamp_type next_timestamp = api::new_timestamp(); +static utils::UUID next_uuid() { + static uint64_t counter = 1; + return utils::UUID_gen::get_time_UUID(std::chrono::system_clock::time_point( + std::chrono::duration_cast( + std::chrono::seconds(counter++)))); +} + static void verify_tablet_metadata_persistence(cql_test_env& env, const tablet_metadata& tm) { save_tablet_metadata(env.local_db(), tm, next_timestamp++).get(); @@ -431,3 +443,290 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) { } } } + +static +void apply_plan(token_metadata& tm, const migration_plan& plan) { + for (auto&& mig : plan) { + tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table); + auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); + tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst); + tmap.set_tablet(mig.tablet.tablet, tinfo); + } +} + +static +void rebalance_tablets(shared_token_metadata& stm) { + while (true) { + auto plan = balance_tablets(stm.get()).get0(); + if (plan.empty()) { + break; + } + stm.mutate_token_metadata([&] (token_metadata& tm) { + apply_plan(tm, plan); + return make_ready_future<>(); + }).get(); + } +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { + // Tests the scenario of bootstrapping a single node + // Verifies that load balancer sees it and moves tablets to that node. + + inet_address ip1("192.168.0.1"); + inet_address ip2("192.168.0.2"); + inet_address ip3("192.168.0.3"); + + auto host1 = host_id(next_uuid()); + auto host2 = host_id(next_uuid()); + auto host3 = host_id(next_uuid()); + + auto table1 = table_id(next_uuid()); + + unsigned shard_count = 2; + + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = ip1, + .local_dc_rack = locator::endpoint_dc_rack::default_location + } + }); + + stm.mutate_token_metadata([&] (auto& tm) { + tm.update_host_id(host1, ip1); + tm.update_host_id(host2, ip2); + tm.update_host_id(host3, ip3); + tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + + tablet_map tmap(4); + auto tid = tmap.first_tablet(); + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, 0}, + tablet_replica {host2, 1}, + } + }); + tid = *tmap.next_tablet(tid); + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, 0}, + tablet_replica {host2, 1}, + } + }); + tid = *tmap.next_tablet(tid); + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, 0}, + tablet_replica {host2, 0}, + } + }); + tid = *tmap.next_tablet(tid); + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, 1}, + tablet_replica {host2, 0}, + } + }); + tablet_metadata tmeta; + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future<>(); + }).get(); + + // Sanity check + { + load_sketch load(stm.get()); + load.populate().get(); + BOOST_REQUIRE_EQUAL(load.get_load(host1), 4); + BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host1), 2); + BOOST_REQUIRE_EQUAL(load.get_load(host2), 4); + BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host2), 2); + BOOST_REQUIRE_EQUAL(load.get_load(host3), 0); + BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0); + } + + rebalance_tablets(stm); + + { + load_sketch load(stm.get()); + load.populate().get(); + + for (auto h : {host1, host2, host3}) { + testlog.debug("Checking host {}", h); + BOOST_REQUIRE(load.get_load(h) <= 3); + BOOST_REQUIRE(load.get_load(h) > 1); + BOOST_REQUIRE(load.get_avg_shard_load(h) <= 2); + BOOST_REQUIRE(load.get_avg_shard_load(h) > 0); + } + } +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { + inet_address ip1("192.168.0.1"); + inet_address ip2("192.168.0.2"); + inet_address ip3("192.168.0.3"); + inet_address ip4("192.168.0.4"); + + auto host1 = host_id(next_uuid()); + auto host2 = host_id(next_uuid()); + auto host3 = host_id(next_uuid()); + auto host4 = host_id(next_uuid()); + + auto table1 = table_id(next_uuid()); + + unsigned shard_count = 2; + + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = ip1, + .local_dc_rack = locator::endpoint_dc_rack::default_location + } + }); + + stm.mutate_token_metadata([&] (auto& tm) { + tm.update_host_id(host1, ip1); + tm.update_host_id(host2, ip2); + tm.update_host_id(host3, ip3); + tm.update_host_id(host4, ip4); + tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(ip4, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + + tablet_map tmap(16); + for (auto tid : tmap.tablet_ids()) { + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, tests::random::get_int(0, shard_count - 1)}, + tablet_replica {host2, tests::random::get_int(0, shard_count - 1)}, + } + }); + } + tablet_metadata tmeta; + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future<>(); + }).get(); + + rebalance_tablets(stm); + + { + load_sketch load(stm.get()); + load.populate().get(); + + for (auto h : {host1, host2, host3, host4}) { + testlog.debug("Checking host {}", h); + BOOST_REQUIRE(load.get_avg_shard_load(h) == 4); + } + } +} + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { + const int n_hosts = 6; + + std::vector hosts; + for (int i = 0; i < n_hosts; ++i) { + hosts.push_back(host_id(next_uuid())); + } + + std::vector racks = { + endpoint_dc_rack{ "dc1", "rack-1" }, + endpoint_dc_rack{ "dc1", "rack-2" } + }; + + for (int i = 0; i < 13; ++i) { + std::unordered_map> hosts_by_rack; + + semaphore sem(1); + shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config { + locator::topology::config { + .this_endpoint = inet_address("192.168.0.1"), + .local_dc_rack = racks[1] + } + }); + + size_t total_tablet_count = 0; + stm.mutate_token_metadata([&](auto& tm) { + tablet_metadata tmeta; + + int i = 0; + for (auto h : hosts) { + auto ip = inet_address(format("192.168.0.{}", ++i)); + auto shard_count = 2; + tm.update_host_id(h, ip); + auto rack = racks[i % racks.size()]; + tm.update_topology(ip, rack, std::nullopt, shard_count); + if (h != hosts[0]) { + // Leave the first host empty by making it invisible to allocation algorithm. + hosts_by_rack[rack.rack].push_back(h); + } + } + + size_t tablet_count_bits = 8; + int rf = tests::random::get_int(2, 4); + for (int log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) { + if (tests::random::get_bool()) { + continue; + } + auto table = table_id(next_uuid()); + tablet_map tmap(1 << log2_tablets); + for (auto tid : tmap.tablet_ids()) { + // Choose replicas randomly while loading racks evenly. + std::vector replica_hosts; + for (int i = 0; i < rf; ++i) { + auto rack = racks[i % racks.size()]; + auto& rack_hosts = hosts_by_rack[rack.rack]; + while (true) { + auto candidate_host = rack_hosts[tests::random::get_int(0, rack_hosts.size() - 1)]; + if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) { + replica_hosts.push_back(candidate_host); + break; + } + } + } + tablet_replica_set replicas; + for (auto h : replica_hosts) { + auto shard_count = tm.get_topology().find_node(h)->get_shard_count(); + auto shard = tests::random::get_int(0, shard_count - 1); + replicas.push_back(tablet_replica {h, shard}); + } + tmap.set_tablet(tid, tablet_info {std::move(replicas)}); + } + total_tablet_count += tmap.tablet_count(); + tmeta.set_tablet_map(table, std::move(tmap)); + } + tm.set_tablets(std::move(tmeta)); + return make_ready_future<>(); + }).get(); + + testlog.debug("tablet metadata: {}", stm.get()->tablets()); + testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size()); + + rebalance_tablets(stm); + + { + load_sketch load(stm.get()); + load.populate().get(); + + min_max_tracker min_max_load; + for (auto h: hosts) { + auto l = load.get_avg_shard_load(h); + testlog.info("Load on host {}: {}", h, l); + min_max_load.update(l); + } + + testlog.debug("tablet metadata: {}", stm.get()->tablets()); + testlog.debug("Min load: {}, max load: {}", min_max_load.min(), min_max_load.max()); + +// FIXME: The algorithm cannot achieve balance in all cases yet, so we only check that it stops. +// For example, if we have an overloaded node in one rack and target underloaded node in a different rack, +// we won't be able to reduce the load gap by moving tablets between the two. We have to balance the overloaded +// rack first, which is unconstrained. +// Uncomment the following line when the algorithm is improved. +// BOOST_REQUIRE(min_max_load.max() - min_max_load.min() <= 1); + } + } +}