tablets: Balance tablets concurrently with active migrations

After this change, the load balancer can make progress with active
migrations. If the algorithm is called with active tablet migrations
in tablet metadata, those are treated by load balancer as if they were
already completed. This allows the algorithm to incrementally make
decision which when executed with active migrations will produce the
desired result.

Overload of shards is limited by the fact that the algorithm tracks
streaming concurrency on both source and target shards of active
migrations and takes concurrency limit into account when producing new
migrations.

The coordinator executes the load balancer on edges of tablet state
machine stransitions. This allows new migrations to be started as soon
as tablets finish streaming.

The load balancer is also continuously invoked as long as it produces
a non-empty plan. This is in order to saturate the cluster with
streaming. A single make_plan() call is still not saturating, due
to the way algorithm is implemented.
This commit is contained in:
Tomasz Grabiec
2023-07-24 23:55:27 +02:00
parent c9ea215ce1
commit fe181b3bac
7 changed files with 214 additions and 51 deletions

View File

@@ -78,20 +78,20 @@ check if we need to rebalance. If so, it computes an incremental tablet migratio
plan, persists it by moving tablets into transitional states, and moves the state machine
into the tablet migration track. All this happens atomically form the perspective
of group0 state machine.
The tablet migration track also invokes the load balancer and starts new migrations
to keep the cluster saturated with streaming. The load balancer is invoked
on transition of tablet stages, and also continuously as long as it generates
new migrations.
If there is a pending topology change request, the load balancer
will not be invoked to allow for current migrations to drain, after which the
state machine will exit the tablet migration track and allow pending topology
operation to start.
The tablet migration track excludes with other topology changes, so node operations
will have to wait for the plan to finish before they can take over the state machine.
The tablet balancing track migrates a small bunch of tablets, decided by the
loaded balancer, and then moves back the state machine to the idle state.
This gives other topology changes a chance to start, and if there aren't any, the
load balancer will be called again to check the conditions. This way
we can avoid blocking topology changes for too long, but also drive the cluster
to eventually achieve balance in the absence of other requests.
The load balancer is always invoked with no pending tablet migrations. This
allows for simplicity in the implementation, but may lead to underutilization
of cluster resources if different tablets migrate with different speeds,
and thus limit the speed of load balancing.
will have to wait for tablet migration track to finish before they can take over
the state machine.
The reason why the load balancer is part of the main state machine and excludes with other topology
changes is that we want to share the infrastructure for fencing between vnode-based topology
@@ -101,10 +101,7 @@ don't interfere with each other. The simplest is to make them part of the same s
When the topology state machine is not in the tablet_migration track, it is guaranteed
that there are no tablet transitions in the system.
Currently, all tablets in a batch decided by the load balancer are migrated in parallel and
their state machines are advanced at the same time. This means that streaming has to complete
for all tablets in a batch before any of them can move to the next phase. This is suboptimal
and will be changed later to allow for independent transitions.
Tablets are migrated in parallel and independently.
# Tablet migration
@@ -115,7 +112,7 @@ these properties of a tablet:
- stage: determines which replicas should be used by requests on the coordinator side, and which
action should be taken by the state machine executor.
Currently, the tablet state machine is driven forward by the tablet balancing track of the
Currently, the tablet state machine is driven forward by the tablet migration track of the
topology state machine.
The "stage" serves two major purposes:

View File

@@ -57,6 +57,13 @@ class load_sketch {
};
std::unordered_map<host_id, node_load> _nodes;
token_metadata_ptr _tm;
private:
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
// We reflect migrations in the load as if they already happened,
// optimistically assuming that they will succeed.
return trinfo ? trinfo->next : ti.replicas;
}
public:
load_sketch(token_metadata_ptr tm)
: _tm(std::move(tm)) {
@@ -65,10 +72,10 @@ public:
future<> populate(std::optional<host_id> host = std::nullopt) {
const topology& topo = _tm->get_topology();
co_await utils::clear_gently(_nodes);
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
for (const tablet_info& ti : tmap.tablets()) {
co_await coroutine::maybe_yield();
for (auto&& replica : ti.replicas) {
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
auto& tmap = tmap_;
co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) {
for (auto&& replica : get_replicas_for_tablet_load(ti, tmap.get_tablet_transition_info(tid))) {
if (host && *host != replica.host) {
continue;
}
@@ -80,7 +87,7 @@ public:
n._shards[replica.shard].load += 1;
}
}
}
});
}
for (auto&& n : _nodes) {
std::make_heap(n.second._shards.begin(), n.second._shards.end(), shard_load_cmp());

View File

@@ -75,6 +75,20 @@ tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, ta
, reads(get_selector_for_reads(stage))
{ }
tablet_migration_streaming_info get_migration_streaming_info(const tablet_info& tinfo, const tablet_transition_info& trinfo) {
tablet_migration_streaming_info result = {
.read_from = std::unordered_set<tablet_replica>(tinfo.replicas.begin(), tinfo.replicas.end()),
.written_to = std::unordered_set<tablet_replica>(trinfo.next.begin(), trinfo.next.end())
};
for (auto&& r : trinfo.next) {
result.read_from.erase(r);
}
for (auto&& r : tinfo.replicas) {
result.written_to.erase(r);
}
return result;
}
tablet_replica get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) {
std::unordered_set<tablet_replica> leaving(tinfo.replicas.begin(), tinfo.replicas.end());
for (auto&& r : trinfo.next) {

View File

@@ -171,6 +171,14 @@ struct tablet_transition_info {
// Returns the leaving replica for a given transition.
tablet_replica get_leaving_replica(const tablet_info&, const tablet_transition_info&);
/// Describes streaming required for a given tablet transition.
struct tablet_migration_streaming_info {
std::unordered_set<tablet_replica> read_from;
std::unordered_set<tablet_replica> written_to;
};
tablet_migration_streaming_info get_migration_streaming_info(const tablet_info&, const tablet_transition_info&);
/// Stores information about tablets of a single table.
///
/// The map contains a constant number of tablets, tablet_count().

View File

@@ -1334,6 +1334,15 @@ class topology_coordinator {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
}
// In order to keep the cluster saturated, ask the load balancer for more transitions.
// Unless there is a pending topology change operation.
auto [preempt, new_guard] = should_preempt_balancing(std::move(guard));
guard = std::move(new_guard);
if (!preempt) {
auto plan = co_await balance_tablets(get_token_metadata_ptr());
co_await generate_migration_updates(updates, guard, plan);
}
// It's ok to execute planned updates after retaking the guard because as long
// as topology is in tablet_migration state only this coordinator has a right
// to advance the state machine of tablets.
@@ -1365,10 +1374,26 @@ class topology_coordinator {
co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration");
}
std::pair<bool, group0_guard> should_preempt_balancing(group0_guard guard) {
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
return std::make_pair(true, std::move(node->guard));
}
guard = std::get<group0_guard>(std::move(node_or_guard));
if (_topo_sm._topology.global_request) {
return std::make_pair(true, std::move(guard));
}
return std::make_pair(false, std::move(guard));
}
// Returns `true` iff there was work to do.
future<bool> handle_topology_transition(group0_guard guard) {
auto tstate = _topo_sm._topology.tstate;
if (!tstate) {
// When adding a new source of work, make sure to update should_preempt_balancing() as well.
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
co_await handle_node_transition(std::move(*node));

View File

@@ -65,6 +65,13 @@ seastar::logger lblogger("load_balancer");
/// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer
/// will alternate between them across make_plan() calls.
///
/// If the algorithm is called with active tablet migrations in tablet metadata, those are treated
/// by load balancer as if they were already completed. This allows the algorithm to incrementally
/// make decision which when executed with active migrations will produce the desired result.
/// Overload of shards which still contain migrated-away tablets is limited by the fact
/// that the algorithm tracks streaming concurrency on both source and target shards of active
/// migrations and takes concurrency limit into account when producing new migrations.
///
/// The cost of make_plan() is relatively heavy in terms of preparing data structures, so the current
/// implementation is not efficient if the scheduler would like to call make_plan() multiple times
/// to parallelize execution. This will be addressed in the future by keeping the data structures
@@ -79,7 +86,13 @@ class load_balancer {
using load_type = double;
struct shard_load {
size_t tablet_count;
size_t tablet_count = 0;
// Number of tablets which are streamed from this shard.
size_t streaming_read_load = 0;
// Number of tablets which are streamed to this shard.
size_t streaming_write_load = 0;
// Tablets which still have a replica on this shard which are candidates for migrating away from this shard.
std::unordered_set<global_tablet_id> candidates;
@@ -120,7 +133,65 @@ class load_balancer {
}
};
// Per-shard limits for active tablet streaming sessions.
//
// There is no hard reason for these values being what they are other than
// the guidelines below.
//
// We want to limit concurrency of active streaming for several reasons.
// One is that we want to prevent over-utilization of memory required to carry out streaming,
// as that may lead to OOM or excessive cache eviction.
//
// There is no network scheduler yet, so we want to avoid over-utilization of network bandwidth.
// Limiting per-shard concurrency is a lame way to achieve that, but it's better than nothing.
//
// Scheduling groups should limit impact of streaming on other kinds of processes on the same node,
// so this aspect is not the reason for limiting concurrency.
//
// We don't want too much parallelism because it means that we have plenty of migrations
// which progress slowly. It's better to have fewer which complete faster because
// less user requests suffer from double-quorum overhead, and under-loaded nodes can take
// the load sooner. At the same time, we want to have enough concurrency to fully utilize resources.
//
// Streaming speed is supposed to be I/O bound and writes are more expensive in terms of IO than reads,
// so we allow more read concurrency.
//
// We allow at least two sessions per shard so that there is less chance for idling until load balancer
// makes the next decision after streaming is finished.
const size_t max_write_streaming_load = 2;
const size_t max_read_streaming_load = 4;
token_metadata_ptr _tm;
private:
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
// We reflect migrations in the load as if they already happened,
// optimistically assuming that they will succeed.
return trinfo ? trinfo->next : ti.replicas;
}
// Whether to count the tablet as putting streaming load on the system.
// Tablets which are streaming or are yet-to-stream are counted.
bool is_streaming(const tablet_transition_info* trinfo) {
if (!trinfo) {
return false;
}
switch (trinfo->stage) {
case tablet_transition_stage::allow_write_both_read_old:
return true;
case tablet_transition_stage::write_both_read_old:
return true;
case tablet_transition_stage::streaming:
return true;
case tablet_transition_stage::write_both_read_new:
return false;
case tablet_transition_stage::use_new:
return false;
case tablet_transition_stage::cleanup:
return false;
}
on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast<int>(trinfo->stage)));
}
public:
load_balancer(token_metadata_ptr tm)
: _tm(std::move(tm)) {
@@ -162,9 +233,14 @@ public:
// Compute tablet load on nodes.
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
auto& tmap = tmap_;
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) {
for (auto&& replica : ti.replicas) {
auto trinfo = tmap.get_tablet_transition_info(tid);
// We reflect migrations in the load as if they already happened,
// optimistically assuming that they will succeed.
for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) {
if (nodes.contains(replica.host)) {
nodes[replica.host].tablet_count += 1;
// This invariant is assumed later.
@@ -209,45 +285,50 @@ public:
// We want to saturate the target node so we migrate several tablets in parallel, one for each shard
// on the target node. This assumes that the target node is well-balanced and that tablet migrations
// complete at the same time. Both assumptions are not generally true in practice, which we currently ignore.
// If target node is not balanced across shards, we will overload some shards.
// If tablets are not balanced in size, throughput will suffer because some shards will be idle sooner than others.
// But they will be true typically, because we fill shards starting from least-loaded shards,
// so we naturally strive towards balance between shards.
//
// FIXME: To handle the above, we should (1) rebalance the target node
// before migrating tablets from other nodes. If shards are balanced on the target node, the balancer
// will naturally distribute tablets to different shards. Also, (2) we should change this algorithm
// to be a generator for migrations and have a scheduler in the execution layer which pulls migrations
// from this algorithm, batches them and decides how many to execute.
//
// The scheduler decides in which order to execute the plan based on current activity in the system.
// We cannot just ask the planner for the next migration and stop when we hit overload on some shard,
// because that can lead to underutilization of the cluster. Just because the next migration is blocked
// by the target shard being busy doesn't mean we could not proceed with migrations for other shards
// which would be produced by the planner subsequently.
// If target node is not balanced across shards, we will overload some shards. Streaming concurrency
// will suffer because more loaded shards will not participate, which will under-utilize the node.
// FIXME: To handle the above, we should rebalance the target node before migrating tablets from other nodes.
auto target_node = topo.find_node(target);
auto batch_size = target_node->get_shard_count();
// Compute per-shard load and candidate tablets.
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
if (!tmap.transitions().empty()) {
// FIXME: The algorithm doesn't support balancing with active transitions yet. They must finish first.
lblogger.warn("Pending transitions active.");
co_return migration_plan();
}
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
auto& tmap = tmap_;
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) {
for (auto&& replica : ti.replicas) {
auto trinfo = tmap.get_tablet_transition_info(tid);
if (is_streaming(trinfo)) {
auto streaming_info = get_migration_streaming_info(ti, *trinfo);
for (auto&& replica : streaming_info.read_from) {
if (nodes.contains(replica.host)) {
nodes[replica.host].shards[replica.shard].streaming_read_load += 1;
}
}
for (auto&& replica : streaming_info.written_to) {
if (nodes.contains(replica.host)) {
nodes[replica.host].shards[replica.shard].streaming_write_load += 1;
}
}
}
for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) {
if (!nodes.contains(replica.host)) {
continue;
}
auto& node_load_info = nodes[replica.host];
auto&& shard_load_info = node_load_info.shards[replica.shard];
shard_load& shard_load_info = node_load_info.shards[replica.shard];
if (shard_load_info.tablet_count == 0) {
node_load_info.shards_by_load.push_back(replica.shard);
}
shard_load_info.tablet_count += 1;
shard_load_info.candidates.emplace(global_tablet_id{table, tid});
if (!trinfo) { // migrating tablets are not candidates
shard_load_info.candidates.emplace(global_tablet_id {table, tid});
}
}
});
}
@@ -283,6 +364,8 @@ public:
const tablet_metadata& tmeta = _tm->tablets();
load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates.
auto& target_info = nodes[target];
const size_t max_skipped_migrations = target_info.shards.size() * 2;
size_t skipped_migrations = 0;
while (plan.size() < batch_size && !nodes_by_load.empty()) {
co_await coroutine::maybe_yield();
@@ -385,8 +468,30 @@ public:
}
auto dst = global_shard_id {target, target_load.next_shard(target)};
lblogger.debug("Select {} to move from {} to {}", source_tablet, src, dst);
plan.push_back(tablet_migration_info {source_tablet, src, dst});
auto mig = tablet_migration_info {source_tablet, src, dst};
if (target_info.shards[dst.shard].streaming_write_load < max_write_streaming_load
&& src_node_info.shards[src_shard].streaming_read_load < max_read_streaming_load) {
target_info.shards[dst.shard].streaming_write_load += 1;
src_node_info.shards[src_shard].streaming_read_load += 1;
lblogger.debug("Adding migration: {}", mig);
plan.push_back(std::move(mig));
} else {
// Shards are overloaded with streaming. Do not include the migration in the plan, but
// continue as if it was in the hope that we will find a migration which can be executed without
// violating the load. Next make_plan() invocation will notice that the migration was not executed.
// We should not just stop here because that can lead to underutilization of the cluster.
// Just because the next migration is blocked doesn't mean we could not proceed with migrations
// for other shards which are produced by the planner subsequently.
lblogger.debug("Migration {} skipped because of load limit: src_load={}, dst_load={}", mig,
src_node_info.shards[src_shard].streaming_read_load,
target_info.shards[dst.shard].streaming_write_load);
skipped_migrations++;
if (skipped_migrations >= max_skipped_migrations) {
lblogger.debug("Too many migrations skipped, aborting balancing");
break;
}
}
target_info.tablet_count += 1;
target_info.update();

View File

@@ -41,6 +41,13 @@ using migration_plan = utils::chunked_vector<tablet_migration_info>;
/// co_await execute(plan);
/// }
///
/// It is ok to invoke the algorithm with already active tablet migrations. The algorithm will take them into account
/// when balancing the load as if they already succeeded. This means that applying a series of migration plans
/// produced by this function will give the same result regardless of whether applying they are fully executed or
/// only initiated by creating corresponding transitions in tablet metadata.
///
/// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account.
///
future<migration_plan> balance_tablets(locator::token_metadata_ptr);
class tablet_allocator_impl;