merge: Add metrics to semaphores
Merged pull request https://github.com/scylladb/scylla/pull/7018 by Piotr Sarna: This series addresses various issues with metrics and semaphores - it mainly adds missing metrics, which makes it possible to see the length of the queues attached to the semaphores. In case of view building and view update generation, metrics was not present in these services at all, so a first, basic implementation is added. More precise semaphore metrics would ease the testing and development of load shedding and admission control. view_builder: add metrics db, view: add view update generator metrics hints: track resource_manager sending queue length hints: add drain queue length to metrics table: add metrics for sstable deletion semaphore database: remove unused semaphore
This commit is contained in:
@@ -1246,7 +1246,6 @@ private:
|
||||
size_t max_memory_streaming_concurrent_reads() { return _dbcfg.available_memory * 0.02; }
|
||||
static constexpr size_t max_count_system_concurrent_reads{10};
|
||||
size_t max_memory_system_concurrent_reads() { return _dbcfg.available_memory * 0.02; };
|
||||
static constexpr size_t max_concurrent_sstable_loads() { return 3; }
|
||||
size_t max_memory_pending_view_updates() const { return _dbcfg.available_memory * 0.1; }
|
||||
|
||||
struct db_stats {
|
||||
@@ -1283,8 +1282,6 @@ private:
|
||||
reader_concurrency_semaphore _compaction_concurrency_sem;
|
||||
reader_concurrency_semaphore _system_read_concurrency_sem;
|
||||
|
||||
named_semaphore _sstable_load_concurrency_sem{max_concurrent_sstable_loads(), named_semaphore_exception_factory{"sstable load concurrency"}};
|
||||
|
||||
db::timeout_semaphore _view_update_concurrency_sem{max_memory_pending_view_updates()};
|
||||
|
||||
cache_tracker _row_cache_tracker;
|
||||
@@ -1558,9 +1555,6 @@ public:
|
||||
std::unordered_set<sstring> get_initial_tokens();
|
||||
std::optional<gms::inet_address> get_replace_address();
|
||||
bool is_replacing();
|
||||
named_semaphore& sstable_load_concurrency_sem() {
|
||||
return _sstable_load_concurrency_sem;
|
||||
}
|
||||
void register_connection_drop_notifier(netw::messaging_service& ms);
|
||||
|
||||
db_stats& get_stats() {
|
||||
|
||||
@@ -87,6 +87,14 @@ void manager::register_metrics(const sstring& group_name) {
|
||||
|
||||
sm::make_derive("corrupted_files", _stats.corrupted_files,
|
||||
sm::description("Number of hints files that were discarded during sending because the file was corrupted.")),
|
||||
|
||||
sm::make_gauge("pending_drains",
|
||||
sm::description("Number of tasks waiting in the queue for draining hints"),
|
||||
[this] { return _drain_lock.waiters(); }),
|
||||
|
||||
sm::make_gauge("pending_sends",
|
||||
sm::description("Number of tasks waiting in the queue for sending a hint"),
|
||||
[this] { return _resource_manager.sending_queue_length(); })
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -59,6 +59,10 @@ future<semaphore_units<named_semaphore::exception_factory>> resource_manager::ge
|
||||
return get_units(_send_limiter, hint_memory_budget);
|
||||
}
|
||||
|
||||
size_t resource_manager::sending_queue_length() const {
|
||||
return _send_limiter.waiters();
|
||||
}
|
||||
|
||||
const std::chrono::seconds space_watchdog::_watchdog_period = std::chrono::seconds(1);
|
||||
|
||||
space_watchdog::space_watchdog(shard_managers_set& managers, per_device_limits_map& per_device_limits_map)
|
||||
|
||||
@@ -140,6 +140,7 @@ public:
|
||||
resource_manager& operator=(resource_manager&&) = delete;
|
||||
|
||||
future<semaphore_units<named_semaphore::exception_factory>> get_send_units_for(size_t buf_size);
|
||||
size_t sending_queue_length() const;
|
||||
|
||||
future<> start(shared_ptr<service::storage_proxy> proxy_ptr, shared_ptr<gms::gossiper> gossiper_ptr, shared_ptr<service::storage_service> ss_ptr);
|
||||
void allow_replaying() noexcept;
|
||||
|
||||
@@ -1213,6 +1213,29 @@ view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_di
|
||||
: _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _mnotifier(mn) {
|
||||
setup_metrics();
|
||||
}
|
||||
|
||||
void view_builder::setup_metrics() {
|
||||
namespace sm = seastar::metrics;
|
||||
|
||||
_metrics.add_group("view_builder", {
|
||||
sm::make_gauge("pending_bookkeeping_ops",
|
||||
sm::description("Number of tasks waiting to perform bookkeeping operations"),
|
||||
[this] { return _sem.waiters(); }),
|
||||
|
||||
sm::make_derive("steps_performed",
|
||||
sm::description("Number of performed build steps."),
|
||||
_stats.steps_performed),
|
||||
|
||||
sm::make_derive("steps_failed",
|
||||
sm::description("Number of failed build steps."),
|
||||
_stats.steps_failed),
|
||||
|
||||
sm::make_gauge("builds_in_progress",
|
||||
sm::description("Number of currently active view builds."),
|
||||
[this] { return _base_to_build_step.size(); })
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::start(service::migration_manager& mm) {
|
||||
@@ -1599,6 +1622,7 @@ future<> view_builder::do_build_step() {
|
||||
exponential_backoff_retry r(1s, 1min);
|
||||
while (!_base_to_build_step.empty() && !_as.abort_requested()) {
|
||||
auto units = get_units(_sem, 1).get0();
|
||||
++_stats.steps_performed;
|
||||
try {
|
||||
execute(_current_step->second, exponential_backoff_retry(1s, 1min));
|
||||
r.reset();
|
||||
@@ -1606,6 +1630,7 @@ future<> view_builder::do_build_step() {
|
||||
return;
|
||||
} catch (...) {
|
||||
++_current_step->second.base->cf_stats()->view_building_paused;
|
||||
++_stats.steps_failed;
|
||||
auto base = _current_step->second.base->schema();
|
||||
vlogger.warn("Error executing build step for base {}.{}: {}", base->ks_name(), base->cf_name(), std::current_exception());
|
||||
r.retry(_as).get();
|
||||
|
||||
@@ -115,6 +115,11 @@ class view_builder final : public service::migration_listener::only_view_notific
|
||||
std::optional<dht::token> next_token;
|
||||
};
|
||||
|
||||
struct stats {
|
||||
uint64_t steps_performed = 0;
|
||||
uint64_t steps_failed = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Keeps track of the build progress for all the views of a particular
|
||||
* base table. Each execution of the build step comprises a query of
|
||||
@@ -164,6 +169,8 @@ class view_builder final : public service::migration_listener::only_view_notific
|
||||
seastar::shared_promise<> _shards_finished_read_promise;
|
||||
// Used for testing.
|
||||
std::unordered_map<std::pair<sstring, sstring>, seastar::shared_promise<>, utils::tuple_hash> _build_notifiers;
|
||||
stats _stats;
|
||||
metrics::metric_groups _metrics;
|
||||
|
||||
public:
|
||||
// The view builder processes the base table in steps of batch_size rows.
|
||||
@@ -206,6 +213,7 @@ private:
|
||||
future<> do_build_step();
|
||||
void execute(build_step&, exponential_backoff_retry);
|
||||
future<> maybe_mark_view_as_built(view_ptr, dht::token);
|
||||
void setup_metrics();
|
||||
|
||||
struct consumer;
|
||||
};
|
||||
|
||||
@@ -152,4 +152,21 @@ future<> view_update_generator::register_staging_sstable(sstables::shared_sstabl
|
||||
}
|
||||
}
|
||||
|
||||
void view_update_generator::setup_metrics() {
|
||||
namespace sm = seastar::metrics;
|
||||
|
||||
_metrics.add_group("view_update_generator", {
|
||||
sm::make_gauge("pending_registrations", sm::description("Number of tasks waiting to register staging sstables"),
|
||||
[this] { return _registration_sem.waiters(); }),
|
||||
|
||||
sm::make_gauge("queued_batches_count",
|
||||
sm::description("Number of sets of sstables queued for view update generation"),
|
||||
[this] { return _sstables_with_tables.size(); }),
|
||||
|
||||
sm::make_gauge("sstables_to_move_count",
|
||||
sm::description("Number of sets of sstables which are already processed and wait to be moved from their staging directory"),
|
||||
[this] { return _sstables_to_move.size(); })
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -48,8 +48,11 @@ private:
|
||||
};
|
||||
std::unordered_map<lw_shared_ptr<table>, std::vector<sstables::shared_sstable>> _sstables_with_tables;
|
||||
std::unordered_map<lw_shared_ptr<table>, std::vector<sstables::shared_sstable>> _sstables_to_move;
|
||||
metrics::metric_groups _metrics;
|
||||
public:
|
||||
view_update_generator(database& db) : _db(db) { }
|
||||
view_update_generator(database& db) : _db(db) {
|
||||
setup_metrics();
|
||||
}
|
||||
|
||||
future<> start();
|
||||
future<> stop();
|
||||
@@ -58,6 +61,7 @@ public:
|
||||
ssize_t available_register_units() const { return _registration_sem.available_units(); }
|
||||
private:
|
||||
bool should_throttle() const;
|
||||
void setup_metrics();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
5
table.cc
5
table.cc
@@ -885,7 +885,10 @@ void table::set_metrics() {
|
||||
ms::make_gauge("live_disk_space", ms::description("Live disk space used"), _stats.live_disk_space_used)(cf)(ks),
|
||||
ms::make_gauge("total_disk_space", ms::description("Total disk space used"), _stats.total_disk_space_used)(cf)(ks),
|
||||
ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks),
|
||||
ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks)
|
||||
ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks),
|
||||
ms::make_gauge("pending_sstable_deletions",
|
||||
ms::description("Number of tasks waiting to delete sstables from a table"),
|
||||
[this] { return _sstable_deletion_sem.waiters(); })(cf)(ks)
|
||||
});
|
||||
|
||||
// Metrics related to row locking
|
||||
|
||||
Reference in New Issue
Block a user