From 92f01674f2ee4476c9b7c9130ac3248bb4c5ea9e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 19 Jan 2024 12:19:00 +0100 Subject: [PATCH] tablets: load_balancer: Generalize load tracking This patch removes some duplication of logic and implicit assumptions by creating clear algebra for load impact calculation and its application to state of the load balancer. Will make adding new kinds of tablet transitions with different impact on load much easier. --- service/tablet_allocator.cc | 61 ++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 6a7bea0a90..f634324a6c 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -440,23 +440,50 @@ public: // Compute per-shard load and candidate tablets. + auto apply_load = [&] (const tablet_migration_streaming_info& info) { + for (auto&& replica : info.read_from) { + if (nodes.contains(replica.host)) { + nodes[replica.host].shards[replica.shard].streaming_read_load += 1; + } + } + for (auto&& replica : info.written_to) { + if (nodes.contains(replica.host)) { + nodes[replica.host].shards[replica.shard].streaming_write_load += 1; + } + } + }; + + auto can_accept_load = [&] (const tablet_migration_streaming_info& info) { + for (auto r : info.read_from) { + if (!nodes.contains(r.host)) { + continue; + } + auto load = nodes[r.host].shards[r.shard].streaming_read_load; + if (load >= max_read_streaming_load) { + lblogger.debug("Migration skipped because of read load limit on {} ({})", r, load); + return false; + } + } + for (auto r : info.written_to) { + if (!nodes.contains(r.host)) { + continue; + } + auto load = nodes[r.host].shards[r.shard].streaming_write_load; + if (load >= max_write_streaming_load) { + lblogger.debug("Migration skipped because of write load limit on {} ({})", r, load); + return false; + } + } + return true; + }; + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { auto& tmap = tmap_; co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) { auto trinfo = tmap.get_tablet_transition_info(tid); if (is_streaming(trinfo)) { - auto streaming_info = get_migration_streaming_info(ti, *trinfo); - for (auto&& replica : streaming_info.read_from) { - if (nodes.contains(replica.host)) { - nodes[replica.host].shards[replica.shard].streaming_read_load += 1; - } - } - for (auto&& replica : streaming_info.written_to) { - if (nodes.contains(replica.host)) { - nodes[replica.host].shards[replica.shard].streaming_write_load += 1; - } - } + apply_load(get_migration_streaming_info(ti, *trinfo)); } for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) { @@ -723,10 +750,11 @@ public: auto dst = global_shard_id {target, target_load_sketch.next_shard(target)}; auto mig = tablet_migration_info {tablet_transition_kind::migration, source_tablet, src, dst}; - if (target_info.shards[dst.shard].streaming_write_load < max_write_streaming_load - && src_node_info.shards[src_shard].streaming_read_load < max_read_streaming_load) { - target_info.shards[dst.shard].streaming_write_load += 1; - src_node_info.shards[src_shard].streaming_read_load += 1; + const locator::node& src_node = topo.get_node(src.host); + auto mig_streaming_info = get_migration_streaming_info(tmap.get_tablet_info(source_tablet.tablet), mig); + + if (can_accept_load(mig_streaming_info)) { + apply_load(mig_streaming_info); lblogger.debug("Adding migration: {}", mig); _stats.for_dc(dc).migrations_produced++; plan.add(std::move(mig)); @@ -737,9 +765,6 @@ public: // We should not just stop here because that can lead to underutilization of the cluster. // Just because the next migration is blocked doesn't mean we could not proceed with migrations // for other shards which are produced by the planner subsequently. - lblogger.debug("Migration {} skipped because of load limit: src_load={}, dst_load={}", mig, - src_node_info.shards[src_shard].streaming_read_load, - target_info.shards[dst.shard].streaming_write_load); skipped_migrations++; _stats.for_dc(dc).migrations_skipped++; if (skipped_migrations >= max_skipped_migrations) {