diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 6a7bea0a90..f634324a6c 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -440,23 +440,50 @@ public: // Compute per-shard load and candidate tablets. + auto apply_load = [&] (const tablet_migration_streaming_info& info) { + for (auto&& replica : info.read_from) { + if (nodes.contains(replica.host)) { + nodes[replica.host].shards[replica.shard].streaming_read_load += 1; + } + } + for (auto&& replica : info.written_to) { + if (nodes.contains(replica.host)) { + nodes[replica.host].shards[replica.shard].streaming_write_load += 1; + } + } + }; + + auto can_accept_load = [&] (const tablet_migration_streaming_info& info) { + for (auto r : info.read_from) { + if (!nodes.contains(r.host)) { + continue; + } + auto load = nodes[r.host].shards[r.shard].streaming_read_load; + if (load >= max_read_streaming_load) { + lblogger.debug("Migration skipped because of read load limit on {} ({})", r, load); + return false; + } + } + for (auto r : info.written_to) { + if (!nodes.contains(r.host)) { + continue; + } + auto load = nodes[r.host].shards[r.shard].streaming_write_load; + if (load >= max_write_streaming_load) { + lblogger.debug("Migration skipped because of write load limit on {} ({})", r, load); + return false; + } + } + return true; + }; + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { auto& tmap = tmap_; co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) { auto trinfo = tmap.get_tablet_transition_info(tid); if (is_streaming(trinfo)) { - auto streaming_info = get_migration_streaming_info(ti, *trinfo); - for (auto&& replica : streaming_info.read_from) { - if (nodes.contains(replica.host)) { - nodes[replica.host].shards[replica.shard].streaming_read_load += 1; - } - } - for (auto&& replica : streaming_info.written_to) { - if (nodes.contains(replica.host)) { - nodes[replica.host].shards[replica.shard].streaming_write_load += 1; - } - } + apply_load(get_migration_streaming_info(ti, *trinfo)); } for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) { @@ -723,10 +750,11 @@ public: auto dst = global_shard_id {target, target_load_sketch.next_shard(target)}; auto mig = tablet_migration_info {tablet_transition_kind::migration, source_tablet, src, dst}; - if (target_info.shards[dst.shard].streaming_write_load < max_write_streaming_load - && src_node_info.shards[src_shard].streaming_read_load < max_read_streaming_load) { - target_info.shards[dst.shard].streaming_write_load += 1; - src_node_info.shards[src_shard].streaming_read_load += 1; + const locator::node& src_node = topo.get_node(src.host); + auto mig_streaming_info = get_migration_streaming_info(tmap.get_tablet_info(source_tablet.tablet), mig); + + if (can_accept_load(mig_streaming_info)) { + apply_load(mig_streaming_info); lblogger.debug("Adding migration: {}", mig); _stats.for_dc(dc).migrations_produced++; plan.add(std::move(mig)); @@ -737,9 +765,6 @@ public: // We should not just stop here because that can lead to underutilization of the cluster. // Just because the next migration is blocked doesn't mean we could not proceed with migrations // for other shards which are produced by the planner subsequently. - lblogger.debug("Migration {} skipped because of load limit: src_load={}, dst_load={}", mig, - src_node_info.shards[src_shard].streaming_read_load, - target_info.shards[dst.shard].streaming_write_load); skipped_migrations++; _stats.for_dc(dc).migrations_skipped++; if (skipped_migrations >= max_skipped_migrations) {