streaming: Do not open rpc stream connection if ranges are not relevant to a shard

Given a list of ranges to stream, stream_transfer_task will create an
reader with the ranges and create a rpc stream connection on all the shards.

When user provides ranges to repair with -st -et options, e.g.,
using scylla-manger, such ranges can belong to only one shard, repair
will pass such ranges to streaming.

As a result, only one shard will have data to send while the rpc stream
connections are created on all the shards, which can cause the kernel
run out of ports in some systems.

To mitigate the problem, do not open the connection if the ranges do not
belong to the shard at all.

Refs: #4708
This commit is contained in:
Asias He
2019-07-15 10:29:13 +08:00
committed by Avi Kivity
parent 51cff8ad23
commit 64a4c0ede2

View File

@@ -105,6 +105,21 @@ struct send_info {
, prs(to_partition_ranges(ranges))
, reader(cf.make_streaming_reader(cf.schema(), prs)) {
}
future<bool> has_relevant_range_on_this_shard() {
return do_with(false, [this] (bool& found_relevant_range) {
return do_for_each(ranges, [this, &found_relevant_range] (dht::token_range range) {
if (!found_relevant_range) {
auto sharder = dht::selective_token_range_sharder(range, engine().cpu_id());
auto range_shard = sharder.next();
if (range_shard) {
found_relevant_range = true;
}
}
}).then([&found_relevant_range] {
return found_relevant_range;
});
});
}
future<size_t> estimate_partitions() {
return do_with(cf.get_sstables(), size_t(0), [this] (auto& sstables, size_t& partition_count) {
return do_for_each(*sstables, [this, &partition_count] (auto& sst) {
@@ -222,11 +237,18 @@ future<> stream_transfer_task::execute() {
auto reason = session->get_reason();
return session->get_db().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, streaming_with_rpc_stream, reason] (database& db) {
auto si = make_lw_shared<send_info>(db, plan_id, cf_id, std::move(ranges), id, dst_cpu_id, reason);
if (streaming_with_rpc_stream) {
return send_mutation_fragments(std::move(si));
} else {
return send_mutations(std::move(si));
}
return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id, streaming_with_rpc_stream] (bool has_relevant_range_on_this_shard) {
if (!has_relevant_range_on_this_shard) {
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}",
plan_id, cf_id, engine().cpu_id());
return make_ready_future<>();
}
if (streaming_with_rpc_stream) {
return send_mutation_fragments(std::move(si));
} else {
return send_mutations(std::move(si));
}
});
}).then([this, plan_id, cf_id, id, streaming_with_rpc_stream] {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,