Merge 'database, reader_concurrency_semaphore: deduplicate reader_concurrency_semaphore metrics ' from Botond Dénes

Reduce code duplication by defining each metric just once, instead of three times, by having the semaphore register metrics by itself. This also makes the lifecycle of metrics contained in that of the semaphore. This is important on enterprise where semaphores are added and removed, together with service levels.
We don't want all semaphores to export metrics, so a new parameter is introduced and all call-sites make a call whether they opt-in or not.

Fixes: https://github.com/scylladb/scylladb/issues/16402

Closes scylladb/scylladb#16383

* github.com:scylladb/scylladb:
  database, reader_concurrency_sempaphore: deduplicate reader_concurrency_sempaphore metrics
  reader_concurrency_semaphore: add register_metrics constructor parameter
  sstables: name sstables_manager
This commit is contained in:
Avi Kivity
2023-12-14 18:26:24 +02:00
21 changed files with 137 additions and 189 deletions

View File

@@ -116,7 +116,8 @@ future<> multishard_writer::make_shard_writer(unsigned shard) {
consumer = _consumer,
reader = make_foreign(std::make_unique<flat_mutation_reader_v2>(std::move(reader)))] () mutable {
auto s = gs.get();
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "shard_writer");
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "shard_writer",
reader_concurrency_semaphore::register_metrics::no);
auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout, {});
auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader));
return make_foreign(std::make_unique<shard_writer>(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer));

View File

@@ -13,6 +13,7 @@
#include <seastar/util/log.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/metrics.hh>
#include <utility>
#include "reader_concurrency_semaphore.hh"
@@ -967,24 +968,82 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
maybe_admit_waiters();
}
namespace sm = seastar::metrics;
static const sm::label class_label("class");
reader_concurrency_semaphore::reader_concurrency_semaphore(int count, ssize_t memory, sstring name, size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier, utils::updateable_value<uint32_t> kill_limit_multiplier)
utils::updateable_value<uint32_t> serialize_limit_multiplier, utils::updateable_value<uint32_t> kill_limit_multiplier, register_metrics metrics)
: _initial_resources(count, memory)
, _resources(count, memory)
, _name(std::move(name))
, _max_queue_length(max_queue_length)
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
, _kill_limit_multiplier(std::move(kill_limit_multiplier))
{ }
{
if (metrics == register_metrics::yes) {
_metrics.emplace();
_metrics->add_group("database", {
sm::make_counter("sstable_read_queue_overloads", _stats.total_reads_shed_due_to_overload,
sm::description("Counts the number of times the sstable read queue was overloaded. "
"A non-zero value indicates that we have to drop read requests because they arrive faster than we can serve them."),
{class_label(_name)}),
reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring name)
sm::make_gauge("active_reads", [this] { return active_reads(); },
sm::description("Holds the number of currently active read operations. "),
{class_label(_name)}),
sm::make_gauge("reads_memory_consumption", [this] { return consumed_resources().memory; },
sm::description("Holds the amount of memory consumed by current read operations. "),
{class_label(_name)}),
sm::make_gauge("queued_reads", [this] { return _stats.waiters; },
sm::description("Holds the number of currently queued read operations."),
{class_label(_name)}),
sm::make_gauge("paused_reads", _stats.inactive_reads,
sm::description("The number of currently active reads that are temporarily paused."),
{class_label(_name)}),
sm::make_counter("paused_reads_permit_based_evictions", _stats.permit_based_evictions,
sm::description("The number of paused reads evicted to free up permits."
" Permits are required for new reads to start, and the database will evict paused reads (if any)"
" to be able to admit new ones, if there is a shortage of permits."),
{class_label(_name)}),
sm::make_counter("reads_shed_due_to_overload", _stats.total_reads_shed_due_to_overload,
sm::description("The number of reads shed because the admission queue reached its max capacity."
" When the queue is full, excessive reads are shed to avoid overload."),
{class_label(_name)}),
sm::make_gauge("disk_reads", [this] { return _stats.disk_reads; },
sm::description("Holds the number of currently active disk read operations. "),
{class_label(_name)}),
sm::make_gauge("sstables_read", [this] { return _stats.sstables_read; },
sm::description("Holds the number of currently read sstables. "),
{class_label(_name)}),
sm::make_counter("total_reads", _stats.total_successful_reads,
sm::description("Counts the total number of successful user reads on this shard."),
{class_label(_name)}),
sm::make_counter("total_reads_failed", _stats.total_failed_reads,
sm::description("Counts the total number of failed user read operations. "
"Add the total_reads to this value to get the total amount of reads issued on this shard."),
{class_label(_name)}),
});
}
}
reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring name, register_metrics metrics)
: reader_concurrency_semaphore(
std::numeric_limits<int>::max(),
std::numeric_limits<ssize_t>::max(),
std::move(name),
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max())) {}
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
metrics) {}
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
assert(!_stats.waiters);

View File

@@ -12,6 +12,7 @@
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/metrics_registration.hh>
#include "reader_permit.hh"
#include "utils/updateable_value.hh"
@@ -187,6 +188,7 @@ private:
utils::updateable_value<uint32_t> _serialize_limit_multiplier;
utils::updateable_value<uint32_t> _kill_limit_multiplier;
stats _stats;
std::optional<seastar::metrics::metric_groups> _metrics;
bool _stopped = false;
bool _evicting = false;
gate _close_readers_gate;
@@ -265,6 +267,7 @@ private:
public:
struct no_limits { };
using register_metrics = bool_class<class register_metrics_clas>;
/// Create a semaphore with the specified limits
///
@@ -274,13 +277,14 @@ public:
sstring name,
size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier);
utils::updateable_value<uint32_t> kill_limit_multiplier,
register_metrics metrics);
/// Create a semaphore with practically unlimited count and memory.
///
/// And conversely, no queue limit either.
/// The semaphore's name has to be unique!
explicit reader_concurrency_semaphore(no_limits, sstring name);
explicit reader_concurrency_semaphore(no_limits, sstring name, register_metrics metrics);
/// A helper constructor *only for tests* that supplies default arguments.
/// The other constructors have default values removed so 'production-code'
@@ -291,8 +295,10 @@ public:
ssize_t memory = std::numeric_limits<ssize_t>::max(),
size_t max_queue_length = std::numeric_limits<size_t>::max(),
utils::updateable_value<uint32_t> serialize_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()))
: reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler), std::move(kill_limit_multipler))
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
register_metrics metrics = register_metrics::no)
: reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
std::move(kill_limit_multipler), register_metrics::no)
{}
virtual ~reader_concurrency_semaphore();

View File

@@ -329,29 +329,32 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
}))
, _read_concurrency_sem(max_count_concurrent_reads,
max_memory_concurrent_reads(),
"_read_concurrency_sem",
"user",
max_inactive_queue_length(),
_cfg.reader_concurrency_semaphore_serialize_limit_multiplier,
_cfg.reader_concurrency_semaphore_kill_limit_multiplier)
_cfg.reader_concurrency_semaphore_kill_limit_multiplier,
reader_concurrency_semaphore::register_metrics::yes)
// No timeouts or queue length limits - a failure here can kill an entire repair.
// Trust the caller to limit concurrency.
, _streaming_concurrency_sem(
max_count_streaming_concurrent_reads,
max_memory_streaming_concurrent_reads(),
"_streaming_concurrency_sem",
"streaming",
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()))
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
reader_concurrency_semaphore::register_metrics::yes)
// No limits, just for accounting.
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction")
, _compaction_concurrency_sem(reader_concurrency_semaphore::no_limits{}, "compaction", reader_concurrency_semaphore::register_metrics::no)
, _system_read_concurrency_sem(
// Using higher initial concurrency, see revert_initial_system_read_concurrency_boost().
max_count_concurrent_reads,
max_memory_system_concurrent_reads(),
"_system_read_concurrency_sem",
"system",
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()))
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
reader_concurrency_semaphore::register_metrics::yes)
, _row_cache_tracker(_cfg.index_cache_fraction.operator utils::updateable_value<double>(), cache_tracker::register_metrics::yes)
, _apply_stage("db_apply", &database::do_apply)
, _version(empty_version)
@@ -367,8 +370,8 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
_cfg.compaction_rows_count_warning_threshold,
_cfg.compaction_collection_elements_count_warning_threshold))
, _nop_large_data_handler(std::make_unique<db::nop_large_data_handler>())
, _user_sstables_manager(std::make_unique<sstables::sstables_manager>(*_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem.local(), [&stm]{ return stm.get()->get_my_id(); }, &sstm))
, _system_sstables_manager(std::make_unique<sstables::sstables_manager>(*_nop_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem.local(), [&stm]{ return stm.get()->get_my_id(); }))
, _user_sstables_manager(std::make_unique<sstables::sstables_manager>("user", *_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem.local(), [&stm]{ return stm.get()->get_my_id(); }, &sstm))
, _system_sstables_manager(std::make_unique<sstables::sstables_manager>("system", *_nop_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem.local(), [&stm]{ return stm.get()->get_my_id(); }))
, _result_memory_limiter(dbcfg.available_memory / 10)
, _data_listeners(std::make_unique<db::data_listeners>())
, _mnotifier(mn)
@@ -476,10 +479,6 @@ database::setup_metrics() {
namespace sm = seastar::metrics;
auto user_label_instance = class_label("user");
auto streaming_label_instance = class_label("streaming");
auto system_label_instance = class_label("system");
_metrics.add_group("memory", {
sm::make_gauge("dirty_bytes", [this] { return _dirty_memory_manager.real_dirty_memory() + _system_dirty_memory_manager.real_dirty_memory(); },
sm::description("Holds the current size of all (\"regular\" and \"system\") non-free memory in bytes: used memory + released memory that hasn't been returned to a free memory pool yet. "
@@ -546,23 +545,6 @@ database::setup_metrics() {
sm::make_counter("total_writes_rate_limited", _stats->total_writes_rate_limited,
sm::description("Counts write operations which were rejected on the replica side because the per-partition limit was reached.")),
sm::make_counter("total_reads", _read_concurrency_sem.get_stats().total_successful_reads,
sm::description("Counts the total number of successful user reads on this shard."),
{user_label_instance}),
sm::make_counter("total_reads_failed", _read_concurrency_sem.get_stats().total_failed_reads,
sm::description("Counts the total number of failed user read operations. "
"Add the total_reads to this value to get the total amount of reads issued on this shard."),
{user_label_instance}),
sm::make_counter("total_reads", _system_read_concurrency_sem.get_stats().total_successful_reads,
sm::description("Counts the total number of successful system reads on this shard."),
{system_label_instance}),
sm::make_counter("total_reads_failed", _system_read_concurrency_sem.get_stats().total_failed_reads,
sm::description("Counts the total number of failed system read operations. "
"Add the total_reads to this value to get the total amount of reads issued on this shard."),
{system_label_instance}),
sm::make_counter("total_reads_rate_limited", _stats->total_reads_rate_limited,
sm::description("Counts read operations which were rejected on the replica side because the per-partition limit was reached.")),
@@ -578,7 +560,7 @@ database::setup_metrics() {
sm::make_counter("querier_cache_drops", _querier_cache.get_stats().drops,
sm::description("Counts querier cache lookups that found a cached querier but had to drop it")),
sm::make_counter("querier_cache_scheduling_group_mismatches", _querier_cache.get_stats().scheduling_group_mismatches,
sm::description("Counts querier cache lookups that found a cached querier but had to drop it due to scheduling group mismatch")),
@@ -592,118 +574,10 @@ database::setup_metrics() {
sm::make_gauge("querier_cache_population", _querier_cache.get_stats().population,
sm::description("The number of entries currently in the querier cache.")),
sm::make_counter("sstable_read_queue_overloads", _read_concurrency_sem.get_stats().total_reads_shed_due_to_overload,
sm::description("Counts the number of times the sstable read queue was overloaded. "
"A non-zero value indicates that we have to drop read requests because they arrive faster than we can serve them.")),
sm::make_gauge("active_reads", [this] { return _read_concurrency_sem.active_reads(); },
sm::description("Holds the number of currently active read operations. "),
{user_label_instance}),
});
// Registering all the metrics with a single call causes the stack size to blow up.
_metrics.add_group("database", {
sm::make_gauge("reads_memory_consumption", [this] { return _read_concurrency_sem.consumed_resources().memory; },
sm::description("Holds the amount of memory consumed by current read operations. "),
{user_label_instance}),
sm::make_gauge("queued_reads", [this] { return _read_concurrency_sem.get_stats().waiters; },
sm::description("Holds the number of currently queued read operations."),
{user_label_instance}),
sm::make_gauge("paused_reads", _read_concurrency_sem.get_stats().inactive_reads,
sm::description("The number of currently active reads that are temporarily paused."),
{user_label_instance}),
sm::make_counter("paused_reads_permit_based_evictions", _read_concurrency_sem.get_stats().permit_based_evictions,
sm::description("The number of paused reads evicted to free up permits."
" Permits are required for new reads to start, and the database will evict paused reads (if any)"
" to be able to admit new ones, if there is a shortage of permits."),
{user_label_instance}),
sm::make_counter("reads_shed_due_to_overload", _read_concurrency_sem.get_stats().total_reads_shed_due_to_overload,
sm::description("The number of reads shed because the admission queue reached its max capacity."
" When the queue is full, excessive reads are shed to avoid overload."),
{user_label_instance}),
sm::make_gauge("disk_reads", [this] { return _read_concurrency_sem.get_stats().disk_reads; },
sm::description("Holds the number of currently active disk read operations. "),
{user_label_instance}),
sm::make_gauge("sstables_read", [this] { return _read_concurrency_sem.get_stats().sstables_read; },
sm::description("Holds the number of currently read sstables. "),
{user_label_instance}),
sm::make_gauge("active_reads", [this] { return _streaming_concurrency_sem.active_reads(); },
sm::description("Holds the number of currently active read operations issued on behalf of streaming "),
{streaming_label_instance}),
sm::make_gauge("reads_memory_consumption", [this] { return _streaming_concurrency_sem.consumed_resources().memory; },
sm::description("Holds the amount of memory consumed by current read operations issued on behalf of streaming "),
{streaming_label_instance}),
sm::make_gauge("queued_reads", [this] { return _streaming_concurrency_sem.get_stats().waiters; },
sm::description("Holds the number of currently queued read operations on behalf of streaming."),
{streaming_label_instance}),
sm::make_gauge("paused_reads", _streaming_concurrency_sem.get_stats().inactive_reads,
sm::description("The number of currently ongoing streaming reads that are temporarily paused."),
{streaming_label_instance}),
sm::make_counter("paused_reads_permit_based_evictions", _streaming_concurrency_sem.get_stats().permit_based_evictions,
sm::description("The number of inactive streaming reads evicted to free up permits"
" Permits are required for new reads to start, and the database will evict paused reads (if any)"
" to be able to admit new ones, if there is a shortage of permits."),
{streaming_label_instance}),
sm::make_counter("reads_shed_due_to_overload", _streaming_concurrency_sem.get_stats().total_reads_shed_due_to_overload,
sm::description("The number of reads shed because the admission queue reached its max capacity."
" When the queue is full, excessive reads are shed to avoid overload."),
{streaming_label_instance}),
sm::make_gauge("disk_reads", [this] { return _streaming_concurrency_sem.get_stats().disk_reads; },
sm::description("Holds the number of currently active disk read operations. "),
{streaming_label_instance}),
sm::make_gauge("sstables_read", [this] { return _streaming_concurrency_sem.get_stats().sstables_read; },
sm::description("Holds the number of currently read sstables. "),
{streaming_label_instance}),
sm::make_gauge("active_reads", [this] { return _system_read_concurrency_sem.active_reads(); },
sm::description("Holds the number of currently active read operations from \"system\" keyspace tables. "),
{system_label_instance}),
sm::make_gauge("reads_memory_consumption", [this] { return _system_read_concurrency_sem.consumed_resources().memory; },
sm::description("Holds the amount of memory consumed by all read operations from \"system\" keyspace tables. "),
{system_label_instance}),
sm::make_gauge("queued_reads", [this] { return _system_read_concurrency_sem.get_stats().waiters; },
sm::description("Holds the number of currently queued read operations from \"system\" keyspace tables."),
{system_label_instance}),
sm::make_gauge("paused_reads", _system_read_concurrency_sem.get_stats().inactive_reads,
sm::description("The number of currently ongoing system reads that are temporarily paused."),
{system_label_instance}),
sm::make_counter("paused_reads_permit_based_evictions", _system_read_concurrency_sem.get_stats().permit_based_evictions,
sm::description("The number of paused system reads evicted to free up permits"
" Permits are required for new reads to start, and the database will evict inactive reads (if any)"
" to be able to admit new ones, if there is a shortage of permits."),
{system_label_instance}),
sm::make_counter("reads_shed_due_to_overload", _system_read_concurrency_sem.get_stats().total_reads_shed_due_to_overload,
sm::description("The number of reads shed because the admission queue reached its max capacity."
" When the queue is full, excessive reads are shed to avoid overload."),
{system_label_instance}),
sm::make_gauge("disk_reads", [this] { return _system_read_concurrency_sem.get_stats().disk_reads; },
sm::description("Holds the number of currently active disk read operations. "),
{system_label_instance}),
sm::make_gauge("sstables_read", [this] { return _system_read_concurrency_sem.get_stats().sstables_read; },
sm::description("Holds the number of currently read sstables. "),
{system_label_instance}),
sm::make_gauge("total_result_bytes", [this] { return get_result_memory_limiter().total_used_memory(); },
sm::description("Holds the current amount of memory used for results.")),

View File

@@ -2481,7 +2481,8 @@ future<>
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst) {
auto cfg = sst->manager().configure_writer("memtable");
auto monitor = replica::permit_monitor(make_lw_shared(sstable_write_permit::unconditional()));
auto semaphore = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable");
auto semaphore = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable",
reader_concurrency_semaphore::register_metrics::no);
std::exception_ptr ex;
try {

View File

@@ -2168,7 +2168,8 @@ class scylla_memory(gdb.Command):
@staticmethod
def format_semaphore_stats(semaphore):
semaphore_name = "{}:".format(str(semaphore['_name'])[1:-1].split("_")[1])
# older versions had names like "_user_concurrency_semaphore"
semaphore_name = "{}:".format(str(semaphore['_name']).removeprefix('_').removesuffix('_concurrency_semaphore'))
initial_count = int(semaphore["_initial_resources"]["count"])
initial_memory = int(semaphore["_initial_resources"]["memory"])
used_count = initial_count - int(semaphore["_resources"]["count"])

View File

@@ -1837,7 +1837,8 @@ future<> sstable::generate_summary() {
};
auto index_file = co_await new_sstable_component_file(_read_error_handler, component_type::Index, open_flags::ro);
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::generate_summary()");
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::generate_summary()",
reader_concurrency_semaphore::register_metrics::no);
std::exception_ptr ex;
@@ -2736,7 +2737,8 @@ future<bool> sstable::has_partition_key(const utils::hashed_key& hk, const dht::
}
bool present;
std::exception_ptr ex;
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()");
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()",
reader_concurrency_semaphore::register_metrics::no);
try {
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout, {}));
present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk);

View File

@@ -22,16 +22,17 @@ namespace sstables {
logging::logger smlogger("sstables_manager");
sstables_manager::sstables_manager(
db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker& ct, size_t available_memory, directory_semaphore& dir_sem, noncopyable_function<locator::host_id()>&& resolve_host_id, storage_manager* shared)
sstring name, db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker& ct, size_t available_memory, directory_semaphore& dir_sem, noncopyable_function<locator::host_id()>&& resolve_host_id, storage_manager* shared)
: _storage(shared)
, _large_data_handler(large_data_handler), _db_config(dbcfg), _features(feat), _cache_tracker(ct)
, _sstable_metadata_concurrency_sem(
max_count_sstable_metadata_concurrent_reads,
max_memory_sstable_metadata_concurrent_reads(available_memory),
"sstable_metadata_concurrency_sem",
fmt::format("sstables_manager_{}", name),
std::numeric_limits<size_t>::max(),
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value(std::numeric_limits<uint32_t>::max()))
utils::updateable_value(std::numeric_limits<uint32_t>::max()),
reader_concurrency_semaphore::register_metrics::no)
, _dir_semaphore(dir_sem)
, _resolve_host_id(std::move(resolve_host_id))
{

View File

@@ -111,7 +111,7 @@ private:
noncopyable_function<locator::host_id()> _resolve_host_id;
public:
explicit sstables_manager(db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker&, size_t available_memory, directory_semaphore& dir_sem, noncopyable_function<locator::host_id()>&& resolve_host_id, storage_manager* shared = nullptr);
explicit sstables_manager(sstring name, db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker&, size_t available_memory, directory_semaphore& dir_sem, noncopyable_function<locator::host_id()>&& resolve_host_id, storage_manager* shared = nullptr);
virtual ~sstables_manager();
shared_sstable make_sstable(schema_ptr schema, sstring table_dir,

View File

@@ -63,7 +63,7 @@ BOOST_AUTO_TEST_CASE(bytes_view_hasher_sanity_check) {
}
SEASTAR_THREAD_TEST_CASE(mutation_fragment_sanity_check) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__);
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__, reader_concurrency_semaphore::register_metrics::no);
auto stop_semaphore = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {});

View File

@@ -2939,7 +2939,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
set_abort_on_internal_error(true);
});
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
@@ -3217,7 +3217,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
};
};
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});

View File

@@ -759,9 +759,9 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
}
SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
reader_concurrency_semaphore sem1(reader_concurrency_semaphore::no_limits{}, "sem1");
reader_concurrency_semaphore sem1(reader_concurrency_semaphore::no_limits{}, "sem1", reader_concurrency_semaphore::register_metrics::no);
auto stop_sem1 = deferred_stop(sem1);
reader_concurrency_semaphore sem2(reader_concurrency_semaphore::no_limits{}, ""); // to see the message for an unnamed semaphore
reader_concurrency_semaphore sem2(reader_concurrency_semaphore::no_limits{}, "", reader_concurrency_semaphore::register_metrics::no); // to see the message for an unnamed semaphore
auto stop_sem2 = deferred_stop(sem2);
auto schema = schema_builder("ks", "cf")
@@ -784,7 +784,7 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_mismatch) {
reader_concurrency_semaphore other_sem(reader_concurrency_semaphore::no_limits{}, "other_semaphore");
reader_concurrency_semaphore other_sem(reader_concurrency_semaphore::no_limits{}, "other_semaphore", reader_concurrency_semaphore::register_metrics::no);
auto stop_sem1 = deferred_stop(other_sem);
bool is_user_semaphore = true;

View File

@@ -36,7 +36,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
std::vector<reader_concurrency_semaphore::inactive_read_handle> handles;
{
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
auto clear_permits = defer([&permits] { permits.clear(); });
@@ -112,7 +112,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_closes_reader) {
simple_schema s;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
@@ -543,7 +543,7 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
}
SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
const auto nr_tables = tests::random::get_int<unsigned>(2, 4);
@@ -583,20 +583,21 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) {
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits) {
BOOST_TEST_MESSAGE("unused");
{
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
// Checks for stop() should not be triggered.
}
BOOST_TEST_MESSAGE("0 permits");
{
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
// Test will fail by timing out.
semaphore.stop().get();
}
BOOST_TEST_MESSAGE("1 permit");
{
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, get_name());
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, get_name(),
reader_concurrency_semaphore::register_metrics::no);
auto permit = std::make_unique<reader_permit>(semaphore->make_tracking_only_permit(nullptr, "permit1", db::no_timeout, {}));
// Test will fail via use-after-free
@@ -947,7 +948,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
schema_handles.emplace(&s, std::vector<reader_concurrency_semaphore::inactive_read_handle>{});
}
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
for (auto& s : schemas) {
@@ -1236,7 +1237,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_memory_limit_no_leaks
const auto serialize_multiplier = 2;
const auto kill_multiplier = 3;
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
const size_t reader_count_target = 6;
@@ -1491,7 +1492,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser
const auto serialize_multiplier = 2;
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
@@ -1554,7 +1555,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_ina
const auto serialize_multiplier = 2;
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -1584,7 +1585,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_ina
// Check that `stop()` correctly evicts all inactive reads.
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_reads) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
simple_schema ss;
auto s = ss.schema();
@@ -1616,7 +1617,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_me
const auto serialize_multiplier = 2;
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
@@ -1663,7 +1664,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_no_unnecessary_evicti
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -1756,7 +1757,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_necessary_evicting) {
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema ss;
@@ -1914,7 +1915,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_execution_stage_wakeu
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get();

View File

@@ -4675,7 +4675,7 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) {
SEASTAR_THREAD_TEST_CASE(test_cache_reader_semaphore_oom_kill) {
simple_schema s;
reader_concurrency_semaphore semaphore(100, 1, get_name(), std::numeric_limits<size_t>::max(), utils::updateable_value<uint32_t>(1),
utils::updateable_value<uint32_t>(1));
utils::updateable_value<uint32_t>(1), reader_concurrency_semaphore::register_metrics::no);
auto stop_semaphore = deferred_stop(semaphore);
cache_tracker tracker;

View File

@@ -18,8 +18,9 @@ class reader_concurrency_semaphore_wrapper {
public:
reader_concurrency_semaphore_wrapper(const char* name = nullptr)
: _semaphore(std::make_unique<::reader_concurrency_semaphore>(::reader_concurrency_semaphore::no_limits{}, name ? name : "test")) {
}
: _semaphore(std::make_unique<::reader_concurrency_semaphore>(::reader_concurrency_semaphore::no_limits{}, name ? name : "test",
reader_concurrency_semaphore::register_metrics::no))
{ }
~reader_concurrency_semaphore_wrapper() {
_semaphore->stop().get();
}

View File

@@ -94,7 +94,7 @@ public:
// Create with no memory, so all inactive reads are immediately evicted.
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::for_tests{}, std::move(name), 1, 0);
} else {
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}, std::move(name));
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}, std::move(name), reader_concurrency_semaphore::register_metrics::no);
}
return *_contexts[shard]->semaphore;
}

View File

@@ -186,10 +186,10 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm)
, db_config(make_db_config(dir.path().native(), cfg.storage))
, dir_sem(1)
, feature_service(gms::feature_config_from_db_config(*db_config))
, mgr(cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config,
, mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config,
feature_service, cache_tracker, memory::stats().total_memory(), dir_sem,
[host_id = locator::host_id::create_random_id()]{ return host_id; }, sstm)
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env")
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
, use_uuid(cfg.use_uuid)
, storage(std::move(cfg.storage))
{

View File

@@ -73,7 +73,8 @@ std::ostream& operator<<(std::ostream& os, const perf_result_with_aio_writes& re
namespace perf {
reader_concurrency_semaphore_wrapper::reader_concurrency_semaphore_wrapper(sstring name)
: _semaphore(std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, std::move(name)))
: _semaphore(std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, std::move(name),
reader_concurrency_semaphore::register_metrics::no))
{
}

View File

@@ -43,7 +43,7 @@ struct table {
row_cache cache;
table(unsigned partitions, unsigned rows)
: semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__)
: semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__, reader_concurrency_semaphore::register_metrics::no)
, mt(make_lw_shared<replica::memtable>(s.schema()))
, underlying(s.schema())
, cache(s.schema(), snapshot_source([this] { return underlying(); }), tracker)

View File

@@ -341,7 +341,7 @@ struct sstable_manager_service {
explicit sstable_manager_service()
: feature_service(gms::feature_config_from_db_config(dbcfg))
, dir_sem(1)
, sst_man(large_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem, []{ return locator::host_id{}; }) {
, sst_man("schema_loader", large_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem, []{ return locator::host_id{}; }) {
}
future<> stop() {
@@ -469,7 +469,7 @@ std::unordered_map<schema_ptr, std::string> get_schema_table_directories(std::fi
}
schema_ptr do_load_schema_from_schema_tables(std::filesystem::path scylla_data_path, std::string_view keyspace, std::string_view table) {
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__);
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__, reader_concurrency_semaphore::register_metrics::no);
auto stop_semaphore = deferred_stop(rcs_sem);
sharded<sstable_manager_service> sst_man;

View File

@@ -2983,7 +2983,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
gms::feature_service feature_service(gms::feature_config_from_db_config(dbcfg));
cache_tracker tracker;
sstables::directory_semaphore dir_sem(1);
sstables::sstables_manager sst_man(large_data_handler, dbcfg, feature_service, tracker,
sstables::sstables_manager sst_man("scylla_sstable", large_data_handler, dbcfg, feature_service, tracker,
memory::stats().total_memory(), dir_sem,
[host_id = locator::host_id::create_random_id()] { return host_id; });
auto close_sst_man = deferred_close(sst_man);
@@ -3003,7 +3003,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
}
}
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name);
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name, reader_concurrency_semaphore::register_metrics::no);
auto stop_semaphore = deferred_stop(rcs_sem);
const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout, {});