dht/range_streamer: stream_async: move ranges_to_stream to do_streaming

Currently the ranges_to_stream variable lives
on the caller state, and do_streaming() moves its
contents down to request_ranges/transfer_ranges
and then calls clear() to make it ready for reuse.

This works in principle but it makes it harder
for an occasional reader of this code to figure out
what going on.

This change transfers control of the ranges_to_stream vector
to do_streaming, by calling it with (std::exchange(do_streaming, {}))
and with that that moved vector doesn't need to be cleared by
do_streaming, and the caller is reponsible for readying
the variable for reuse in its for loop.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-12-18 15:16:26 +02:00
parent 1392c7e1cf
commit 06a0902708

View File

@@ -266,8 +266,7 @@ future<> range_streamer::stream_async() {
unsigned nr_ranges_streamed = 0;
size_t nr_ranges_total = range_vec.size();
size_t nr_ranges_per_stream_plan = nr_ranges_total / 10;
dht::token_range_vector ranges_to_stream;
auto do_streaming = [&] {
auto do_streaming = [&] (dht::token_range_vector&& ranges_to_stream) {
auto sp = stream_plan(_stream_manager.local(), format("{}-{}-index-{:d}", description, keyspace, sp_index++), _reason);
auto abort_listener = _abort_source.subscribe([&] () noexcept { sp.abort(); });
_abort_source.check();
@@ -281,7 +280,6 @@ future<> range_streamer::stream_async() {
sp.transfer_ranges(source, keyspace, std::move(ranges_to_stream));
}
sp.execute().discard_result().get();
ranges_to_stream.clear();
// Update finished percentage
nr_ranges_streamed += ranges_streamed;
_nr_ranges_remaining -= ranges_streamed;
@@ -290,6 +288,7 @@ future<> range_streamer::stream_async() {
logger.info("Finished {} out of {} ranges for {}, finished percentage={}",
_nr_total_ranges - _nr_ranges_remaining, _nr_total_ranges, _reason, percentage);
};
dht::token_range_vector ranges_to_stream;
try {
for (auto it = range_vec.begin(); it < range_vec.end();) {
ranges_to_stream.push_back(*it);
@@ -297,12 +296,12 @@ future<> range_streamer::stream_async() {
if (ranges_to_stream.size() < nr_ranges_per_stream_plan) {
continue;
} else {
do_streaming();
do_streaming(std::exchange(ranges_to_stream, {}));
it = range_vec.erase(range_vec.begin(), it);
}
}
if (ranges_to_stream.size() > 0) {
do_streaming();
do_streaming(std::exchange(ranges_to_stream, {}));
range_vec.clear();
}
} catch (...) {