streaming: Add finished percentage metrics for node ops using streaming

We have added the finished percentage for repair based node operations.

This patch adds the finished percentage for node ops using the old
streaming.

Example output:

scylla_streaming_finished_percentage{ops="bootstrap",shard="0"} 1.000000
scylla_streaming_finished_percentage{ops="decommission",shard="0"} 1.000000
scylla_streaming_finished_percentage{ops="rebuild",shard="0"} 0.561945
scylla_streaming_finished_percentage{ops="removenode",shard="0"} 1.000000
scylla_streaming_finished_percentage{ops="repair",shard="0"} 1.000000
scylla_streaming_finished_percentage{ops="replace",shard="0"} 1.000000

In addition to the metrics, log shows the percentage is added.

[shard 0] range_streamer - Finished 2698 out of 2817 ranges for rebuild, finished percentage=0.95775646

Fixes #11600

Closes #11601
This commit is contained in:
Asias He
2022-09-22 10:21:36 +08:00
committed by Botond Dénes
parent 517c1529aa
commit 9ed401c4b2
4 changed files with 43 additions and 0 deletions

View File

@@ -243,6 +243,7 @@ future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::effec
future<> range_streamer::stream_async() {
auto nr_ranges_remaining = nr_ranges_to_stream();
_nr_total_ranges = nr_ranges_remaining;
logger.info("{} starts, nr_ranges_remaining={}", _description, nr_ranges_remaining);
auto start = lowres_clock::now();
return do_for_each(_to_stream, [this, start, description = _description] (auto& stream) {
@@ -279,6 +280,12 @@ future<> range_streamer::stream_async() {
}
sp.execute().discard_result().get();
ranges_to_stream.clear();
// Update finished percentage
auto remaining = nr_ranges_to_stream();
float percentage = _nr_total_ranges == 0 ? 1 : (_nr_total_ranges - remaining) / (float)_nr_total_ranges;
_stream_manager.local().update_finished_percentage(_reason, percentage);
logger.info("Finished {} out of {} ranges for {}, finished percentage={}",
_nr_total_ranges - remaining, _nr_total_ranges, _reason, percentage);
};
try {
for (auto it = range_vec.begin(); it < range_vec.end();) {

View File

@@ -162,6 +162,7 @@ private:
unsigned _nr_rx_added = 0;
// Limit the number of nodes to stream in parallel to reduce memory pressure with large cluster.
seastar::semaphore _limiter{16};
size_t _nr_total_ranges = 0;
};
} // dht

View File

@@ -45,12 +45,38 @@ stream_manager::stream_manager(db::config& cfg,
(void)_io_throughput_updater.trigger_later();
}
_finished_percentage[streaming::stream_reason::bootstrap] = 1;
_finished_percentage[streaming::stream_reason::decommission] = 1;
_finished_percentage[streaming::stream_reason::removenode] = 1;
_finished_percentage[streaming::stream_reason::rebuild] = 1;
_finished_percentage[streaming::stream_reason::repair] = 1;
_finished_percentage[streaming::stream_reason::replace] = 1;
auto ops_label_type = sm::label("ops");
_metrics.add_group("streaming", {
sm::make_counter("total_incoming_bytes", [this] { return _total_incoming_bytes; },
sm::description("Total number of bytes received on this shard.")),
sm::make_counter("total_outgoing_bytes", [this] { return _total_outgoing_bytes; },
sm::description("Total number of bytes sent on this shard.")),
sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::bootstrap]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("bootstrap")}),
sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::decommission]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("decommission")}),
sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::removenode]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("removenode")}),
sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::rebuild]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("rebuild")}),
sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::repair]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("repair")}),
sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::replace]; },
sm::description("Finished percentage of node operation on this shard"), {ops_label_type("replace")}),
});
}
@@ -365,4 +391,8 @@ shared_ptr<stream_session> stream_manager::get_session(streaming::plan_id plan_i
return coordinator->get_or_create_session(*this, from);
}
void stream_manager::update_finished_percentage(streaming::stream_reason reason, float percentage) {
_finished_percentage[reason] = percentage;
}
} // namespace streaming

View File

@@ -11,6 +11,7 @@
#pragma once
#include "streaming/stream_fwd.hh"
#include "streaming/progress_info.hh"
#include "streaming/stream_reason.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/distributed.hh>
#include "utils/updateable_value.hh"
@@ -96,6 +97,7 @@ private:
uint64_t _total_outgoing_bytes{0};
semaphore _mutation_send_limiter{256};
seastar::metrics::metric_groups _metrics;
std::unordered_map<streaming::stream_reason, float> _finished_percentage;
utils::updateable_value<uint32_t> _io_throughput_mbs;
serialized_action _io_throughput_updater = serialized_action([this] { return update_io_throughput(_io_throughput_mbs()); });
@@ -184,6 +186,9 @@ private:
void init_messaging_service_handler();
future<> uninit_messaging_service_handler();
future<> update_io_throughput(uint32_t value_mbs);
public:
void update_finished_percentage(streaming::stream_reason reason, float percentage);
};
} // namespace streaming