service: tablet_allocator: Introduce tablet load balancer

Will be invoked by the topology coordinator later to decide
which tablets to migrate.
This commit is contained in:
Tomasz Grabiec
2023-07-07 17:51:37 +02:00
parent d59b8d316c
commit 6f4a35f9ae
5 changed files with 782 additions and 0 deletions

View File

@@ -12,6 +12,7 @@
#include "locator/token_metadata.hh"
#include "locator/tablets.hh"
#include "utils/stall_free.hh"
#include "utils/div_ceil.hh"
#include <seastar/core/smp.hh>
#include <seastar/coroutine/maybe_yield.hh>
@@ -45,6 +46,14 @@ class load_sketch {
s.load = 0;
}
}
uint64_t load() const {
uint64_t result = 0;
for (auto&& s : _shards) {
result += s.load;
}
return result;
}
};
std::unordered_map<host_id, node_load> _nodes;
token_metadata_ptr _tm;
@@ -95,6 +104,21 @@ public:
std::push_heap(n._shards.begin(), n._shards.end(), shard_load_cmp());
return shard;
}
uint64_t get_load(host_id node) const {
if (!_nodes.contains(node)) {
return 0;
}
return _nodes.at(node).load();
}
uint64_t get_avg_shard_load(host_id node) const {
if (!_nodes.contains(node)) {
return 0;
}
auto& n = _nodes.at(node);
return div_ceil(n.load(), n._shards.size());
}
};
} // namespace locator

View File

@@ -80,6 +80,22 @@ std::ostream& operator<<(std::ostream&, const tablet_replica&);
using tablet_replica_set = utils::small_vector<tablet_replica, 3>;
/// Creates a new replica set with old_replica replaced by new_replica.
/// If there is no old_replica, the set is returned unchanged.
inline
tablet_replica_set replace_replica(const tablet_replica_set& rs, tablet_replica old_replica, tablet_replica new_replica) {
tablet_replica_set result;
result.reserve(rs.size());
for (auto&& r : rs) {
if (r == old_replica) {
result.push_back(new_replica);
} else {
result.push_back(r);
}
}
return result;
}
/// Stores information about a single tablet.
struct tablet_info {
tablet_replica_set replicas;

View File

@@ -12,12 +12,426 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "service/tablet_allocator.hh"
#include "utils/stall_free.hh"
#include "locator/load_sketch.hh"
#include "utils/div_ceil.hh"
using namespace locator;
using namespace replica;
namespace service {
seastar::logger lblogger("load_balancer");
/// The algorithm aims to equalize tablet count on each shard.
/// This goal is based on the assumption that every shard has similar processing power and space capacity,
/// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we
/// equalize resource utilization.
///
/// The algorithm produces a migration plan which is a set of instructions about which tablets to move
/// where. The plan is a small increment, not a complete plan. To achieve balance, the algorithm should
/// be invoked iteratively until an empty plan is returned.
///
/// The algorithm keeps track of load at two levels, per node and per shard. The reason for this is that
/// we want to equalize the per-node score first, by moving tablets across nodes. Tablets are moved away
/// from the most loaded node first. We also track load per shard, so that we move tablets from the most
/// loaded shard on a given node first.
///
/// The metric for node load is (number of tablets / shard count) which is the average
/// per-shard load. If we achieve balance according to this metric, and then rebalance the nodes internally,
/// we will achieve global balance on all shards in the cluster.
///
/// The reason why we focus on nodes first before rebalancing them internally is that this results
/// in less tablet movements than looking at shards only.
///
/// It would be still beneficial to rebalance tablet-receiving nodes internally before moving tablets
/// to them so that we can distribute load equally without overloading shards which are out of balance,
/// but this is not implemented yet.
///
/// The outline of the algorithm is as follows:
///
/// 1. Determine the set of nodes whose load should be balanced.
/// 2. Pick the least-loaded node (target)
/// 3. Keep moving tablets to the target until balance is achieved with the highest-loaded node,
/// or we hit a limit for plan size:
/// 3.1. Pick the most-loaded node (source)
/// 3.2. Pick the most-loaded shard on the source
/// 3.3. Pick one of the candidate tablets on the source shard
/// 3.4. Evaluate collocation constraints for tablet replicas, if they pass:
/// 3.4.1 Pick the least-loaded shard on the target
/// 3.4.2 Generate a migration for the candidate tablet from the source shard to the target shard
///
/// Even though the algorithm focuses on a single target, the fact the the produced plan is just an increment
/// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer
/// will alternate between them across make_plan() calls.
///
/// The cost of make_plan() is relatively heavy in terms of preparing data structures, so the current
/// implementation is not efficient if the scheduler would like to call make_plan() multiple times
/// to parallelize execution. This will be addressed in the future by keeping the data structures
/// valid across calls and only recalculating them when starting a new round with a new token metadata version.
///
class load_balancer {
using global_shard_id = tablet_replica;
using shard_id = seastar::shard_id;
// Represents metric for per-node load which we want to equalize between nodes.
// It's an average per-shard load in terms of tablet count.
using load_type = double;
struct shard_load {
size_t tablet_count;
// Tablets which still have a replica on this shard which are candidates for migrating away from this shard.
std::unordered_set<global_tablet_id> candidates;
future<> clear_gently() {
return utils::clear_gently(candidates);
}
};
struct node_load {
host_id node;
uint64_t shard_count = 0;
uint64_t tablet_count = 0;
// The average shard load on this node.
load_type avg_load = 0;
std::vector<shard_id> shards_by_load; // heap which tracks most-loaded shards using shards_by_load_cmp().
std::vector<shard_load> shards; // Indexed by shard_id to which a given shard_load corresponds.
// Call when tablet_count changes.
void update() {
avg_load = get_avg_load(tablet_count);
}
load_type get_avg_load(uint64_t tablets) const {
return double(tablets) / shard_count;
}
auto shards_by_load_cmp() {
return [this] (const auto& a, const auto& b) {
return shards[a].tablet_count < shards[b].tablet_count;
};
}
future<> clear_gently() {
return utils::clear_gently(shards);
}
};
token_metadata_ptr _tm;
public:
load_balancer(token_metadata_ptr tm)
: _tm(std::move(tm)) {
}
future<migration_plan> make_plan() {
const locator::topology& topo = _tm->get_topology();
migration_plan plan;
// Prepare plans for each DC separately and combine them to be executed in parallel.
for (auto&& dc : topo.get_datacenters()) {
auto dc_plan = co_await make_plan(dc);
lblogger.info("Prepared {} migrations in DC {}", dc_plan.size(), dc);
std::move(dc_plan.begin(), dc_plan.end(), std::back_inserter(plan));
}
lblogger.info("Prepared {} migrations", plan.size());
co_return std::move(plan);
}
future<migration_plan> make_plan(sstring dc) {
lblogger.info("Examining DC {}", dc);
const locator::topology& topo = _tm->get_topology();
// Select subset of nodes to balance.
std::unordered_map<host_id, node_load> nodes;
topo.for_each_node([&] (const locator::node* node_ptr) {
if (node_ptr->get_state() == locator::node::state::normal && node_ptr->dc_rack().dc == dc) {
node_load& load = nodes[node_ptr->host_id()];
load.shard_count = node_ptr->get_shard_count();
load.shards.resize(load.shard_count);
if (!load.shard_count) {
throw std::runtime_error(format("Shard count of {} not found in topology", node_ptr->host_id()));
}
}
});
// Compute tablet load on nodes.
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) {
for (auto&& replica : ti.replicas) {
if (nodes.contains(replica.host)) {
nodes[replica.host].tablet_count += 1;
// This invariant is assumed later.
if (replica.shard >= nodes[replica.host].shard_count) {
auto gtid = global_tablet_id{table, tid};
on_internal_error(lblogger, format("Tablet {} replica {} targets non-existent shard", gtid, replica));
}
}
}
});
}
// Compute load imbalance.
load_type max_load = 0;
load_type min_load = 0;
std::optional<host_id> min_load_node = std::nullopt;
for (auto&& [host, load] : nodes) {
load.update();
if (!min_load_node || load.avg_load < min_load) {
min_load = load.avg_load;
min_load_node = host;
}
if (load.avg_load > max_load) {
max_load = load.avg_load;
}
}
if (max_load == min_load) {
// load is balanced.
// TODO: Evaluate and fix intra-node balance.
co_return migration_plan();
}
for (auto&& [host, load] : nodes) {
lblogger.info("Node {}: rack={} avg_load={}, tablets={}, shards={}",
host, topo.find_node(host)->dc_rack().rack, load.avg_load, load.tablet_count, load.shard_count);
}
lblogger.info("target node: {}, avg_load: {}, max: {}", *min_load_node, min_load, max_load);
auto target = *min_load_node;
// We want to saturate the target node so we migrate several tablets in parallel, one for each shard
// on the target node. This assumes that the target node is well-balanced and that tablet migrations
// complete at the same time. Both assumptions are not generally true in practice, which we currently ignore.
// If target node is not balanced across shards, we will overload some shards.
// If tablets are not balanced in size, throughput will suffer because some shards will be idle sooner than others.
//
// FIXME: To handle the above, we should (1) rebalance the target node
// before migrating tablets from other nodes. If shards are balanced on the target node, the balancer
// will naturally distribute tablets to different shards. Also, (2) we should change this algorithm
// to be a generator for migrations and have a scheduler in the execution layer which pulls migrations
// from this algorithm, batches them and decides how many to execute.
//
// The scheduler decides in which order to execute the plan based on current activity in the system.
// We cannot just ask the planner for the next migration and stop when we hit overload on some shard,
// because that can lead to underutilization of the cluster. Just because the next migration is blocked
// by the target shard being busy doesn't mean we could not proceed with migrations for other shards
// which would be produced by the planner subsequently.
auto target_node = topo.find_node(target);
auto batch_size = target_node->get_shard_count();
// Compute per-shard load and candidate tablets.
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
if (!tmap.transitions().empty()) {
// FIXME: The algorithm doesn't support balancing with active transitions yet. They must finish first.
lblogger.warn("Pending transitions active.");
co_return migration_plan();
}
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) {
for (auto&& replica : ti.replicas) {
if (!nodes.contains(replica.host)) {
continue;
}
auto& node_load_info = nodes[replica.host];
auto&& shard_load_info = node_load_info.shards[replica.shard];
if (shard_load_info.tablet_count == 0) {
node_load_info.shards_by_load.push_back(replica.shard);
}
shard_load_info.tablet_count += 1;
shard_load_info.candidates.emplace(global_tablet_id{table, tid});
}
});
}
// Prepare candidate nodes and shards for heap-based balancing.
// heap which tracks most-loaded nodes in terms of avg_load.
std::vector<host_id> nodes_by_load;
nodes_by_load.reserve(nodes.size());
auto nodes_cmp = [&] (const host_id& a, const host_id& b) {
return nodes[a].avg_load < nodes[b].avg_load;
};
for (auto&& [host, node_load] : nodes) {
if (lblogger.is_enabled(seastar::log_level::debug)) {
shard_id shard = 0;
for (auto&& shard_load : node_load.shards) {
lblogger.debug("shard {}: all tablets: {}, candidates: {}", tablet_replica{host, shard},
shard_load.tablet_count, shard_load.candidates.size());
shard++;
}
}
nodes_by_load.push_back(host);
std::make_heap(node_load.shards_by_load.begin(), node_load.shards_by_load.end(), node_load.shards_by_load_cmp());
}
std::make_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
locator::load_sketch target_load(_tm);
co_await target_load.populate(target);
migration_plan plan;
const tablet_metadata& tmeta = _tm->tablets();
load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates.
auto& target_info = nodes[target];
while (plan.size() < batch_size && !nodes_by_load.empty()) {
co_await coroutine::maybe_yield();
std::pop_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
auto src_host = nodes_by_load.back();
auto& src_node_info = nodes[src_host];
// Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load)
// and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than
// that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set.
// This is ensured by the fact that we remove candidates in the order of avg_load from the heap, and
// because we prevent load inversion between candidate and target in the next check.
// So the max avg_load of candidates is that of the current src_node_info, and max avg_load of off-candidates
// is tracked in max_off_candidate_load. If max_off_candidate_load is equal to target's avg_load,
// it means that all nodes have equal avg_load. We take the maximum with the current candidate in src_node_info
// to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0.
if (std::max(max_off_candidate_load, src_node_info.avg_load) == target_info.avg_load) {
lblogger.debug("Balance achieved.");
break;
}
// If balance is not achieved, still consider migrating from candidate nodes which have higher load than the target.
// max_off_candidate_load may be higher than the load of current candidate.
if (src_node_info.avg_load <= target_info.avg_load) {
lblogger.debug("No more candidate nodes.");
lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}",
src_host, src_node_info.avg_load, target_info.avg_load);
break;
}
// Prevent load inversion which can lead to oscillations.
if (src_node_info.get_avg_load(nodes[src_host].tablet_count - 1) <
target_info.get_avg_load(target_info.tablet_count + 1)) {
lblogger.debug("No more candidate nodes, load would be inverted. Next candidate is {} with avg_load={}, target's avg_load={}",
src_host, src_node_info.avg_load, target_info.avg_load);
break;
}
if (src_node_info.shards_by_load.empty()) {
lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining.",
src_host, src_node_info.tablet_count);
max_off_candidate_load = std::max(max_off_candidate_load, src_node_info.avg_load);
nodes_by_load.pop_back();
continue;
}
auto push_back_node_candidate = seastar::defer([&] {
std::push_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
});
std::pop_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
auto src_shard = src_node_info.shards_by_load.back();
auto src = tablet_replica{src_host, src_shard};
auto&& src_shard_info = src_node_info.shards[src_shard];
if (src_shard_info.candidates.empty()) {
lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src, src_shard_info.tablet_count);
src_node_info.shards_by_load.pop_back();
continue;
}
auto push_back_shard_candidate = seastar::defer([&] {
std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
});
auto source_tablet = *src_shard_info.candidates.begin();
src_shard_info.candidates.erase(source_tablet);
// Check replication strategy constraints.
auto same_rack = target_node->dc_rack().rack == topo.get_node(src.host).dc_rack().rack;
std::unordered_map<sstring, int> rack_load; // Will be built if !same_rack
bool has_replica_on_target = false;
auto& tmap = tmeta.get_tablet_map(source_tablet.table);
for (auto&& r : tmap.get_tablet_info(source_tablet.tablet).replicas) {
if (r.host == target) {
has_replica_on_target = true;
break;
}
if (!same_rack) {
const locator::node& node = topo.get_node(r.host);
if (node.dc_rack().dc == dc) {
rack_load[node.dc_rack().rack] += 1;
}
}
}
if (has_replica_on_target) {
lblogger.debug("candidate tablet {} skipped because it has a replica on target node", source_tablet);
continue;
}
// Make sure we don't increase level of duplication of racks in the replica list.
if (!same_rack) {
auto max_rack_load = std::max_element(rack_load.begin(), rack_load.end(),
[] (auto& a, auto& b) { return a.second < b.second; })->second;
auto new_rack_load = rack_load[target_node->dc_rack().rack] + 1;
if (new_rack_load > max_rack_load) {
lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}",
source_tablet, target_node->dc_rack().rack, new_rack_load, max_rack_load);
continue;
}
}
auto dst = global_shard_id {target, target_load.next_shard(target)};
lblogger.debug("Select {} to move from {} to {}", source_tablet, src, dst);
plan.push_back(tablet_migration_info {source_tablet, src, dst});
target_info.tablet_count += 1;
target_info.update();
src_shard_info.tablet_count -= 1;
if (src_shard_info.tablet_count == 0) {
push_back_shard_candidate.cancel();
src_node_info.shards_by_load.pop_back();
}
src_node_info.tablet_count -= 1;
src_node_info.update();
if (src_node_info.tablet_count == 0) {
push_back_node_candidate.cancel();
nodes_by_load.pop_back();
}
}
if (plan.empty()) {
// Due to replica collocation constraints, it may not be possible to balance the cluster evenly.
// For example, if nodes have different number of shards. Nodes which have more shards will be
// replicas for more tablets which rules out more candidates on other nodes with a higher per-shard load.
//
// Example:
//
// node1: 1 shard
// node2: 1 shard
// node3: 7 shard
//
// If there are 7 tablets and RF=3, each node must have 1 tablet replica.
// So node3 will have average load of 1, and node1 and node2 will have
// average shard load of 7.
lblogger.info("Not possible to achieve balance.");
}
co_await utils::clear_gently(nodes);
co_return std::move(plan);
}
};
future<migration_plan> balance_tablets(token_metadata_ptr tm) {
load_balancer lb(tm);
co_return co_await lb.make_plan();
}
class tablet_allocator_impl : public tablet_allocator::impl
, public service::migration_listener::empty_listener {
service::migration_notifier& _migration_notifier;

View File

@@ -10,10 +10,39 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "locator/tablets.hh"
#include <any>
namespace service {
/// Represents intention to move a single tablet replica from src to dst.
struct tablet_migration_info {
locator::global_tablet_id tablet;
locator::tablet_replica src;
locator::tablet_replica dst;
};
using migration_plan = utils::chunked_vector<tablet_migration_info>;
/// Returns a tablet migration plan that aims to achieve better load balance in the whole cluster.
/// The plan is computed based on information in the given token_metadata snapshot
/// and thus should be executed and reflected, at least as pending tablet transitions, in token_metadata
/// before this is called again.
///
/// For any given global_tablet_id there is at most one tablet_migration_info in the returned plan.
///
/// To achieve full balance, do:
///
/// while (true) {
/// auto plan = co_await balance_tablets(get_token_metadata());
/// if (plan.empty()) {
/// break;
/// }
/// co_await execute(plan);
/// }
///
future<migration_plan> balance_tablets(locator::token_metadata_ptr);
class tablet_allocator_impl;
class tablet_allocator {

View File

@@ -9,6 +9,7 @@
#include "test/lib/scylla_test_case.hh"
#include "test/lib/random_utils.hh"
#include <seastar/testing/thread_test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/log.hh"
@@ -17,15 +18,26 @@
#include "replica/tablets.hh"
#include "locator/tablets.hh"
#include "service/tablet_allocator.hh"
#include "locator/tablet_sharder.hh"
#include "locator/load_sketch.hh"
#include "locator/tablet_replication_strategy.hh"
#include "utils/fb_utilities.hh"
#include "utils/UUID_gen.hh"
using namespace locator;
using namespace replica;
using namespace service;
static api::timestamp_type next_timestamp = api::new_timestamp();
static utils::UUID next_uuid() {
static uint64_t counter = 1;
return utils::UUID_gen::get_time_UUID(std::chrono::system_clock::time_point(
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::seconds(counter++))));
}
static
void verify_tablet_metadata_persistence(cql_test_env& env, const tablet_metadata& tm) {
save_tablet_metadata(env.local_db(), tm, next_timestamp++).get();
@@ -431,3 +443,290 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) {
}
}
}
static
void apply_plan(token_metadata& tm, const migration_plan& plan) {
for (auto&& mig : plan) {
tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table);
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst);
tmap.set_tablet(mig.tablet.tablet, tinfo);
}
}
static
void rebalance_tablets(shared_token_metadata& stm) {
while (true) {
auto plan = balance_tablets(stm.get()).get0();
if (plan.empty()) {
break;
}
stm.mutate_token_metadata([&] (token_metadata& tm) {
apply_plan(tm, plan);
return make_ready_future<>();
}).get();
}
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
// Tests the scenario of bootstrapping a single node
// Verifies that load balancer sees it and moves tablets to that node.
inet_address ip1("192.168.0.1");
inet_address ip2("192.168.0.2");
inet_address ip3("192.168.0.3");
auto host1 = host_id(next_uuid());
auto host2 = host_id(next_uuid());
auto host3 = host_id(next_uuid());
auto table1 = table_id(next_uuid());
unsigned shard_count = 2;
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
locator::topology::config{
.this_endpoint = ip1,
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tablet_map tmap(4);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 1},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 1},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 1},
tablet_replica {host2, 0},
}
});
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
// Sanity check
{
load_sketch load(stm.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host1), 4);
BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host1), 2);
BOOST_REQUIRE_EQUAL(load.get_load(host2), 4);
BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host2), 2);
BOOST_REQUIRE_EQUAL(load.get_load(host3), 0);
BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host3), 0);
}
rebalance_tablets(stm);
{
load_sketch load(stm.get());
load.populate().get();
for (auto h : {host1, host2, host3}) {
testlog.debug("Checking host {}", h);
BOOST_REQUIRE(load.get_load(h) <= 3);
BOOST_REQUIRE(load.get_load(h) > 1);
BOOST_REQUIRE(load.get_avg_shard_load(h) <= 2);
BOOST_REQUIRE(load.get_avg_shard_load(h) > 0);
}
}
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
inet_address ip1("192.168.0.1");
inet_address ip2("192.168.0.2");
inet_address ip3("192.168.0.3");
inet_address ip4("192.168.0.4");
auto host1 = host_id(next_uuid());
auto host2 = host_id(next_uuid());
auto host3 = host_id(next_uuid());
auto host4 = host_id(next_uuid());
auto table1 = table_id(next_uuid());
unsigned shard_count = 2;
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
locator::topology::config{
.this_endpoint = ip1,
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_host_id(host4, ip4);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip4, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tablet_map tmap(16);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, tests::random::get_int<shard_id>(0, shard_count - 1)},
tablet_replica {host2, tests::random::get_int<shard_id>(0, shard_count - 1)},
}
});
}
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
rebalance_tablets(stm);
{
load_sketch load(stm.get());
load.populate().get();
for (auto h : {host1, host2, host3, host4}) {
testlog.debug("Checking host {}", h);
BOOST_REQUIRE(load.get_avg_shard_load(h) == 4);
}
}
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
const int n_hosts = 6;
std::vector<host_id> hosts;
for (int i = 0; i < n_hosts; ++i) {
hosts.push_back(host_id(next_uuid()));
}
std::vector<endpoint_dc_rack> racks = {
endpoint_dc_rack{ "dc1", "rack-1" },
endpoint_dc_rack{ "dc1", "rack-2" }
};
for (int i = 0; i < 13; ++i) {
std::unordered_map<sstring, std::vector<host_id>> hosts_by_rack;
semaphore sem(1);
shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
locator::topology::config {
.this_endpoint = inet_address("192.168.0.1"),
.local_dc_rack = racks[1]
}
});
size_t total_tablet_count = 0;
stm.mutate_token_metadata([&](auto& tm) {
tablet_metadata tmeta;
int i = 0;
for (auto h : hosts) {
auto ip = inet_address(format("192.168.0.{}", ++i));
auto shard_count = 2;
tm.update_host_id(h, ip);
auto rack = racks[i % racks.size()];
tm.update_topology(ip, rack, std::nullopt, shard_count);
if (h != hosts[0]) {
// Leave the first host empty by making it invisible to allocation algorithm.
hosts_by_rack[rack.rack].push_back(h);
}
}
size_t tablet_count_bits = 8;
int rf = tests::random::get_int<shard_id>(2, 4);
for (int log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) {
if (tests::random::get_bool()) {
continue;
}
auto table = table_id(next_uuid());
tablet_map tmap(1 << log2_tablets);
for (auto tid : tmap.tablet_ids()) {
// Choose replicas randomly while loading racks evenly.
std::vector<host_id> replica_hosts;
for (int i = 0; i < rf; ++i) {
auto rack = racks[i % racks.size()];
auto& rack_hosts = hosts_by_rack[rack.rack];
while (true) {
auto candidate_host = rack_hosts[tests::random::get_int<shard_id>(0, rack_hosts.size() - 1)];
if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) {
replica_hosts.push_back(candidate_host);
break;
}
}
}
tablet_replica_set replicas;
for (auto h : replica_hosts) {
auto shard_count = tm.get_topology().find_node(h)->get_shard_count();
auto shard = tests::random::get_int<shard_id>(0, shard_count - 1);
replicas.push_back(tablet_replica {h, shard});
}
tmap.set_tablet(tid, tablet_info {std::move(replicas)});
}
total_tablet_count += tmap.tablet_count();
tmeta.set_tablet_map(table, std::move(tmap));
}
tm.set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
testlog.debug("tablet metadata: {}", stm.get()->tablets());
testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size());
rebalance_tablets(stm);
{
load_sketch load(stm.get());
load.populate().get();
min_max_tracker<unsigned> min_max_load;
for (auto h: hosts) {
auto l = load.get_avg_shard_load(h);
testlog.info("Load on host {}: {}", h, l);
min_max_load.update(l);
}
testlog.debug("tablet metadata: {}", stm.get()->tablets());
testlog.debug("Min load: {}, max load: {}", min_max_load.min(), min_max_load.max());
// FIXME: The algorithm cannot achieve balance in all cases yet, so we only check that it stops.
// For example, if we have an overloaded node in one rack and target underloaded node in a different rack,
// we won't be able to reduce the load gap by moving tablets between the two. We have to balance the overloaded
// rack first, which is unconstrained.
// Uncomment the following line when the algorithm is improved.
// BOOST_REQUIRE(min_max_load.max() - min_max_load.min() <= 1);
}
}
}