From ec79ac46c9bb2b01742ad43d0787ee756449690d Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 7 Oct 2022 15:46:31 -0300 Subject: [PATCH] db/view: Add visibility to view updating of Staging SSTables Today, we're completely blind about the progress of view updating on Staging files. We don't know how long it will take, nor how much progress we've made. This patch adds visibility with a new metric that will inform the number of bytes to be processed from Staging files. Before any work is done, the metric tell us the total size to be processed. As view updating progresses, the metric value is expected to decrease, unless work is being produced faster than we can consume them. We're piggybacking on sstables::read_monitor, which allows the progress metric to be updated whenever the SSTable reader makes progress. Signed-off-by: Raphael S. Carvalho Closes #11751 --- db/view/view_update_generator.cc | 83 +++++++++++++++++++++++++++++++- db/view/view_update_generator.hh | 8 +-- 2 files changed, 85 insertions(+), 6 deletions(-) diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 07bece8808..5021026856 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -13,6 +13,7 @@ #include "utils/error_injection.hh" #include "db/view/view_updating_consumer.hh" #include "sstables/sstables.hh" +#include "sstables/progress_monitor.hh" #include "readers/evictable.hh" static logging::logger vug_logger("view_update_generator"); @@ -24,6 +25,76 @@ static inline void inject_failure(std::string_view operation) { namespace db::view { +class view_update_generator::progress_tracker final : public sstables::read_monitor_generator { + class read_monitor final : public sstables::read_monitor { + sstables::shared_sstable _sst; + const sstables::reader_position_tracker* _tracker = nullptr; + uint64_t _last_position_seen = 0; + public: + virtual void on_read_started(const sstables::reader_position_tracker& tracker) override { + _tracker = &tracker; + } + + virtual void on_read_completed() override { + if (auto tracker = std::exchange(_tracker, nullptr)) { + _last_position_seen = tracker->position; + } + } + + uint64_t pending_work() const noexcept { + auto last_pos = (_tracker) ? _tracker->position : _last_position_seen; + return _sst->data_size() - last_pos; + } + + read_monitor& operator=(const read_monitor&) = delete; + read_monitor(const read_monitor&) = delete; + read_monitor& operator=(const read_monitor&&) = delete; + read_monitor(read_monitor&&) = delete; + + explicit read_monitor(sstables::shared_sstable sst) + : _sst(std::move(sst)) { + } + }; +private: + // Tracks SSTables that were registered in view_update_generator, but aren't being processed yet. + uint64_t _inactive_pending_work = 0; + // Tracks SSTables that are now being processed by view_update_generator's async loop + // using unordered_map to provide a stable address for read_monitor, so operator() can safely return a reference. + std::unordered_map _monitors; +public: + virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override { + auto p = _monitors.try_emplace(sst, sst); + _inactive_pending_work -= sst->data_size(); + return p.first->second; + } + + void on_sstable_registration(const sstables::shared_sstable& sst) { + _inactive_pending_work += sst->data_size(); + } + + void on_sstables_deregistration(const std::vector& ssts) { + for (auto& sst : ssts) { + if (_monitors.contains(sst)) { + _monitors.erase(sst); + } else { + _inactive_pending_work -= sst->data_size(); + } + } + } + + uint64_t sstables_pending_work() const noexcept { + return _inactive_pending_work + + boost::accumulate(_monitors | boost::adaptors::map_values | boost::adaptors::transformed(std::mem_fn(&read_monitor::pending_work)), uint64_t(0)); + } +}; + +view_update_generator::view_update_generator(replica::database& db) : _db(db), _progress_tracker(std::make_unique()) { + setup_metrics(); + discover_staging_sstables(); +} + +view_update_generator::~view_update_generator() {} + future<> view_update_generator::start() { thread_attributes attr; attr.sched_group = _db.get_streaming_scheduling_group(); @@ -34,6 +105,7 @@ future<> view_update_generator::start() { _sstables_to_move.size(), _sstables_with_tables.size()); _sstables_to_move.clear(); _sstables_with_tables.clear(); + _progress_tracker = {}; }); while (!_as.abort_requested()) { if (_sstables_with_tables.empty()) { @@ -73,7 +145,7 @@ future<> view_update_generator::start() { tracing::trace_state_ptr ts, streamed_mutation::forwarding fwd_ms, mutation_reader::forwarding fwd_mr) { - return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr, *_progress_tracker); }); auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader_v2( std::move(ms), @@ -100,6 +172,7 @@ future<> view_update_generator::start() { } try { inject_failure("view_update_generator_collect_consumed_sstables"); + _progress_tracker->on_sstables_deregistration(sstables); // collect all staging sstables to move in a map, grouped by table. std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_to_move[t])); } catch (...) { @@ -142,6 +215,7 @@ future<> view_update_generator::register_staging_sstable(sstables::shared_sstabl return make_ready_future<>(); } inject_failure("view_update_generator_registering_staging_sstable"); + _progress_tracker->on_sstable_registration(sst); _sstables_with_tables[table].push_back(std::move(sst)); _pending_sstables.signal(); @@ -166,7 +240,11 @@ void view_update_generator::setup_metrics() { sm::make_gauge("sstables_to_move_count", sm::description("Number of sets of sstables which are already processed and wait to be moved from their staging directory"), - [this] { return _sstables_to_move.size(); }) + [this] { return _sstables_to_move.size(); }), + + sm::make_gauge("sstables_pending_work", + sm::description("Number of bytes remaining to be processed from SSTables for view updates"), + [this] { return _progress_tracker ? _progress_tracker->sstables_pending_work() : 0; }) }); } @@ -175,6 +253,7 @@ void view_update_generator::discover_staging_sstables() { auto t = x.second->shared_from_this(); for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) { if (sst->requires_view_building()) { + _progress_tracker->on_sstable_registration(sst); _sstables_with_tables[t].push_back(std::move(sst)); // we're at early stage here, no need to kick _pending_sstables (the // bulding fiber is not running), neither we can wait on the semaphore diff --git a/db/view/view_update_generator.hh b/db/view/view_update_generator.hh index 002af019a2..6aae8177fb 100644 --- a/db/view/view_update_generator.hh +++ b/db/view/view_update_generator.hh @@ -30,11 +30,11 @@ private: std::unordered_map, std::vector> _sstables_with_tables; std::unordered_map, std::vector> _sstables_to_move; metrics::metric_groups _metrics; + class progress_tracker; + std::unique_ptr _progress_tracker; public: - view_update_generator(replica::database& db) : _db(db) { - setup_metrics(); - discover_staging_sstables(); - } + view_update_generator(replica::database& db); + ~view_update_generator(); future<> start(); future<> stop();