diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 07bece8808..5021026856 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -13,6 +13,7 @@ #include "utils/error_injection.hh" #include "db/view/view_updating_consumer.hh" #include "sstables/sstables.hh" +#include "sstables/progress_monitor.hh" #include "readers/evictable.hh" static logging::logger vug_logger("view_update_generator"); @@ -24,6 +25,76 @@ static inline void inject_failure(std::string_view operation) { namespace db::view { +class view_update_generator::progress_tracker final : public sstables::read_monitor_generator { + class read_monitor final : public sstables::read_monitor { + sstables::shared_sstable _sst; + const sstables::reader_position_tracker* _tracker = nullptr; + uint64_t _last_position_seen = 0; + public: + virtual void on_read_started(const sstables::reader_position_tracker& tracker) override { + _tracker = &tracker; + } + + virtual void on_read_completed() override { + if (auto tracker = std::exchange(_tracker, nullptr)) { + _last_position_seen = tracker->position; + } + } + + uint64_t pending_work() const noexcept { + auto last_pos = (_tracker) ? _tracker->position : _last_position_seen; + return _sst->data_size() - last_pos; + } + + read_monitor& operator=(const read_monitor&) = delete; + read_monitor(const read_monitor&) = delete; + read_monitor& operator=(const read_monitor&&) = delete; + read_monitor(read_monitor&&) = delete; + + explicit read_monitor(sstables::shared_sstable sst) + : _sst(std::move(sst)) { + } + }; +private: + // Tracks SSTables that were registered in view_update_generator, but aren't being processed yet. + uint64_t _inactive_pending_work = 0; + // Tracks SSTables that are now being processed by view_update_generator's async loop + // using unordered_map to provide a stable address for read_monitor, so operator() can safely return a reference. + std::unordered_map _monitors; +public: + virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override { + auto p = _monitors.try_emplace(sst, sst); + _inactive_pending_work -= sst->data_size(); + return p.first->second; + } + + void on_sstable_registration(const sstables::shared_sstable& sst) { + _inactive_pending_work += sst->data_size(); + } + + void on_sstables_deregistration(const std::vector& ssts) { + for (auto& sst : ssts) { + if (_monitors.contains(sst)) { + _monitors.erase(sst); + } else { + _inactive_pending_work -= sst->data_size(); + } + } + } + + uint64_t sstables_pending_work() const noexcept { + return _inactive_pending_work + + boost::accumulate(_monitors | boost::adaptors::map_values | boost::adaptors::transformed(std::mem_fn(&read_monitor::pending_work)), uint64_t(0)); + } +}; + +view_update_generator::view_update_generator(replica::database& db) : _db(db), _progress_tracker(std::make_unique()) { + setup_metrics(); + discover_staging_sstables(); +} + +view_update_generator::~view_update_generator() {} + future<> view_update_generator::start() { thread_attributes attr; attr.sched_group = _db.get_streaming_scheduling_group(); @@ -34,6 +105,7 @@ future<> view_update_generator::start() { _sstables_to_move.size(), _sstables_with_tables.size()); _sstables_to_move.clear(); _sstables_with_tables.clear(); + _progress_tracker = {}; }); while (!_as.abort_requested()) { if (_sstables_with_tables.empty()) { @@ -73,7 +145,7 @@ future<> view_update_generator::start() { tracing::trace_state_ptr ts, streamed_mutation::forwarding fwd_ms, mutation_reader::forwarding fwd_mr) { - return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr, *_progress_tracker); }); auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader_v2( std::move(ms), @@ -100,6 +172,7 @@ future<> view_update_generator::start() { } try { inject_failure("view_update_generator_collect_consumed_sstables"); + _progress_tracker->on_sstables_deregistration(sstables); // collect all staging sstables to move in a map, grouped by table. std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_to_move[t])); } catch (...) { @@ -142,6 +215,7 @@ future<> view_update_generator::register_staging_sstable(sstables::shared_sstabl return make_ready_future<>(); } inject_failure("view_update_generator_registering_staging_sstable"); + _progress_tracker->on_sstable_registration(sst); _sstables_with_tables[table].push_back(std::move(sst)); _pending_sstables.signal(); @@ -166,7 +240,11 @@ void view_update_generator::setup_metrics() { sm::make_gauge("sstables_to_move_count", sm::description("Number of sets of sstables which are already processed and wait to be moved from their staging directory"), - [this] { return _sstables_to_move.size(); }) + [this] { return _sstables_to_move.size(); }), + + sm::make_gauge("sstables_pending_work", + sm::description("Number of bytes remaining to be processed from SSTables for view updates"), + [this] { return _progress_tracker ? _progress_tracker->sstables_pending_work() : 0; }) }); } @@ -175,6 +253,7 @@ void view_update_generator::discover_staging_sstables() { auto t = x.second->shared_from_this(); for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) { if (sst->requires_view_building()) { + _progress_tracker->on_sstable_registration(sst); _sstables_with_tables[t].push_back(std::move(sst)); // we're at early stage here, no need to kick _pending_sstables (the // bulding fiber is not running), neither we can wait on the semaphore diff --git a/db/view/view_update_generator.hh b/db/view/view_update_generator.hh index 002af019a2..6aae8177fb 100644 --- a/db/view/view_update_generator.hh +++ b/db/view/view_update_generator.hh @@ -30,11 +30,11 @@ private: std::unordered_map, std::vector> _sstables_with_tables; std::unordered_map, std::vector> _sstables_to_move; metrics::metric_groups _metrics; + class progress_tracker; + std::unique_ptr _progress_tracker; public: - view_update_generator(replica::database& db) : _db(db) { - setup_metrics(); - discover_staging_sstables(); - } + view_update_generator(replica::database& db); + ~view_update_generator(); future<> start(); future<> stop();