db/view: Add visibility to view updating of Staging SSTables
Today, we're completely blind about the progress of view updating on Staging files. We don't know how long it will take, nor how much progress we've made. This patch adds visibility with a new metric that will inform the number of bytes to be processed from Staging files. Before any work is done, the metric tell us the total size to be processed. As view updating progresses, the metric value is expected to decrease, unless work is being produced faster than we can consume them. We're piggybacking on sstables::read_monitor, which allows the progress metric to be updated whenever the SSTable reader makes progress. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Closes #11751
This commit is contained in:
committed by
Botond Dénes
parent
2e79bb431c
commit
ec79ac46c9
@@ -13,6 +13,7 @@
|
||||
#include "utils/error_injection.hh"
|
||||
#include "db/view/view_updating_consumer.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/progress_monitor.hh"
|
||||
#include "readers/evictable.hh"
|
||||
|
||||
static logging::logger vug_logger("view_update_generator");
|
||||
@@ -24,6 +25,76 @@ static inline void inject_failure(std::string_view operation) {
|
||||
|
||||
namespace db::view {
|
||||
|
||||
class view_update_generator::progress_tracker final : public sstables::read_monitor_generator {
|
||||
class read_monitor final : public sstables::read_monitor {
|
||||
sstables::shared_sstable _sst;
|
||||
const sstables::reader_position_tracker* _tracker = nullptr;
|
||||
uint64_t _last_position_seen = 0;
|
||||
public:
|
||||
virtual void on_read_started(const sstables::reader_position_tracker& tracker) override {
|
||||
_tracker = &tracker;
|
||||
}
|
||||
|
||||
virtual void on_read_completed() override {
|
||||
if (auto tracker = std::exchange(_tracker, nullptr)) {
|
||||
_last_position_seen = tracker->position;
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t pending_work() const noexcept {
|
||||
auto last_pos = (_tracker) ? _tracker->position : _last_position_seen;
|
||||
return _sst->data_size() - last_pos;
|
||||
}
|
||||
|
||||
read_monitor& operator=(const read_monitor&) = delete;
|
||||
read_monitor(const read_monitor&) = delete;
|
||||
read_monitor& operator=(const read_monitor&&) = delete;
|
||||
read_monitor(read_monitor&&) = delete;
|
||||
|
||||
explicit read_monitor(sstables::shared_sstable sst)
|
||||
: _sst(std::move(sst)) {
|
||||
}
|
||||
};
|
||||
private:
|
||||
// Tracks SSTables that were registered in view_update_generator, but aren't being processed yet.
|
||||
uint64_t _inactive_pending_work = 0;
|
||||
// Tracks SSTables that are now being processed by view_update_generator's async loop
|
||||
// using unordered_map to provide a stable address for read_monitor, so operator() can safely return a reference.
|
||||
std::unordered_map<sstables::shared_sstable, read_monitor> _monitors;
|
||||
public:
|
||||
virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override {
|
||||
auto p = _monitors.try_emplace(sst, sst);
|
||||
_inactive_pending_work -= sst->data_size();
|
||||
return p.first->second;
|
||||
}
|
||||
|
||||
void on_sstable_registration(const sstables::shared_sstable& sst) {
|
||||
_inactive_pending_work += sst->data_size();
|
||||
}
|
||||
|
||||
void on_sstables_deregistration(const std::vector<sstables::shared_sstable>& ssts) {
|
||||
for (auto& sst : ssts) {
|
||||
if (_monitors.contains(sst)) {
|
||||
_monitors.erase(sst);
|
||||
} else {
|
||||
_inactive_pending_work -= sst->data_size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t sstables_pending_work() const noexcept {
|
||||
return _inactive_pending_work +
|
||||
boost::accumulate(_monitors | boost::adaptors::map_values | boost::adaptors::transformed(std::mem_fn(&read_monitor::pending_work)), uint64_t(0));
|
||||
}
|
||||
};
|
||||
|
||||
view_update_generator::view_update_generator(replica::database& db) : _db(db), _progress_tracker(std::make_unique<progress_tracker>()) {
|
||||
setup_metrics();
|
||||
discover_staging_sstables();
|
||||
}
|
||||
|
||||
view_update_generator::~view_update_generator() {}
|
||||
|
||||
future<> view_update_generator::start() {
|
||||
thread_attributes attr;
|
||||
attr.sched_group = _db.get_streaming_scheduling_group();
|
||||
@@ -34,6 +105,7 @@ future<> view_update_generator::start() {
|
||||
_sstables_to_move.size(), _sstables_with_tables.size());
|
||||
_sstables_to_move.clear();
|
||||
_sstables_with_tables.clear();
|
||||
_progress_tracker = {};
|
||||
});
|
||||
while (!_as.abort_requested()) {
|
||||
if (_sstables_with_tables.empty()) {
|
||||
@@ -73,7 +145,7 @@ future<> view_update_generator::start() {
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding fwd_ms,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr, *_progress_tracker);
|
||||
});
|
||||
auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader_v2(
|
||||
std::move(ms),
|
||||
@@ -100,6 +172,7 @@ future<> view_update_generator::start() {
|
||||
}
|
||||
try {
|
||||
inject_failure("view_update_generator_collect_consumed_sstables");
|
||||
_progress_tracker->on_sstables_deregistration(sstables);
|
||||
// collect all staging sstables to move in a map, grouped by table.
|
||||
std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_to_move[t]));
|
||||
} catch (...) {
|
||||
@@ -142,6 +215,7 @@ future<> view_update_generator::register_staging_sstable(sstables::shared_sstabl
|
||||
return make_ready_future<>();
|
||||
}
|
||||
inject_failure("view_update_generator_registering_staging_sstable");
|
||||
_progress_tracker->on_sstable_registration(sst);
|
||||
_sstables_with_tables[table].push_back(std::move(sst));
|
||||
|
||||
_pending_sstables.signal();
|
||||
@@ -166,7 +240,11 @@ void view_update_generator::setup_metrics() {
|
||||
|
||||
sm::make_gauge("sstables_to_move_count",
|
||||
sm::description("Number of sets of sstables which are already processed and wait to be moved from their staging directory"),
|
||||
[this] { return _sstables_to_move.size(); })
|
||||
[this] { return _sstables_to_move.size(); }),
|
||||
|
||||
sm::make_gauge("sstables_pending_work",
|
||||
sm::description("Number of bytes remaining to be processed from SSTables for view updates"),
|
||||
[this] { return _progress_tracker ? _progress_tracker->sstables_pending_work() : 0; })
|
||||
});
|
||||
}
|
||||
|
||||
@@ -175,6 +253,7 @@ void view_update_generator::discover_staging_sstables() {
|
||||
auto t = x.second->shared_from_this();
|
||||
for (auto sstables = t->get_sstables(); sstables::shared_sstable sst : *sstables) {
|
||||
if (sst->requires_view_building()) {
|
||||
_progress_tracker->on_sstable_registration(sst);
|
||||
_sstables_with_tables[t].push_back(std::move(sst));
|
||||
// we're at early stage here, no need to kick _pending_sstables (the
|
||||
// bulding fiber is not running), neither we can wait on the semaphore
|
||||
|
||||
@@ -30,11 +30,11 @@ private:
|
||||
std::unordered_map<lw_shared_ptr<replica::table>, std::vector<sstables::shared_sstable>> _sstables_with_tables;
|
||||
std::unordered_map<lw_shared_ptr<replica::table>, std::vector<sstables::shared_sstable>> _sstables_to_move;
|
||||
metrics::metric_groups _metrics;
|
||||
class progress_tracker;
|
||||
std::unique_ptr<progress_tracker> _progress_tracker;
|
||||
public:
|
||||
view_update_generator(replica::database& db) : _db(db) {
|
||||
setup_metrics();
|
||||
discover_staging_sstables();
|
||||
}
|
||||
view_update_generator(replica::database& db);
|
||||
~view_update_generator();
|
||||
|
||||
future<> start();
|
||||
future<> stop();
|
||||
|
||||
Reference in New Issue
Block a user