diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 769e395daf..1246aadb06 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -243,6 +243,7 @@ future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::effec future<> range_streamer::stream_async() { auto nr_ranges_remaining = nr_ranges_to_stream(); + _nr_total_ranges = nr_ranges_remaining; logger.info("{} starts, nr_ranges_remaining={}", _description, nr_ranges_remaining); auto start = lowres_clock::now(); return do_for_each(_to_stream, [this, start, description = _description] (auto& stream) { @@ -279,6 +280,12 @@ future<> range_streamer::stream_async() { } sp.execute().discard_result().get(); ranges_to_stream.clear(); + // Update finished percentage + auto remaining = nr_ranges_to_stream(); + float percentage = _nr_total_ranges == 0 ? 1 : (_nr_total_ranges - remaining) / (float)_nr_total_ranges; + _stream_manager.local().update_finished_percentage(_reason, percentage); + logger.info("Finished {} out of {} ranges for {}, finished percentage={}", + _nr_total_ranges - remaining, _nr_total_ranges, _reason, percentage); }; try { for (auto it = range_vec.begin(); it < range_vec.end();) { diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index fb5c92c808..c949bafa8b 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -162,6 +162,7 @@ private: unsigned _nr_rx_added = 0; // Limit the number of nodes to stream in parallel to reduce memory pressure with large cluster. seastar::semaphore _limiter{16}; + size_t _nr_total_ranges = 0; }; } // dht diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index 00000f6028..006175975b 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -45,12 +45,38 @@ stream_manager::stream_manager(db::config& cfg, (void)_io_throughput_updater.trigger_later(); } + _finished_percentage[streaming::stream_reason::bootstrap] = 1; + _finished_percentage[streaming::stream_reason::decommission] = 1; + _finished_percentage[streaming::stream_reason::removenode] = 1; + _finished_percentage[streaming::stream_reason::rebuild] = 1; + _finished_percentage[streaming::stream_reason::repair] = 1; + _finished_percentage[streaming::stream_reason::replace] = 1; + + auto ops_label_type = sm::label("ops"); _metrics.add_group("streaming", { sm::make_counter("total_incoming_bytes", [this] { return _total_incoming_bytes; }, sm::description("Total number of bytes received on this shard.")), sm::make_counter("total_outgoing_bytes", [this] { return _total_outgoing_bytes; }, sm::description("Total number of bytes sent on this shard.")), + + sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::bootstrap]; }, + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("bootstrap")}), + + sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::decommission]; }, + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("decommission")}), + + sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::removenode]; }, + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("removenode")}), + + sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::rebuild]; }, + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("rebuild")}), + + sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::repair]; }, + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("repair")}), + + sm::make_gauge("finished_percentage", [this] { return _finished_percentage[streaming::stream_reason::replace]; }, + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("replace")}), }); } @@ -365,4 +391,8 @@ shared_ptr stream_manager::get_session(streaming::plan_id plan_i return coordinator->get_or_create_session(*this, from); } +void stream_manager::update_finished_percentage(streaming::stream_reason reason, float percentage) { + _finished_percentage[reason] = percentage; +} + } // namespace streaming diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index 6dd6cfa0e5..c2724b84fe 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -11,6 +11,7 @@ #pragma once #include "streaming/stream_fwd.hh" #include "streaming/progress_info.hh" +#include "streaming/stream_reason.hh" #include #include #include "utils/updateable_value.hh" @@ -96,6 +97,7 @@ private: uint64_t _total_outgoing_bytes{0}; semaphore _mutation_send_limiter{256}; seastar::metrics::metric_groups _metrics; + std::unordered_map _finished_percentage; utils::updateable_value _io_throughput_mbs; serialized_action _io_throughput_updater = serialized_action([this] { return update_io_throughput(_io_throughput_mbs()); }); @@ -184,6 +186,9 @@ private: void init_messaging_service_handler(); future<> uninit_messaging_service_handler(); future<> update_io_throughput(uint32_t value_mbs); + +public: + void update_finished_percentage(streaming::stream_reason reason, float percentage); }; } // namespace streaming