From 156e5d346d427abc4c79075cb2e71d68ed35d703 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 17 Mar 2023 07:59:24 -0400 Subject: [PATCH] reader_permit: keep trace_state pointer on permit And propagate it down to where it is created. This will be used to add trace points for semaphore related events, but this will come in the next patches. --- db/view/view.cc | 2 +- db/view/view_update_generator.cc | 2 +- multishard_mutation_query.cc | 6 +- mutation_writer/multishard_writer.cc | 2 +- reader_concurrency_semaphore.cc | 37 +++--- reader_concurrency_semaphore.hh | 10 +- reader_permit.hh | 5 +- readers/multishard.cc | 2 +- readers/multishard.hh | 3 +- repair/row_level.cc | 4 +- replica/database.cc | 18 +-- replica/database.hh | 4 +- replica/table.cc | 10 +- sstables/sstables.cc | 6 +- sstables_loader.cc | 2 +- streaming/stream_session.cc | 2 +- streaming/stream_transfer_task.cc | 2 +- test/boost/commitlog_test.cc | 2 +- test/boost/database_test.cc | 2 +- test/boost/hashers_test.cc | 2 +- test/boost/mutation_fragment_test.cc | 6 +- test/boost/mutation_reader_test.cc | 10 +- test/boost/querier_cache_test.cc | 12 +- .../reader_concurrency_semaphore_test.cc | 106 +++++++++--------- test/boost/view_build_test.cc | 12 +- test/lib/cql_test_env.cc | 4 +- test/lib/reader_concurrency_semaphore.hh | 2 +- test/lib/reader_lifecycle_policy.hh | 4 +- test/lib/sstable_test_env.hh | 4 +- test/lib/test_services.cc | 2 +- test/manual/sstable_scan_footprint_test.cc | 2 +- test/perf/perf.cc | 2 +- test/unit/row_cache_stress_test.cc | 2 +- tools/scylla-sstable.cc | 2 +- 34 files changed, 150 insertions(+), 143 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 945b2aa6fa..c0f56b1805 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1747,7 +1747,7 @@ view_builder::view_builder(replica::database& db, db::system_keyspace& sys_ks, d , _sys_ks(sys_ks) , _sys_dist_ks(sys_dist_ks) , _mnotifier(mn) - , _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder", db::no_timeout)) { + , _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder", db::no_timeout, {})) { setup_metrics(); } diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 08022b63bb..4f88206b8e 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -137,7 +137,7 @@ future<> view_update_generator::start() { ssts->insert(sst); } - auto permit = _db.obtain_reader_permit(*t, "view_update_generator", db::no_timeout).get0(); + auto permit = _db.obtain_reader_permit(*t, "view_update_generator", db::no_timeout, {}).get0(); auto ms = mutation_source([this, ssts] ( schema_ptr s, reader_permit permit, diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 0a35ffea52..bbd6fde54c 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -213,7 +213,7 @@ public: tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) : _db(db) , _schema(std::move(s)) - , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout)) + , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout, trace_state)) , _cmd(cmd) , _ranges(ranges) , _trace_state(std::move(trace_state)) @@ -261,14 +261,14 @@ public: return *_semaphores[shard]; } - virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override { + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { const auto shard = this_shard_id(); auto& rm = _readers[shard]; if (rm.state == reader_state::successful_lookup) { rm.rparts->permit.set_max_result_size(get_max_result_size()); co_return rm.rparts->permit; } - auto permit = co_await _db.local().obtain_reader_permit(std::move(schema), description, timeout); + auto permit = co_await _db.local().obtain_reader_permit(std::move(schema), description, timeout, std::move(trace_ptr)); permit.set_max_result_size(get_max_result_size()); co_return permit; } diff --git a/mutation_writer/multishard_writer.cc b/mutation_writer/multishard_writer.cc index 6d11eba48f..e8be1d9a7c 100644 --- a/mutation_writer/multishard_writer.cc +++ b/mutation_writer/multishard_writer.cc @@ -113,7 +113,7 @@ future<> multishard_writer::make_shard_writer(unsigned shard) { reader = make_foreign(std::make_unique(std::move(reader)))] () mutable { auto s = gs.get(); auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, "shard_writer"); - auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout); + auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout, {}); auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader)); return make_foreign(std::make_unique(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer)); }).then([this, shard] (foreign_ptr> writer) { diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 7dbbcc8508..780fb6b426 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -140,6 +140,7 @@ private: uint64_t _sstables_read = 0; size_t _requested_memory = 0; uint64_t _oom_kills = 0; + tracing::trace_state_ptr _trace_ptr; // Not strictly related to the permit. // Used by the semaphore to to manage the permit. @@ -204,23 +205,25 @@ private: public: struct value_tag {}; - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout) + impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) : _semaphore(semaphore) , _schema(schema) , _op_name_view(op_name) , _base_resources(base_resources) , _ttl_timer([this] { on_timeout(); }) + , _trace_ptr(std::move(trace_ptr)) { set_timeout(timeout); _semaphore.on_permit_created(*this); } - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout) + impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) : _semaphore(semaphore) , _schema(schema) , _op_name(std::move(op_name)) , _op_name_view(_op_name) , _base_resources(base_resources) , _ttl_timer([this] { on_timeout(); }) + , _trace_ptr(std::move(trace_ptr)) { set_timeout(timeout); _semaphore.on_permit_created(*this); @@ -478,14 +481,14 @@ reader_permit::reader_permit(shared_ptr impl) : _impl(std::move(impl)) } reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, - reader_resources base_resources, db::timeout_clock::time_point timeout) - : _impl(::seastar::make_shared(semaphore, schema, op_name, base_resources, timeout)) + reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) + : _impl(::seastar::make_shared(semaphore, schema, op_name, base_resources, timeout, std::move(trace_ptr))) { } reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, - reader_resources base_resources, db::timeout_clock::time_point timeout) - : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name), base_resources, timeout)) + reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) + : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name), base_resources, timeout, std::move(trace_ptr))) { } @@ -1383,32 +1386,34 @@ void reader_concurrency_semaphore::on_permit_unblocked() noexcept { } future reader_concurrency_semaphore::obtain_permit(const schema* const schema, const char* const op_name, size_t memory, - db::timeout_clock::time_point timeout) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout); + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); return do_wait_admission(*permit).then([permit] () mutable { return std::move(permit); }); } future reader_concurrency_semaphore::obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, - db::timeout_clock::time_point timeout) { - auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast(memory)}, timeout); + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); return do_wait_admission(*permit).then([permit] () mutable { return std::move(permit); }); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout) { - return reader_permit(*this, schema, std::string_view(op_name), {}, timeout); +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name, + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + return reader_permit(*this, schema, std::string_view(op_name), {}, timeout, std::move(trace_ptr)); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout) { - return reader_permit(*this, schema, std::move(op_name), {}, timeout); +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name, + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + return reader_permit(*this, schema, std::move(op_name), {}, timeout, std::move(trace_ptr)); } future<> reader_concurrency_semaphore::with_permit(const schema* const schema, const char* const op_name, size_t memory, - db::timeout_clock::time_point timeout, read_func func) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout); + db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func) { + auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); permit->aux_data().func = std::move(func); permit->aux_data().permit_keepalive = permit; return do_wait_admission(*permit); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 9eda3e375d..2aac743bf3 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -386,8 +386,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - future obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout); - future obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout); + future obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + future obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); /// Make a tracking only permit /// @@ -402,8 +402,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout); - reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout); + reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); /// Run the function through the semaphore's execution stage with an admitted permit /// @@ -424,7 +424,7 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, read_func func); + future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func); /// Run the function through the semaphore's execution stage with a pre-admitted permit /// diff --git a/reader_permit.hh b/reader_permit.hh index f927b776ad..759a9c1ad1 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -13,6 +13,7 @@ #include "db/timeout_clock.hh" #include "schema/schema_fwd.hh" +#include "tracing/trace_state.hh" #include "query_class_config.hh" namespace seastar { @@ -98,9 +99,9 @@ private: reader_permit() = default; reader_permit(shared_ptr); explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, - reader_resources base_resources, db::timeout_clock::time_point timeout); + reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, - reader_resources base_resources, db::timeout_clock::time_point timeout); + reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); reader_permit::impl& operator*() { return *_impl; } reader_permit::impl* operator->() { return _impl.get(); } diff --git a/readers/multishard.cc b/readers/multishard.cc index b34f78757f..8f8c597e9c 100644 --- a/readers/multishard.cc +++ b/readers/multishard.cc @@ -819,7 +819,7 @@ future<> shard_reader_v2::do_fill_buffer() { return lifecycle_policy->create_reader(std::move(s), std::move(permit), pr, ps, pc, std::move(ts), fwd_mr); }); auto s = gs.get(); - auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout()); + auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout(), _trace_state); auto rreader = make_foreign(std::make_unique(evictable_reader_v2::auto_pause::yes, std::move(ms), s, std::move(permit), *_pr, _ps, _pc, _trace_state, _fwd_mr)); diff --git a/readers/multishard.hh b/readers/multishard.hh index 48f9c3dc20..5716a5d555 100644 --- a/readers/multishard.hh +++ b/readers/multishard.hh @@ -97,7 +97,8 @@ public: /// `semaphore()`. /// /// This method will be called on the shard where the relevant reader lives. - virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) = 0; + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, + tracing::trace_state_ptr trace_ptr) = 0; }; /// Make a multishard_combining_reader. diff --git a/repair/row_level.cc b/repair/row_level.cc index d4f172c6c5..851def085c 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2779,7 +2779,7 @@ public: rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}", _shard_task.global_repair_id.uuid(), wanted, mem_sem.current(), max); - auto permit = _shard_task.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0(); + auto permit = _shard_task.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout, {}).get0(); repair_meta master(_shard_task.rs, _cf, @@ -3100,7 +3100,7 @@ repair_service::insert_repair_meta( reason] (schema_ptr s) { auto& db = get_db(); auto& cf = db.local().find_column_family(s->id()); - return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout).then([s = std::move(s), + return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout, {}).then([s = std::move(s), &cf, this, from, diff --git a/replica/database.cc b/replica/database.cc index 18ce73cf2a..7260f155b5 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1521,7 +1521,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti if (querier_opt) { f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func)); } else { - f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, read_func)); + f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); } if (!f.failed()) { @@ -1587,7 +1587,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh if (querier_opt) { f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func)); } else { - f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, read_func)); + f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); } if (!f.failed()) { @@ -1636,12 +1636,12 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { std::abort(); } -future database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout) { - return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout); +future database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); } -future database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout) { - return obtain_reader_permit(find_column_family(std::move(schema)), op_name, timeout); +future database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { + return obtain_reader_permit(find_column_family(std::move(schema)), op_name, timeout, std::move(trace_ptr)); } std::ostream& operator<<(std::ostream& out, const column_family& cf) { @@ -1701,7 +1701,7 @@ future database::do_apply_counter_update(column_family& cf, const froz // counter state for each modified cell... tracing::trace(trace_state, "Reading counter values from the CF"); - auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout); + auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout, trace_state); return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state) .then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) { // ...now, that we got existing state of all affected counter @@ -2771,9 +2771,9 @@ flat_mutation_reader_v2 make_multishard_streaming_reader(distributed obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override { + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { auto& cf = _db.local().find_column_family(_table_id); - return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout); + return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); } }; auto ms = mutation_source([&db] (schema_ptr s, diff --git a/replica/database.hh b/replica/database.hh index 702db92a74..5ea8204cc2 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1716,8 +1716,8 @@ public: reader_concurrency_semaphore& get_reader_concurrency_semaphore(); // Convenience method to obtain an admitted permit. See reader_concurrency_semaphore::obtain_permit(). - future obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout); - future obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout); + future obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + future obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); bool is_internal_query() const; diff --git a/replica/table.cc b/replica/table.cc index 8165d17974..79e004c95c 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -874,7 +874,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptrmake_flush_reader( old->schema(), - compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout), + compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout, {}), service::get_local_memtable_flush_priority())); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush @@ -2271,7 +2271,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables:: std::make_unique(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable"), cfg, [&mt, sst] (auto& monitor, auto& semaphore, auto& cfg) { - return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout), mt, std::move(sst), monitor, cfg) + return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout, {}), mt, std::move(sst), monitor, cfg) .finally([&semaphore] { return semaphore->stop(); }); @@ -2605,7 +2605,7 @@ future table::do_push_view_replica_updates(schema_ptr s const bool need_static = db::view::needs_static_row(m.partition(), views); if (!need_regular && !need_static) { tracing::trace(tr_state, "View updates do not require read-before-write"); - co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout), std::move(views), std::move(m), { }, std::move(tr_state), now); + co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now); // In this case we are not doing a read-before-write, just a // write, so no lock is needed. co_return row_locker::lock_holder(); @@ -2638,7 +2638,7 @@ future table::do_push_view_replica_updates(schema_ptr s co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout); auto lock = co_await std::move(lockf); auto pk = dht::partition_range::make_singular(m.decorated_key()); - auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout); + auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout, tr_state); auto reader = source.make_reader_v2(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now); tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); @@ -2723,7 +2723,7 @@ public: return _t.get_compaction_strategy(); } reader_permit make_compaction_reader_permit() const override { - return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout); + return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _t.get_sstables_manager(); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 46e41326c3..f23f78a5ec 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1291,7 +1291,7 @@ future<> sstable::load_first_and_last_position_in_partition() { } auto& sem = _manager.sstable_metadata_concurrency_sem(); - reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout); + reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {}); auto first_pos_opt = co_await find_first_position_in_partition(permit, get_first_decorated_key(), false); auto last_pos_opt = co_await find_first_position_in_partition(permit, get_last_decorated_key(), true); @@ -1888,7 +1888,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) { auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.config().sstable_summary_ratio()); auto ctx = make_lw_shared>( - *this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout), s, trust_promoted_index::yes, + *this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes, make_file_input_stream(index_file, 0, index_size, std::move(options)), 0, index_size, (_version >= sstable_version_types::mc ? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header())) @@ -3032,7 +3032,7 @@ future sstable::has_partition_key(const utils::hashed_key& hk, const dht:: std::exception_ptr ex; auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()"); try { - auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout), default_priority_class(), tracing::trace_state_ptr(), use_caching::yes); + auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout, {}), default_priority_class(), tracing::trace_state_ptr(), use_caching::yes); present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk); } catch (...) { ex = std::current_exception(); diff --git a/sstables_loader.cc b/sstables_loader.cc index 5ac9585121..c95e58b2e3 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -162,7 +162,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name, size_t num_partitions_processed = 0; size_t num_bytes_read = 0; nr_sst_current += sst_processed.size(); - auto permit = co_await _db.local().obtain_reader_permit(table, "sstables_loader::load_and_stream()", db::no_timeout); + auto permit = co_await _db.local().obtain_reader_permit(table, "sstables_loader::load_and_stream()", db::no_timeout, {}); auto reader = mutation_fragment_v1_stream(table.make_streaming_reader(s, std::move(permit), full_partition_range, sst_set)); std::exception_ptr eptr; bool failed = false; diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index c10ad793e9..5882e55bc2 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -121,7 +121,7 @@ void stream_manager::init_messaging_service_handler() { utils::fb_utilities::get_broadcast_address()))); } return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable { - return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable { + return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable { auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source); struct stream_mutation_fragments_cmd_status { bool got_cmd = false; diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 854c23a834..59ec7f62ee 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -197,7 +197,7 @@ future<> stream_transfer_task::execute() { auto& sm = session->manager(); return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason] (stream_manager& sm) mutable { auto& tbl = sm.db().find_column_family(cf_id); - return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable { + return sm.db().obtain_reader_permit(tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, &tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason] (reader_permit permit) mutable { auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) { sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz); }); diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index c1096dbb4f..9fdc4b2487 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -644,7 +644,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){ { std::vector readers; readers.reserve(memtables.size()); - auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); + auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); for (auto mt : memtables) { readers.push_back(mt->make_flat_reader(s, permit)); } diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index a882e8fa46..786f8394ed 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -1231,7 +1231,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) { auto q = query::querier( tbl.as_mutation_source(), tbl.schema(), - database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout), + database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), query::full_partition_range, s->full_slice(), default_priority_class(), diff --git a/test/boost/hashers_test.cc b/test/boost/hashers_test.cc index 5e1256f8f2..169e641130 100644 --- a/test/boost/hashers_test.cc +++ b/test/boost/hashers_test.cc @@ -65,7 +65,7 @@ SEASTAR_THREAD_TEST_CASE(mutation_fragment_sanity_check) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__); auto stop_semaphore = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {}); gc_clock::time_point ts(gc_clock::duration(1234567890000)); auto check_hash = [&] (const mutation_fragment& mf, uint64_t expected) { diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc index f35739c888..ff413f94ff 100644 --- a/test/boost/mutation_fragment_test.cc +++ b/test/boost/mutation_fragment_test.cc @@ -358,7 +358,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); const auto available_res = sem.available_resources(); const sstring val(1024, 'a'); @@ -428,7 +428,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout); + auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {}); auto expect = [&] (bool expect_valid, const char* desc, unsigned at, auto&& first_mf, auto&&... mf) { std::vector mfs; @@ -619,7 +619,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout); + auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {}); mutation_fragment_stream_validator validator(*ss.schema()); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 749d90a6e7..98419b873b 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2921,7 +2921,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(4); std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3199,7 +3199,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(6); boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3250,7 +3250,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { @@ -3477,7 +3477,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_non_monotonic_positions) { auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkey = s.make_pkey(); const auto prange = dht::partition_range::make_open_ended_both_sides(); @@ -3536,7 +3536,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_p auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 9ec6819c30..ffdfd6687a 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -105,7 +105,7 @@ private: Querier make_querier(const dht::partition_range& range) { return Querier(_mutation_source, _s.schema(), - _sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout), + _sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout, {}), range, _s.schema()->full_slice(), service::get_local_sstable_query_read_priority(), @@ -674,7 +674,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0); // Drain all resources of the semaphore - auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout); + auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {}); auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto cmd2 = query::read_command(s->id(), @@ -723,13 +723,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) { test_querier_cache t; auto& sem = t.get_semaphore(); - auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout).get0(); + auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout, {}).get0(); auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0)); BOOST_CHECK_EQUAL(sem.available_resources().count, 0); - auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout); + auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout, {}); BOOST_CHECK_EQUAL(sem.get_stats().waiters, 1); @@ -755,8 +755,8 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) { .with_column("v", int32_type) .build(); - auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout))); - auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout))); + auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {}))); + auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {}))); // Sanity check that lookup still works with empty handle. BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{})); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 60c3838600..51c7027d19 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -41,7 +41,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) auto clear_permits = defer([&permits] { permits.clear(); }); for (int i = 0; i < 10; ++i) { - permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout)); + permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})); handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permits.back()))); } @@ -59,7 +59,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) handles.clear(); for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout)))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})))); } BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); })); @@ -77,14 +77,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Not admitted, active { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto units2 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Not admitted, inactive { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto units2 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); @@ -94,14 +94,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Admitted, active { - auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0(); auto units1 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Admitted, inactive { - auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0(); auto units1 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); @@ -115,7 +115,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_clos reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); { auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); // The handle is destroyed here, triggering the destrution of the inactive read. @@ -136,7 +136,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves auto stop_sem = deferred_stop(semaphore); - reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout).get(); + reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get(); BOOST_REQUIRE_EQUAL(permit->consumed_resources(), base_resources); std::optional residue_units; @@ -152,7 +152,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources - permit->consumed_resources()); if (i % 2) { - auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); + auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto fut = make_ready_future<>(); @@ -267,9 +267,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { } future<> obtain_permit() { if (_memory_only) { - _permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout); + _permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout, {}); } else { - _permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout); + _permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {}); } _units = _permit->consume_memory(tests::random::get_int(128, 1024)); } @@ -414,7 +414,7 @@ SEASTAR_TEST_CASE(reader_restriction_file_tracking) { return async([&] { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 100, 4 * 1024); auto stop_sem = deferred_stop(semaphore); - auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout).get(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 0, db::no_timeout, {}).get(); { auto tracked_file = make_tracked_file(file(shared_ptr(make_shared())), permit); @@ -474,11 +474,11 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_timeout) { { auto timeout = db::timeout_clock::now() + std::chrono::duration_cast(std::chrono::milliseconds{1}); - reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout).get(); + reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, timeout, {}).get(); - auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout); + auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, timeout, {}); - auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, timeout); + auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); @@ -516,15 +516,15 @@ SEASTAR_TEST_CASE(reader_concurrency_semaphore_max_queue_length) { auto stop_sem = deferred_stop(semaphore); { - reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, db::no_timeout).get(); + reader_permit_opt permit1 = semaphore.obtain_permit(nullptr, "permit1", replica::new_reader_base_cost, db::no_timeout, {}).get(); - auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, db::no_timeout); + auto permit2_fut = semaphore.obtain_permit(nullptr, "permit2", replica::new_reader_base_cost, db::no_timeout, {}); - auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, db::no_timeout); + auto permit3_fut = semaphore.obtain_permit(nullptr, "permit3", replica::new_reader_base_cost, db::no_timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); - auto permit4_fut = semaphore.obtain_permit(nullptr, "permit4", replica::new_reader_base_cost, db::no_timeout); + auto permit4_fut = semaphore.obtain_permit(nullptr, "permit4", replica::new_reader_base_cost, db::no_timeout, {}); // The queue should now be full. BOOST_REQUIRE_THROW(permit4_fut.get(), std::runtime_error); @@ -564,7 +564,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) { for (auto& schema : schemas) { const auto nr_permits = tests::random::get_int(2, 32); for (unsigned i = 0; i < nr_permits; ++i) { - auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout); + auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout, {}); if (tests::random::get_int(0, 4)) { auto units = permit.consume_resources(reader_resources(tests::random::get_int(0, 1), tests::random::get_int(1024, 16 * 1024 * 1024))); permits.push_back(std::pair(std::move(permit), std::move(units))); @@ -597,7 +597,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits BOOST_TEST_MESSAGE("1 permit"); { auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, get_name()); - auto permit = std::make_unique(semaphore->make_tracking_only_permit(nullptr, "permit1", db::no_timeout)); + auto permit = std::make_unique(semaphore->make_tracking_only_permit(nullptr, "permit1", db::no_timeout, {})); // Test will fail via use-after-free auto f = semaphore->stop().then([semaphore = std::move(semaphore)] { }); @@ -623,7 +623,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { testlog.trace("Running admission scenario {}, with exepcted_can_admit={}", description, expected_can_admit); const auto stats_before = semaphore.get_stats(); - auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()); + auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}); admit_fut.wait(); const bool can_admit = !admit_fut.failed(); if (can_admit) { @@ -650,13 +650,13 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // resources and waitlist { - reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); const auto stats_before = semaphore.get_stats(); - auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout); + auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout, {}); { const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE(!enqueued_permit_fut.available()); @@ -678,7 +678,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // used and blocked { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "!used"); { @@ -698,7 +698,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // forward progress -- resources { - auto sponge_permit = semaphore.make_tracking_only_permit(nullptr, "sponge", db::no_timeout); + auto sponge_permit = semaphore.make_tracking_only_permit(nullptr, "sponge", db::no_timeout, {}); sponge_permit.consume_resources(reader_resources::with_memory(semaphore.available_resources().memory)); require_can_admit(true, "semaphore with no memory but all count available"); } @@ -707,7 +707,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // forward progress -- readmission { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); BOOST_REQUIRE(semaphore.try_evict_one_inactive_read()); @@ -733,7 +733,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // inactive readers { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "!used"); { @@ -761,10 +761,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // evicting inactive readers for admission { - auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit1)); - auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh2 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit2)); require_can_admit(true, "evictable reads"); @@ -779,7 +779,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { const auto stats_before = semaphore.get_stats(); - auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout); + auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout, {}); const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted); @@ -800,7 +800,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now()).get()); + return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); }, [] (reader_permit_opt& permit1) { permit1 = {}; @@ -815,7 +815,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now()).get()); + return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); }, [&] (reader_permit_opt& permit1) { return semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit1)); @@ -829,7 +829,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, std::optional{permit}); }, [&] (std::pair>& permit_and_used_guard) { @@ -845,7 +845,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now()).get(); + auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, reader_permit::used_guard{permit}); }, [&] (std::pair& permit_and_used_guard) { @@ -866,7 +866,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) { BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 0); BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0); - auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); for (auto scenario = 0; scenario < 5; ++scenario) { testlog.info("Running scenario {}", scenario); @@ -953,7 +953,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ for (auto& s : schemas) { auto& handles = schema_handles[&s]; for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout)))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})))); } } @@ -1008,10 +1008,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ } }; - auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get(); + auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get(); auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1)); - auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get(); + auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get(); read rd2(p2); auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func()); @@ -1020,7 +1020,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // * 1 used (but not blocked) read on the ready list // * 1 waiter // * no more count resources left - auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout); + auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); // (waiters includes _ready_list entries) BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 0); // permit looses used status while waiting for execution @@ -1061,8 +1061,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); - auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); - auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(2, 2 * 1024)); BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024)); @@ -1082,7 +1082,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) { BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(-1, 1024)); BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(1, 3 * 1024)); - auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout); + auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1); @@ -1128,7 +1128,7 @@ private: public: explicit allocating_reader(reader_concurrency_semaphore& sem) : _sem(sem) { testlog.debug("[{}] allocating_reader created", fmt::ptr(this)); - _admission_fut = sem.obtain_permit(nullptr, "reader", admission_cost, db::no_timeout).then_wrapped([this] (future&& permit_fut) { + _admission_fut = sem.obtain_permit(nullptr, "reader", admission_cost, db::no_timeout, {}).then_wrapped([this] (future&& permit_fut) { try { _permit = std::move(permit_fut.get()); _state = state::request_memory; @@ -1335,7 +1335,7 @@ memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_ auto sst = sst_man.make_sstable(s, sstables_dir.path().string(), sstables::generation_type{num_sstables}); auto writer_cfg = sst_man.configure_writer("test"); sst->write_components( - make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout), mut, s->full_slice()), + make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), mut, s->full_slice()), 1, s, writer_cfg, @@ -1493,7 +1493,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier)); auto stop_sem = deferred_stop(semaphore); - auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto sponge_permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); uint64_t reads_enqueued_for_memory = 0; @@ -1528,20 +1528,20 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser // unused { - auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); do_check(permit, 0, 0, std::source_location::current()); } // used { - auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); reader_permit::used_guard ug{permit}; do_check(permit, 1, 0, std::source_location::current()); } // blocked { - auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); reader_permit::used_guard ug{permit}; reader_permit::blocked_guard bg{permit}; do_check(permit, 1, 1, std::source_location::current()); @@ -1559,7 +1559,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_ina simple_schema ss; auto s = ss.schema(); - auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout).get(); + auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get(); std::vector permit_res; @@ -1588,7 +1588,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re simple_schema ss; auto s = ss.schema(); - auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout).get()); + auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get()); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, *permit)); @@ -1618,8 +1618,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_me utils::updateable_value(serialize_multiplier), utils::updateable_value(kill_multiplier)); auto stop_sem = deferred_stop(semaphore); - auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); - auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0(); + auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); + auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout, {}).get0(); std::vector res; diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 9f74ec52bc..f0af35f2ce 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -442,7 +442,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) { sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -556,7 +556,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -566,7 +566,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { }).get0(); // consume all units except what is needed to admit a single reader. - auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout); + auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {}); auto resources = sponge_permit.consume_resources(sem.available_resources() - reader_resources{1, replica::new_reader_base_cost}); testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory); @@ -630,7 +630,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -731,7 +731,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { void check(mutation mut) { // First we check that we would be able to create a reader, even // though the staging reader consumed all resources. - auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now()).get0(); + auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0(); const size_t current_rows = rows_in_mut(mut); const auto total_rows = _partition_rows.at(mut.decorated_key()); @@ -836,7 +836,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { return less(a.decorated_key(), b.decorated_key()); }); - auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout).get0(); + auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); auto mt = make_lw_shared(schema); for (const auto& mut : muts) { diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 23eff1c2c8..98b71bed8e 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -349,7 +349,7 @@ public: table_name = std::move(table_name)] (replica::database& db) mutable { auto& cf = db.find_column_family(ks_name, table_name); auto schema = cf.schema(); - auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()", db::no_timeout); + auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()", db::no_timeout, {}); return cf.find_partition_slow(schema, permit, pkey) .then([schema, ckey, column_name, exp] (replica::column_family::const_mutation_partition_ptr p) { assert(p != nullptr); @@ -997,7 +997,7 @@ future<> do_with_cql_env_thread(std::function func, cql_tes } reader_permit make_reader_permit(cql_test_env& env) { - return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test", db::no_timeout); + return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test", db::no_timeout, {}); } cql_test_config raft_cql_test_config() { diff --git a/test/lib/reader_concurrency_semaphore.hh b/test/lib/reader_concurrency_semaphore.hh index 13b1758183..eb105fbf84 100644 --- a/test/lib/reader_concurrency_semaphore.hh +++ b/test/lib/reader_concurrency_semaphore.hh @@ -26,7 +26,7 @@ public: } reader_concurrency_semaphore& semaphore() { return *_semaphore; }; - reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test", db::no_timeout); } + reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test", db::no_timeout, {}); } }; } // namespace tests diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index d32012d710..ffd71cbb36 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -95,8 +95,8 @@ public: } return *_contexts[shard]->semaphore; } - virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override { - return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout); + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { + return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout, std::move(trace_ptr)); } }; diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 85e9faa6c9..1fb634ff58 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -171,10 +171,10 @@ public: tmpdir& tempdir() noexcept { return _impl->dir; } reader_permit make_reader_permit(const schema* const s, const char* n, db::timeout_clock::time_point timeout) { - return _impl->semaphore.make_tracking_only_permit(s, n, timeout); + return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {}); } reader_permit make_reader_permit(db::timeout_clock::time_point timeout = db::no_timeout) { - return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout); + return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout, {}); } replica::table::config make_table_config() { diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 84eb421174..2966fcdaa9 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -84,7 +84,7 @@ public: return table().get_compaction_strategy(); } reader_permit make_compaction_reader_permit() const override { - return _data.semaphore.make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout); + return _data.semaphore.make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _sstables_manager; diff --git a/test/manual/sstable_scan_footprint_test.cc b/test/manual/sstable_scan_footprint_test.cc index 33f48a74d0..0374f7919e 100644 --- a/test/manual/sstable_scan_footprint_test.cc +++ b/test/manual/sstable_scan_footprint_test.cc @@ -175,7 +175,7 @@ void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned if (sem.get_stats().waiters) { testlog.trace("Waiting for queue to drain"); - sem.obtain_permit(&s, "drain", 1, db::no_timeout).get(); + sem.obtain_permit(&s, "drain", 1, db::no_timeout, {}).get(); } } diff --git a/test/perf/perf.cc b/test/perf/perf.cc index e18974d257..0e0ab74d66 100644 --- a/test/perf/perf.cc +++ b/test/perf/perf.cc @@ -82,7 +82,7 @@ reader_concurrency_semaphore_wrapper::~reader_concurrency_semaphore_wrapper() { } reader_permit reader_concurrency_semaphore_wrapper::make_permit() { - return _semaphore->make_tracking_only_permit(nullptr, "perf", db::no_timeout); + return _semaphore->make_tracking_only_permit(nullptr, "perf", db::no_timeout, {}); } } // namespace perf diff --git a/test/unit/row_cache_stress_test.cc b/test/unit/row_cache_stress_test.cc index f7b925cd2c..01f7b90c19 100644 --- a/test/unit/row_cache_stress_test.cc +++ b/test/unit/row_cache_stress_test.cc @@ -54,7 +54,7 @@ struct table { } reader_permit make_permit() { - return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout); + return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {}); } future<> stop() noexcept { return semaphore.stop(); diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index e8b2c83f70..f95e427882 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -2854,7 +2854,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big- reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name); auto stop_semaphore = deferred_stop(rcs_sem); - const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout); + const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout, {}); operation(schema, permit, sstables, sst_man, app_config);