From 76f0d5e35bb24d7aac95de628d816ff7fc3abb62 Mon Sep 17 00:00:00 2001 From: Lakshmi Narayanan Sreethar Date: Fri, 5 Jan 2024 12:19:58 +0530 Subject: [PATCH] reader_permit: store schema_ptr instead of raw schema pointer Store schema_ptr in reader permit instead of storing a const pointer to schema to ensure that the schema doesn't get changed elsewhere when the permit is holding on to it. Also update the constructors and all the relevant callers to pass down schema_ptr instead of a raw pointer. Fixes #16180 Signed-off-by: Lakshmi Narayanan Sreethar Closes scylladb/scylladb#16658 --- multishard_mutation_query.cc | 2 +- mutation_writer/multishard_writer.cc | 2 +- reader_concurrency_semaphore.cc | 45 ++++++------ reader_concurrency_semaphore.hh | 10 +-- reader_permit.hh | 6 +- replica/database.cc | 10 +-- replica/table.cc | 10 +-- scylla-gdb.py | 7 +- sstables/sstables.cc | 6 +- test/boost/commitlog_test.cc | 2 +- test/boost/continuous_data_consumer_test.cc | 1 + test/boost/database_test.cc | 2 +- test/boost/hashers_test.cc | 2 +- test/boost/mutation_fragment_test.cc | 6 +- test/boost/mutation_reader_test.cc | 12 ++-- test/boost/querier_cache_test.cc | 12 ++-- .../reader_concurrency_semaphore_test.cc | 68 +++++++++---------- test/boost/row_cache_test.cc | 2 +- test/boost/view_build_test.cc | 16 ++--- test/lib/reader_concurrency_semaphore.hh | 1 + test/lib/reader_lifecycle_policy.hh | 2 +- test/lib/sstable_test_env.hh | 2 +- test/lib/test_services.cc | 2 +- test/manual/sstable_scan_footprint_test.cc | 6 +- test/perf/perf.cc | 1 + test/unit/row_cache_stress_test.cc | 2 +- tools/schema_loader.cc | 4 +- tools/scylla-sstable.cc | 2 +- 28 files changed, 126 insertions(+), 117 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index a7d0269cdf..fd6e471b81 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -215,7 +215,7 @@ public: : _db(db) , _schema(std::move(s)) , _erm(std::move(erm)) - , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout, trace_state)) + , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema, "multishard-mutation-query", timeout, trace_state)) , _cmd(cmd) , _ranges(ranges) , _trace_state(std::move(trace_state)) diff --git a/mutation_writer/multishard_writer.cc b/mutation_writer/multishard_writer.cc index 3a0d2c447a..ad123d70e0 100644 --- a/mutation_writer/multishard_writer.cc +++ b/mutation_writer/multishard_writer.cc @@ -118,7 +118,7 @@ future<> multishard_writer::make_shard_writer(unsigned shard) { auto s = gs.get(); auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, "shard_writer", reader_concurrency_semaphore::register_metrics::no); - auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout, {}); + auto permit = semaphore->make_tracking_only_permit(s, "multishard-writer", db::no_timeout, {}); auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader)); return make_foreign(std::make_unique(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer)); }).then([this, shard] (foreign_ptr> writer) { diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index da4904f1be..a3261430bb 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -150,7 +150,8 @@ public: private: reader_concurrency_semaphore& _semaphore; - const schema* _schema; + schema_ptr _schema; + sstring _op_name; std::string_view _op_name_view; reader_resources _base_resources; @@ -237,9 +238,9 @@ private: public: struct value_tag {}; - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) + impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) : _semaphore(semaphore) - , _schema(schema) + , _schema(std::move(schema)) , _op_name_view(op_name) , _base_resources(base_resources) , _ttl_timer([this] { on_timeout(); }) @@ -248,9 +249,9 @@ public: set_timeout(timeout); _semaphore.on_permit_created(*this); } - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) + impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) : _semaphore(semaphore) - , _schema(schema) + , _schema(std::move(schema)) , _op_name(std::move(op_name)) , _op_name_view(_op_name) , _base_resources(base_resources) @@ -302,7 +303,7 @@ public: return _semaphore; } - const ::schema* get_schema() const { + const schema_ptr& get_schema() const { return _schema; } @@ -524,15 +525,15 @@ reader_permit::reader_permit(shared_ptr impl) : _impl(std::move(impl)) { } -reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, +reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, std::string_view op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) - : _impl(::seastar::make_shared(semaphore, schema, op_name, base_resources, timeout, std::move(trace_ptr))) + : _impl(::seastar::make_shared(semaphore, std::move(schema), op_name, base_resources, timeout, std::move(trace_ptr))) { } -reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, +reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) - : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name), base_resources, timeout, std::move(trace_ptr))) + : _impl(::seastar::make_shared(semaphore, std::move(schema), std::move(op_name), base_resources, timeout, std::move(trace_ptr))) { } @@ -543,7 +544,7 @@ reader_concurrency_semaphore& reader_permit::semaphore() { return _impl->semaphore(); } -const ::schema* reader_permit::get_schema() const { +const schema_ptr& reader_permit::get_schema() const { return _impl->get_schema(); } @@ -766,7 +767,7 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con permit_groups permits; semaphore.foreach_permit([&] (const reader_permit::impl& permit) { - permits[permit_group_key(permit.get_schema(), permit.get_op_name(), permit.get_state())].add(permit); + permits[permit_group_key(permit.get_schema().get(), permit.get_op_name(), permit.get_state())].add(permit); }); permit_stats total; @@ -1518,35 +1519,35 @@ void reader_concurrency_semaphore::on_permit_not_awaits() noexcept { --_stats.awaits_permits; } -future reader_concurrency_semaphore::obtain_permit(const schema* const schema, const char* const op_name, size_t memory, +future reader_concurrency_semaphore::obtain_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); + auto permit = reader_permit(*this, std::move(schema), std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); return do_wait_admission(*permit).then([permit] () mutable { return std::move(permit); }); } -future reader_concurrency_semaphore::obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, +future reader_concurrency_semaphore::obtain_permit(schema_ptr schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); + auto permit = reader_permit(*this, std::move(schema), std::move(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); return do_wait_admission(*permit).then([permit] () mutable { return std::move(permit); }); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name, +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - return reader_permit(*this, schema, std::string_view(op_name), {}, timeout, std::move(trace_ptr)); + return reader_permit(*this, std::move(schema), std::string_view(op_name), {}, timeout, std::move(trace_ptr)); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name, +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(schema_ptr schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - return reader_permit(*this, schema, std::move(op_name), {}, timeout, std::move(trace_ptr)); + return reader_permit(*this, std::move(schema), std::move(op_name), {}, timeout, std::move(trace_ptr)); } -future<> reader_concurrency_semaphore::with_permit(const schema* const schema, const char* const op_name, size_t memory, +future<> reader_concurrency_semaphore::with_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); + auto permit = reader_permit(*this, std::move(schema), std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); permit->aux_data().func = std::move(func); permit->aux_data().permit_keepalive = permit; return do_wait_admission(*permit); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 752e8623a5..bdfded4587 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -395,8 +395,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - future obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); - future obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + future obtain_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + future obtain_permit(schema_ptr schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); /// Make a tracking only permit /// @@ -411,8 +411,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); - reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + reader_permit make_tracking_only_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + reader_permit make_tracking_only_permit(schema_ptr schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); /// Run the function through the semaphore's execution stage with an admitted permit /// @@ -433,7 +433,7 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func); + future<> with_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func); /// Run the function through the semaphore's execution stage with a pre-admitted permit /// diff --git a/reader_permit.hh b/reader_permit.hh index 0b633c746d..c96bd1f04a 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -104,9 +104,9 @@ private: private: reader_permit() = default; reader_permit(shared_ptr); - explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, + explicit reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, std::string_view op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); - explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, + explicit reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); reader_permit::impl& operator*() { return *_impl; } @@ -143,7 +143,7 @@ public: reader_concurrency_semaphore& semaphore(); - const ::schema* get_schema() const; + const schema_ptr& get_schema() const; std::string_view get_op_name() const; state get_state() const; diff --git a/replica/database.cc b/replica/database.cc index 4b6524c832..f928c7a2af 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1470,7 +1470,7 @@ public: } virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { auto& cf = _db.local().find_column_family(_table_id); - return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); + return semaphore().obtain_permit(schema, description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); } }; @@ -1571,7 +1571,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti querier_opt->permit().set_trace_state(trace_state); f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func)); } else { - f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); + f = co_await coroutine::as_future(semaphore.with_permit(s, "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); } if (!f.failed()) { @@ -1638,7 +1638,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh querier_opt->permit().set_trace_state(trace_state); f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func)); } else { - f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); + f = co_await coroutine::as_future(semaphore.with_permit(s, "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); } if (!f.failed()) { @@ -1688,7 +1688,7 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { } future database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); + return get_reader_concurrency_semaphore().obtain_permit(tbl.schema(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); } future database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { @@ -1758,7 +1758,7 @@ future database::do_apply_counter_update(column_family& cf, const froz // counter state for each modified cell... tracing::trace(trace_state, "Reading counter values from the CF"); - auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout, trace_state); + auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema, "counter-read-before-write", timeout, trace_state); return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state) .then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) { // ...now, that we got existing state of all affected counter diff --git a/replica/table.cc b/replica/table.cc index 09b062233c..d88b28895b 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1136,7 +1136,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptrmake_flush_reader( old->schema(), - compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout, {}))); + compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {}))); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush // controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to @@ -2612,7 +2612,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst) { std::exception_ptr ex; try { - auto permit = semaphore.make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(mt.schema(), "mt_to_sst", db::no_timeout, {}); auto reader = mt.make_flush_reader(mt.schema(), std::move(permit)); co_await write_memtable_to_sstable(std::move(reader), mt, std::move(sst), mt.partition_count(), monitor, cfg); } catch (...) { @@ -2925,7 +2925,7 @@ future table::do_push_view_replica_updates(shared_ptr table::do_push_view_replica_updates(shared_ptrgenerate_and_propagate_view_updates(gen, base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now); tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); @@ -3057,7 +3057,7 @@ public: return _cg._compaction_strategy_state; } reader_permit make_compaction_reader_permit() const override { - return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout, {}); + return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "compaction", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _t.get_sstables_manager(); diff --git a/scylla-gdb.py b/scylla-gdb.py index d1bf15e6da..677d72b5c5 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -5396,7 +5396,12 @@ class scylla_read_stats(gdb.Command): total = permit_stats() for permit in intrusive_list(permit_list): - schema = permit['_schema'] + try: + schema = permit['_schema']['_p'] + except: + # schema is already a raw pointer in older versions + schema = permit['_schema'] + if schema: raw_schema = schema.dereference()['_raw'] schema_name = "{}.{}".format(str(raw_schema['_ks_name']).replace('"', ''), str(raw_schema['_cf_name']).replace('"', '')) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 2fed9107a8..de6bb81f02 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1222,7 +1222,7 @@ future<> sstable::load_first_and_last_position_in_partition() { } auto& sem = _manager.sstable_metadata_concurrency_sem(); - reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {}); + reader_permit permit = co_await sem.obtain_permit(_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {}); auto first_pos_opt = co_await find_first_position_in_partition(permit, get_first_decorated_key(), false); auto last_pos_opt = co_await find_first_position_in_partition(permit, get_last_decorated_key(), true); @@ -1859,7 +1859,7 @@ future<> sstable::generate_summary() { auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.config().sstable_summary_ratio()); auto ctx = make_lw_shared>( - *this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes, + *this, sem.make_tracking_only_permit(_schema, "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes, make_file_input_stream(index_file, 0, index_size, std::move(options)), 0, index_size, (_version >= sstable_version_types::mc ? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header())) @@ -2746,7 +2746,7 @@ future sstable::has_partition_key(const utils::hashed_key& hk, const dht:: auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()", reader_concurrency_semaphore::register_metrics::no); try { - auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout, {})); + auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema, s->get_filename(), db::no_timeout, {})); present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk); } catch (...) { ex = std::current_exception(); diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index de4d274bf8..812f14198b 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -953,7 +953,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){ { std::vector readers; readers.reserve(memtables.size()); - auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); + auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}); for (auto mt : memtables) { readers.push_back(mt->make_flat_reader(s, permit)); } diff --git a/test/boost/continuous_data_consumer_test.cc b/test/boost/continuous_data_consumer_test.cc index 9b32804ccb..a95034c181 100644 --- a/test/boost/continuous_data_consumer_test.cc +++ b/test/boost/continuous_data_consumer_test.cc @@ -13,6 +13,7 @@ #include "utils/buffer_input_stream.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/random_utils.hh" +#include "schema/schema.hh" #include "sstables/processing_result_generator.hh" #include diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index d82740b6d1..bdb2ef6401 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -1346,7 +1346,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) { auto q = query::querier( tbl.as_mutation_source(), tbl.schema(), - database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), + database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}), query::full_partition_range, s->full_slice(), nullptr); diff --git a/test/boost/hashers_test.cc b/test/boost/hashers_test.cc index 9b451e06c5..6d3e43b846 100644 --- a/test/boost/hashers_test.cc +++ b/test/boost/hashers_test.cc @@ -66,7 +66,7 @@ SEASTAR_THREAD_TEST_CASE(mutation_fragment_sanity_check) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__, reader_concurrency_semaphore::register_metrics::no); auto stop_semaphore = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), "test", db::no_timeout, {}); gc_clock::time_point ts(gc_clock::duration(1234567890000)); auto check_hash = [&] (const mutation_fragment& mf, uint64_t expected) { diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc index eb59d48214..c2730856c1 100644 --- a/test/boost/mutation_fragment_test.cc +++ b/test/boost/mutation_fragment_test.cc @@ -357,7 +357,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = sem.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); const auto available_res = sem.available_resources(); const sstring val(1024, 'a'); @@ -427,7 +427,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {}); + auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {}); auto expect = [&] (bool expect_valid, const char* desc, unsigned at, auto&& first_mf, auto&&... mf) { std::vector mfs; @@ -618,7 +618,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {}); + auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {}); mutation_fragment_stream_validator validator(*ss.schema()); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 3646bdc71b..edeee58a9a 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2942,7 +2942,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(4); std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3220,7 +3220,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(6); boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3270,7 +3270,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { @@ -3497,7 +3497,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_non_monotonic_positions) { auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkey = s.make_pkey(); const auto prange = dht::partition_range::make_open_ended_both_sides(); @@ -3555,7 +3555,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_p auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { @@ -3653,7 +3653,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) { auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pk = s.make_pkey(); const auto prange = dht::partition_range::make_open_ended_both_sides(); diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 1c9273c1f4..a3c9c100e9 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -107,7 +107,7 @@ private: Querier make_querier(const dht::partition_range& range) { return Querier(_mutation_source, _s.schema(), - _sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout, {}), + _sem.make_tracking_only_permit(_s.schema(), "make-querier", db::no_timeout, {}), range, _s.schema()->full_slice(), nullptr); @@ -688,7 +688,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0); // Drain all resources of the semaphore - auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {}); + auto sponge_permit = semaphore.make_tracking_only_permit(s, "sponge", db::no_timeout, {}); auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto cmd2 = query::read_command(s->id(), @@ -737,13 +737,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) { test_querier_cache t; auto& sem = t.get_semaphore(); - auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout, {}).get0(); + auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 0, db::no_timeout, {}).get0(); auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0)); BOOST_CHECK_EQUAL(sem.available_resources().count, 0); - auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout, {}); + auto fut = sem.obtain_permit(t.get_schema(), get_name(), 1, db::no_timeout, {}); BOOST_CHECK_EQUAL(sem.get_stats().waiters, 1); @@ -769,8 +769,8 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) { .with_column("v", int32_type) .build(); - auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {}))); - auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {}))); + auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema, get_name(), db::no_timeout, {}))); + auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema, get_name(), db::no_timeout, {}))); // Sanity check that lookup still works with empty handle. BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{})); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 66660d96af..6aba3ab840 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -41,7 +41,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) auto clear_permits = defer([&permits] { permits.clear(); }); for (int i = 0; i < 10; ++i) { - permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})); + permits.emplace_back(semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {})); handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permits.back()))); } @@ -59,7 +59,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) handles.clear(); for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {})))); } BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); })); @@ -77,14 +77,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Not admitted, active { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto units2 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Not admitted, inactive { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto units2 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); @@ -94,14 +94,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Admitted, active { - auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0(); + auto permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get0(); auto units1 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Admitted, inactive { - auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0(); + auto permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get0(); auto units1 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); @@ -115,7 +115,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_clos reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); { auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); // The handle is destroyed here, triggering the destrution of the inactive read. @@ -136,7 +136,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves auto stop_sem = deferred_stop(semaphore); - reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get(); + reader_permit_opt permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get(); BOOST_REQUIRE_EQUAL(permit->consumed_resources(), base_resources); std::optional residue_units; @@ -152,7 +152,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources - permit->consumed_resources()); if (i % 2) { - auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto sponge_permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto fut = make_ready_future<>(); @@ -267,9 +267,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { } future<> obtain_permit() { if (_memory_only) { - _permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout, {}); + _permit = _semaphore.make_tracking_only_permit(_schema, "reader_m", db::no_timeout, {}); } else { - _permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {}); + _permit = co_await _semaphore.obtain_permit(_schema, fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {}); } _units = _permit->consume_memory(tests::random::get_int(128, 1024)); } @@ -564,7 +564,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) { for (auto& schema : schemas) { const auto nr_permits = tests::random::get_int(2, 32); for (unsigned i = 0; i < nr_permits; ++i) { - auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(schema, op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout, {}); if (tests::random::get_int(0, 4)) { auto units = permit.consume_resources(reader_resources(tests::random::get_int(0, 1), tests::random::get_int(1024, 16 * 1024 * 1024))); permits.push_back(std::pair(std::move(permit), std::move(units))); @@ -614,7 +614,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { simple_schema s; - const auto schema_ptr = s.schema().get(); + const auto schema = s.schema(); const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); @@ -624,7 +624,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { testlog.trace("Running admission scenario {}, with exepcted_can_admit={}", description, expected_can_admit); const auto stats_before = semaphore.get_stats(); - auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}); + auto admit_fut = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}); admit_fut.wait(); const bool can_admit = !admit_fut.failed(); if (can_admit) { @@ -651,13 +651,13 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // resources and waitlist { - reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + reader_permit_opt permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); const auto stats_before = semaphore.get_stats(); - auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout, {}); + auto enqueued_permit_fut = semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::no_timeout, {}); { const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE(!enqueued_permit_fut.available()); @@ -679,7 +679,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // need_cpu and awaits { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "!need_cpu"); { @@ -708,7 +708,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // forward progress -- readmission { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); BOOST_REQUIRE(semaphore.try_evict_one_inactive_read()); @@ -734,7 +734,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // inactive readers { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "!need_cpu"); { @@ -762,10 +762,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // evicting inactive readers for admission { - auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit1 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit1)); - auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit2 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh2 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit2)); require_can_admit(true, "evictable reads"); @@ -780,7 +780,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { const auto stats_before = semaphore.get_stats(); - auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout, {}); + auto permit2_fut = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}); const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted); @@ -801,7 +801,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); + return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); }, [] (reader_permit_opt& permit1) { permit1 = {}; @@ -816,7 +816,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); + return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); }, [&] (reader_permit_opt& permit1) { return semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit1)); @@ -830,7 +830,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, std::optional{permit}); }, [&] (std::pair>& permit_and_need_cpu_guard) { @@ -846,7 +846,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, reader_permit::need_cpu_guard{permit}); }, [&] (std::pair& permit_and_need_cpu_guard) { @@ -954,7 +954,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ for (auto& s : schemas) { auto& handles = schema_handles[&s]; for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {})))); } } @@ -981,7 +981,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // Reproduces https://github.com/scylladb/scylladb/issues/11770 SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_when_all_is_awaits) { simple_schema ss; - const auto& s = *ss.schema(); + const auto& s = ss.schema(); const auto initial_resources = reader_concurrency_semaphore::resources{2, 32 * 1024}; reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); @@ -1009,10 +1009,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ } }; - auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get(); + auto p1 = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get(); auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1)); - auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get(); + auto p2 = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get(); read rd2(p2); auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func()); @@ -1021,7 +1021,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // * 1 need_cpu (but not awaiting) read on the ready list // * 1 waiter // * no more count resources left - auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}); + auto p3_fut = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); // (waiters includes _ready_list entries) BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().need_cpu_permits, 0); // permit looses need_cpu status while waiting for execution @@ -1337,7 +1337,7 @@ memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_ auto sst = tbl.make_sstable(); auto writer_cfg = sst_man.configure_writer("test"); sst->write_components( - make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), mut, s->full_slice()), + make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s, "test", db::no_timeout, {}), mut, s->full_slice()), 1, s, writer_cfg, @@ -1561,7 +1561,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_ina simple_schema ss; auto s = ss.schema(); - auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get(); + auto permit = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get(); std::vector permit_res; @@ -1590,7 +1590,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re simple_schema ss; auto s = ss.schema(); - auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get()); + auto permit = reader_permit_opt(semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get()); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, *permit)); diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index c44404dd5d..0e97a827b1 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4697,7 +4697,7 @@ SEASTAR_THREAD_TEST_CASE(test_cache_reader_semaphore_oom_kill) { // Check different amounts of memory consumed before the read, so the OOM kill is triggered in different places. for (unsigned memory = 1; memory <= 512; memory *= 2) { semaphore.set_resources({1, memory}); - auto permit = semaphore.obtain_permit(s.schema().get(), "read", 0, db::no_timeout, {}).get(); + auto permit = semaphore.obtain_permit(s.schema(), "read", 0, db::no_timeout, {}).get(); auto create_reader_and_read_all = [&] { auto rd = cache.make_reader(s.schema(), permit, pr, &gc_state); auto close_rd = deferred_close(rd); diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 1546db6469..ce63c8d785 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -441,7 +441,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) { auto write_to_sstable = [&] (mutation m) { auto sst = t->make_streaming_staging_sstable(); sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer("test"); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -553,7 +553,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { auto sst = t->make_streaming_staging_sstable(); sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -563,7 +563,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { }).get0(); // consume all units except what is needed to admit a single reader. - auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {}); + auto sponge_permit = sem.make_tracking_only_permit(s, "sponge", db::no_timeout, {}); auto resources = sponge_permit.consume_resources(sem.available_resources() - reader_resources{1, replica::new_reader_base_cost}); testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory); @@ -625,7 +625,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak auto sst = t->make_streaming_staging_sstable(); sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -726,7 +726,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { void check(mutation mut) { // First we check that we would be able to create a reader, even // though the staging reader consumed all resources. - auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0(); + auto permit = _semaphore.obtain_permit(_schema, "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0(); const size_t current_rows = rows_in_mut(mut); const auto total_rows = _partition_rows.at(mut.decorated_key()); @@ -835,7 +835,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { return less(a.decorated_key(), b.decorated_key()); }); - auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); + auto permit = sem.obtain_permit(schema, get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); auto mt = make_memtable(schema, muts); auto p = make_manually_paused_evictable_reader_v2( @@ -936,7 +936,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_random_mutati auto stop_sem = deferred_stop(sem); const abort_source as; auto mt = make_memtable(schema, {mut}); - auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); + auto permit = sem.obtain_permit(schema, get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); auto p = make_manually_paused_evictable_reader_v2( mt->as_data_source(), schema, @@ -998,7 +998,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_empty_mutatio auto schema = ss.schema(); reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(schema.get(), "test", db::no_timeout, {}); + auto permit = sem.make_tracking_only_permit(schema, "test", db::no_timeout, {}); abort_source as; auto [staging_reader, staging_reader_handle] = make_manually_paused_evictable_reader_v2(make_empty_mutation_source(), schema, permit, query::full_partition_range, schema->full_slice(), {}, mutation_reader::forwarding::no); diff --git a/test/lib/reader_concurrency_semaphore.hh b/test/lib/reader_concurrency_semaphore.hh index 213a2b27a0..23648a6306 100644 --- a/test/lib/reader_concurrency_semaphore.hh +++ b/test/lib/reader_concurrency_semaphore.hh @@ -9,6 +9,7 @@ #pragma once #include "../../reader_concurrency_semaphore.hh" +#include "schema/schema.hh" namespace tests { diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 5841d7a625..ea87043692 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -99,7 +99,7 @@ public: return *_contexts[shard]->semaphore; } virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { - return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout, std::move(trace_ptr)); + return semaphore().obtain_permit(schema, description, 128 * 1024, timeout, std::move(trace_ptr)); } }; diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 99c2ab7ff5..4ad8ef0cbd 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -194,7 +194,7 @@ public: tmpdir& tempdir() noexcept { return _impl->dir; } data_dictionary::storage_options get_storage_options() const noexcept { return _impl->storage; } - reader_permit make_reader_permit(const schema* const s, const char* n, db::timeout_clock::time_point timeout) { + reader_permit make_reader_permit(const schema_ptr &s, const char* n, db::timeout_clock::time_point timeout) { return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {}); } reader_permit make_reader_permit(db::timeout_clock::time_point timeout = db::no_timeout) { diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 3ebde78c89..856a134268 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -87,7 +87,7 @@ public: return _compaction_strategy_state; } reader_permit make_compaction_reader_permit() const override { - return table().compaction_concurrency_semaphore().make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {}); + return table().compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "table_for_tests::table_state", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _sstables_manager; diff --git a/test/manual/sstable_scan_footprint_test.cc b/test/manual/sstable_scan_footprint_test.cc index ec6da28765..2f54e9d72b 100644 --- a/test/manual/sstable_scan_footprint_test.cc +++ b/test/manual/sstable_scan_footprint_test.cc @@ -144,7 +144,7 @@ public: } }; -void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned reads, unsigned concurrency, std::function(unsigned)> read) { +void execute_reads(const schema_ptr& schema, reader_concurrency_semaphore& sem, unsigned reads, unsigned concurrency, std::function(unsigned)> read) { const reader_resources initial_res = sem.available_resources(); unsigned n = 0; gate g; @@ -175,7 +175,7 @@ void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned if (sem.get_stats().waiters) { testlog.trace("Waiting for queue to drain"); - sem.obtain_permit(&s, "drain", 1, db::no_timeout, {}).get(); + sem.obtain_permit(schema, "drain", 1, db::no_timeout, {}).get(); } } @@ -267,7 +267,7 @@ void test_main_thread(cql_test_env& env) { try { auto _ = sc.collect(); memory::set_heap_profiling_sampling_rate(100); - execute_reads(*s, sem, reads, read_concurrency, [&] (unsigned i) { + execute_reads(s, sem, reads, read_concurrency, [&] (unsigned i) { return env.execute_cql(format("select * from ks.test where pk = 0 and ck > {} limit 100;", tests::random::get_int(rows / 2))).discard_result(); }); diff --git a/test/perf/perf.cc b/test/perf/perf.cc index 76c02cb894..1e08519677 100644 --- a/test/perf/perf.cc +++ b/test/perf/perf.cc @@ -11,6 +11,7 @@ #include #include "seastarx.hh" #include "reader_concurrency_semaphore.hh" +#include "schema/schema.hh" uint64_t perf_mallocs() { diff --git a/test/unit/row_cache_stress_test.cc b/test/unit/row_cache_stress_test.cc index 7531f062be..a5a43c98fb 100644 --- a/test/unit/row_cache_stress_test.cc +++ b/test/unit/row_cache_stress_test.cc @@ -54,7 +54,7 @@ struct table { } reader_permit make_permit() { - return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {}); + return semaphore.make_tracking_only_permit(s.schema(), "test", db::no_timeout, {}); } future<> stop() noexcept { return semaphore.stop(); diff --git a/tools/schema_loader.cc b/tools/schema_loader.cc index 370eb5a182..81b4f50ac6 100644 --- a/tools/schema_loader.cc +++ b/tools/schema_loader.cc @@ -530,7 +530,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files sst_man, schema_tables_path / schema_table_table_dir[s], schema_factory, - rcs_sem.make_tracking_only_permit(s.get(), "schema_mutation", db::no_timeout, {}), + rcs_sem.make_tracking_only_permit(s, "schema_mutation", db::no_timeout, {}), keyspace, {table}); }; @@ -552,7 +552,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files sst_man, schema_tables_path / schema_table_table_dir[db::schema_tables::types()], db::schema_tables::types, - rcs_sem.make_tracking_only_permit(db::schema_tables::types().get(), "types_mutation", db::no_timeout, {}), + rcs_sem.make_tracking_only_permit(db::schema_tables::types(), "types_mutation", db::no_timeout, {}), keyspace, {}); if (types_mut) { diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index 530216885d..79910ecfd7 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -3058,7 +3058,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big- reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name, reader_concurrency_semaphore::register_metrics::no); auto stop_semaphore = deferred_stop(rcs_sem); - const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout, {}); + const auto permit = rcs_sem.make_tracking_only_permit(schema, app_name, db::no_timeout, {}); try { operations_with_func.at(operation)(schema, permit, sstables, sst_man, app_config);