reader_permit: store schema_ptr instead of raw schema pointer
Store schema_ptr in reader permit instead of storing a const pointer to schema to ensure that the schema doesn't get changed elsewhere when the permit is holding on to it. Also update the constructors and all the relevant callers to pass down schema_ptr instead of a raw pointer. Fixes #16180 Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com> Closes scylladb/scylladb#16658
This commit is contained in:
committed by
Botond Dénes
parent
f4e311e871
commit
76f0d5e35b
@@ -215,7 +215,7 @@ public:
|
||||
: _db(db)
|
||||
, _schema(std::move(s))
|
||||
, _erm(std::move(erm))
|
||||
, _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout, trace_state))
|
||||
, _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema, "multishard-mutation-query", timeout, trace_state))
|
||||
, _cmd(cmd)
|
||||
, _ranges(ranges)
|
||||
, _trace_state(std::move(trace_state))
|
||||
|
||||
@@ -118,7 +118,7 @@ future<> multishard_writer::make_shard_writer(unsigned shard) {
|
||||
auto s = gs.get();
|
||||
auto semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "shard_writer",
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout, {});
|
||||
auto permit = semaphore->make_tracking_only_permit(s, "multishard-writer", db::no_timeout, {});
|
||||
auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader));
|
||||
return make_foreign(std::make_unique<shard_writer>(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer));
|
||||
}).then([this, shard] (foreign_ptr<std::unique_ptr<shard_writer>> writer) {
|
||||
|
||||
@@ -150,7 +150,8 @@ public:
|
||||
|
||||
private:
|
||||
reader_concurrency_semaphore& _semaphore;
|
||||
const schema* _schema;
|
||||
schema_ptr _schema;
|
||||
|
||||
sstring _op_name;
|
||||
std::string_view _op_name_view;
|
||||
reader_resources _base_resources;
|
||||
@@ -237,9 +238,9 @@ private:
|
||||
public:
|
||||
struct value_tag {};
|
||||
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(schema)
|
||||
, _schema(std::move(schema))
|
||||
, _op_name_view(op_name)
|
||||
, _base_resources(base_resources)
|
||||
, _ttl_timer([this] { on_timeout(); })
|
||||
@@ -248,9 +249,9 @@ public:
|
||||
set_timeout(timeout);
|
||||
_semaphore.on_permit_created(*this);
|
||||
}
|
||||
impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _semaphore(semaphore)
|
||||
, _schema(schema)
|
||||
, _schema(std::move(schema))
|
||||
, _op_name(std::move(op_name))
|
||||
, _op_name_view(_op_name)
|
||||
, _base_resources(base_resources)
|
||||
@@ -302,7 +303,7 @@ public:
|
||||
return _semaphore;
|
||||
}
|
||||
|
||||
const ::schema* get_schema() const {
|
||||
const schema_ptr& get_schema() const {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
@@ -524,15 +525,15 @@ reader_permit::reader_permit(shared_ptr<impl> impl) : _impl(std::move(impl))
|
||||
{
|
||||
}
|
||||
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name,
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, std::string_view op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, op_name, base_resources, timeout, std::move(trace_ptr)))
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, std::move(schema), op_name, base_resources, timeout, std::move(trace_ptr)))
|
||||
{
|
||||
}
|
||||
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name,
|
||||
reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr)
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, schema, std::move(op_name), base_resources, timeout, std::move(trace_ptr)))
|
||||
: _impl(::seastar::make_shared<reader_permit::impl>(semaphore, std::move(schema), std::move(op_name), base_resources, timeout, std::move(trace_ptr)))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -543,7 +544,7 @@ reader_concurrency_semaphore& reader_permit::semaphore() {
|
||||
return _impl->semaphore();
|
||||
}
|
||||
|
||||
const ::schema* reader_permit::get_schema() const {
|
||||
const schema_ptr& reader_permit::get_schema() const {
|
||||
return _impl->get_schema();
|
||||
}
|
||||
|
||||
@@ -766,7 +767,7 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con
|
||||
permit_groups permits;
|
||||
|
||||
semaphore.foreach_permit([&] (const reader_permit::impl& permit) {
|
||||
permits[permit_group_key(permit.get_schema(), permit.get_op_name(), permit.get_state())].add(permit);
|
||||
permits[permit_group_key(permit.get_schema().get(), permit.get_op_name(), permit.get_state())].add(permit);
|
||||
});
|
||||
|
||||
permit_stats total;
|
||||
@@ -1518,35 +1519,35 @@ void reader_concurrency_semaphore::on_permit_not_awaits() noexcept {
|
||||
--_stats.awaits_permits;
|
||||
}
|
||||
|
||||
future<reader_permit> reader_concurrency_semaphore::obtain_permit(const schema* const schema, const char* const op_name, size_t memory,
|
||||
future<reader_permit> reader_concurrency_semaphore::obtain_permit(schema_ptr schema, const char* const op_name, size_t memory,
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
auto permit = reader_permit(*this, std::move(schema), std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
return do_wait_admission(*permit).then([permit] () mutable {
|
||||
return std::move(permit);
|
||||
});
|
||||
}
|
||||
|
||||
future<reader_permit> reader_concurrency_semaphore::obtain_permit(const schema* const schema, sstring&& op_name, size_t memory,
|
||||
future<reader_permit> reader_concurrency_semaphore::obtain_permit(schema_ptr schema, sstring&& op_name, size_t memory,
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
auto permit = reader_permit(*this, std::move(schema), std::move(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
return do_wait_admission(*permit).then([permit] () mutable {
|
||||
return std::move(permit);
|
||||
});
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name,
|
||||
reader_permit reader_concurrency_semaphore::make_tracking_only_permit(schema_ptr schema, const char* const op_name,
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
return reader_permit(*this, schema, std::string_view(op_name), {}, timeout, std::move(trace_ptr));
|
||||
return reader_permit(*this, std::move(schema), std::string_view(op_name), {}, timeout, std::move(trace_ptr));
|
||||
}
|
||||
|
||||
reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name,
|
||||
reader_permit reader_concurrency_semaphore::make_tracking_only_permit(schema_ptr schema, sstring&& op_name,
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
return reader_permit(*this, schema, std::move(op_name), {}, timeout, std::move(trace_ptr));
|
||||
return reader_permit(*this, std::move(schema), std::move(op_name), {}, timeout, std::move(trace_ptr));
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::with_permit(const schema* const schema, const char* const op_name, size_t memory,
|
||||
future<> reader_concurrency_semaphore::with_permit(schema_ptr schema, const char* const op_name, size_t memory,
|
||||
db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func) {
|
||||
auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
auto permit = reader_permit(*this, std::move(schema), std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout, std::move(trace_ptr));
|
||||
permit->aux_data().func = std::move(func);
|
||||
permit->aux_data().permit_keepalive = permit;
|
||||
return do_wait_admission(*permit);
|
||||
|
||||
@@ -395,8 +395,8 @@ public:
|
||||
///
|
||||
/// Some permits cannot be associated with any table, so passing nullptr as
|
||||
/// the schema parameter is allowed.
|
||||
future<reader_permit> obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
future<reader_permit> obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
future<reader_permit> obtain_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
future<reader_permit> obtain_permit(schema_ptr schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
|
||||
/// Make a tracking only permit
|
||||
///
|
||||
@@ -411,8 +411,8 @@ public:
|
||||
///
|
||||
/// Some permits cannot be associated with any table, so passing nullptr as
|
||||
/// the schema parameter is allowed.
|
||||
reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
reader_permit make_tracking_only_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
reader_permit make_tracking_only_permit(schema_ptr schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
|
||||
/// Run the function through the semaphore's execution stage with an admitted permit
|
||||
///
|
||||
@@ -433,7 +433,7 @@ public:
|
||||
///
|
||||
/// Some permits cannot be associated with any table, so passing nullptr as
|
||||
/// the schema parameter is allowed.
|
||||
future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func);
|
||||
future<> with_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func);
|
||||
|
||||
/// Run the function through the semaphore's execution stage with a pre-admitted permit
|
||||
///
|
||||
|
||||
@@ -104,9 +104,9 @@ private:
|
||||
private:
|
||||
reader_permit() = default;
|
||||
reader_permit(shared_ptr<impl>);
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name,
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, std::string_view op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name,
|
||||
explicit reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name,
|
||||
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
|
||||
|
||||
reader_permit::impl& operator*() { return *_impl; }
|
||||
@@ -143,7 +143,7 @@ public:
|
||||
|
||||
reader_concurrency_semaphore& semaphore();
|
||||
|
||||
const ::schema* get_schema() const;
|
||||
const schema_ptr& get_schema() const;
|
||||
std::string_view get_op_name() const;
|
||||
state get_state() const;
|
||||
|
||||
|
||||
@@ -1470,7 +1470,7 @@ public:
|
||||
}
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override {
|
||||
auto& cf = _db.local().find_column_family(_table_id);
|
||||
return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr));
|
||||
return semaphore().obtain_permit(schema, description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1571,7 +1571,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
|
||||
querier_opt->permit().set_trace_state(trace_state);
|
||||
f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func));
|
||||
} else {
|
||||
f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func));
|
||||
f = co_await coroutine::as_future(semaphore.with_permit(s, "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func));
|
||||
}
|
||||
|
||||
if (!f.failed()) {
|
||||
@@ -1638,7 +1638,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
querier_opt->permit().set_trace_state(trace_state);
|
||||
f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func));
|
||||
} else {
|
||||
f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func));
|
||||
f = co_await coroutine::as_future(semaphore.with_permit(s, "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func));
|
||||
}
|
||||
|
||||
if (!f.failed()) {
|
||||
@@ -1688,7 +1688,7 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() {
|
||||
}
|
||||
|
||||
future<reader_permit> database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr));
|
||||
return get_reader_concurrency_semaphore().obtain_permit(tbl.schema(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr));
|
||||
}
|
||||
|
||||
future<reader_permit> database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) {
|
||||
@@ -1758,7 +1758,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
|
||||
// counter state for each modified cell...
|
||||
|
||||
tracing::trace(trace_state, "Reading counter values from the CF");
|
||||
auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout, trace_state);
|
||||
auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema, "counter-read-before-write", timeout, trace_state);
|
||||
return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state)
|
||||
.then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) {
|
||||
// ...now, that we got existing state of all affected counter
|
||||
|
||||
@@ -1136,7 +1136,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtabl
|
||||
|
||||
auto f = consumer(old->make_flush_reader(
|
||||
old->schema(),
|
||||
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout, {})));
|
||||
compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {})));
|
||||
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
||||
@@ -2612,7 +2612,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst) {
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
auto permit = semaphore.make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(mt.schema(), "mt_to_sst", db::no_timeout, {});
|
||||
auto reader = mt.make_flush_reader(mt.schema(), std::move(permit));
|
||||
co_await write_memtable_to_sstable(std::move(reader), mt, std::move(sst), mt.partition_count(), monitor, cfg);
|
||||
} catch (...) {
|
||||
@@ -2925,7 +2925,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(shared_ptr<d
|
||||
const bool need_static = db::view::needs_static_row(m.partition(), views);
|
||||
if (!need_regular && !need_static) {
|
||||
tracing::trace(tr_state, "View updates do not require read-before-write");
|
||||
co_await generate_and_propagate_view_updates(gen, base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now);
|
||||
co_await generate_and_propagate_view_updates(gen, base, sem.make_tracking_only_permit(s, "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now);
|
||||
// In this case we are not doing a read-before-write, just a
|
||||
// write, so no lock is needed.
|
||||
co_return row_locker::lock_holder();
|
||||
@@ -2958,7 +2958,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(shared_ptr<d
|
||||
co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout);
|
||||
auto lock = co_await std::move(lockf);
|
||||
auto pk = dht::partition_range::make_singular(m.decorated_key());
|
||||
auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout, tr_state);
|
||||
auto permit = sem.make_tracking_only_permit(base, "push-view-updates-2", timeout, tr_state);
|
||||
auto reader = source.make_reader_v2(base, permit, pk, slice, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
co_await this->generate_and_propagate_view_updates(gen, base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now);
|
||||
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
|
||||
@@ -3057,7 +3057,7 @@ public:
|
||||
return _cg._compaction_strategy_state;
|
||||
}
|
||||
reader_permit make_compaction_reader_permit() const override {
|
||||
return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout, {});
|
||||
return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "compaction", db::no_timeout, {});
|
||||
}
|
||||
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
||||
return _t.get_sstables_manager();
|
||||
|
||||
@@ -5396,7 +5396,12 @@ class scylla_read_stats(gdb.Command):
|
||||
total = permit_stats()
|
||||
|
||||
for permit in intrusive_list(permit_list):
|
||||
schema = permit['_schema']
|
||||
try:
|
||||
schema = permit['_schema']['_p']
|
||||
except:
|
||||
# schema is already a raw pointer in older versions
|
||||
schema = permit['_schema']
|
||||
|
||||
if schema:
|
||||
raw_schema = schema.dereference()['_raw']
|
||||
schema_name = "{}.{}".format(str(raw_schema['_ks_name']).replace('"', ''), str(raw_schema['_cf_name']).replace('"', ''))
|
||||
|
||||
@@ -1222,7 +1222,7 @@ future<> sstable::load_first_and_last_position_in_partition() {
|
||||
}
|
||||
|
||||
auto& sem = _manager.sstable_metadata_concurrency_sem();
|
||||
reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {});
|
||||
reader_permit permit = co_await sem.obtain_permit(_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {});
|
||||
auto first_pos_opt = co_await find_first_position_in_partition(permit, get_first_decorated_key(), false);
|
||||
auto last_pos_opt = co_await find_first_position_in_partition(permit, get_last_decorated_key(), true);
|
||||
|
||||
@@ -1859,7 +1859,7 @@ future<> sstable::generate_summary() {
|
||||
|
||||
auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.config().sstable_summary_ratio());
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
||||
*this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes,
|
||||
*this, sem.make_tracking_only_permit(_schema, "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes,
|
||||
make_file_input_stream(index_file, 0, index_size, std::move(options)), 0, index_size,
|
||||
(_version >= sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
|
||||
@@ -2746,7 +2746,7 @@ future<bool> sstable::has_partition_key(const utils::hashed_key& hk, const dht::
|
||||
auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()",
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
try {
|
||||
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout, {}));
|
||||
auto lh_index_ptr = std::make_unique<sstables::index_reader>(s, sem.make_tracking_only_permit(_schema, s->get_filename(), db::no_timeout, {}));
|
||||
present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
|
||||
@@ -953,7 +953,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){
|
||||
{
|
||||
std::vector<flat_mutation_reader_v2> readers;
|
||||
readers.reserve(memtables.size());
|
||||
auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {});
|
||||
auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {});
|
||||
for (auto mt : memtables) {
|
||||
readers.push_back(mt->make_flat_reader(s, permit));
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "utils/buffer_input_stream.hh"
|
||||
#include "test/lib/reader_concurrency_semaphore.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "sstables/processing_result_generator.hh"
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
@@ -1346,7 +1346,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
|
||||
auto q = query::querier(
|
||||
tbl.as_mutation_source(),
|
||||
tbl.schema(),
|
||||
database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}),
|
||||
database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
nullptr);
|
||||
|
||||
@@ -66,7 +66,7 @@ SEASTAR_THREAD_TEST_CASE(mutation_fragment_sanity_check) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__, reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_semaphore = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), "test", db::no_timeout, {});
|
||||
gc_clock::time_point ts(gc_clock::duration(1234567890000));
|
||||
|
||||
auto check_hash = [&] (const mutation_fragment& mf, uint64_t expected) {
|
||||
|
||||
@@ -357,7 +357,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
|
||||
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = sem.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
|
||||
const auto available_res = sem.available_resources();
|
||||
const sstring val(1024, 'a');
|
||||
@@ -427,7 +427,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
|
||||
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto expect = [&] (bool expect_valid, const char* desc, unsigned at, auto&& first_mf, auto&&... mf) {
|
||||
std::vector<mutation_fragment_v2> mfs;
|
||||
@@ -618,7 +618,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage
|
||||
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {});
|
||||
|
||||
mutation_fragment_stream_validator validator(*ss.schema());
|
||||
|
||||
|
||||
@@ -2942,7 +2942,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pkeys = s.make_pkeys(4);
|
||||
std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
|
||||
@@ -3220,7 +3220,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
auto pkeys = s.make_pkeys(6);
|
||||
boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
|
||||
|
||||
@@ -3270,7 +3270,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pkeys = s.make_pkeys(2);
|
||||
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
|
||||
@@ -3497,7 +3497,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_non_monotonic_positions) {
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto schema = s.schema();
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pkey = s.make_pkey();
|
||||
const auto prange = dht::partition_range::make_open_ended_both_sides();
|
||||
@@ -3555,7 +3555,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_p
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto schema = s.schema();
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pkeys = s.make_pkeys(2);
|
||||
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
|
||||
@@ -3653,7 +3653,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) {
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
simple_schema s;
|
||||
auto schema = s.schema();
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
|
||||
auto pk = s.make_pkey();
|
||||
const auto prange = dht::partition_range::make_open_ended_both_sides();
|
||||
|
||||
@@ -107,7 +107,7 @@ private:
|
||||
Querier make_querier(const dht::partition_range& range) {
|
||||
return Querier(_mutation_source,
|
||||
_s.schema(),
|
||||
_sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout, {}),
|
||||
_sem.make_tracking_only_permit(_s.schema(), "make-querier", db::no_timeout, {}),
|
||||
range,
|
||||
_s.schema()->full_slice(),
|
||||
nullptr);
|
||||
@@ -688,7 +688,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0);
|
||||
|
||||
// Drain all resources of the semaphore
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {});
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s, "sponge", db::no_timeout, {});
|
||||
auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources());
|
||||
|
||||
auto cmd2 = query::read_command(s->id(),
|
||||
@@ -737,13 +737,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
|
||||
test_querier_cache t;
|
||||
|
||||
auto& sem = t.get_semaphore();
|
||||
auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout, {}).get0();
|
||||
auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 0, db::no_timeout, {}).get0();
|
||||
|
||||
auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0));
|
||||
|
||||
BOOST_CHECK_EQUAL(sem.available_resources().count, 0);
|
||||
|
||||
auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout, {});
|
||||
auto fut = sem.obtain_permit(t.get_schema(), get_name(), 1, db::no_timeout, {});
|
||||
|
||||
BOOST_CHECK_EQUAL(sem.get_stats().waiters, 1);
|
||||
|
||||
@@ -769,8 +769,8 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) {
|
||||
.with_column("v", int32_type)
|
||||
.build();
|
||||
|
||||
auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {})));
|
||||
auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {})));
|
||||
auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema, get_name(), db::no_timeout, {})));
|
||||
auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema, get_name(), db::no_timeout, {})));
|
||||
|
||||
// Sanity check that lookup still works with empty handle.
|
||||
BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{}));
|
||||
|
||||
@@ -41,7 +41,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
|
||||
auto clear_permits = defer([&permits] { permits.clear(); });
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}));
|
||||
permits.emplace_back(semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}));
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permits.back())));
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads)
|
||||
handles.clear();
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}))));
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}))));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
|
||||
@@ -77,14 +77,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
|
||||
|
||||
// Not admitted, active
|
||||
{
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
auto units2 = permit.consume_memory(1024);
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources);
|
||||
|
||||
// Not admitted, inactive
|
||||
{
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
auto units2 = permit.consume_memory(1024);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
|
||||
@@ -94,14 +94,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
|
||||
|
||||
// Admitted, active
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0();
|
||||
auto permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get0();
|
||||
auto units1 = permit.consume_memory(1024);
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources);
|
||||
|
||||
// Admitted, inactive
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0();
|
||||
auto permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get0();
|
||||
auto units1 = permit.consume_memory(1024);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
|
||||
@@ -115,7 +115,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_clos
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
{
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
|
||||
// The handle is destroyed here, triggering the destrution of the inactive read.
|
||||
@@ -136,7 +136,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get();
|
||||
reader_permit_opt permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get();
|
||||
BOOST_REQUIRE_EQUAL(permit->consumed_resources(), base_resources);
|
||||
|
||||
std::optional<reader_permit::resource_units> residue_units;
|
||||
@@ -152,7 +152,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources - permit->consumed_resources());
|
||||
|
||||
if (i % 2) {
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
|
||||
auto sponge_permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
|
||||
auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources());
|
||||
|
||||
auto fut = make_ready_future<>();
|
||||
@@ -267,9 +267,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) {
|
||||
}
|
||||
future<> obtain_permit() {
|
||||
if (_memory_only) {
|
||||
_permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout, {});
|
||||
_permit = _semaphore.make_tracking_only_permit(_schema, "reader_m", db::no_timeout, {});
|
||||
} else {
|
||||
_permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {});
|
||||
_permit = co_await _semaphore.obtain_permit(_schema, fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {});
|
||||
}
|
||||
_units = _permit->consume_memory(tests::random::get_int(128, 1024));
|
||||
}
|
||||
@@ -564,7 +564,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) {
|
||||
for (auto& schema : schemas) {
|
||||
const auto nr_permits = tests::random::get_int<unsigned>(2, 32);
|
||||
for (unsigned i = 0; i < nr_permits; ++i) {
|
||||
auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int<unsigned>(0, nr_ops - 1)), db::no_timeout, {});
|
||||
auto permit = semaphore.make_tracking_only_permit(schema, op_names.at(tests::random::get_int<unsigned>(0, nr_ops - 1)), db::no_timeout, {});
|
||||
if (tests::random::get_int<unsigned>(0, 4)) {
|
||||
auto units = permit.consume_resources(reader_resources(tests::random::get_int<unsigned>(0, 1), tests::random::get_int<unsigned>(1024, 16 * 1024 * 1024)));
|
||||
permits.push_back(std::pair(std::move(permit), std::move(units)));
|
||||
@@ -614,7 +614,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
simple_schema s;
|
||||
const auto schema_ptr = s.schema().get();
|
||||
const auto schema = s.schema();
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
@@ -624,7 +624,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
testlog.trace("Running admission scenario {}, with exepcted_can_admit={}", description, expected_can_admit);
|
||||
const auto stats_before = semaphore.get_stats();
|
||||
|
||||
auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {});
|
||||
auto admit_fut = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {});
|
||||
admit_fut.wait();
|
||||
const bool can_admit = !admit_fut.failed();
|
||||
if (can_admit) {
|
||||
@@ -651,13 +651,13 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// resources and waitlist
|
||||
{
|
||||
reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
reader_permit_opt permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
|
||||
require_can_admit(true, "enough resources");
|
||||
|
||||
const auto stats_before = semaphore.get_stats();
|
||||
|
||||
auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout, {});
|
||||
auto enqueued_permit_fut = semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::no_timeout, {});
|
||||
{
|
||||
const auto stats_after = semaphore.get_stats();
|
||||
BOOST_REQUIRE(!enqueued_permit_fut.available());
|
||||
@@ -679,7 +679,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// need_cpu and awaits
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
|
||||
require_can_admit(true, "!need_cpu");
|
||||
{
|
||||
@@ -708,7 +708,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// forward progress -- readmission
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
|
||||
auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit));
|
||||
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
|
||||
@@ -734,7 +734,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// inactive readers
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
|
||||
require_can_admit(true, "!need_cpu");
|
||||
{
|
||||
@@ -762,10 +762,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
// evicting inactive readers for admission
|
||||
{
|
||||
auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto permit1 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit1));
|
||||
|
||||
auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto permit2 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto irh2 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit2));
|
||||
|
||||
require_can_admit(true, "evictable reads");
|
||||
@@ -780,7 +780,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
|
||||
const auto stats_before = semaphore.get_stats();
|
||||
|
||||
auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout, {});
|
||||
auto permit2_fut = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {});
|
||||
|
||||
const auto stats_after = semaphore.get_stats();
|
||||
BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted);
|
||||
@@ -801,7 +801,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
check_admitting_enqueued_read(
|
||||
[&] {
|
||||
return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get());
|
||||
return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get());
|
||||
},
|
||||
[] (reader_permit_opt& permit1) {
|
||||
permit1 = {};
|
||||
@@ -816,7 +816,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
check_admitting_enqueued_read(
|
||||
[&] {
|
||||
return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get());
|
||||
return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get());
|
||||
},
|
||||
[&] (reader_permit_opt& permit1) {
|
||||
return semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit1));
|
||||
@@ -830,7 +830,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
check_admitting_enqueued_read(
|
||||
[&] {
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
require_can_admit(true, "enough resources");
|
||||
return std::pair(permit, std::optional<reader_permit::need_cpu_guard>{permit});
|
||||
}, [&] (std::pair<reader_permit, std::optional<reader_permit::need_cpu_guard>>& permit_and_need_cpu_guard) {
|
||||
@@ -846,7 +846,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) {
|
||||
{
|
||||
check_admitting_enqueued_read(
|
||||
[&] {
|
||||
auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get();
|
||||
require_can_admit(true, "enough resources");
|
||||
return std::pair(permit, reader_permit::need_cpu_guard{permit});
|
||||
}, [&] (std::pair<reader_permit, reader_permit::need_cpu_guard>& permit_and_need_cpu_guard) {
|
||||
@@ -954,7 +954,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
for (auto& s : schemas) {
|
||||
auto& handles = schema_handles[&s];
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}))));
|
||||
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -981,7 +981,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
// Reproduces https://github.com/scylladb/scylladb/issues/11770
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_when_all_is_awaits) {
|
||||
simple_schema ss;
|
||||
const auto& s = *ss.schema();
|
||||
const auto& s = ss.schema();
|
||||
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 32 * 1024};
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
@@ -1009,10 +1009,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
}
|
||||
};
|
||||
|
||||
auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get();
|
||||
auto p1 = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get();
|
||||
auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1));
|
||||
|
||||
auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get();
|
||||
auto p2 = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get();
|
||||
read rd2(p2);
|
||||
auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func());
|
||||
|
||||
@@ -1021,7 +1021,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
// * 1 need_cpu (but not awaiting) read on the ready list
|
||||
// * 1 waiter
|
||||
// * no more count resources left
|
||||
auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {});
|
||||
auto p3_fut = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); // (waiters includes _ready_list entries)
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().need_cpu_permits, 0); // permit looses need_cpu status while waiting for execution
|
||||
@@ -1337,7 +1337,7 @@ memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_
|
||||
auto sst = tbl.make_sstable();
|
||||
auto writer_cfg = sst_man.configure_writer("test");
|
||||
sst->write_components(
|
||||
make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), mut, s->full_slice()),
|
||||
make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s, "test", db::no_timeout, {}), mut, s->full_slice()),
|
||||
1,
|
||||
s,
|
||||
writer_cfg,
|
||||
@@ -1561,7 +1561,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_ina
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get();
|
||||
auto permit = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get();
|
||||
|
||||
std::vector<reader_permit::resource_units> permit_res;
|
||||
|
||||
@@ -1590,7 +1590,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get());
|
||||
auto permit = reader_permit_opt(semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get());
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, *permit));
|
||||
|
||||
|
||||
@@ -4697,7 +4697,7 @@ SEASTAR_THREAD_TEST_CASE(test_cache_reader_semaphore_oom_kill) {
|
||||
// Check different amounts of memory consumed before the read, so the OOM kill is triggered in different places.
|
||||
for (unsigned memory = 1; memory <= 512; memory *= 2) {
|
||||
semaphore.set_resources({1, memory});
|
||||
auto permit = semaphore.obtain_permit(s.schema().get(), "read", 0, db::no_timeout, {}).get();
|
||||
auto permit = semaphore.obtain_permit(s.schema(), "read", 0, db::no_timeout, {}).get();
|
||||
auto create_reader_and_read_all = [&] {
|
||||
auto rd = cache.make_reader(s.schema(), permit, pr, &gc_state);
|
||||
auto close_rd = deferred_close(rd);
|
||||
|
||||
@@ -441,7 +441,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
auto write_to_sstable = [&] (mutation m) {
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer("test");
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {});
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {});
|
||||
sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
@@ -553,7 +553,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
|
||||
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test");
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {});
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {});
|
||||
sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
@@ -563,7 +563,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
|
||||
}).get0();
|
||||
|
||||
// consume all units except what is needed to admit a single reader.
|
||||
auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {});
|
||||
auto sponge_permit = sem.make_tracking_only_permit(s, "sponge", db::no_timeout, {});
|
||||
auto resources = sponge_permit.consume_resources(sem.available_resources() - reader_resources{1, replica::new_reader_base_cost});
|
||||
|
||||
testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory);
|
||||
@@ -625,7 +625,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak
|
||||
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test");
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {});
|
||||
auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {});
|
||||
sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
@@ -726,7 +726,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
void check(mutation mut) {
|
||||
// First we check that we would be able to create a reader, even
|
||||
// though the staging reader consumed all resources.
|
||||
auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0();
|
||||
auto permit = _semaphore.obtain_permit(_schema, "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0();
|
||||
|
||||
const size_t current_rows = rows_in_mut(mut);
|
||||
const auto total_rows = _partition_rows.at(mut.decorated_key());
|
||||
@@ -835,7 +835,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
return less(a.decorated_key(), b.decorated_key());
|
||||
});
|
||||
|
||||
auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0();
|
||||
auto permit = sem.obtain_permit(schema, get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0();
|
||||
|
||||
auto mt = make_memtable(schema, muts);
|
||||
auto p = make_manually_paused_evictable_reader_v2(
|
||||
@@ -936,7 +936,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_random_mutati
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
const abort_source as;
|
||||
auto mt = make_memtable(schema, {mut});
|
||||
auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0();
|
||||
auto permit = sem.obtain_permit(schema, get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0();
|
||||
auto p = make_manually_paused_evictable_reader_v2(
|
||||
mt->as_data_source(),
|
||||
schema,
|
||||
@@ -998,7 +998,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_empty_mutatio
|
||||
auto schema = ss.schema();
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost);
|
||||
auto stop_sem = deferred_stop(sem);
|
||||
auto permit = sem.make_tracking_only_permit(schema.get(), "test", db::no_timeout, {});
|
||||
auto permit = sem.make_tracking_only_permit(schema, "test", db::no_timeout, {});
|
||||
abort_source as;
|
||||
auto [staging_reader, staging_reader_handle] = make_manually_paused_evictable_reader_v2(make_empty_mutation_source(), schema, permit,
|
||||
query::full_partition_range, schema->full_slice(), {}, mutation_reader::forwarding::no);
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "../../reader_concurrency_semaphore.hh"
|
||||
#include "schema/schema.hh"
|
||||
|
||||
namespace tests {
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ public:
|
||||
return *_contexts[shard]->semaphore;
|
||||
}
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override {
|
||||
return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout, std::move(trace_ptr));
|
||||
return semaphore().obtain_permit(schema, description, 128 * 1024, timeout, std::move(trace_ptr));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -194,7 +194,7 @@ public:
|
||||
tmpdir& tempdir() noexcept { return _impl->dir; }
|
||||
data_dictionary::storage_options get_storage_options() const noexcept { return _impl->storage; }
|
||||
|
||||
reader_permit make_reader_permit(const schema* const s, const char* n, db::timeout_clock::time_point timeout) {
|
||||
reader_permit make_reader_permit(const schema_ptr &s, const char* n, db::timeout_clock::time_point timeout) {
|
||||
return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {});
|
||||
}
|
||||
reader_permit make_reader_permit(db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||
|
||||
@@ -87,7 +87,7 @@ public:
|
||||
return _compaction_strategy_state;
|
||||
}
|
||||
reader_permit make_compaction_reader_permit() const override {
|
||||
return table().compaction_concurrency_semaphore().make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {});
|
||||
return table().compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "table_for_tests::table_state", db::no_timeout, {});
|
||||
}
|
||||
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
||||
return _sstables_manager;
|
||||
|
||||
@@ -144,7 +144,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned reads, unsigned concurrency, std::function<future<>(unsigned)> read) {
|
||||
void execute_reads(const schema_ptr& schema, reader_concurrency_semaphore& sem, unsigned reads, unsigned concurrency, std::function<future<>(unsigned)> read) {
|
||||
const reader_resources initial_res = sem.available_resources();
|
||||
unsigned n = 0;
|
||||
gate g;
|
||||
@@ -175,7 +175,7 @@ void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned
|
||||
|
||||
if (sem.get_stats().waiters) {
|
||||
testlog.trace("Waiting for queue to drain");
|
||||
sem.obtain_permit(&s, "drain", 1, db::no_timeout, {}).get();
|
||||
sem.obtain_permit(schema, "drain", 1, db::no_timeout, {}).get();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,7 +267,7 @@ void test_main_thread(cql_test_env& env) {
|
||||
try {
|
||||
auto _ = sc.collect();
|
||||
memory::set_heap_profiling_sampling_rate(100);
|
||||
execute_reads(*s, sem, reads, read_concurrency, [&] (unsigned i) {
|
||||
execute_reads(s, sem, reads, read_concurrency, [&] (unsigned i) {
|
||||
return env.execute_cql(format("select * from ks.test where pk = 0 and ck > {} limit 100;",
|
||||
tests::random::get_int(rows / 2))).discard_result();
|
||||
});
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <seastar/core/memory.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "reader_concurrency_semaphore.hh"
|
||||
#include "schema/schema.hh"
|
||||
|
||||
|
||||
uint64_t perf_mallocs() {
|
||||
|
||||
@@ -54,7 +54,7 @@ struct table {
|
||||
}
|
||||
|
||||
reader_permit make_permit() {
|
||||
return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {});
|
||||
return semaphore.make_tracking_only_permit(s.schema(), "test", db::no_timeout, {});
|
||||
}
|
||||
future<> stop() noexcept {
|
||||
return semaphore.stop();
|
||||
|
||||
@@ -530,7 +530,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
|
||||
sst_man,
|
||||
schema_tables_path / schema_table_table_dir[s],
|
||||
schema_factory,
|
||||
rcs_sem.make_tracking_only_permit(s.get(), "schema_mutation", db::no_timeout, {}),
|
||||
rcs_sem.make_tracking_only_permit(s, "schema_mutation", db::no_timeout, {}),
|
||||
keyspace,
|
||||
{table});
|
||||
};
|
||||
@@ -552,7 +552,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
|
||||
sst_man,
|
||||
schema_tables_path / schema_table_table_dir[db::schema_tables::types()],
|
||||
db::schema_tables::types,
|
||||
rcs_sem.make_tracking_only_permit(db::schema_tables::types().get(), "types_mutation", db::no_timeout, {}),
|
||||
rcs_sem.make_tracking_only_permit(db::schema_tables::types(), "types_mutation", db::no_timeout, {}),
|
||||
keyspace,
|
||||
{});
|
||||
if (types_mut) {
|
||||
|
||||
@@ -3058,7 +3058,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
|
||||
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name, reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_semaphore = deferred_stop(rcs_sem);
|
||||
|
||||
const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout, {});
|
||||
const auto permit = rcs_sem.make_tracking_only_permit(schema, app_name, db::no_timeout, {});
|
||||
|
||||
try {
|
||||
operations_with_func.at(operation)(schema, permit, sstables, sst_man, app_config);
|
||||
|
||||
Reference in New Issue
Block a user