diff --git a/db/view/view.cc b/db/view/view.cc index 945b2aa6fa..c0f56b1805 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1747,7 +1747,7 @@ view_builder::view_builder(replica::database& db, db::system_keyspace& sys_ks, d , _sys_ks(sys_ks) , _sys_dist_ks(sys_dist_ks) , _mnotifier(mn) - , _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder", db::no_timeout)) { + , _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder", db::no_timeout, {})) { setup_metrics(); } diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 08022b63bb..4f88206b8e 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -137,7 +137,7 @@ future<> view_update_generator::start() { ssts->insert(sst); } - auto permit = _db.obtain_reader_permit(*t, "view_update_generator", db::no_timeout).get0(); + auto permit = _db.obtain_reader_permit(*t, "view_update_generator", db::no_timeout, {}).get0(); auto ms = mutation_source([this, ssts] ( schema_ptr s, reader_permit permit, diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 0a35ffea52..bbd6fde54c 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -213,7 +213,7 @@ public: tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) : _db(db) , _schema(std::move(s)) - , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout)) + , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout, trace_state)) , _cmd(cmd) , _ranges(ranges) , _trace_state(std::move(trace_state)) @@ -261,14 +261,14 @@ public: return *_semaphores[shard]; } - virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override { + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { const auto shard = this_shard_id(); auto& rm = _readers[shard]; if (rm.state == reader_state::successful_lookup) { rm.rparts->permit.set_max_result_size(get_max_result_size()); co_return rm.rparts->permit; } - auto permit = co_await _db.local().obtain_reader_permit(std::move(schema), description, timeout); + auto permit = co_await _db.local().obtain_reader_permit(std::move(schema), description, timeout, std::move(trace_ptr)); permit.set_max_result_size(get_max_result_size()); co_return permit; } diff --git a/mutation_writer/multishard_writer.cc b/mutation_writer/multishard_writer.cc index 6d11eba48f..e8be1d9a7c 100644 --- a/mutation_writer/multishard_writer.cc +++ b/mutation_writer/multishard_writer.cc @@ -113,7 +113,7 @@ future<> multishard_writer::make_shard_writer(unsigned shard) { reader = make_foreign(std::make_unique(std::move(reader)))] () mutable { auto s = gs.get(); auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, "shard_writer"); - auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout); + auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout, {}); auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader)); return make_foreign(std::make_unique(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer)); }).then([this, shard] (foreign_ptr> writer) { diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 7dbbcc8508..780fb6b426 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -140,6 +140,7 @@ private: uint64_t _sstables_read = 0; size_t _requested_memory = 0; uint64_t _oom_kills = 0; + tracing::trace_state_ptr _trace_ptr; // Not strictly related to the permit. // Used by the semaphore to to manage the permit. @@ -204,23 +205,25 @@ private: public: struct value_tag {}; - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout) + impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) : _semaphore(semaphore) , _schema(schema) , _op_name_view(op_name) , _base_resources(base_resources) , _ttl_timer([this] { on_timeout(); }) + , _trace_ptr(std::move(trace_ptr)) { set_timeout(timeout); _semaphore.on_permit_created(*this); } - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout) + impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) : _semaphore(semaphore) , _schema(schema) , _op_name(std::move(op_name)) , _op_name_view(_op_name) , _base_resources(base_resources) , _ttl_timer([this] { on_timeout(); }) + , _trace_ptr(std::move(trace_ptr)) { set_timeout(timeout); _semaphore.on_permit_created(*this); @@ -478,14 +481,14 @@ reader_permit::reader_permit(shared_ptr impl) : _impl(std::move(impl)) } reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, - reader_resources base_resources, db::timeout_clock::time_point timeout) - : _impl(::seastar::make_shared(semaphore, schema, op_name, base_resources, timeout)) + reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) + : _impl(::seastar::make_shared(semaphore, schema, op_name, base_resources, timeout, std::move(trace_ptr))) { } reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, - reader_resources base_resources, db::timeout_clock::time_point timeout) - : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name), base_resources, timeout)) + reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) + : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name), base_resources, timeout, std::move(trace_ptr))) { } @@ -1383,32 +1386,34 @@ void reader_concurrency_semaphore::on_permit_unblocked() noexcept { } future reader_concurrency_semaphore::obtain_permit(const schema* const schema, const char* const op_name, size_t memory, - db::timeout_clock::time_point timeout) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout); + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); return do_wait_admission(*permit).then([permit] () mutable { return std::move(permit); }); } future reader_concurrency_semaphore::obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, - db::timeout_clock::time_point timeout) { - auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast(memory)}, timeout); + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); return do_wait_admission(*permit).then([permit] () mutable { return std::move(permit); }); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout) { - return reader_permit(*this, schema, std::string_view(op_name), {}, timeout); +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name, + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + return reader_permit(*this, schema, std::string_view(op_name), {}, timeout, std::move(trace_ptr)); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout) { - return reader_permit(*this, schema, std::move(op_name), {}, timeout); +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name, + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + return reader_permit(*this, schema, std::move(op_name), {}, timeout, std::move(trace_ptr)); } future<> reader_concurrency_semaphore::with_permit(const schema* const schema, const char* const op_name, size_t memory, - db::timeout_clock::time_point timeout, read_func func) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout); + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func) { + auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); permit->aux_data().func = std::move(func); permit->aux_data().permit_keepalive = permit; return do_wait_admission(*permit); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 9eda3e375d..2aac743bf3 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -386,8 +386,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - future obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout); - future obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout); + future obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + future obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); /// Make a tracking only permit /// @@ -402,8 +402,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout); - reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout); + reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); /// Run the function through the semaphore's execution stage with an admitted permit /// @@ -424,7 +424,7 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, read_func func); + future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func); /// Run the function through the semaphore's execution stage with a pre-admitted permit /// diff --git a/reader_permit.hh b/reader_permit.hh index f927b776ad..759a9c1ad1 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -13,6 +13,7 @@ #include "db/timeout_clock.hh" #include "schema/schema_fwd.hh" +#include "tracing/trace_state.hh" #include "query_class_config.hh" namespace seastar { @@ -98,9 +99,9 @@ private: reader_permit() = default; reader_permit(shared_ptr); explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, - reader_resources base_resources, db::timeout_clock::time_point timeout); + reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, - reader_resources base_resources, db::timeout_clock::time_point timeout); + reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); reader_permit::impl& operator*() { return *_impl; } reader_permit::impl* operator->() { return _impl.get(); } diff --git a/readers/multishard.cc b/readers/multishard.cc index b34f78757f..8f8c597e9c 100644 --- a/readers/multishard.cc +++ b/readers/multishard.cc @@ -819,7 +819,7 @@ future<> shard_reader_v2::do_fill_buffer() { return lifecycle_policy->create_reader(std::move(s), std::move(permit), pr, ps, pc, std::move(ts), fwd_mr); }); auto s = gs.get(); - auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout()); + auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout(), _trace_state); auto rreader = make_foreign(std::make_unique(evictable_reader_v2::auto_pause::yes, std::move(ms), s, std::move(permit), *_pr, _ps, _pc, _trace_state, _fwd_mr)); diff --git a/readers/multishard.hh b/readers/multishard.hh index 48f9c3dc20..5716a5d555 100644 --- a/readers/multishard.hh +++ b/readers/multishard.hh @@ -97,7 +97,8 @@ public: /// `semaphore()`. /// /// This method will be called on the shard where the relevant reader lives. - virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) = 0; + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, + tracing::trace_state_ptr trace_ptr) = 0; }; /// Make a multishard_combining_reader. diff --git a/repair/row_level.cc b/repair/row_level.cc index d4f172c6c5..851def085c 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2779,7 +2779,7 @@ public: rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}", _shard_task.global_repair_id.uuid(), wanted, mem_sem.current(), max); - auto permit = _shard_task.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0(); + auto permit = _shard_task.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout, {}).get0(); repair_meta master(_shard_task.rs, _cf, @@ -3100,7 +3100,7 @@ repair_service::insert_repair_meta( reason] (schema_ptr s) { auto& db = get_db(); auto& cf = db.local().find_column_family(s->id()); - return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout).then([s = std::move(s), + return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout, {}).then([s = std::move(s), &cf, this, from, diff --git a/replica/database.cc b/replica/database.cc index 18ce73cf2a..7260f155b5 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1521,7 +1521,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti if (querier_opt) { f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func)); } else { - f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, read_func)); + f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); } if (!f.failed()) { @@ -1587,7 +1587,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh if (querier_opt) { f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func)); } else { - f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, read_func)); + f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); } if (!f.failed()) { @@ -1636,12 +1636,12 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { std::abort(); } -future database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout) { - return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout); +future database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); } -future database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout) { - return obtain_reader_permit(find_column_family(std::move(schema)), op_name, timeout); +future database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + return obtain_reader_permit(find_column_family(std::move(schema)), op_name, timeout, std::move(trace_ptr)); } std::ostream& operator<<(std::ostream& out, const column_family& cf) { @@ -1701,7 +1701,7 @@ future database::do_apply_counter_update(column_family& cf, const froz // counter state for each modified cell... tracing::trace(trace_state, "Reading counter values from the CF"); - auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout); + auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout, trace_state); return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state) .then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) { // ...now, that we got existing state of all affected counter @@ -2771,9 +2771,9 @@ flat_mutation_reader_v2 make_multishard_streaming_reader(distributed obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override { + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { auto& cf = _db.local().find_column_family(_table_id); - return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout); + return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); } }; auto ms = mutation_source([&db] (schema_ptr s, diff --git a/replica/database.hh b/replica/database.hh index 702db92a74..5ea8204cc2 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1716,8 +1716,8 @@ public: reader_concurrency_semaphore& get_reader_concurrency_semaphore(); // Convenience method to obtain an admitted permit. See reader_concurrency_semaphore::obtain_permit(). - future obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout); - future obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout); + future obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + future obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); bool is_internal_query() const; diff --git a/replica/table.cc b/replica/table.cc index 8165d17974..79e004c95c 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -874,7 +874,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptrmake_flush_reader( old->schema(), - compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout), + compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout, {}), service::get_local_memtable_flush_priority())); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush @@ -2271,7 +2271,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables:: std::make_unique(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable"), cfg, [&mt, sst] (auto& monitor, auto& semaphore, auto& cfg) { - return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout), mt, std::move(sst), monitor, cfg) + return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout, {}), mt, std::move(sst), monitor, cfg) .finally([&semaphore] { return semaphore->stop(); }); @@ -2605,7 +2605,7 @@ future table::do_push_view_replica_updates(schema_ptr s const bool need_static = db::view::needs_static_row(m.partition(), views); if (!need_regular && !need_static) { tracing::trace(tr_state, "View updates do not require read-before-write"); - co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout), std::move(views), std::move(m), { }, std::move(tr_state), now); + co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now); // In this case we are not doing a read-before-write, just a // write, so no lock is needed. co_return row_locker::lock_holder(); @@ -2638,7 +2638,7 @@ future table::do_push_view_replica_updates(schema_ptr s co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout); auto lock = co_await std::move(lockf); auto pk = dht::partition_range::make_singular(m.decorated_key()); - auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout); + auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout, tr_state); auto reader = source.make_reader_v2(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now); tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); @@ -2723,7 +2723,7 @@ public: return _t.get_compaction_strategy(); } reader_permit make_compaction_reader_permit() const override { - return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout); + return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _t.get_sstables_manager(); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 46e41326c3..f23f78a5ec 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1291,7 +1291,7 @@ future<> sstable::load_first_and_last_position_in_partition() { } auto& sem = _manager.sstable_metadata_concurrency_sem(); - reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout); + reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {}); auto first_pos_opt = co_await find_first_position_in_partition(permit, get_first_decorated_key(), false); auto last_pos_opt = co_await find_first_position_in_partition(permit, get_last_decorated_key(), true); @@ -1888,7 +1888,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) { auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.config().sstable_summary_ratio()); auto ctx = make_lw_shared>( - *this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout), s, trust_promoted_index::yes, + *this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes, make_file_input_stream(index_file, 0, index_size, std::move(options)), 0, index_size, (_version >= sstable_version_types::mc ? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header())) @@ -3032,7 +3032,7 @@ future sstable::has_partition_key(const utils::hashed_key& hk, const dht:: std::exception_ptr ex; auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()"); try { - auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout), default_priority_class(), tracing::trace_state_ptr(), use_caching::yes); + auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout, {}), default_priority_class(), tracing::trace_state_ptr(), use_caching::yes); present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk); } catch (...) { ex = std::current_exception(); diff --git a/sstables_loader.cc b/sstables_loader.cc index 5ac9585121..c95e58b2e3 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -162,7 +162,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name, size_t num_partitions_processed = 0; size_t num_bytes_read = 0; nr_sst_current += sst_processed.size(); - auto permit = co_await _db.local().obtain_reader_permit(table, "sstables_loader::load_and_stream()", db::no_timeout); + auto permit = co_await _db.local().obtain_reader_permit(table, "sstables_loader::load_and_stream()", db::no_timeout, {}); auto reader = mutation_fragment_v1_stream(table.make_streaming_reader(s, std::move(permit), full_partition_range, sst_set)); std::exception_ptr eptr; bool failed = false; diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index c10ad793e9..5882e55bc2 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -121,7 +121,7 @@ void stream_manager::init_messaging_service_handler() { utils::fb_utilities::get_broadcast_address()))); } return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable { - return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable { + return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable { auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source); struct stream_mutation_fragments_cmd_status { bool got_cmd = false; diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 854c23a834..59ec7f62ee 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -197,7 +197,7 @@ future<> stream_transfer_task::execute() { auto& sm = session->manager(); return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (stream_manager& sm) mutable { auto& tbl = sm.db().find_column_family(cf_id); - return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable { + return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable { auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) { sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz); }); diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index c1096dbb4f..9fdc4b2487 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -644,7 +644,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){ { std::vector readers; readers.reserve(memtables.size()); - auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); + auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); for (auto mt : memtables) { readers.push_back(mt->make_flat_reader(s, permit)); } diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index a882e8fa46..786f8394ed 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -1231,7 +1231,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) { auto q = query::querier( tbl.as_mutation_source(), tbl.schema(), - database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout), + database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), query::full_partition_range, s->full_slice(), default_priority_class(), diff --git a/test/boost/hashers_test.cc b/test/boost/hashers_test.cc index 5e1256f8f2..169e641130 100644 --- a/test/boost/hashers_test.cc +++ b/test/boost/hashers_test.cc @@ -65,7 +65,7 @@ SEASTAR_THREAD_TEST_CASE(mutation_fragment_sanity_check) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__); auto stop_semaphore = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {}); gc_clock::time_point ts(gc_clock::duration(1234567890000)); auto check_hash = [&] (const mutation_fragment& mf, uint64_t expected) { diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc index f35739c888..ff413f94ff 100644 --- a/test/boost/mutation_fragment_test.cc +++ b/test/boost/mutation_fragment_test.cc @@ -358,7 +358,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); const auto available_res = sem.available_resources(); const sstring val(1024, 'a'); @@ -428,7 +428,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout); + auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {}); auto expect = [&] (bool expect_valid, const char* desc, unsigned at, auto&& first_mf, auto&&... mf) { std::vector mfs; @@ -619,7 +619,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout); + auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {}); mutation_fragment_stream_validator validator(*ss.schema()); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 749d90a6e7..98419b873b 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2921,7 +2921,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(4); std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3199,7 +3199,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(6); boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3250,7 +3250,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { @@ -3477,7 +3477,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_non_monotonic_positions) { auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkey = s.make_pkey(); const auto prange = dht::partition_range::make_open_ended_both_sides(); @@ -3536,7 +3536,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_p auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 9ec6819c30..ffdfd6687a 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -105,7 +105,7 @@ private: Querier make_querier(const dht::partition_range& range) { return Querier(_mutation_source, _s.schema(), - _sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout), + _sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout, {}), range, _s.schema()->full_slice(), service::get_local_sstable_query_read_priority(), @@ -674,7 +674,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0); // Drain all resources of the semaphore - auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout); + auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {}); auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto cmd2 = query::read_command(s->id(), @@ -723,13 +723,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) { test_querier_cache t; auto& sem = t.get_semaphore(); - auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout).get0(); + auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout, {}).get0(); auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0)); BOOST_CHECK_EQUAL(sem.available_resources().count, 0); - auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout); + auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout, {}); BOOST_CHECK_EQUAL(sem.get_stats().waiters, 1); @@ -755,8 +755,8 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) { .with_column("v", int32_type) .build(); - auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout))); - auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout))); + auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {}))); + auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {}))); // Sanity check that lookup still works with empty handle. BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{})); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 60c3838600..51c7027d19 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -41,7 +41,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) auto clear_permits = defer([&permits] { permits.clear(); }); for (int i = 0; i < 10; ++i) { - permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout)); + permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})); handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permits.back()))); } @@ -59,7 +59,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) handles.clear(); for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout)))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})))); } BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); })); @@ -77,14 +77,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Not admitted, active { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto units2 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Not admitted, inactive { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto units2 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); @@ -94,14 +94,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Admitted, active { - auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0(); auto units1 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Admitted, inactive { - auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0(); auto units1 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); @@ -115,7 +115,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_clos reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); { auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); // The handle is destroyed here, triggering the destrution of the inactive read. @@ -136,7 +136,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves auto stop_sem = deferred_stop(semaphore); - reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get(); + reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get(); BOOST_REQUIRE_EQUAL(permit->consumed_resources(), base_resources); std::optional residue_units; @@ -152,7 +152,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources - permit->consumed_resources()); if (i % 2) { - auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto fut = make_ready_future<>(); @@ -267,9 +267,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { } future<> obtain_permit() { if (_memory_only) { - _permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout); + _permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout, {}); } else { - _permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout); + _permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {}); } _units = _permit->consume_memory(tests::random::get_int(128, 1024)); } @@ -414,7 +414,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) { return async([&] { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 4 * 1024); auto stop_sem = deferred_stop(semaphore); - auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout, {}).get(); { auto tracked_file = make_tracked_file(file(shared_ptr(make_shared())), permit); @@ -474,11 +474,11 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) { { auto timeout = db::timeout_clock::now() + std::chrono::duration_cast(std::chrono::milliseconds{1}); - reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout).get(); + reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout, {}).get(); - auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout); + auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout, {}); - auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, timeout); + auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); @@ -516,15 +516,15 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) { auto stop_sem = deferred_stop(semaphore); { - reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, db::no_timeout).get(); + reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, db::no_timeout, {}).get(); - auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, db::no_timeout); + auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, db::no_timeout, {}); - auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, db::no_timeout); + auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, db::no_timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); - auto permit4_fut = semaphore.obtain_permit(nullptr, "permit4", replica::new_reader_base_cost, db::no_timeout); + auto permit4_fut = semaphore.obtain_permit(nullptr, "permit4", replica::new_reader_base_cost, db::no_timeout, {}); // The queue should now be full. BOOST_REQUIRE_THROW(permit4_fut.get(), std::runtime_error); @@ -564,7 +564,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) { for (auto& schema : schemas) { const auto nr_permits = tests::random::get_int(2, 32); for (unsigned i = 0; i < nr_permits; ++i) { - auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout, {}); if (tests::random::get_int(0, 4)) { auto units = permit.consume_resources(reader_resources(tests::random::get_int(0, 1), tests::random::get_int(1024, 16 * 1024 * 1024))); permits.push_back(std::pair(std::move(permit), std::move(units))); @@ -597,7 +597,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits BOOST_TEST_MESSAGE("1 permit"); { auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, get_name()); - auto permit = std::make_unique(semaphore->make_tracking_only_permit(nullptr, "permit1", db::no_timeout)); + auto permit = std::make_unique(semaphore->make_tracking_only_permit(nullptr, "permit1", db::no_timeout, {})); // Test will fail via use-after-free auto f = semaphore->stop().then([semaphore = std::move(semaphore)] { }); @@ -623,7 +623,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { testlog.trace("Running admission scenario {}, with exepcted_can_admit={}", description, expected_can_admit); const auto stats_before = semaphore.get_stats(); - auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()); + auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}); admit_fut.wait(); const bool can_admit = !admit_fut.failed(); if (can_admit) { @@ -650,13 +650,13 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // resources and waitlist { - reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); const auto stats_before = semaphore.get_stats(); - auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout); + auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout, {}); { const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE(!enqueued_permit_fut.available()); @@ -678,7 +678,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // used and blocked { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "!used"); { @@ -698,7 +698,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // forward progress -- resources { - auto sponge_permit = semaphore.make_tracking_only_permit(nullptr, "sponge", db::no_timeout); + auto sponge_permit = semaphore.make_tracking_only_permit(nullptr, "sponge", db::no_timeout, {}); sponge_permit.consume_resources(reader_resources::with_memory(semaphore.available_resources().memory)); require_can_admit(true, "semaphore with no memory but all count available"); } @@ -707,7 +707,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // forward progress -- readmission { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); BOOST_REQUIRE(semaphore.try_evict_one_inactive_read()); @@ -733,7 +733,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // inactive readers { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "!used"); { @@ -761,10 +761,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // evicting inactive readers for admission { - auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit1)); - auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh2 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit2)); require_can_admit(true, "evictable reads"); @@ -779,7 +779,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { const auto stats_before = semaphore.get_stats(); - auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout); + auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout, {}); const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted); @@ -800,7 +800,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now()).get()); + return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); }, [] (reader_permit_opt& permit1) { permit1 = {}; @@ -815,7 +815,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now()).get()); + return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); }, [&] (reader_permit_opt& permit1) { return semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit1)); @@ -829,7 +829,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, std::optional{permit}); }, [&] (std::pair>& permit_and_used_guard) { @@ -845,7 +845,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, reader_permit::used_guard{permit}); }, [&] (std::pair& permit_and_used_guard) { @@ -866,7 +866,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) { BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 0); BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0); - auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); for (auto scenario = 0; scenario < 5; ++scenario) { testlog.info("Running scenario {}", scenario); @@ -953,7 +953,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ for (auto& s : schemas) { auto& handles = schema_handles[&s]; for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout)))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})))); } } @@ -1008,10 +1008,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ } }; - auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get(); + auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get(); auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1)); - auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get(); + auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get(); read rd2(p2); auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func()); @@ -1020,7 +1020,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // * 1 used (but not blocked) read on the ready list // * 1 waiter // * no more count resources left - auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout); + auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); // (waiters includes _ready_list entries) BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 0); // permit looses used status while waiting for execution @@ -1061,8 +1061,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); - auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); - auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(2, 2 * 1024)); BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024)); @@ -1082,7 +1082,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) { BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(-1, 1024)); BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(1, 3 * 1024)); - auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1); @@ -1128,7 +1128,7 @@ private: public: explicit allocating_reader(reader_concurrency_semaphore& sem) : _sem(sem) { testlog.debug("[{}] allocating_reader created", fmt::ptr(this)); - _admission_fut = sem.obtain_permit(nullptr, "reader", admission_cost, db::no_timeout).then_wrapped([this] (future&& permit_fut) { + _admission_fut = sem.obtain_permit(nullptr, "reader", admission_cost, db::no_timeout, {}).then_wrapped([this] (future&& permit_fut) { try { _permit = std::move(permit_fut.get()); _state = state::request_memory; @@ -1335,7 +1335,7 @@ memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_ auto sst = sst_man.make_sstable(s, sstables_dir.path().string(), sstables::generation_type{num_sstables}); auto writer_cfg = sst_man.configure_writer("test"); sst->write_components( - make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout), mut, s->full_slice()), + make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), mut, s->full_slice()), 1, s, writer_cfg, @@ -1493,7 +1493,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier)); auto stop_sem = deferred_stop(semaphore); - auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); uint64_t reads_enqueued_for_memory = 0; @@ -1528,20 +1528,20 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser // unused { - auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); do_check(permit, 0, 0, std::source_location::current()); } // used { - auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); reader_permit::used_guard ug{permit}; do_check(permit, 1, 0, std::source_location::current()); } // blocked { - auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); reader_permit::used_guard ug{permit}; reader_permit::blocked_guard bg{permit}; do_check(permit, 1, 1, std::source_location::current()); @@ -1559,7 +1559,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_ina simple_schema ss; auto s = ss.schema(); - auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout).get(); + auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get(); std::vector permit_res; @@ -1588,7 +1588,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re simple_schema ss; auto s = ss.schema(); - auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout).get()); + auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get()); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, *permit)); @@ -1618,8 +1618,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_me utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier)); auto stop_sem = deferred_stop(semaphore); - auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); - auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); std::vector res; diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 9f74ec52bc..f0af35f2ce 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -442,7 +442,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) { sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -556,7 +556,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -566,7 +566,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { }).get0(); // consume all units except what is needed to admit a single reader. - auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout); + auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {}); auto resources = sponge_permit.consume_resources(sem.available_resources() - reader_resources{1, replica::new_reader_base_cost}); testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory); @@ -630,7 +630,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -731,7 +731,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { void check(mutation mut) { // First we check that we would be able to create a reader, even // though the staging reader consumed all resources. - auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now()).get0(); + auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0(); const size_t current_rows = rows_in_mut(mut); const auto total_rows = _partition_rows.at(mut.decorated_key()); @@ -836,7 +836,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { return less(a.decorated_key(), b.decorated_key()); }); - auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout).get0(); + auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); auto mt = make_lw_shared(schema); for (const auto& mut : muts) { diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 23eff1c2c8..98b71bed8e 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -349,7 +349,7 @@ public: table_name = std::move(table_name)] (replica::database& db) mutable { auto& cf = db.find_column_family(ks_name, table_name); auto schema = cf.schema(); - auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()", db::no_timeout); + auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()", db::no_timeout, {}); return cf.find_partition_slow(schema, permit, pkey) .then([schema, ckey, column_name, exp] (replica::column_family::const_mutation_partition_ptr p) { assert(p != nullptr); @@ -997,7 +997,7 @@ future<> do_with_cql_env_thread(std::function func, cql_tes } reader_permit make_reader_permit(cql_test_env& env) { - return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test", db::no_timeout); + return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test", db::no_timeout, {}); } cql_test_config raft_cql_test_config() { diff --git a/test/lib/reader_concurrency_semaphore.hh b/test/lib/reader_concurrency_semaphore.hh index 13b1758183..eb105fbf84 100644 --- a/test/lib/reader_concurrency_semaphore.hh +++ b/test/lib/reader_concurrency_semaphore.hh @@ -26,7 +26,7 @@ public: } reader_concurrency_semaphore& semaphore() { return *_semaphore; }; - reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test", db::no_timeout); } + reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test", db::no_timeout, {}); } }; } // namespace tests diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index d32012d710..ffd71cbb36 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -95,8 +95,8 @@ public: } return *_contexts[shard]->semaphore; } - virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override { - return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout); + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { + return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout, std::move(trace_ptr)); } }; diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 85e9faa6c9..1fb634ff58 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -171,10 +171,10 @@ public: tmpdir& tempdir() noexcept { return _impl->dir; } reader_permit make_reader_permit(const schema* const s, const char* n, db::timeout_clock::time_point timeout) { - return _impl->semaphore.make_tracking_only_permit(s, n, timeout); + return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {}); } reader_permit make_reader_permit(db::timeout_clock::time_point timeout = db::no_timeout) { - return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout); + return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout, {}); } replica::table::config make_table_config() { diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 84eb421174..2966fcdaa9 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -84,7 +84,7 @@ public: return table().get_compaction_strategy(); } reader_permit make_compaction_reader_permit() const override { - return _data.semaphore.make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout); + return _data.semaphore.make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _sstables_manager; diff --git a/test/manual/sstable_scan_footprint_test.cc b/test/manual/sstable_scan_footprint_test.cc index 33f48a74d0..0374f7919e 100644 --- a/test/manual/sstable_scan_footprint_test.cc +++ b/test/manual/sstable_scan_footprint_test.cc @@ -175,7 +175,7 @@ void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned if (sem.get_stats().waiters) { testlog.trace("Waiting for queue to drain"); - sem.obtain_permit(&s, "drain", 1, db::no_timeout).get(); + sem.obtain_permit(&s, "drain", 1, db::no_timeout, {}).get(); } } diff --git a/test/perf/perf.cc b/test/perf/perf.cc index e18974d257..0e0ab74d66 100644 --- a/test/perf/perf.cc +++ b/test/perf/perf.cc @@ -82,7 +82,7 @@ reader_concurrency_semaphore_wrapper::~reader_concurrency_semaphore_wrapper() { } reader_permit reader_concurrency_semaphore_wrapper::make_permit() { - return _semaphore->make_tracking_only_permit(nullptr, "perf", db::no_timeout); + return _semaphore->make_tracking_only_permit(nullptr, "perf", db::no_timeout, {}); } } // namespace perf diff --git a/test/unit/row_cache_stress_test.cc b/test/unit/row_cache_stress_test.cc index f7b925cd2c..01f7b90c19 100644 --- a/test/unit/row_cache_stress_test.cc +++ b/test/unit/row_cache_stress_test.cc @@ -54,7 +54,7 @@ struct table { } reader_permit make_permit() { - return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout); + return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {}); } future<> stop() noexcept { return semaphore.stop(); diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index e8b2c83f70..f95e427882 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -2854,7 +2854,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big- reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name); auto stop_semaphore = deferred_stop(rcs_sem); - const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout); + const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout, {}); operation(schema, permit, sstables, sst_man, app_config);