tablets: load_balancer: Generalize load tracking
This patch removes some duplication of logic and implicit assumptions by creating clear algebra for load impact calculation and its application to state of the load balancer. Will make adding new kinds of tablet transitions with different impact on load much easier.
This commit is contained in:
@@ -440,23 +440,50 @@ public:
|
|||||||
|
|
||||||
// Compute per-shard load and candidate tablets.
|
// Compute per-shard load and candidate tablets.
|
||||||
|
|
||||||
|
auto apply_load = [&] (const tablet_migration_streaming_info& info) {
|
||||||
|
for (auto&& replica : info.read_from) {
|
||||||
|
if (nodes.contains(replica.host)) {
|
||||||
|
nodes[replica.host].shards[replica.shard].streaming_read_load += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (auto&& replica : info.written_to) {
|
||||||
|
if (nodes.contains(replica.host)) {
|
||||||
|
nodes[replica.host].shards[replica.shard].streaming_write_load += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
auto can_accept_load = [&] (const tablet_migration_streaming_info& info) {
|
||||||
|
for (auto r : info.read_from) {
|
||||||
|
if (!nodes.contains(r.host)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
auto load = nodes[r.host].shards[r.shard].streaming_read_load;
|
||||||
|
if (load >= max_read_streaming_load) {
|
||||||
|
lblogger.debug("Migration skipped because of read load limit on {} ({})", r, load);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (auto r : info.written_to) {
|
||||||
|
if (!nodes.contains(r.host)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
auto load = nodes[r.host].shards[r.shard].streaming_write_load;
|
||||||
|
if (load >= max_write_streaming_load) {
|
||||||
|
lblogger.debug("Migration skipped because of write load limit on {} ({})", r, load);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
|
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
|
||||||
auto& tmap = tmap_;
|
auto& tmap = tmap_;
|
||||||
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) {
|
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) {
|
||||||
auto trinfo = tmap.get_tablet_transition_info(tid);
|
auto trinfo = tmap.get_tablet_transition_info(tid);
|
||||||
|
|
||||||
if (is_streaming(trinfo)) {
|
if (is_streaming(trinfo)) {
|
||||||
auto streaming_info = get_migration_streaming_info(ti, *trinfo);
|
apply_load(get_migration_streaming_info(ti, *trinfo));
|
||||||
for (auto&& replica : streaming_info.read_from) {
|
|
||||||
if (nodes.contains(replica.host)) {
|
|
||||||
nodes[replica.host].shards[replica.shard].streaming_read_load += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (auto&& replica : streaming_info.written_to) {
|
|
||||||
if (nodes.contains(replica.host)) {
|
|
||||||
nodes[replica.host].shards[replica.shard].streaming_write_load += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) {
|
for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) {
|
||||||
@@ -723,10 +750,11 @@ public:
|
|||||||
auto dst = global_shard_id {target, target_load_sketch.next_shard(target)};
|
auto dst = global_shard_id {target, target_load_sketch.next_shard(target)};
|
||||||
auto mig = tablet_migration_info {tablet_transition_kind::migration, source_tablet, src, dst};
|
auto mig = tablet_migration_info {tablet_transition_kind::migration, source_tablet, src, dst};
|
||||||
|
|
||||||
if (target_info.shards[dst.shard].streaming_write_load < max_write_streaming_load
|
const locator::node& src_node = topo.get_node(src.host);
|
||||||
&& src_node_info.shards[src_shard].streaming_read_load < max_read_streaming_load) {
|
auto mig_streaming_info = get_migration_streaming_info(tmap.get_tablet_info(source_tablet.tablet), mig);
|
||||||
target_info.shards[dst.shard].streaming_write_load += 1;
|
|
||||||
src_node_info.shards[src_shard].streaming_read_load += 1;
|
if (can_accept_load(mig_streaming_info)) {
|
||||||
|
apply_load(mig_streaming_info);
|
||||||
lblogger.debug("Adding migration: {}", mig);
|
lblogger.debug("Adding migration: {}", mig);
|
||||||
_stats.for_dc(dc).migrations_produced++;
|
_stats.for_dc(dc).migrations_produced++;
|
||||||
plan.add(std::move(mig));
|
plan.add(std::move(mig));
|
||||||
@@ -737,9 +765,6 @@ public:
|
|||||||
// We should not just stop here because that can lead to underutilization of the cluster.
|
// We should not just stop here because that can lead to underutilization of the cluster.
|
||||||
// Just because the next migration is blocked doesn't mean we could not proceed with migrations
|
// Just because the next migration is blocked doesn't mean we could not proceed with migrations
|
||||||
// for other shards which are produced by the planner subsequently.
|
// for other shards which are produced by the planner subsequently.
|
||||||
lblogger.debug("Migration {} skipped because of load limit: src_load={}, dst_load={}", mig,
|
|
||||||
src_node_info.shards[src_shard].streaming_read_load,
|
|
||||||
target_info.shards[dst.shard].streaming_write_load);
|
|
||||||
skipped_migrations++;
|
skipped_migrations++;
|
||||||
_stats.for_dc(dc).migrations_skipped++;
|
_stats.for_dc(dc).migrations_skipped++;
|
||||||
if (skipped_migrations >= max_skipped_migrations) {
|
if (skipped_migrations >= max_skipped_migrations) {
|
||||||
|
|||||||
Reference in New Issue
Block a user