database: move total_reads* metrics to the concurrency semaphore

This commit is contained in:
Botond Dénes
2020-09-07 14:25:42 +03:00
parent 32ff524454
commit d7e794e565
2 changed files with 30 additions and 13 deletions

View File

@@ -379,12 +379,23 @@ database::setup_metrics() {
sm::make_derive("total_writes_timedout", _stats->total_writes_timedout,
sm::description("Counts write operations failed due to a timeout. A positive value is a sign of storage being overloaded.")),
sm::make_derive("total_reads", _stats->total_reads,
sm::description("Counts the total number of successful reads on this shard.")),
sm::make_derive("total_reads", _read_concurrency_sem.get_stats().total_successful_reads,
sm::description("Counts the total number of successful user reads on this shard."),
{user_label_instance}),
sm::make_derive("total_reads_failed", _stats->total_reads_failed,
sm::description("Counts the total number of failed read operations. "
"Add the total_reads to this value to get the total amount of reads issued on this shard.")),
sm::make_derive("total_reads_failed", _read_concurrency_sem.get_stats().total_failed_reads,
sm::description("Counts the total number of failed user read operations. "
"Add the total_reads to this value to get the total amount of reads issued on this shard."),
{user_label_instance}),
sm::make_derive("total_reads", _system_read_concurrency_sem.get_stats().total_successful_reads,
sm::description("Counts the total number of successful system reads on this shard."),
{system_label_instance}),
sm::make_derive("total_reads_failed", _system_read_concurrency_sem.get_stats().total_failed_reads,
sm::description("Counts the total number of failed system read operations. "
"Add the total_reads to this value to get the total amount of reads issued on this shard."),
{system_label_instance}),
sm::make_current_bytes("view_update_backlog", [this] { return get_view_update_backlog().current; },
sm::description("Holds the current size in bytes of the pending view updates for all tables")),
@@ -1192,7 +1203,8 @@ future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
column_family& cf = find_column_family(cmd.cf_id);
auto class_config = query::query_class_config{.semaphore = get_reader_concurrency_semaphore(), .max_memory_for_unlimited_query = *cmd.max_result_size};
auto& semaphore = get_reader_concurrency_semaphore();
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page);
return _data_query_stage(&cf,
std::move(s),
@@ -1203,12 +1215,12 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
std::move(trace_state),
seastar::ref(get_result_memory_limiter()),
timeout,
std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) {
std::move(cache_ctx)).then_wrapped([this, s = _stats, &semaphore, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) {
if (f.failed()) {
++s->total_reads_failed;
++semaphore.get_stats().total_failed_reads;
return make_exception_future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>(f.get_exception());
} else {
++s->total_reads;
++semaphore.get_stats().total_successful_reads;
auto result = f.get0();
s->short_data_queries += bool(result->is_short_read());
return make_ready_future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>(std::tuple(std::move(result), hit_rate));
@@ -1223,7 +1235,8 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
return get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed).then(
[&, s = std::move(s), trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) {
column_family& cf = find_column_family(cmd.cf_id);
auto class_config = query::query_class_config{.semaphore = get_reader_concurrency_semaphore(), .max_memory_for_unlimited_query = *cmd.max_result_size};
auto& semaphore = get_reader_concurrency_semaphore();
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page);
return _mutation_query_stage(std::move(s),
cf.as_mutation_source(),
@@ -1236,12 +1249,12 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
class_config,
std::move(accounter),
std::move(trace_state),
std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) {
std::move(cache_ctx)).then_wrapped([this, s = _stats, &semaphore, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) {
if (f.failed()) {
++s->total_reads_failed;
++semaphore.get_stats().total_failed_reads;
return make_exception_future<std::tuple<reconcilable_result, cache_temperature>>(f.get_exception());
} else {
++s->total_reads;
++semaphore.get_stats().total_successful_reads;
auto result = f.get0();
s->short_mutation_queries += bool(result.is_short_read());
return make_ready_future<std::tuple<reconcilable_result, cache_temperature>>(std::tuple(std::move(result), hit_rate));

View File

@@ -87,6 +87,10 @@ public:
uint64_t permit_based_evictions = 0;
// The number of inactive reads currently registered.
uint64_t inactive_reads = 0;
// Total number of successful reads executed through this semaphore.
uint64_t total_successful_reads = 0;
// Total number of failed reads executed through this semaphore.
uint64_t total_failed_reads = 0;
};
private: