reader_permit: keep trace_state pointer on permit
And propagate it down to where it is created. This will be used to add trace points for semaphore related events, but this will come in the next patches.
This commit is contained in:
@@ -1747,7 +1747,7 @@ view_builder::view_builder(replica::database& db, db::system_keyspace& sys_ks, d
|
||||
, _sys_ks(sys_ks)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _mnotifier(mn)
|
||||
, _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder", db::no_timeout)) {
|
||||
, _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder", db::no_timeout, {})) {
|
||||
setup_metrics();
|
||||
}
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ future<> view_update_generator::start() {
|
||||
ssts->insert(sst);
|
||||
}
|
||||
|
||||
auto permit = _db.obtain_reader_permit(*t, "view_update_generator", db::no_timeout).get0();
|
||||
auto permit = _db.obtain_reader_permit(*t, "view_update_generator", db::no_timeout, {}).get0();
|
||||
auto ms = mutation_source([this, ssts] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
|
||||
@@ -213,7 +213,7 @@ public:
|
||||
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout)
|
||||
: _db(db)
|
||||
, _schema(std::move(s))
|
||||
, _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout))
|
||||
, _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout, trace_state))
|
||||
, _cmd(cmd)
|
||||
, _ranges(ranges)
|
||||
, _trace_state(std::move(trace_state))
|
||||
@@ -261,14 +261,14 @@ public:
|
||||
return *_semaphores[shard];
|
||||
}
|
||||
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override {
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override {
|
||||
const auto shard = this_shard_id();
|
||||
auto& rm = _readers[shard];
|
||||
if (rm.state == reader_state::successful_lookup) {
|
||||
rm.rparts->permit.set_max_result_size(get_max_result_size());
|
||||
co_return rm.rparts->permit;
|
||||
}
|
||||
auto permit = co_await _db.local().obtain_reader_permit(std::move(schema), description, timeout);
|
||||
auto permit = co_await _db.local().obtain_reader_permit(std::move(schema), description, timeout, std::move(trace_ptr));
|
||||
permit.set_max_result_size(get_max_result_size());
|
||||
co_return permit;
|
||||
}
|
||||
|
||||
@@ -113,7 +113,7 @@ future<> multishard_writer::make_shard_writer(unsigned shard) {
|
||||
reader = make_foreign(std::make_unique<flat_mutation_reader_v2>(std::move(reader)))] () mutable {
|
||||
auto s = gs.get();
|
||||
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "shard_writer");
|
||||
auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout);
|
||||
auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout, {});
|
||||
auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader));
|
||||
return make_foreign(std::make_unique<shard_writer>(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer));
|
||||
}).then([this, shard] (foreign_ptr<std::unique_ptr<shard_writer>> writer) {
|
||||
|
||||
@@ -140,6 +140,7 @@ private:
|
||||
uint64_t _sstables_read = 0;
|
||||
size_t _requested_memory = 0;
|
||||
uint64_t _oom_kills = 0;
|
||||
tracing::trace_state_ptr _trace_ptr;
|
||||
|
||||
// Not strictly related to the permit.
|
||||
// Used by the semaphore to to manage the permit.
|
||||
@@ -204,23 +205,25 @@ private:
|
||||
public:
|
||||
struct value_tag {};
|
||||
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout)
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(schema)
|
||||
, _op_name_view(op_name)
|
||||
, _base_resources(base_resources)
|
||||
, _ttl_timer([this] { on_timeout(); })
|
||||
, _trace_ptr(std::move(trace_ptr))
|
||||
{
|
||||
set_timeout(timeout);
|
||||
_semaphore.on_permit_created(*this);
|
||||
}
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout)
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(schema)
|
||||
, _op_name(std::move(op_name))
|
||||
, _op_name_view(_op_name)
|
||||
, _base_resources(base_resources)
|
||||
, _ttl_timer([this] { on_timeout(); })
|
||||
, _trace_ptr(std::move(trace_ptr))
|
||||
{
|
||||
set_timeout(timeout);
|
||||
_semaphore.on_permit_created(*this);
|
||||
@@ -478,14 +481,14 @@ reader_permit::reader_permit(shared_ptr<impl> impl) : _impl(std::move(impl))
|
||||
}
|
||||
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, op_name, base_resources, timeout))
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, op_name, base_resources, timeout, std::move(trace_ptr)))
|
||||
{
|
||||
}
|
||||
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, std::move(op_name), base_resources, timeout))
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, std::move(op_name), base_resources, timeout, std::move(trace_ptr)))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -1383,32 +1386,34 @@ void reader_concurrency_semaphore::on_permit_unblocked() noexcept {
|
||||
}
|
||||
|
||||
future<reader_permit> reader_concurrency_semaphore::obtain_permit(const schema* const schema, const char* const op_name, size_t memory,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout);
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
return do_wait_admission(*permit).then([permit] () mutable {
|
||||
return std::move(permit);
|
||||
});
|
||||
}
|
||||
|
||||
future<reader_permit> reader_concurrency_semaphore::obtain_permit(const schema* const schema, sstring&& op_name, size_t memory,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast<ssize_t>(memory)}, timeout);
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
return do_wait_admission(*permit).then([permit] () mutable {
|
||||
return std::move(permit);
|
||||
});
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout) {
|
||||
return reader_permit(*this, schema, std::string_view(op_name), {}, timeout);
|
||||
reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name,
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
return reader_permit(*this, schema, std::string_view(op_name), {}, timeout, std::move(trace_ptr));
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout) {
|
||||
return reader_permit(*this, schema, std::move(op_name), {}, timeout);
|
||||
reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name,
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
return reader_permit(*this, schema, std::move(op_name), {}, timeout, std::move(trace_ptr));
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::with_permit(const schema* const schema, const char* const op_name, size_t memory,
|
||||
db::timeout_clock::time_point timeout, read_func func) {
|
||||
auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout);
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func) {
|
||||
auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
permit->aux_data().func = std::move(func);
|
||||
permit->aux_data().permit_keepalive = permit;
|
||||
return do_wait_admission(*permit);
|
||||
|
||||
@@ -386,8 +386,8 @@ public:
|
||||
///
|
||||
/// Some permits cannot be associated with any table, so passing nullptr as
|
||||
/// the schema parameter is allowed.
|
||||
future<reader_permit> obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout);
|
||||
future<reader_permit> obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout);
|
||||
future<reader_permit> obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
future<reader_permit> obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
|
||||
/// Make a tracking only permit
|
||||
///
|
||||
@@ -402,8 +402,8 @@ public:
|
||||
///
|
||||
/// Some permits cannot be associated with any table, so passing nullptr as
|
||||
/// the schema parameter is allowed.
|
||||
reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout);
|
||||
reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout);
|
||||
reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
|
||||
/// Run the function through the semaphore's execution stage with an admitted permit
|
||||
///
|
||||
@@ -424,7 +424,7 @@ public:
|
||||
///
|
||||
/// Some permits cannot be associated with any table, so passing nullptr as
|
||||
/// the schema parameter is allowed.
|
||||
future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, read_func func);
|
||||
future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func);
|
||||
|
||||
/// Run the function through the semaphore's execution stage with a pre-admitted permit
|
||||
///
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "query_class_config.hh"
|
||||
|
||||
namespace seastar {
|
||||
@@ -98,9 +99,9 @@ private:
|
||||
reader_permit() = default;
|
||||
reader_permit(shared_ptr<impl>);
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout);
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout);
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
|
||||
reader_permit::impl& operator*() { return *_impl; }
|
||||
reader_permit::impl* operator->() { return _impl.get(); }
|
||||
|
||||
@@ -819,7 +819,7 @@ future<> shard_reader_v2::do_fill_buffer() {
|
||||
return lifecycle_policy->create_reader(std::move(s), std::move(permit), pr, ps, pc, std::move(ts), fwd_mr);
|
||||
});
|
||||
auto s = gs.get();
|
||||
auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout());
|
||||
auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout(), _trace_state);
|
||||
auto rreader = make_foreign(std::make_unique<evictable_reader_v2>(evictable_reader_v2::auto_pause::yes, std::move(ms),
|
||||
s, std::move(permit), *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
|
||||
|
||||
@@ -97,7 +97,8 @@ public:
|
||||
/// `semaphore()`.
|
||||
///
|
||||
/// This method will be called on the shard where the relevant reader lives.
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) = 0;
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout,
|
||||
tracing::trace_state_ptr trace_ptr) = 0;
|
||||
};
|
||||
|
||||
/// Make a multishard_combining_reader.
|
||||
|
||||
@@ -2779,7 +2779,7 @@ public:
|
||||
rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}",
|
||||
_shard_task.global_repair_id.uuid(), wanted, mem_sem.current(), max);
|
||||
|
||||
auto permit = _shard_task.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0();
|
||||
auto permit = _shard_task.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout, {}).get0();
|
||||
|
||||
repair_meta master(_shard_task.rs,
|
||||
_cf,
|
||||
@@ -3100,7 +3100,7 @@ repair_service::insert_repair_meta(
|
||||
reason] (schema_ptr s) {
|
||||
auto& db = get_db();
|
||||
auto& cf = db.local().find_column_family(s->id());
|
||||
return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout).then([s = std::move(s),
|
||||
return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout, {}).then([s = std::move(s),
|
||||
&cf,
|
||||
this,
|
||||
from,
|
||||
|
||||
@@ -1521,7 +1521,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
|
||||
if (querier_opt) {
|
||||
f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func));
|
||||
} else {
|
||||
f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, read_func));
|
||||
f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func));
|
||||
}
|
||||
|
||||
if (!f.failed()) {
|
||||
@@ -1587,7 +1587,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
if (querier_opt) {
|
||||
f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func));
|
||||
} else {
|
||||
f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, read_func));
|
||||
f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func));
|
||||
}
|
||||
|
||||
if (!f.failed()) {
|
||||
@@ -1636,12 +1636,12 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() {
|
||||
std::abort();
|
||||
}
|
||||
|
||||
future<reader_permit> database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout) {
|
||||
return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout);
|
||||
future<reader_permit> database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr));
|
||||
}
|
||||
|
||||
future<reader_permit> database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout) {
|
||||
return obtain_reader_permit(find_column_family(std::move(schema)), op_name, timeout);
|
||||
future<reader_permit> database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
return obtain_reader_permit(find_column_family(std::move(schema)), op_name, timeout, std::move(trace_ptr));
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const column_family& cf) {
|
||||
@@ -1701,7 +1701,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
// counter state for each modified cell...
|
||||
|
||||
tracing::trace(trace_state, "Reading counter values from the CF");
|
||||
auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout);
|
||||
auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout, trace_state);
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state)
|
||||
.then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) {
|
||||
// ...now, that we got existing state of all affected counter
|
||||
@@ -2771,9 +2771,9 @@ flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::da
|
||||
}
|
||||
return *_contexts[shard].semaphore;
|
||||
}
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override {
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override {
|
||||
auto& cf = _db.local().find_column_family(_table_id);
|
||||
return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout);
|
||||
return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr));
|
||||
}
|
||||
};
|
||||
auto ms = mutation_source([&db] (schema_ptr s,
|
||||
|
||||
@@ -1716,8 +1716,8 @@ public:
|
||||
reader_concurrency_semaphore& get_reader_concurrency_semaphore();
|
||||
|
||||
// Convenience method to obtain an admitted permit. See reader_concurrency_semaphore::obtain_permit().
|
||||
future<reader_permit> obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout);
|
||||
future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout);
|
||||
future<reader_permit> obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
|
||||
bool is_internal_query() const;
|
||||
|
||||
|
||||
@@ -874,7 +874,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtabl
|
||||
|
||||
auto f = consumer(old->make_flush_reader(
|
||||
old->schema(),
|
||||
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout),
|
||||
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout, {}),
|
||||
service::get_local_memtable_flush_priority()));
|
||||
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
@@ -2271,7 +2271,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables::
|
||||
std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable"),
|
||||
cfg,
|
||||
[&mt, sst] (auto& monitor, auto& semaphore, auto& cfg) {
|
||||
return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout), mt, std::move(sst), monitor, cfg)
|
||||
return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout, {}), mt, std::move(sst), monitor, cfg)
|
||||
.finally([&semaphore] {
|
||||
return semaphore->stop();
|
||||
});
|
||||
@@ -2605,7 +2605,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s
|
||||
const bool need_static = db::view::needs_static_row(m.partition(), views);
|
||||
if (!need_regular && !need_static) {
|
||||
tracing::trace(tr_state, "View updates do not require read-before-write");
|
||||
co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout), std::move(views), std::move(m), { }, std::move(tr_state), now);
|
||||
co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now);
|
||||
// In this case we are not doing a read-before-write, just a
|
||||
// write, so no lock is needed.
|
||||
co_return row_locker::lock_holder();
|
||||
@@ -2638,7 +2638,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s
|
||||
co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout);
|
||||
auto lock = co_await std::move(lockf);
|
||||
auto pk = dht::partition_range::make_singular(m.decorated_key());
|
||||
auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout);
|
||||
auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout, tr_state);
|
||||
auto reader = source.make_reader_v2(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now);
|
||||
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
|
||||
@@ -2723,7 +2723,7 @@ public:
|
||||
return _t.get_compaction_strategy();
|
||||
}
|
||||
reader_permit make_compaction_reader_permit() const override {
|
||||
return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout);
|
||||
return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout, {});
|
||||
}
|
||||
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
||||
return _t.get_sstables_manager();
|
||||
|
||||
@@ -1291,7 +1291,7 @@ future<> sstable::load_first_and_last_position_in_partition() {
|
||||
}
|
||||
|
||||
auto& sem = _manager.sstable_metadata_concurrency_sem();
|
||||
reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout);
|
||||
reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {});
|
||||
auto first_pos_opt = co_await find_first_position_in_partition(permit, get_first_decorated_key(), false);
|
||||
auto last_pos_opt = co_await find_first_position_in_partition(permit, get_last_decorated_key(), true);
|
||||
|
||||
@@ -1888,7 +1888,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
|
||||
auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.config().sstable_summary_ratio());
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
||||
*this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout), s, trust_promoted_index::yes,
|
||||
*this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes,
|
||||
make_file_input_stream(index_file, 0, index_size, std::move(options)), 0, index_size,
|
||||
(_version >= sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
|
||||
@@ -3032,7 +3032,7 @@ future<bool> sstable::has_partition_key(const utils::hashed_key& hk, const dht::
|
||||
std::exception_ptr ex;
|
||||
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()");
|
||||
try {
|
||||
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout), default_priority_class(), tracing::trace_state_ptr(), use_caching::yes);
|
||||
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout, {}), default_priority_class(), tracing::trace_state_ptr(), use_caching::yes);
|
||||
present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
|
||||
@@ -162,7 +162,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
size_t num_partitions_processed = 0;
|
||||
size_t num_bytes_read = 0;
|
||||
nr_sst_current += sst_processed.size();
|
||||
auto permit = co_await _db.local().obtain_reader_permit(table, "sstables_loader::load_and_stream()", db::no_timeout);
|
||||
auto permit = co_await _db.local().obtain_reader_permit(table, "sstables_loader::load_and_stream()", db::no_timeout, {});
|
||||
auto reader = mutation_fragment_v1_stream(table.make_streaming_reader(s, std::move(permit), full_partition_range, sst_set));
|
||||
std::exception_ptr eptr;
|
||||
bool failed = false;
|
||||
|
||||
@@ -121,7 +121,7 @@ void stream_manager::init_messaging_service_handler() {
|
||||
utils::fb_utilities::get_broadcast_address())));
|
||||
}
|
||||
return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable {
|
||||
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable {
|
||||
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable {
|
||||
auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source);
|
||||
struct stream_mutation_fragments_cmd_status {
|
||||
bool got_cmd = false;
|
||||
|
||||
@@ -197,7 +197,7 @@ future<> stream_transfer_task::execute() {
|
||||
auto& sm = session->manager();
|
||||
return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (stream_manager& sm) mutable {
|
||||
auto& tbl = sm.db().find_column_family(cf_id);
|
||||
return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable {
|
||||
return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable {
|
||||
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) {
|
||||
sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz);
|
||||
});
|
||||
|
||||
@@ -644,7 +644,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){
|
||||
{
|
||||
std::vector<flat_mutation_reader_v2> readers;
|
||||
readers.reserve(memtables.size());
|
||||
auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout);
|
||||
auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {});
|
||||
for (auto mt : memtables) {
|
||||
readers.push_back(mt->make_flat_reader(s, permit));
|
||||
}
|
||||
|
||||
@@ -1231,7 +1231,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
|
||||
auto q = query::querier(
|
||||
tbl.as_mutation_source(),
|
||||
tbl.schema(),
|
||||
database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout),
|
||||
database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
default_priority_class(),
|
||||
|
||||
@@ -65,7 +65,7 @@ SEASTAR_THREAD_TEST_CASE(mutation_fragment_sanity_check) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__);
|
||||
auto stop_semaphore = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {});
|
||||
gc_clock::time_point ts(gc_clock::duration(1234567890000));
|
||||
|
||||
auto check_hash = [&] (const mutation_fragment& mf, uint64_t expected) {
|
||||
|
||||
@@ -358,7 +358,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
|
||||
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
|
||||
const auto available_res = sem.available_resources();
|
||||
const sstring val(1024, 'a');
|
||||
@@ -428,7 +428,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
|
||||
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto expect = [&] (bool expect_valid, const char* desc, unsigned at, auto&& first_mf, auto&&... mf) {
|
||||
std::vector<mutation_fragment_v2> mfs;
|
||||
@@ -619,7 +619,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage
|
||||
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {});
|
||||
|
||||
mutation_fragment_stream_validator validator(*ss.schema());
|
||||
|
||||
|
||||
@@ -2921,7 +2921,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pkeys = s.make_pkeys(4);
|
||||
std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
|
||||
@@ -3199,7 +3199,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto pkeys = s.make_pkeys(6);
|
||||
boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
|
||||
|
||||
@@ -3250,7 +3250,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pkeys = s.make_pkeys(2);
|
||||
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
|
||||
@@ -3477,7 +3477,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_non_monotonic_positions) {
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto schema = s.schema();
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pkey = s.make_pkey();
|
||||
const auto prange = dht::partition_range::make_open_ended_both_sides();
|
||||
@@ -3536,7 +3536,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_p
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto schema = s.schema();
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pkeys = s.make_pkeys(2);
|
||||
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
|
||||
|
||||
@@ -105,7 +105,7 @@ private:
|
||||
Querier make_querier(const dht::partition_range& range) {
|
||||
return Querier(_mutation_source,
|
||||
_s.schema(),
|
||||
_sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout),
|
||||
_sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout, {}),
|
||||
range,
|
||||
_s.schema()->full_slice(),
|
||||
service::get_local_sstable_query_read_priority(),
|
||||
@@ -674,7 +674,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0);
|
||||
|
||||
// Drain all resources of the semaphore
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout);
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {});
|
||||
auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources());
|
||||
|
||||
auto cmd2 = query::read_command(s->id(),
|
||||
@@ -723,13 +723,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
|
||||
test_querier_cache t;
|
||||
|
||||
auto& sem = t.get_semaphore();
|
||||
auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout).get0();
|
||||
auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout, {}).get0();
|
||||
|
||||
auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0));
|
||||
|
||||
BOOST_CHECK_EQUAL(sem.available_resources().count, 0);
|
||||
|
||||
auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout);
|
||||
auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout, {});
|
||||
|
||||
BOOST_CHECK_EQUAL(sem.get_stats().waiters, 1);
|
||||
|
||||
@@ -755,8 +755,8 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
|
||||
.with_column("v", int32_type)
|
||||
.build();
|
||||
|
||||
auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout)));
|
||||
auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout)));
|
||||
auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {})));
|
||||
auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {})));
|
||||
|
||||
// Sanity check that lookup still works with empty handle.
|
||||
BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{}));
|
||||
|
||||
@@ -41,7 +41,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
|
||||
auto clear_permits = defer([&permits] { permits.clear(); });
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout));
|
||||
permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}));
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permits.back())));
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
|
||||
handles.clear();
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout))));
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}))));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
@@ -77,14 +77,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
|
||||
|
||||
// Not admitted, active
|
||||
{
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto units2 = permit.consume_memory(1024);
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources);
|
||||
|
||||
// Not admitted, inactive
|
||||
{
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto units2 = permit.consume_memory(1024);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
|
||||
@@ -94,14 +94,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
|
||||
|
||||
// Admitted, active
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0();
|
||||
auto units1 = permit.consume_memory(1024);
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources);
|
||||
|
||||
// Admitted, inactive
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0();
|
||||
auto units1 = permit.consume_memory(1024);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
|
||||
@@ -115,7 +115,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_clos
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
{
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
|
||||
// The handle is destroyed here, triggering the destrution of the inactive read.
|
||||
@@ -136,7 +136,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get();
|
||||
reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get();
|
||||
BOOST_REQUIRE_EQUAL(permit->consumed_resources(), base_resources);
|
||||
|
||||
std::optional<reader_permit::resource_units> residue_units;
|
||||
@@ -152,7 +152,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources - permit->consumed_resources());
|
||||
|
||||
if (i % 2) {
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout);
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources());
|
||||
|
||||
auto fut = make_ready_future<>();
|
||||
@@ -267,9 +267,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) {
|
||||
}
|
||||
future<> obtain_permit() {
|
||||
if (_memory_only) {
|
||||
_permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout);
|
||||
_permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout, {});
|
||||
} else {
|
||||
_permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout);
|
||||
_permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {});
|
||||
}
|
||||
_units = _permit->consume_memory(tests::random::get_int(128, 1024));
|
||||
}
|
||||
@@ -414,7 +414,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) {
|
||||
return async([&] {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 4 * 1024);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get();
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout, {}).get();
|
||||
|
||||
{
|
||||
auto tracked_file = make_tracked_file(file(shared_ptr<file_impl>(make_shared<dummy_file_impl>())), permit);
|
||||
@@ -474,11 +474,11 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) {
|
||||
{
|
||||
auto timeout = db::timeout_clock::now() + std::chrono::duration_cast<db::timeout_clock::time_point::duration>(std::chrono::milliseconds{1});
|
||||
|
||||
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout).get();
|
||||
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout, {}).get();
|
||||
|
||||
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout);
|
||||
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout, {});
|
||||
|
||||
auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, timeout);
|
||||
auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, timeout, {});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2);
|
||||
|
||||
@@ -516,15 +516,15 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) {
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
{
|
||||
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, db::no_timeout).get();
|
||||
reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, db::no_timeout, {}).get();
|
||||
|
||||
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, db::no_timeout);
|
||||
auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, db::no_timeout, {});
|
||||
|
||||
auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, db::no_timeout);
|
||||
auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, db::no_timeout, {});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2);
|
||||
|
||||
auto permit4_fut = semaphore.obtain_permit(nullptr, "permit4", replica::new_reader_base_cost, db::no_timeout);
|
||||
auto permit4_fut = semaphore.obtain_permit(nullptr, "permit4", replica::new_reader_base_cost, db::no_timeout, {});
|
||||
|
||||
// The queue should now be full.
|
||||
BOOST_REQUIRE_THROW(permit4_fut.get(), std::runtime_error);
|
||||
@@ -564,7 +564,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) {
|
||||
for (auto& schema : schemas) {
|
||||
const auto nr_permits = tests::random::get_int<unsigned>(2, 32);
|
||||
for (unsigned i = 0; i < nr_permits; ++i) {
|
||||
auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int<unsigned>(0, nr_ops - 1)), db::no_timeout);
|
||||
auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int<unsigned>(0, nr_ops - 1)), db::no_timeout, {});
|
||||
if (tests::random::get_int<unsigned>(0, 4)) {
|
||||
auto units = permit.consume_resources(reader_resources(tests::random::get_int<unsigned>(0, 1), tests::random::get_int<unsigned>(1024, 16 * 1024 * 1024)));
|
||||
permits.push_back(std::pair(std::move(permit), std::move(units)));
|
||||
@@ -597,7 +597,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits
|
||||
BOOST_TEST_MESSAGE("1 permit");
|
||||
{
|
||||
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, get_name());
|
||||
auto permit = std::make_unique<reader_permit>(semaphore->make_tracking_only_permit(nullptr, "permit1", db::no_timeout));
|
||||
auto permit = std::make_unique<reader_permit>(semaphore->make_tracking_only_permit(nullptr, "permit1", db::no_timeout, {}));
|
||||
|
||||
// Test will fail via use-after-free
|
||||
auto f = semaphore->stop().then([semaphore = std::move(semaphore)] { });
|
||||
@@ -623,7 +623,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
testlog.trace("Running admission scenario {}, with exepcted_can_admit={}", description, expected_can_admit);
|
||||
const auto stats_before = semaphore.get_stats();
|
||||
|
||||
auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now());
|
||||
auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {});
|
||||
admit_fut.wait();
|
||||
const bool can_admit = !admit_fut.failed();
|
||||
if (can_admit) {
|
||||
@@ -650,13 +650,13 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// resources and waitlist
|
||||
{
|
||||
reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get();
|
||||
reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
|
||||
require_can_admit(true, "enough resources");
|
||||
|
||||
const auto stats_before = semaphore.get_stats();
|
||||
|
||||
auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout);
|
||||
auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout, {});
|
||||
{
|
||||
const auto stats_after = semaphore.get_stats();
|
||||
BOOST_REQUIRE(!enqueued_permit_fut.available());
|
||||
@@ -678,7 +678,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// used and blocked
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get();
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
|
||||
require_can_admit(true, "!used");
|
||||
{
|
||||
@@ -698,7 +698,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// forward progress -- resources
|
||||
{
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(nullptr, "sponge", db::no_timeout);
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(nullptr, "sponge", db::no_timeout, {});
|
||||
sponge_permit.consume_resources(reader_resources::with_memory(semaphore.available_resources().memory));
|
||||
require_can_admit(true, "semaphore with no memory but all count available");
|
||||
}
|
||||
@@ -707,7 +707,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// forward progress -- readmission
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get();
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
|
||||
auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
|
||||
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
|
||||
@@ -733,7 +733,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// inactive readers
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get();
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
|
||||
require_can_admit(true, "!used");
|
||||
{
|
||||
@@ -761,10 +761,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// evicting inactive readers for admission
|
||||
{
|
||||
auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get();
|
||||
auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit1));
|
||||
|
||||
auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get();
|
||||
auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto irh2 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit2));
|
||||
|
||||
require_can_admit(true, "evictable reads");
|
||||
@@ -779,7 +779,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
const auto stats_before = semaphore.get_stats();
|
||||
|
||||
auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout);
|
||||
auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout, {});
|
||||
|
||||
const auto stats_after = semaphore.get_stats();
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted);
|
||||
@@ -800,7 +800,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
check_admitting_enqueued_read(
|
||||
[&] {
|
||||
return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now()).get());
|
||||
return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get());
|
||||
},
|
||||
[] (reader_permit_opt& permit1) {
|
||||
permit1 = {};
|
||||
@@ -815,7 +815,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
check_admitting_enqueued_read(
|
||||
[&] {
|
||||
return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now()).get());
|
||||
return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get());
|
||||
},
|
||||
[&] (reader_permit_opt& permit1) {
|
||||
return semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit1));
|
||||
@@ -829,7 +829,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
check_admitting_enqueued_read(
|
||||
[&] {
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get();
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
require_can_admit(true, "enough resources");
|
||||
return std::pair(permit, std::optional<reader_permit::used_guard>{permit});
|
||||
}, [&] (std::pair<reader_permit, std::optional<reader_permit::used_guard>>& permit_and_used_guard) {
|
||||
@@ -845,7 +845,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
check_admitting_enqueued_read(
|
||||
[&] {
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get();
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
require_can_admit(true, "enough resources");
|
||||
return std::pair(permit, reader_permit::used_guard{permit});
|
||||
}, [&] (std::pair<reader_permit, reader_permit::used_guard>& permit_and_used_guard) {
|
||||
@@ -866,7 +866,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) {
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0);
|
||||
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
|
||||
for (auto scenario = 0; scenario < 5; ++scenario) {
|
||||
testlog.info("Running scenario {}", scenario);
|
||||
@@ -953,7 +953,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
for (auto& s : schemas) {
|
||||
auto& handles = schema_handles[&s];
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout))));
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1008,10 +1008,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
}
|
||||
};
|
||||
|
||||
auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get();
|
||||
auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get();
|
||||
auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1));
|
||||
|
||||
auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get();
|
||||
auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get();
|
||||
read rd2(p2);
|
||||
auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func());
|
||||
|
||||
@@ -1020,7 +1020,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
// * 1 used (but not blocked) read on the ready list
|
||||
// * 1 waiter
|
||||
// * no more count resources left
|
||||
auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout);
|
||||
auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); // (waiters includes _ready_list entries)
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 0); // permit looses used status while waiting for execution
|
||||
@@ -1061,8 +1061,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(2, 2 * 1024));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024));
|
||||
|
||||
@@ -1082,7 +1082,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) {
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(-1, 1024));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(1, 3 * 1024));
|
||||
|
||||
auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout);
|
||||
auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1);
|
||||
|
||||
@@ -1128,7 +1128,7 @@ private:
|
||||
public:
|
||||
explicit allocating_reader(reader_concurrency_semaphore& sem) : _sem(sem) {
|
||||
testlog.debug("[{}] allocating_reader created", fmt::ptr(this));
|
||||
_admission_fut = sem.obtain_permit(nullptr, "reader", admission_cost, db::no_timeout).then_wrapped([this] (future<reader_permit>&& permit_fut) {
|
||||
_admission_fut = sem.obtain_permit(nullptr, "reader", admission_cost, db::no_timeout, {}).then_wrapped([this] (future<reader_permit>&& permit_fut) {
|
||||
try {
|
||||
_permit = std::move(permit_fut.get());
|
||||
_state = state::request_memory;
|
||||
@@ -1335,7 +1335,7 @@ memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_
|
||||
auto sst = sst_man.make_sstable(s, sstables_dir.path().string(), sstables::generation_type{num_sstables});
|
||||
auto writer_cfg = sst_man.configure_writer("test");
|
||||
sst->write_components(
|
||||
make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout), mut, s->full_slice()),
|
||||
make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), mut, s->full_slice()),
|
||||
1,
|
||||
s,
|
||||
writer_cfg,
|
||||
@@ -1493,7 +1493,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
|
||||
uint64_t reads_enqueued_for_memory = 0;
|
||||
|
||||
@@ -1528,20 +1528,20 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser
|
||||
|
||||
// unused
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
do_check(permit, 0, 0, std::source_location::current());
|
||||
}
|
||||
|
||||
// used
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
reader_permit::used_guard ug{permit};
|
||||
do_check(permit, 1, 0, std::source_location::current());
|
||||
}
|
||||
|
||||
// blocked
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
reader_permit::used_guard ug{permit};
|
||||
reader_permit::blocked_guard bg{permit};
|
||||
do_check(permit, 1, 1, std::source_location::current());
|
||||
@@ -1559,7 +1559,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_ina
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout).get();
|
||||
auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get();
|
||||
|
||||
std::vector<reader_permit::resource_units> permit_res;
|
||||
|
||||
@@ -1588,7 +1588,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout).get());
|
||||
auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get());
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, *permit));
|
||||
|
||||
@@ -1618,8 +1618,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_me
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0();
|
||||
|
||||
std::vector<reader_permit::resource_units> res;
|
||||
|
||||
|
||||
@@ -442,7 +442,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer("test");
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout);
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {});
|
||||
sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
@@ -556,7 +556,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
|
||||
sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test");
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout);
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {});
|
||||
sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
@@ -566,7 +566,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
|
||||
}).get0();
|
||||
|
||||
// consume all units except what is needed to admit a single reader.
|
||||
auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout);
|
||||
auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {});
|
||||
auto resources = sponge_permit.consume_resources(sem.available_resources() - reader_resources{1, replica::new_reader_base_cost});
|
||||
|
||||
testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory);
|
||||
@@ -630,7 +630,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak
|
||||
sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test");
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout);
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {});
|
||||
sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
@@ -731,7 +731,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
void check(mutation mut) {
|
||||
// First we check that we would be able to create a reader, even
|
||||
// though the staging reader consumed all resources.
|
||||
auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now()).get0();
|
||||
auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0();
|
||||
|
||||
const size_t current_rows = rows_in_mut(mut);
|
||||
const auto total_rows = _partition_rows.at(mut.decorated_key());
|
||||
@@ -836,7 +836,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
return less(a.decorated_key(), b.decorated_key());
|
||||
});
|
||||
|
||||
auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout).get0();
|
||||
auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0();
|
||||
|
||||
auto mt = make_lw_shared<replica::memtable>(schema);
|
||||
for (const auto& mut : muts) {
|
||||
|
||||
@@ -349,7 +349,7 @@ public:
|
||||
table_name = std::move(table_name)] (replica::database& db) mutable {
|
||||
auto& cf = db.find_column_family(ks_name, table_name);
|
||||
auto schema = cf.schema();
|
||||
auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()", db::no_timeout);
|
||||
auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()", db::no_timeout, {});
|
||||
return cf.find_partition_slow(schema, permit, pkey)
|
||||
.then([schema, ckey, column_name, exp] (replica::column_family::const_mutation_partition_ptr p) {
|
||||
assert(p != nullptr);
|
||||
@@ -997,7 +997,7 @@ future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_tes
|
||||
}
|
||||
|
||||
reader_permit make_reader_permit(cql_test_env& env) {
|
||||
return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test", db::no_timeout);
|
||||
return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test", db::no_timeout, {});
|
||||
}
|
||||
|
||||
cql_test_config raft_cql_test_config() {
|
||||
|
||||
@@ -26,7 +26,7 @@ public:
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore& semaphore() { return *_semaphore; };
|
||||
reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test", db::no_timeout); }
|
||||
reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test", db::no_timeout, {}); }
|
||||
};
|
||||
|
||||
} // namespace tests
|
||||
|
||||
@@ -95,8 +95,8 @@ public:
|
||||
}
|
||||
return *_contexts[shard]->semaphore;
|
||||
}
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override {
|
||||
return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout);
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override {
|
||||
return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout, std::move(trace_ptr));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -171,10 +171,10 @@ public:
|
||||
tmpdir& tempdir() noexcept { return _impl->dir; }
|
||||
|
||||
reader_permit make_reader_permit(const schema* const s, const char* n, db::timeout_clock::time_point timeout) {
|
||||
return _impl->semaphore.make_tracking_only_permit(s, n, timeout);
|
||||
return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {});
|
||||
}
|
||||
reader_permit make_reader_permit(db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||
return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout);
|
||||
return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout, {});
|
||||
}
|
||||
|
||||
replica::table::config make_table_config() {
|
||||
|
||||
@@ -84,7 +84,7 @@ public:
|
||||
return table().get_compaction_strategy();
|
||||
}
|
||||
reader_permit make_compaction_reader_permit() const override {
|
||||
return _data.semaphore.make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout);
|
||||
return _data.semaphore.make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {});
|
||||
}
|
||||
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
||||
return _sstables_manager;
|
||||
|
||||
@@ -175,7 +175,7 @@ void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned
|
||||
|
||||
if (sem.get_stats().waiters) {
|
||||
testlog.trace("Waiting for queue to drain");
|
||||
sem.obtain_permit(&s, "drain", 1, db::no_timeout).get();
|
||||
sem.obtain_permit(&s, "drain", 1, db::no_timeout, {}).get();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ reader_concurrency_semaphore_wrapper::~reader_concurrency_semaphore_wrapper() {
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore_wrapper::make_permit() {
|
||||
return _semaphore->make_tracking_only_permit(nullptr, "perf", db::no_timeout);
|
||||
return _semaphore->make_tracking_only_permit(nullptr, "perf", db::no_timeout, {});
|
||||
}
|
||||
|
||||
} // namespace perf
|
||||
|
||||
@@ -54,7 +54,7 @@ struct table {
|
||||
}
|
||||
|
||||
reader_permit make_permit() {
|
||||
return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout);
|
||||
return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {});
|
||||
}
|
||||
future<> stop() noexcept {
|
||||
return semaphore.stop();
|
||||
|
||||
@@ -2854,7 +2854,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
|
||||
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name);
|
||||
auto stop_semaphore = deferred_stop(rcs_sem);
|
||||
|
||||
const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout);
|
||||
const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout, {});
|
||||
|
||||
operation(schema, permit, sstables, sst_man, app_config);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user