Merge "reader concurrency semaphore: dump permit diagnostics on timeout or queue overflow" from Botond
" The reader concurrency semaphore timing out or its queue being overflown are fairly common events both in production and in testing. At the same time it is a hard to diagnose problem that often has a benign cause (especially during testing), but it is equally possible that it points to something serious. So when this error starts to appear in logs, usually we want to investigate and the investigation is lengthy... either involves looking at metrics or coredumps or both. This patch intends to jumpstart this process by dumping a diagnostics on semaphore timeout or queue overflow. The diagnostics is printed to the log with debug level to avoid excessive spamming. It contains a histogram of all the permits associated with the problematic semaphore organized by table, operation and state. Example: DEBUG 2020-10-08 17:05:26,115 [shard 0] reader_concurrency_semaphore - Semaphore _read_concurrency_sem: timed out, dumping permit diagnostics: Permits with state admitted, sorted by memory memory count name 3499M 27 ks.test:data-query 3499M 27 total Permits with state waiting, sorted by count count memory name 1 0B ks.test:drain 7650 0B ks.test:data-query 7651 0B total Permits with state registered, sorted by count count memory name 0 0B total Total: permits: 7678, memory: 3499M This allows determining several things at glance: * What are the tables involved * What are the operations involved * Where is the memory This can speed up a follow-up investigation greatly, or it can even be enough on its own to determine that the issue is benign. Tests: unit(dev, debug) " * 'dump-diagnostics-on-semaphore-timeout/v2' of https://github.com/denesb/scylla: reader_concurrency_semaphore: dump permit diagnostics on timeout or queue overflow utils: add to_hr_size() reader_concurrency_semaphore: link permits into an intrusive list reader_concurrency_semaphore: move expiry_handler::operator()() out-of-line reader_concurrency_semaphore: move constructors out-of-line reader_concurrency_semaphore: add state to permits reader_concurrency_semaphore: name permits querier_cache_test: test_immediate_evict_on_insert: use two permits multishard_combining_reader: reader_lifecycle_policy: add permit param to create_reader() multishard_combining_reader: add permit parameter multishard_combining_reader: shard_reader: use multishard reader's permit
This commit is contained in:
@@ -567,6 +567,7 @@ scylla_core = (['database.cc',
|
||||
'utils/directories.cc',
|
||||
'utils/generation-number.cc',
|
||||
'utils/rjson.cc',
|
||||
'utils/human_readable.cc',
|
||||
'mutation_partition.cc',
|
||||
'mutation_partition_view.cc',
|
||||
'mutation_partition_serializer.cc',
|
||||
|
||||
10
database.cc
10
database.cc
@@ -1379,7 +1379,8 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
// counter state for each modified cell...
|
||||
|
||||
tracing::trace(trace_state, "Reading counter values from the CF");
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), get_reader_concurrency_semaphore().make_permit(), m.decorated_key(), slice, trace_state, timeout)
|
||||
auto permit = get_reader_concurrency_semaphore().make_permit(m_schema.get(), "counter-read-before-write");
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state, timeout)
|
||||
.then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) {
|
||||
// ...now, that we got existing state of all affected counter
|
||||
// cells we can look for our shard in each of them, increment
|
||||
@@ -2041,6 +2042,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
}
|
||||
virtual flat_mutation_reader create_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
@@ -2075,7 +2077,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
}
|
||||
};
|
||||
auto ms = mutation_source([&db] (schema_ptr s,
|
||||
reader_permit,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
@@ -2083,12 +2085,12 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
auto table_id = s->id();
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db, table_id), std::move(s), pr, ps, pc,
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db, table_id), std::move(s), std::move(permit), pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
});
|
||||
auto&& full_slice = schema->full_slice();
|
||||
auto& cf = db.local().find_column_family(schema);
|
||||
return make_flat_multi_range_reader(std::move(schema), cf.streaming_read_concurrency_semaphore().make_permit(), std::move(ms),
|
||||
return make_flat_multi_range_reader(schema, cf.streaming_read_concurrency_semaphore().make_permit(schema.get(), "multishard-streaming-reader"), std::move(ms),
|
||||
std::move(range_generator), std::move(full_slice), service::get_local_streaming_priority(), {}, mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
|
||||
@@ -1235,7 +1235,7 @@ view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_di
|
||||
: _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _mnotifier(mn)
|
||||
, _permit(_db.get_reader_concurrency_semaphore().make_permit()) {
|
||||
, _permit(_db.get_reader_concurrency_semaphore().make_permit(nullptr, "view_builder")) {
|
||||
setup_metrics();
|
||||
}
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ future<> view_update_generator::start() {
|
||||
ssts->insert(sst);
|
||||
}
|
||||
|
||||
auto permit = _db.get_reader_concurrency_semaphore().make_permit();
|
||||
auto permit = _db.get_reader_concurrency_semaphore().make_permit(s.get(), "view_update_generator");
|
||||
auto ms = mutation_source([this, ssts] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
|
||||
@@ -689,11 +689,12 @@ memtable::make_flat_reader(schema_ptr s,
|
||||
|
||||
flat_mutation_reader
|
||||
memtable::make_flush_reader(schema_ptr s, const io_priority_class& pc) {
|
||||
auto permit = _flush_semaphore.make_permit(s.get(), "memtable-flush");
|
||||
if (group()) {
|
||||
return make_flat_mutation_reader<flush_reader>(s, _flush_semaphore.make_permit(), shared_from_this());
|
||||
return make_flat_mutation_reader<flush_reader>(std::move(s), std::move(permit), shared_from_this());
|
||||
} else {
|
||||
auto& full_slice = s->full_slice();
|
||||
return make_flat_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), _flush_semaphore.make_permit(),
|
||||
return make_flat_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), std::move(permit),
|
||||
query::full_partition_range, full_slice, pc, mutation_reader::forwarding::no);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,10 +103,6 @@ class read_context : public reader_lifecycle_policy {
|
||||
std::unique_ptr<const query::partition_slice> slice;
|
||||
utils::phased_barrier::operation read_operation;
|
||||
|
||||
explicit remote_parts(reader_concurrency_semaphore& semaphore)
|
||||
: permit(semaphore.make_permit()) {
|
||||
}
|
||||
|
||||
remote_parts(
|
||||
reader_permit permit,
|
||||
std::unique_ptr<const dht::partition_range> range = nullptr,
|
||||
@@ -128,10 +124,12 @@ class read_context : public reader_lifecycle_policy {
|
||||
reader_meta() = default;
|
||||
|
||||
// Remote constructor.
|
||||
reader_meta(reader_state s, remote_parts rp, reader_concurrency_semaphore::inactive_read_handle h = {})
|
||||
reader_meta(reader_state s, std::optional<remote_parts> rp = {}, reader_concurrency_semaphore::inactive_read_handle h = {})
|
||||
: state(s)
|
||||
, rparts(make_foreign(std::make_unique<remote_parts>(std::move(rp))))
|
||||
, handle(make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(h)))) {
|
||||
if (rp) {
|
||||
rparts = make_foreign(std::make_unique<remote_parts>(std::move(*rp)));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -216,7 +214,7 @@ public:
|
||||
tracing::trace_state_ptr trace_state)
|
||||
: _db(db)
|
||||
, _schema(std::move(s))
|
||||
, _permit(_db.local().get_reader_concurrency_semaphore().make_permit())
|
||||
, _permit(_db.local().get_reader_concurrency_semaphore().make_permit(_schema.get(), "multishard-mutation-query"))
|
||||
, _cmd(cmd)
|
||||
, _ranges(ranges)
|
||||
, _trace_state(std::move(trace_state))
|
||||
@@ -240,6 +238,7 @@ public:
|
||||
|
||||
virtual flat_mutation_reader create_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
@@ -282,6 +281,7 @@ std::string_view read_context::reader_state_to_string(reader_state rs) {
|
||||
|
||||
flat_mutation_reader read_context::create_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
@@ -309,7 +309,7 @@ flat_mutation_reader read_context::create_reader(
|
||||
auto& table = _db.local().find_column_family(schema);
|
||||
|
||||
if (!rm.rparts) {
|
||||
rm.rparts = make_foreign(std::make_unique<reader_meta::remote_parts>(semaphore()));
|
||||
rm.rparts = make_foreign(std::make_unique<reader_meta::remote_parts>(std::move(permit)));
|
||||
}
|
||||
|
||||
rm.rparts->range = std::make_unique<const dht::partition_range>(pr);
|
||||
@@ -527,7 +527,7 @@ future<> read_context::lookup_readers() {
|
||||
auto& semaphore = this->semaphore();
|
||||
|
||||
if (!querier_opt) {
|
||||
return reader_meta(reader_state::inexistent, reader_meta::remote_parts(semaphore));
|
||||
return reader_meta(reader_state::inexistent);
|
||||
}
|
||||
|
||||
auto& q = *querier_opt;
|
||||
@@ -630,7 +630,7 @@ static future<reconcilable_result> do_query_mutations(
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_multishard_combining_reader(ctx, std::move(s), pr, ps, pc, std::move(trace_state), fwd_mr);
|
||||
return make_multishard_combining_reader(ctx, std::move(s), std::move(permit), pr, ps, pc, std::move(trace_state), fwd_mr);
|
||||
});
|
||||
auto reader = make_flat_multi_range_reader(s, ctx->permit(), std::move(ms), ranges,
|
||||
cmd.slice, service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no);
|
||||
|
||||
@@ -2246,7 +2246,8 @@ future<> data_query(
|
||||
auto querier_opt = cache_ctx.lookup_data_querier(*s, range, slice, trace_ptr);
|
||||
auto q = querier_opt
|
||||
? std::move(*querier_opt)
|
||||
: query::data_querier(source, s, class_config.semaphore.make_permit(), range, slice, service::get_local_sstable_query_read_priority(), trace_ptr);
|
||||
: query::data_querier(source, s, class_config.semaphore.make_permit(s.get(), "data-query"), range, slice,
|
||||
service::get_local_sstable_query_read_priority(), trace_ptr);
|
||||
|
||||
return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable {
|
||||
auto qrb = query_result_builder(*s, builder);
|
||||
@@ -2359,7 +2360,8 @@ static do_mutation_query(schema_ptr s,
|
||||
auto querier_opt = cache_ctx.lookup_mutation_querier(*s, range, slice, trace_ptr);
|
||||
auto q = querier_opt
|
||||
? std::move(*querier_opt)
|
||||
: query::mutation_querier(source, s, class_config.semaphore.make_permit(), range, slice, service::get_local_sstable_query_read_priority(), trace_ptr);
|
||||
: query::mutation_querier(source, s, class_config.semaphore.make_permit(s.get(), "mutation-query"), range, slice,
|
||||
service::get_local_sstable_query_read_priority(), trace_ptr);
|
||||
|
||||
return do_with(std::move(q), [=, &slice, accounter = std::move(accounter), trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (
|
||||
query::mutation_querier& q) mutable {
|
||||
|
||||
@@ -1560,6 +1560,7 @@ private:
|
||||
public:
|
||||
shard_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
|
||||
unsigned shard,
|
||||
const dht::partition_range& pr,
|
||||
@@ -1567,7 +1568,7 @@ public:
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: impl(std::move(schema), lifecycle_policy->semaphore().make_permit())
|
||||
: impl(std::move(schema), std::move(permit))
|
||||
, _lifecycle_policy(std::move(lifecycle_policy))
|
||||
, _shard(shard)
|
||||
, _pr(&pr)
|
||||
@@ -1644,17 +1645,18 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema), timeout] {
|
||||
auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] (
|
||||
schema_ptr s,
|
||||
reader_permit,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return lifecycle_policy->create_reader(std::move(s), pr, ps, pc, std::move(ts), fwd_mr);
|
||||
return lifecycle_policy->create_reader(std::move(s), std::move(permit), pr, ps, pc, std::move(ts), fwd_mr);
|
||||
});
|
||||
auto s = gs.get();
|
||||
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
|
||||
gs.get(), _lifecycle_policy->semaphore().make_permit(), *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id());
|
||||
auto f = rreader->fill_buffer(timeout);
|
||||
return f.then([rreader = std::move(rreader)] () mutable {
|
||||
@@ -1767,6 +1769,7 @@ public:
|
||||
const dht::sharder& sharder,
|
||||
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
@@ -1861,18 +1864,19 @@ multishard_combining_reader::multishard_combining_reader(
|
||||
const dht::sharder& sharder,
|
||||
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: impl(std::move(s), lifecycle_policy->semaphore().make_permit()), _sharder(sharder) {
|
||||
: impl(std::move(s), std::move(permit)), _sharder(sharder) {
|
||||
|
||||
on_partition_range_change(pr);
|
||||
|
||||
_shard_readers.reserve(_sharder.shard_count());
|
||||
for (unsigned i = 0; i < _sharder.shard_count(); ++i) {
|
||||
_shard_readers.emplace_back(make_lw_shared<shard_reader>(_schema, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr));
|
||||
_shard_readers.emplace_back(make_lw_shared<shard_reader>(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1949,13 +1953,14 @@ reader_lifecycle_policy::try_resume(reader_concurrency_semaphore::inactive_read_
|
||||
flat_mutation_reader make_multishard_combining_reader(
|
||||
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
const dht::sharder& sharder = schema->get_sharder();
|
||||
return make_flat_mutation_reader<multishard_combining_reader>(sharder, std::move(lifecycle_policy), std::move(schema), pr, ps, pc,
|
||||
return make_flat_mutation_reader<multishard_combining_reader>(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
}
|
||||
|
||||
@@ -1963,12 +1968,13 @@ flat_mutation_reader make_multishard_combining_reader_for_tests(
|
||||
const dht::sharder& sharder,
|
||||
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_flat_mutation_reader<multishard_combining_reader>(sharder, std::move(lifecycle_policy), std::move(schema), pr, ps, pc,
|
||||
return make_flat_mutation_reader<multishard_combining_reader>(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
}
|
||||
|
||||
|
||||
@@ -470,6 +470,7 @@ public:
|
||||
/// remote shard stay alive, during the lifetime of the created reader.
|
||||
virtual flat_mutation_reader create_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
@@ -546,6 +547,7 @@ public:
|
||||
flat_mutation_reader make_multishard_combining_reader(
|
||||
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
@@ -556,6 +558,7 @@ flat_mutation_reader make_multishard_combining_reader_for_tests(
|
||||
const dht::sharder& sharder,
|
||||
shared_ptr<reader_lifecycle_policy> lifecycle_policy,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
|
||||
@@ -111,7 +111,8 @@ future<> multishard_writer::make_shard_writer(unsigned shard) {
|
||||
return smp::submit_to(shard, [gs = global_schema_ptr(_s),
|
||||
consumer = _consumer,
|
||||
reader = make_foreign(std::make_unique<flat_mutation_reader>(std::move(reader)))] () mutable {
|
||||
auto this_shard_reader = make_foreign_reader(gs.get(), semaphore.make_permit(), std::move(reader));
|
||||
auto s = gs.get();
|
||||
auto this_shard_reader = make_foreign_reader(s, semaphore.make_permit(s.get(), "multishard-writer"), std::move(reader));
|
||||
return make_foreign(std::make_unique<shard_writer>(gs.get(), std::move(this_shard_reader), consumer));
|
||||
}).then([this, shard] (foreign_ptr<std::unique_ptr<shard_writer>> writer) {
|
||||
_shard_writers[shard] = std::move(writer);
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
|
||||
#include "reader_concurrency_semaphore.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
#include "schema.hh"
|
||||
#include "utils/human_readable.hh"
|
||||
|
||||
logger rcslog("reader_concurrency_semaphore");
|
||||
|
||||
@@ -67,17 +69,36 @@ void reader_permit::resource_units::reset(reader_resources res) {
|
||||
_resources = res;
|
||||
}
|
||||
|
||||
class reader_permit::impl {
|
||||
class reader_permit::impl : public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
|
||||
reader_concurrency_semaphore& _semaphore;
|
||||
const schema* _schema;
|
||||
sstring _op_name;
|
||||
std::string_view _op_name_view;
|
||||
reader_resources _resources;
|
||||
bool _admitted = false;
|
||||
reader_permit::state _state = reader_permit::state::registered;
|
||||
|
||||
public:
|
||||
impl(reader_concurrency_semaphore& semaphore) : _semaphore(semaphore) { }
|
||||
struct value_tag {};
|
||||
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(schema)
|
||||
, _op_name_view(op_name)
|
||||
{ }
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(schema)
|
||||
, _op_name(std::move(op_name))
|
||||
, _op_name_view(_op_name)
|
||||
{ }
|
||||
~impl() {
|
||||
if (_resources) {
|
||||
on_internal_error_noexcept(rcslog, format("reader_permit::impl::~impl(): detected a leak of {{count={}, memory={}}} resources",
|
||||
_resources.count, _resources.memory));
|
||||
on_internal_error_noexcept(rcslog, format("reader_permit::impl::~impl(): permit {}.{}:{} detected a leak of {{count={}, memory={}}} resources",
|
||||
_schema ? _schema->ks_name() : "*",
|
||||
_schema ? _schema->cf_name() : "*",
|
||||
_op_name_view,
|
||||
_resources.count,
|
||||
_resources.memory));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,21 +106,37 @@ public:
|
||||
return _semaphore;
|
||||
}
|
||||
|
||||
const ::schema* get_schema() const {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
std::string_view get_op_name() const {
|
||||
return _op_name_view;
|
||||
}
|
||||
|
||||
reader_permit::state get_state() const {
|
||||
return _state;
|
||||
}
|
||||
|
||||
void on_waiting() {
|
||||
_state = reader_permit::state::waiting;
|
||||
}
|
||||
|
||||
void on_admission() {
|
||||
_admitted = true;
|
||||
_state = reader_permit::state::admitted;
|
||||
_semaphore.consume(_resources);
|
||||
}
|
||||
|
||||
void consume(reader_resources res) {
|
||||
_resources += res;
|
||||
if (_admitted) {
|
||||
if (_state == reader_permit::state::admitted) {
|
||||
_semaphore.consume(res);
|
||||
}
|
||||
}
|
||||
|
||||
void signal(reader_resources res) {
|
||||
_resources -= res;
|
||||
if (_admitted) {
|
||||
if (_state == reader_permit::state::admitted) {
|
||||
_semaphore.signal(res);
|
||||
}
|
||||
}
|
||||
@@ -109,8 +146,26 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore)
|
||||
: _impl(make_shared<impl>(semaphore)) {
|
||||
struct reader_concurrency_semaphore::permit_list {
|
||||
using list_type = boost::intrusive::list<reader_permit::impl, boost::intrusive::constant_time_size<false>>;
|
||||
|
||||
list_type permits;
|
||||
};
|
||||
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, op_name))
|
||||
{
|
||||
semaphore._permit_list->permits.push_back(*_impl);
|
||||
}
|
||||
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, std::move(op_name)))
|
||||
{
|
||||
semaphore._permit_list->permits.push_back(*_impl);
|
||||
}
|
||||
|
||||
void reader_permit::on_waiting() {
|
||||
_impl->on_waiting();
|
||||
}
|
||||
|
||||
void reader_permit::on_admission() {
|
||||
@@ -148,6 +203,140 @@ reader_resources reader_permit::consumed_resources() const {
|
||||
return _impl->resources();
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, reader_permit::state s) {
|
||||
switch (s) {
|
||||
case reader_permit::state::registered:
|
||||
os << "registered";
|
||||
break;
|
||||
case reader_permit::state::waiting:
|
||||
os << "waiting";
|
||||
break;
|
||||
case reader_permit::state::admitted:
|
||||
os << "admitted";
|
||||
break;
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
struct permit_stats {
|
||||
uint64_t memory = 0;
|
||||
uint64_t count = 0;
|
||||
|
||||
void add(uint64_t m) {
|
||||
memory += m;
|
||||
++count;
|
||||
}
|
||||
|
||||
permit_stats& operator+=(const permit_stats& o) {
|
||||
memory += o.memory;
|
||||
count += o.count;
|
||||
return *this;
|
||||
}
|
||||
};
|
||||
|
||||
using permit_group_key = std::tuple<const schema*, std::string_view, reader_permit::state>;
|
||||
|
||||
struct permit_group_key_hash {
|
||||
size_t operator()(const permit_group_key& k) const {
|
||||
using underlying_type = std::underlying_type_t<reader_permit::state>;
|
||||
return std::hash<uintptr_t>()(reinterpret_cast<uintptr_t>(std::get<0>(k)))
|
||||
^ std::hash<std::string_view>()(std::get<1>(k))
|
||||
^ std::hash<underlying_type>()(static_cast<underlying_type>(std::get<2>(k)));
|
||||
}
|
||||
};
|
||||
|
||||
using permit_groups = std::unordered_map<permit_group_key, permit_stats, permit_group_key_hash>;
|
||||
|
||||
static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state, bool sort_by_memory) {
|
||||
struct permit_summary {
|
||||
const schema* s;
|
||||
std::string_view op_name;
|
||||
uint64_t memory;
|
||||
uint64_t count;
|
||||
};
|
||||
|
||||
std::vector<permit_summary> permit_summaries;
|
||||
for (const auto& [k, v] : permits) {
|
||||
const auto& [s, op_name, k_state] = k;
|
||||
if (k_state == state) {
|
||||
permit_summaries.emplace_back(s, op_name, v.memory, v.count);
|
||||
}
|
||||
}
|
||||
|
||||
std::ranges::sort(permit_summaries, [sort_by_memory] (const permit_summary& a, const permit_summary& b) {
|
||||
if (sort_by_memory) {
|
||||
return a.memory < b.memory;
|
||||
} else {
|
||||
return a.count < b.count;
|
||||
}
|
||||
});
|
||||
|
||||
permit_stats total;
|
||||
|
||||
auto print_line = [&os, sort_by_memory] (auto col1, auto col2, auto col3) {
|
||||
if (sort_by_memory) {
|
||||
fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3);
|
||||
} else {
|
||||
fmt::print(os, "{}\t{}\t{}\n", col1, col2, col3);
|
||||
}
|
||||
};
|
||||
|
||||
fmt::print(os, "Permits with state {}, sorted by {}\n", state, sort_by_memory ? "memory" : "count");
|
||||
print_line("count", "memory", "name");
|
||||
for (const auto& summary : permit_summaries) {
|
||||
total.count += summary.count;
|
||||
total.memory += summary.memory;
|
||||
print_line(summary.count, utils::to_hr_size(summary.memory), fmt::format("{}.{}:{}",
|
||||
summary.s ? summary.s->ks_name() : "*",
|
||||
summary.s ? summary.s->cf_name() : "*",
|
||||
summary.op_name));
|
||||
}
|
||||
fmt::print(os, "\n");
|
||||
print_line(total.count, utils::to_hr_size(total.memory), "total");
|
||||
return total;
|
||||
}
|
||||
|
||||
static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_concurrency_semaphore& semaphore,
|
||||
const reader_concurrency_semaphore::permit_list& list, std::string_view problem) {
|
||||
permit_groups permits;
|
||||
|
||||
for (const auto& permit : list.permits) {
|
||||
permits[permit_group_key(permit.get_schema(), permit.get_op_name(), permit.get_state())].add(permit.resources().memory);
|
||||
}
|
||||
|
||||
permit_stats total;
|
||||
|
||||
fmt::print(os, "Semaphore {}: {}, dumping permit diagnostics:\n", semaphore.name(), problem);
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::admitted, true);
|
||||
fmt::print(os, "\n");
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting, false);
|
||||
fmt::print(os, "\n");
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::registered, false);
|
||||
fmt::print(os, "\n");
|
||||
fmt::print(os, "Total: permits: {}, memory: {}\n", total.count, utils::to_hr_size(total.memory));
|
||||
}
|
||||
|
||||
static void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaphore& semaphore, const reader_concurrency_semaphore::permit_list& list,
|
||||
std::string_view problem) {
|
||||
if (rcslog.level() < log_level::debug) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::ostringstream os;
|
||||
do_dump_reader_permit_diagnostics(os, semaphore, list, problem);
|
||||
rcslog.debug("{}", os.str());
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept {
|
||||
e.pr.set_exception(named_semaphore_timed_out(_semaphore._name));
|
||||
|
||||
maybe_dump_reader_permit_diagnostics(_semaphore, *_semaphore._permit_list, "timed out");
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
_resources += r;
|
||||
while (!_wait_list.empty() && has_available_units(_wait_list.front().res)) {
|
||||
@@ -162,6 +351,22 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::reader_concurrency_semaphore(int count, ssize_t memory, sstring name, size_t max_queue_length,
|
||||
std::function<void()> prethrow_action)
|
||||
: _initial_resources(count, memory)
|
||||
, _resources(count, memory)
|
||||
, _wait_list(expiry_handler(*this))
|
||||
, _name(std::move(name))
|
||||
, _max_queue_length(max_queue_length)
|
||||
, _prethrow_action(std::move(prethrow_action))
|
||||
, _permit_list(std::make_unique<permit_list>()) {}
|
||||
|
||||
reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring name)
|
||||
: reader_concurrency_semaphore(
|
||||
std::numeric_limits<int>::max(),
|
||||
std::numeric_limits<ssize_t>::max(),
|
||||
std::move(name)) {}
|
||||
|
||||
reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
broken(std::make_exception_ptr(broken_semaphore{}));
|
||||
}
|
||||
@@ -228,11 +433,13 @@ bool reader_concurrency_semaphore::may_proceed(const resources& r) const {
|
||||
return _wait_list.empty() && (has_available_units(r) || _resources.count == _initial_resources.count);
|
||||
}
|
||||
|
||||
future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, size_t memory, db::timeout_clock::time_point timeout) {
|
||||
future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, size_t memory,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
if (_wait_list.size() >= _max_queue_length) {
|
||||
if (_prethrow_action) {
|
||||
_prethrow_action();
|
||||
}
|
||||
maybe_dump_reader_permit_diagnostics(*this, *_permit_list, "wait queue overloaded");
|
||||
return make_exception_future<reader_permit::resource_units>(
|
||||
std::make_exception_ptr(std::runtime_error(
|
||||
format("{}: restricted mutation reader queue overload", _name))));
|
||||
@@ -253,12 +460,17 @@ future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admi
|
||||
}
|
||||
promise<reader_permit::resource_units> pr;
|
||||
auto fut = pr.get_future();
|
||||
permit.on_waiting();
|
||||
_wait_list.push_back(entry(std::move(pr), std::move(permit), r), timeout);
|
||||
return fut;
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore::make_permit() {
|
||||
return reader_permit(*this);
|
||||
reader_permit reader_concurrency_semaphore::make_permit(const schema* const schema, const char* const op_name) {
|
||||
return reader_permit(*this, schema, std::string_view(op_name));
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore::make_permit(const schema* const schema, sstring&& op_name) {
|
||||
return reader_permit(*this, schema, std::move(op_name));
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
|
||||
|
||||
@@ -93,6 +93,8 @@ public:
|
||||
uint64_t total_failed_reads = 0;
|
||||
};
|
||||
|
||||
struct permit_list;
|
||||
|
||||
private:
|
||||
struct entry {
|
||||
promise<reader_permit::resource_units> pr;
|
||||
@@ -103,13 +105,11 @@ private:
|
||||
};
|
||||
|
||||
class expiry_handler {
|
||||
sstring _semaphore_name;
|
||||
reader_concurrency_semaphore& _semaphore;
|
||||
public:
|
||||
explicit expiry_handler(sstring semaphore_name)
|
||||
: _semaphore_name(std::move(semaphore_name)) {}
|
||||
void operator()(entry& e) noexcept {
|
||||
e.pr.set_exception(named_semaphore_timed_out(_semaphore_name));
|
||||
}
|
||||
explicit expiry_handler(reader_concurrency_semaphore& semaphore)
|
||||
: _semaphore(semaphore) {}
|
||||
void operator()(entry& e) noexcept;
|
||||
};
|
||||
|
||||
private:
|
||||
@@ -124,6 +124,7 @@ private:
|
||||
uint64_t _next_id = 1;
|
||||
std::map<uint64_t, std::unique_ptr<inactive_read>> _inactive_reads;
|
||||
stats _stats;
|
||||
std::unique_ptr<permit_list> _permit_list;
|
||||
|
||||
private:
|
||||
bool has_available_units(const resources& r) const;
|
||||
@@ -139,22 +140,12 @@ public:
|
||||
ssize_t memory,
|
||||
sstring name,
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max(),
|
||||
std::function<void()> prethrow_action = nullptr)
|
||||
: _initial_resources(count, memory)
|
||||
, _resources(count, memory)
|
||||
, _wait_list(expiry_handler(name))
|
||||
, _name(std::move(name))
|
||||
, _max_queue_length(max_queue_length)
|
||||
, _prethrow_action(std::move(prethrow_action)) {}
|
||||
std::function<void()> prethrow_action = nullptr);
|
||||
|
||||
/// Create a semaphore with practically unlimited count and memory.
|
||||
///
|
||||
/// And conversely, no queue limit either.
|
||||
explicit reader_concurrency_semaphore(no_limits, sstring name = "unlimited reader_concurrency_semaphore")
|
||||
: reader_concurrency_semaphore(
|
||||
std::numeric_limits<int>::max(),
|
||||
std::numeric_limits<ssize_t>::max(),
|
||||
std::move(name)) {}
|
||||
explicit reader_concurrency_semaphore(no_limits, sstring name = "unlimited reader_concurrency_semaphore");
|
||||
|
||||
~reader_concurrency_semaphore();
|
||||
|
||||
@@ -208,7 +199,18 @@ public:
|
||||
return _stats;
|
||||
}
|
||||
|
||||
reader_permit make_permit();
|
||||
/// Make a permit
|
||||
///
|
||||
/// The permit is associated with a schema, which is the schema of the table
|
||||
/// the read is executed against, and the operation name, which should be a
|
||||
/// name such that we can identify the operation which created this permit.
|
||||
/// Ideally this should be a unique enough name that we not only can identify
|
||||
/// the kind of read, but the exact code-path that was taken.
|
||||
///
|
||||
/// Some permits cannot be associated with any table, so passing nullptr as
|
||||
/// the schema parameter is allowed.
|
||||
reader_permit make_permit(const schema* const schema, const char* const op_name);
|
||||
reader_permit make_permit(const schema* const schema, sstring&& op_name);
|
||||
|
||||
const resources initial_resources() const {
|
||||
return _initial_resources;
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "seastarx.hh"
|
||||
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "schema_fwd.hh"
|
||||
|
||||
struct reader_resources {
|
||||
int count = 0;
|
||||
@@ -89,13 +90,22 @@ class reader_permit {
|
||||
public:
|
||||
class resource_units;
|
||||
|
||||
private:
|
||||
enum class state {
|
||||
registered, // read is registered, but didn't attempt admission yet
|
||||
waiting, // waiting for admission
|
||||
admitted,
|
||||
};
|
||||
|
||||
class impl;
|
||||
|
||||
private:
|
||||
shared_ptr<impl> _impl;
|
||||
|
||||
private:
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore);
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name);
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name);
|
||||
|
||||
void on_waiting();
|
||||
void on_admission();
|
||||
|
||||
public:
|
||||
|
||||
@@ -797,7 +797,7 @@ public:
|
||||
, _messaging(ms)
|
||||
, _cf(cf)
|
||||
, _schema(s)
|
||||
, _permit(_cf.streaming_read_concurrency_semaphore().make_permit())
|
||||
, _permit(_cf.streaming_read_concurrency_semaphore().make_permit(_schema.get(), "repair-meta"))
|
||||
, _range(range)
|
||||
, _cmp(repair_sync_boundary::tri_compare(*_schema))
|
||||
, _algo(algo)
|
||||
|
||||
@@ -449,7 +449,7 @@ protected:
|
||||
: _cf(cf)
|
||||
, _sstable_creator(std::move(descriptor.creator))
|
||||
, _schema(cf.schema())
|
||||
, _permit(_cf.compaction_concurrency_semaphore().make_permit())
|
||||
, _permit(_cf.compaction_concurrency_semaphore().make_permit(_cf.schema().get(), "compaction"))
|
||||
, _sstables(std::move(descriptor.sstables))
|
||||
, _max_sstable_size(descriptor.max_sstable_bytes)
|
||||
, _sstable_level(descriptor.level)
|
||||
|
||||
@@ -741,7 +741,7 @@ public:
|
||||
: sstable_writer::writer_impl(sst, s, pc, cfg)
|
||||
, _enc_stats(enc_stats)
|
||||
, _shard(shard)
|
||||
, _range_tombstones(_schema, reader_semaphore.make_permit())
|
||||
, _range_tombstones(_schema, reader_semaphore.make_permit(&s, "mx-writer"))
|
||||
, _tmp_bufs(_sst.sstable_buffer_size)
|
||||
, _sst_schema(make_sstable_schema(s, _enc_stats, _cfg))
|
||||
, _run_identifier(cfg.run_identifier)
|
||||
|
||||
@@ -2023,7 +2023,7 @@ components_writer::components_writer(sstable& sst, const schema& s, file_writer&
|
||||
, _index_needs_close(true)
|
||||
, _max_sstable_size(cfg.max_sstable_size)
|
||||
, _tombstone_written(false)
|
||||
, _range_tombstones(s, reader_semaphore.make_permit())
|
||||
, _range_tombstones(s, reader_semaphore.make_permit(&_schema, "components-writer"))
|
||||
{
|
||||
// This can be 0 in some cases, which is albeit benign, can wreak havoc
|
||||
// in lower-level writer code, so clamp it to [1, +inf) here, which is
|
||||
@@ -2559,7 +2559,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
[this, &pc, options = std::move(options), index_file, index_size] (summary_generator& s) mutable {
|
||||
auto sem = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{});
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
||||
sem->make_permit(), s, trust_promoted_index::yes, *_schema, "", index_file, std::move(options), 0, index_size,
|
||||
sem->make_permit(_schema.get(), "generate-summary"), s, trust_promoted_index::yes, *_schema, "", index_file, std::move(options), 0, index_size,
|
||||
(_version >= sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
|
||||
: std::optional<column_values_fixed_lengths>{}));
|
||||
@@ -3296,7 +3296,7 @@ future<bool> sstable::has_partition_key(const utils::hashed_key& hk, const dht::
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
auto sem = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{});
|
||||
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem->make_permit(), default_priority_class(), tracing::trace_state_ptr());
|
||||
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem->make_permit(_schema.get(), s->get_filename()), default_priority_class(), tracing::trace_state_ptr());
|
||||
auto& lh_index = *lh_index_ptr;
|
||||
return lh_index.advance_lower_and_check_if_present(dk).then([lh_index_ptr = std::move(lh_index_ptr), s, sem = std::move(sem)] (bool present) mutable {
|
||||
lh_index_ptr.reset(); // destroy before the semaphore
|
||||
|
||||
@@ -142,7 +142,7 @@ void stream_session::init_messaging_service_handler(netw::messaging_service& ms)
|
||||
bool got_end_of_stream = false;
|
||||
};
|
||||
auto cmd_status = make_lw_shared<stream_mutation_fragments_cmd_status>();
|
||||
auto permit = cf.streaming_read_concurrency_semaphore().make_permit();
|
||||
auto permit = cf.streaming_read_concurrency_semaphore().make_permit(s.get(), "stream-session");
|
||||
auto get_next_mutation_fragment = [source, plan_id, from, s, cmd_status, permit] () mutable {
|
||||
return source().then([plan_id, from, s, cmd_status, permit] (std::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
|
||||
if (opt) {
|
||||
|
||||
8
table.cc
8
table.cc
@@ -440,7 +440,7 @@ sstables::shared_sstable table::make_streaming_sstable_for_write(std::optional<s
|
||||
flat_mutation_reader
|
||||
table::make_streaming_reader(schema_ptr s,
|
||||
const dht::partition_range_vector& ranges) const {
|
||||
auto permit = _config.streaming_read_concurrency_semaphore->make_permit();
|
||||
auto permit = _config.streaming_read_concurrency_semaphore->make_permit(s.get(), "stream-ranges");
|
||||
auto& slice = s->full_slice();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
@@ -460,7 +460,7 @@ table::make_streaming_reader(schema_ptr s,
|
||||
|
||||
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
|
||||
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) const {
|
||||
auto permit = _config.streaming_read_concurrency_semaphore->make_permit();
|
||||
auto permit = _config.streaming_read_concurrency_semaphore->make_permit(schema.get(), "stream-range");
|
||||
const auto& pc = service::get_local_streaming_priority();
|
||||
auto trace_state = tracing::trace_state_ptr();
|
||||
const auto fwd = streamed_mutation::forwarding::no;
|
||||
@@ -2162,7 +2162,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views, now);
|
||||
if (cr_ranges.empty()) {
|
||||
tracing::trace(tr_state, "View updates do not require read-before-write");
|
||||
return generate_and_propagate_view_updates(base, sem.make_permit(), std::move(views), std::move(m), { }, std::move(tr_state), now).then([] {
|
||||
return generate_and_propagate_view_updates(base, sem.make_permit(s.get(), "push-view-updates-1"), std::move(views), std::move(m), { }, std::move(tr_state), now).then([] {
|
||||
// In this case we are not doing a read-before-write, just a
|
||||
// write, so no lock is needed.
|
||||
return make_ready_future<row_locker::lock_holder>();
|
||||
@@ -2193,7 +2193,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
std::move(slice),
|
||||
std::move(m),
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout, now, source = std::move(source), &sem, &io_priority, tr_state = std::move(tr_state)] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto permit = sem.make_permit();
|
||||
auto permit = sem.make_permit(base.get(), "push-view-updates-2");
|
||||
auto reader = source.make_reader(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
return this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now)
|
||||
.then([base, tr_state = std::move(tr_state), lock = std::move(lock)] () mutable {
|
||||
|
||||
@@ -109,7 +109,8 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
|
||||
};
|
||||
|
||||
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, evict_paused_readers);
|
||||
auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s, range, slice, pc, trace_state, fwd_mr);
|
||||
auto mr = make_multishard_combining_reader_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s,
|
||||
tests::make_permit(), range, slice, pc, trace_state, fwd_mr);
|
||||
if (fwd_sm == streamed_mutation::forwarding::yes) {
|
||||
return make_forwardable(std::move(mr));
|
||||
}
|
||||
|
||||
@@ -974,13 +974,14 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
|
||||
|
||||
const auto& partitions = pop_desc.partitions;
|
||||
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(pop_desc.schema), &partitions] {
|
||||
auto s = gs.get();
|
||||
auto& sem = db->local().get_reader_concurrency_semaphore();
|
||||
|
||||
auto resources = sem.available_resources();
|
||||
resources -= reader_concurrency_semaphore::resources{1, 0};
|
||||
auto permit = sem.make_permit();
|
||||
auto permit = sem.make_permit(s.get(), "fuzzy-test");
|
||||
|
||||
return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions).finally([units = permit.consume_resources(resources)] {});
|
||||
return run_fuzzy_test_workload(cfg, *db, std::move(s), partitions).finally([units = permit.consume_resources(resources)] {});
|
||||
}).handle_exception([seed] (std::exception_ptr e) {
|
||||
testlog.error("Test workload failed with exception {}."
|
||||
" To repeat this particular run, replace the random seed of the test, with that of this run ({})."
|
||||
|
||||
@@ -412,11 +412,11 @@ SEASTAR_TEST_CASE(test_schema_upgrader_is_equivalent_with_mutation_upgrade) {
|
||||
SEASTAR_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
|
||||
struct dummy_exception { };
|
||||
|
||||
reader_concurrency_semaphore sem(1, 100, get_name());
|
||||
auto permit = sem.make_permit();
|
||||
|
||||
simple_schema s;
|
||||
|
||||
reader_concurrency_semaphore sem(1, 100, get_name());
|
||||
auto permit = sem.make_permit(s.schema().get(), get_name());
|
||||
|
||||
const auto available_res = sem.available_resources();
|
||||
const sstring val(1024, 'a');
|
||||
|
||||
|
||||
@@ -986,7 +986,7 @@ public:
|
||||
return flat_mutation_reader(std::move(tracker_ptr));
|
||||
});
|
||||
|
||||
_reader = make_restricted_flat_reader(std::move(ms), schema, semaphore.make_permit());
|
||||
_reader = make_restricted_flat_reader(std::move(ms), schema, semaphore.make_permit(schema.get(), "reader-wrapper"));
|
||||
}
|
||||
|
||||
reader_wrapper(
|
||||
@@ -1082,7 +1082,7 @@ class dummy_file_impl : public file_impl {
|
||||
SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
|
||||
return async([&] {
|
||||
reader_concurrency_semaphore semaphore(100, 4 * 1024, get_name());
|
||||
auto permit = semaphore.make_permit();
|
||||
auto permit = semaphore.make_permit(nullptr, get_name());
|
||||
permit.wait_admission(0, db::no_timeout).get();
|
||||
|
||||
{
|
||||
@@ -1973,6 +1973,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) {
|
||||
assert_that(make_multishard_combining_reader(
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate),
|
||||
s.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
s.schema()->full_slice(),
|
||||
service::get_local_sstable_query_read_priority()))
|
||||
@@ -2227,7 +2228,7 @@ multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(s
|
||||
|
||||
auto sharder = std::make_unique<dummy_sharder>(s.schema()->get_sharder(), std::move(pkeys_by_tokens));
|
||||
auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate),
|
||||
s.schema(), *pr, s.schema()->full_slice(), service::get_local_sstable_query_read_priority());
|
||||
s.schema(), tests::make_permit(), *pr, s.schema()->full_slice(), service::get_local_sstable_query_read_priority());
|
||||
|
||||
return {std::move(reader), std::move(sharder), std::move(remote_controls), std::move(pr)};
|
||||
}
|
||||
@@ -2440,6 +2441,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
|
||||
auto reader = make_multishard_combining_reader(
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate),
|
||||
schema,
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
schema->full_slice(),
|
||||
service::get_local_sstable_query_read_priority());
|
||||
@@ -2574,6 +2576,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_non_strictly_monotonic
|
||||
assert_that(make_multishard_combining_reader(
|
||||
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), operations_gate, true),
|
||||
s.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
s.schema()->full_slice(),
|
||||
service::get_local_sstable_query_read_priority()))
|
||||
@@ -2638,7 +2641,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
|
||||
};
|
||||
auto reference_reader = make_filtering_reader(
|
||||
make_multishard_combining_reader(seastar::make_shared<test_reader_lifecycle_policy>(std::move(reader_factory), operations_gate),
|
||||
schema, partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()),
|
||||
schema, tests::make_permit(), partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()),
|
||||
[&remote_partitioner] (const dht::decorated_key& pkey) {
|
||||
return remote_partitioner.shard_of(pkey.token()) == 0;
|
||||
});
|
||||
@@ -3101,8 +3104,8 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto permit = semaphore.make_permit();
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_permit(s.schema().get(), get_name());
|
||||
|
||||
const auto pkey = s.make_pkey();
|
||||
size_t max_buffer_size = 512;
|
||||
@@ -3192,8 +3195,8 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
|
||||
});
|
||||
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto permit = semaphore.make_permit();
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_permit(s.schema().get(), get_name());
|
||||
|
||||
auto pkeys = s.make_pkeys(4);
|
||||
std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
|
||||
|
||||
@@ -113,7 +113,7 @@ private:
|
||||
Querier make_querier(const dht::partition_range& range) {
|
||||
return Querier(_mutation_source,
|
||||
_s.schema(),
|
||||
_sem.make_permit(),
|
||||
_sem.make_permit(_s.schema().get(), "make-querier"),
|
||||
range,
|
||||
_s.schema()->full_slice(),
|
||||
service::get_local_sstable_query_read_priority(),
|
||||
@@ -749,15 +749,16 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
|
||||
test_querier_cache t;
|
||||
|
||||
auto& sem = t.get_semaphore();
|
||||
auto permit = sem.make_permit();
|
||||
auto permit1 = sem.make_permit(t.get_schema().get(), get_name());
|
||||
auto permit2 = sem.make_permit(t.get_schema().get(), get_name());
|
||||
|
||||
permit.wait_admission(0, db::no_timeout).get();
|
||||
permit1.wait_admission(0, db::no_timeout).get();
|
||||
|
||||
auto resources = permit.consume_resources(reader_resources(sem.available_resources().count, 0));
|
||||
auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0));
|
||||
|
||||
BOOST_CHECK_EQUAL(sem.available_resources().count, 0);
|
||||
|
||||
auto fut = permit.wait_admission(1, db::no_timeout);
|
||||
auto fut = permit2.wait_admission(1, db::no_timeout);
|
||||
|
||||
BOOST_CHECK_EQUAL(sem.waiters(), 1);
|
||||
|
||||
|
||||
@@ -828,7 +828,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
return less(a.decorated_key(), b.decorated_key());
|
||||
});
|
||||
|
||||
auto permit = sem.make_permit();
|
||||
auto permit = sem.make_permit(schema.get(), get_name());
|
||||
|
||||
auto mt = make_lw_shared<memtable>(schema);
|
||||
for (const auto& mut : muts) {
|
||||
|
||||
@@ -111,6 +111,7 @@ public:
|
||||
}
|
||||
virtual flat_mutation_reader create_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
@@ -152,7 +153,7 @@ public:
|
||||
if (_evict_paused_readers) {
|
||||
_contexts[shard]->semaphore = std::make_unique<reader_concurrency_semaphore>(0, std::numeric_limits<ssize_t>::max(),
|
||||
format("reader_concurrency_semaphore @shard_id={}", shard));
|
||||
_contexts[shard]->permit = _contexts[shard]->semaphore->make_permit();
|
||||
_contexts[shard]->permit = _contexts[shard]->semaphore->make_permit(nullptr, "tests::reader_lifecycle_policy");
|
||||
// Add a waiter, so that all registered inactive reads are
|
||||
// immediately evicted.
|
||||
// We don't care about the returned future.
|
||||
|
||||
@@ -30,7 +30,7 @@ reader_concurrency_semaphore& semaphore() {
|
||||
}
|
||||
|
||||
reader_permit make_permit() {
|
||||
return the_semaphore.make_permit();
|
||||
return the_semaphore.make_permit(nullptr, "test");
|
||||
}
|
||||
|
||||
query::query_class_config make_query_class_config() {
|
||||
|
||||
@@ -155,7 +155,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
void execute_reads(reader_concurrency_semaphore& sem, unsigned reads, unsigned concurrency, std::function<future<>(unsigned)> read) {
|
||||
void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned reads, unsigned concurrency, std::function<future<>(unsigned)> read) {
|
||||
const reader_resources initial_res = sem.available_resources();
|
||||
unsigned n = 0;
|
||||
gate g;
|
||||
@@ -186,7 +186,7 @@ void execute_reads(reader_concurrency_semaphore& sem, unsigned reads, unsigned c
|
||||
|
||||
if (sem.waiters()) {
|
||||
testlog.trace("Waiting for queue to drain");
|
||||
auto permit = sem.make_permit();
|
||||
auto permit = sem.make_permit(&s, "drain");
|
||||
permit.wait_admission(1, db::no_timeout).get();
|
||||
}
|
||||
}
|
||||
@@ -279,7 +279,7 @@ void test_main_thread(cql_test_env& env) {
|
||||
try {
|
||||
auto _ = sc.collect();
|
||||
memory::set_heap_profiling_enabled(true);
|
||||
execute_reads(sem, reads, read_concurrency, [&] (unsigned i) {
|
||||
execute_reads(*s, sem, reads, read_concurrency, [&] (unsigned i) {
|
||||
return env.execute_cql(format("select * from ks.test where pk = 0 and ck > {} limit 100;",
|
||||
tests::random::get_int(rows / 2))).discard_result();
|
||||
});
|
||||
|
||||
63
utils/human_readable.cc
Normal file
63
utils/human_readable.cc
Normal file
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
#include "utils/human_readable.hh"
|
||||
|
||||
#include <array>
|
||||
#include <ostream>
|
||||
|
||||
namespace utils {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const human_readable_value& val) {
|
||||
os << val.value;
|
||||
if (val.suffix) {
|
||||
os << val.suffix;
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
static human_readable_value to_human_readable_value(uint64_t value, uint64_t step, uint64_t precision, const std::array<char, 5>& suffixes) {
|
||||
if (!value) {
|
||||
return {0, suffixes[0]};
|
||||
}
|
||||
|
||||
uint64_t result = value;
|
||||
uint64_t remainder = 0;
|
||||
unsigned i = 0;
|
||||
// If there is no remainder we go below precision because we don't loose any.
|
||||
while (((!remainder && result >= step) || result >= precision)) {
|
||||
remainder = result % step;
|
||||
result /= step;
|
||||
if (i == suffixes.size()) {
|
||||
break;
|
||||
} else {
|
||||
++i;
|
||||
}
|
||||
}
|
||||
return {uint16_t(remainder < (step / 2) ? result : result + 1), suffixes[i]};
|
||||
}
|
||||
|
||||
human_readable_value to_hr_size(uint64_t size) {
|
||||
const std::array<char, 5> suffixes = {'B', 'K', 'M', 'G', 'T'};
|
||||
return to_human_readable_value(size, 1024, 8192, suffixes);
|
||||
}
|
||||
|
||||
} // namespace utils
|
||||
53
utils/human_readable.hh
Normal file
53
utils/human_readable.hh
Normal file
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cinttypes>
|
||||
#include <iosfwd>
|
||||
|
||||
namespace utils {
|
||||
|
||||
struct human_readable_value {
|
||||
uint16_t value; // [0, 1024)
|
||||
char suffix; // 0 -> no suffix
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const human_readable_value& val);
|
||||
|
||||
/// Convert a size to a human readable representation.
|
||||
///
|
||||
/// The human-readable representation has at most 4 digits
|
||||
/// and a letter appropriate to the power of two the number has to be multiplied
|
||||
/// with to arrive to the original number (with some loss of precision).
|
||||
/// The different powers of two are the conventional 2 ** (N * 10) variants:
|
||||
/// * N=0: (B)ytes
|
||||
/// * N=1: (K)bytes
|
||||
/// * N=2: (M)bytes
|
||||
/// * N=3: (G)bytes
|
||||
/// * N=4: (T)bytes
|
||||
///
|
||||
/// Examples:
|
||||
/// * 87665 will be converted to 87K
|
||||
/// * 1024 will be converted to 1K
|
||||
human_readable_value to_hr_size(uint64_t size);
|
||||
|
||||
} // namespace utils
|
||||
Reference in New Issue
Block a user