diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index a7d0269cdf..fd6e471b81 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -215,7 +215,7 @@ public: : _db(db) , _schema(std::move(s)) , _erm(std::move(erm)) - , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout, trace_state)) + , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema, "multishard-mutation-query", timeout, trace_state)) , _cmd(cmd) , _ranges(ranges) , _trace_state(std::move(trace_state)) diff --git a/mutation_writer/multishard_writer.cc b/mutation_writer/multishard_writer.cc index 3a0d2c447a..ad123d70e0 100644 --- a/mutation_writer/multishard_writer.cc +++ b/mutation_writer/multishard_writer.cc @@ -118,7 +118,7 @@ future<> multishard_writer::make_shard_writer(unsigned shard) { auto s = gs.get(); auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, "shard_writer", reader_concurrency_semaphore::register_metrics::no); - auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout, {}); + auto permit = semaphore->make_tracking_only_permit(s, "multishard-writer", db::no_timeout, {}); auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader)); return make_foreign(std::make_unique(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer)); }).then([this, shard] (foreign_ptr> writer) { diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index da4904f1be..a3261430bb 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -150,7 +150,8 @@ public: private: reader_concurrency_semaphore& _semaphore; - const schema* _schema; + schema_ptr _schema; + sstring _op_name; std::string_view _op_name_view; reader_resources _base_resources; @@ -237,9 +238,9 @@ private: public: struct value_tag {}; - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) + impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) : _semaphore(semaphore) - , _schema(schema) + , _schema(std::move(schema)) , _op_name_view(op_name) , _base_resources(base_resources) , _ttl_timer([this] { on_timeout(); }) @@ -248,9 +249,9 @@ public: set_timeout(timeout); _semaphore.on_permit_created(*this); } - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) + impl(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) : _semaphore(semaphore) - , _schema(schema) + , _schema(std::move(schema)) , _op_name(std::move(op_name)) , _op_name_view(_op_name) , _base_resources(base_resources) @@ -302,7 +303,7 @@ public: return _semaphore; } - const ::schema* get_schema() const { + const schema_ptr& get_schema() const { return _schema; } @@ -524,15 +525,15 @@ reader_permit::reader_permit(shared_ptr impl) : _impl(std::move(impl)) { } -reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, +reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, std::string_view op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) - : _impl(::seastar::make_shared(semaphore, schema, op_name, base_resources, timeout, std::move(trace_ptr))) + : _impl(::seastar::make_shared(semaphore, std::move(schema), op_name, base_resources, timeout, std::move(trace_ptr))) { } -reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, +reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) - : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name), base_resources, timeout, std::move(trace_ptr))) + : _impl(::seastar::make_shared(semaphore, std::move(schema), std::move(op_name), base_resources, timeout, std::move(trace_ptr))) { } @@ -543,7 +544,7 @@ reader_concurrency_semaphore& reader_permit::semaphore() { return _impl->semaphore(); } -const ::schema* reader_permit::get_schema() const { +const schema_ptr& reader_permit::get_schema() const { return _impl->get_schema(); } @@ -766,7 +767,7 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con permit_groups permits; semaphore.foreach_permit([&] (const reader_permit::impl& permit) { - permits[permit_group_key(permit.get_schema(), permit.get_op_name(), permit.get_state())].add(permit); + permits[permit_group_key(permit.get_schema().get(), permit.get_op_name(), permit.get_state())].add(permit); }); permit_stats total; @@ -1518,35 +1519,35 @@ void reader_concurrency_semaphore::on_permit_not_awaits() noexcept { --_stats.awaits_permits; } -future reader_concurrency_semaphore::obtain_permit(const schema* const schema, const char* const op_name, size_t memory, +future reader_concurrency_semaphore::obtain_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); + auto permit = reader_permit(*this, std::move(schema), std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); return do_wait_admission(*permit).then([permit] () mutable { return std::move(permit); }); } -future reader_concurrency_semaphore::obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, +future reader_concurrency_semaphore::obtain_permit(schema_ptr schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); + auto permit = reader_permit(*this, std::move(schema), std::move(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); return do_wait_admission(*permit).then([permit] () mutable { return std::move(permit); }); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name, +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - return reader_permit(*this, schema, std::string_view(op_name), {}, timeout, std::move(trace_ptr)); + return reader_permit(*this, std::move(schema), std::string_view(op_name), {}, timeout, std::move(trace_ptr)); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name, +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(schema_ptr schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - return reader_permit(*this, schema, std::move(op_name), {}, timeout, std::move(trace_ptr)); + return reader_permit(*this, std::move(schema), std::move(op_name), {}, timeout, std::move(trace_ptr)); } -future<> reader_concurrency_semaphore::with_permit(const schema* const schema, const char* const op_name, size_t memory, +future<> reader_concurrency_semaphore::with_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); + auto permit = reader_permit(*this, std::move(schema), std::string_view(op_name), {1, static_cast(memory)}, timeout, std::move(trace_ptr)); permit->aux_data().func = std::move(func); permit->aux_data().permit_keepalive = permit; return do_wait_admission(*permit); diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 752e8623a5..bdfded4587 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -395,8 +395,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - future obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); - future obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + future obtain_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + future obtain_permit(schema_ptr schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); /// Make a tracking only permit /// @@ -411,8 +411,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); - reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + reader_permit make_tracking_only_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); + reader_permit make_tracking_only_permit(schema_ptr schema, sstring&& op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); /// Run the function through the semaphore's execution stage with an admitted permit /// @@ -433,7 +433,7 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - future<> with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func); + future<> with_permit(schema_ptr schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr, read_func func); /// Run the function through the semaphore's execution stage with a pre-admitted permit /// diff --git a/reader_permit.hh b/reader_permit.hh index 0b633c746d..c96bd1f04a 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -104,9 +104,9 @@ private: private: reader_permit() = default; reader_permit(shared_ptr); - explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, + explicit reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, std::string_view op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); - explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, + explicit reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr); reader_permit::impl& operator*() { return *_impl; } @@ -143,7 +143,7 @@ public: reader_concurrency_semaphore& semaphore(); - const ::schema* get_schema() const; + const schema_ptr& get_schema() const; std::string_view get_op_name() const; state get_state() const; diff --git a/replica/database.cc b/replica/database.cc index 4b6524c832..f928c7a2af 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1470,7 +1470,7 @@ public: } virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { auto& cf = _db.local().find_column_family(_table_id); - return semaphore().obtain_permit(schema.get(), description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); + return semaphore().obtain_permit(schema, description, cf.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); } }; @@ -1571,7 +1571,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti querier_opt->permit().set_trace_state(trace_state); f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func)); } else { - f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); + f = co_await coroutine::as_future(semaphore.with_permit(s, "data-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); } if (!f.failed()) { @@ -1638,7 +1638,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh querier_opt->permit().set_trace_state(trace_state); f = co_await coroutine::as_future(semaphore.with_ready_permit(querier_opt->permit(), read_func)); } else { - f = co_await coroutine::as_future(semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); + f = co_await coroutine::as_future(semaphore.with_permit(s, "mutation-query", cf.estimate_read_memory_cost(), timeout, trace_state, read_func)); } if (!f.failed()) { @@ -1688,7 +1688,7 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { } future database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { - return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); + return get_reader_concurrency_semaphore().obtain_permit(tbl.schema(), op_name, tbl.estimate_read_memory_cost(), timeout, std::move(trace_ptr)); } future database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) { @@ -1758,7 +1758,7 @@ future database::do_apply_counter_update(column_family& cf, const froz // counter state for each modified cell... tracing::trace(trace_state, "Reading counter values from the CF"); - auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout, trace_state); + auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema, "counter-read-before-write", timeout, trace_state); return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state) .then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) { // ...now, that we got existing state of all affected counter diff --git a/replica/table.cc b/replica/table.cc index 09b062233c..d88b28895b 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1136,7 +1136,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptrmake_flush_reader( old->schema(), - compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout, {}))); + compaction_concurrency_semaphore().make_tracking_only_permit(old->schema(), "try_flush_memtable_to_sstable()", db::no_timeout, {}))); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush // controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to @@ -2612,7 +2612,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst) { std::exception_ptr ex; try { - auto permit = semaphore.make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(mt.schema(), "mt_to_sst", db::no_timeout, {}); auto reader = mt.make_flush_reader(mt.schema(), std::move(permit)); co_await write_memtable_to_sstable(std::move(reader), mt, std::move(sst), mt.partition_count(), monitor, cfg); } catch (...) { @@ -2925,7 +2925,7 @@ future table::do_push_view_replica_updates(shared_ptr table::do_push_view_replica_updates(shared_ptrgenerate_and_propagate_view_updates(gen, base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now); tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); @@ -3057,7 +3057,7 @@ public: return _cg._compaction_strategy_state; } reader_permit make_compaction_reader_permit() const override { - return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout, {}); + return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "compaction", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _t.get_sstables_manager(); diff --git a/scylla-gdb.py b/scylla-gdb.py index d1bf15e6da..677d72b5c5 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -5396,7 +5396,12 @@ class scylla_read_stats(gdb.Command): total = permit_stats() for permit in intrusive_list(permit_list): - schema = permit['_schema'] + try: + schema = permit['_schema']['_p'] + except: + # schema is already a raw pointer in older versions + schema = permit['_schema'] + if schema: raw_schema = schema.dereference()['_raw'] schema_name = "{}.{}".format(str(raw_schema['_ks_name']).replace('"', ''), str(raw_schema['_cf_name']).replace('"', '')) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 2fed9107a8..de6bb81f02 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1222,7 +1222,7 @@ future<> sstable::load_first_and_last_position_in_partition() { } auto& sem = _manager.sstable_metadata_concurrency_sem(); - reader_permit permit = co_await sem.obtain_permit(&*_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {}); + reader_permit permit = co_await sem.obtain_permit(_schema, "sstable::load_first_and_last_position_range", sstable_buffer_size, db::no_timeout, {}); auto first_pos_opt = co_await find_first_position_in_partition(permit, get_first_decorated_key(), false); auto last_pos_opt = co_await find_first_position_in_partition(permit, get_last_decorated_key(), true); @@ -1859,7 +1859,7 @@ future<> sstable::generate_summary() { auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.config().sstable_summary_ratio()); auto ctx = make_lw_shared>( - *this, sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes, + *this, sem.make_tracking_only_permit(_schema, "generate-summary", db::no_timeout, {}), s, trust_promoted_index::yes, make_file_input_stream(index_file, 0, index_size, std::move(options)), 0, index_size, (_version >= sstable_version_types::mc ? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header())) @@ -2746,7 +2746,7 @@ future sstable::has_partition_key(const utils::hashed_key& hk, const dht:: auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()", reader_concurrency_semaphore::register_metrics::no); try { - auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout, {})); + auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema, s->get_filename(), db::no_timeout, {})); present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk); } catch (...) { ex = std::current_exception(); diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index de4d274bf8..812f14198b 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -953,7 +953,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){ { std::vector readers; readers.reserve(memtables.size()); - auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); + auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}); for (auto mt : memtables) { readers.push_back(mt->make_flat_reader(s, permit)); } diff --git a/test/boost/continuous_data_consumer_test.cc b/test/boost/continuous_data_consumer_test.cc index 9b32804ccb..a95034c181 100644 --- a/test/boost/continuous_data_consumer_test.cc +++ b/test/boost/continuous_data_consumer_test.cc @@ -13,6 +13,7 @@ #include "utils/buffer_input_stream.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/random_utils.hh" +#include "schema/schema.hh" #include "sstables/processing_result_generator.hh" #include diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index d82740b6d1..bdb2ef6401 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -1346,7 +1346,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) { auto q = query::querier( tbl.as_mutation_source(), tbl.schema(), - database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), + database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}), query::full_partition_range, s->full_slice(), nullptr); diff --git a/test/boost/hashers_test.cc b/test/boost/hashers_test.cc index 9b451e06c5..6d3e43b846 100644 --- a/test/boost/hashers_test.cc +++ b/test/boost/hashers_test.cc @@ -66,7 +66,7 @@ SEASTAR_THREAD_TEST_CASE(mutation_fragment_sanity_check) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__, reader_concurrency_semaphore::register_metrics::no); auto stop_semaphore = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), "test", db::no_timeout, {}); gc_clock::time_point ts(gc_clock::duration(1234567890000)); auto check_hash = [&] (const mutation_fragment& mf, uint64_t expected) { diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc index eb59d48214..c2730856c1 100644 --- a/test/boost/mutation_fragment_test.cc +++ b/test/boost/mutation_fragment_test.cc @@ -357,7 +357,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = sem.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); const auto available_res = sem.available_resources(); const sstring val(1024, 'a'); @@ -427,7 +427,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {}); + auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {}); auto expect = [&] (bool expect_valid, const char* desc, unsigned at, auto&& first_mf, auto&&... mf) { std::vector mfs; @@ -618,7 +618,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(ss.schema().get(), get_name(), db::no_timeout, {}); + auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {}); mutation_fragment_stream_validator validator(*ss.schema()); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 3646bdc71b..edeee58a9a 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2942,7 +2942,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(4); std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3220,7 +3220,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(6); boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3270,7 +3270,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { @@ -3497,7 +3497,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_non_monotonic_positions) { auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkey = s.make_pkey(); const auto prange = dht::partition_range::make_open_ended_both_sides(); @@ -3555,7 +3555,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_p auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { @@ -3653,7 +3653,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) { auto stop_sem = deferred_stop(semaphore); simple_schema s; auto schema = s.schema(); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto pk = s.make_pkey(); const auto prange = dht::partition_range::make_open_ended_both_sides(); diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 1c9273c1f4..a3c9c100e9 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -107,7 +107,7 @@ private: Querier make_querier(const dht::partition_range& range) { return Querier(_mutation_source, _s.schema(), - _sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout, {}), + _sem.make_tracking_only_permit(_s.schema(), "make-querier", db::no_timeout, {}), range, _s.schema()->full_slice(), nullptr); @@ -688,7 +688,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 0); // Drain all resources of the semaphore - auto sponge_permit = semaphore.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {}); + auto sponge_permit = semaphore.make_tracking_only_permit(s, "sponge", db::no_timeout, {}); auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto cmd2 = query::read_command(s->id(), @@ -737,13 +737,13 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) { test_querier_cache t; auto& sem = t.get_semaphore(); - auto permit1 = sem.obtain_permit(t.get_schema().get(), get_name(), 0, db::no_timeout, {}).get0(); + auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 0, db::no_timeout, {}).get0(); auto resources = permit1.consume_resources(reader_resources(sem.available_resources().count, 0)); BOOST_CHECK_EQUAL(sem.available_resources().count, 0); - auto fut = sem.obtain_permit(t.get_schema().get(), get_name(), 1, db::no_timeout, {}); + auto fut = sem.obtain_permit(t.get_schema(), get_name(), 1, db::no_timeout, {}); BOOST_CHECK_EQUAL(sem.get_stats().waiters, 1); @@ -769,8 +769,8 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) { .with_column("v", int32_type) .build(); - auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {}))); - auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout, {}))); + auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader_v2(schema, sem1.make_tracking_only_permit(schema, get_name(), db::no_timeout, {}))); + auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader_v2(schema, sem2.make_tracking_only_permit(schema, get_name(), db::no_timeout, {}))); // Sanity check that lookup still works with empty handle. BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{})); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 66660d96af..6aba3ab840 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -41,7 +41,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) auto clear_permits = defer([&permits] { permits.clear(); }); for (int i = 0; i < 10; ++i) { - permits.emplace_back(semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})); + permits.emplace_back(semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {})); handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permits.back()))); } @@ -59,7 +59,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) handles.clear(); for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {})))); } BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); })); @@ -77,14 +77,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Not admitted, active { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto units2 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Not admitted, inactive { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto units2 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); @@ -94,14 +94,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Admitted, active { - auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0(); + auto permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get0(); auto units1 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Admitted, inactive { - auto permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get0(); + auto permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get0(); auto units1 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); @@ -115,7 +115,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_clos reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no); auto stop_sem = deferred_stop(semaphore); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); { auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); // The handle is destroyed here, triggering the destrution of the inactive read. @@ -136,7 +136,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves auto stop_sem = deferred_stop(semaphore); - reader_permit_opt permit = semaphore.obtain_permit(s.schema().get(), get_name(), 1024, db::no_timeout, {}).get(); + reader_permit_opt permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get(); BOOST_REQUIRE_EQUAL(permit->consumed_resources(), base_resources); std::optional residue_units; @@ -152,7 +152,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources - permit->consumed_resources()); if (i % 2) { - auto sponge_permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {}); + auto sponge_permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {}); auto consumed_resources = sponge_permit.consume_resources(semaphore.available_resources()); auto fut = make_ready_future<>(); @@ -267,9 +267,9 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { } future<> obtain_permit() { if (_memory_only) { - _permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout, {}); + _permit = _semaphore.make_tracking_only_permit(_schema, "reader_m", db::no_timeout, {}); } else { - _permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {}); + _permit = co_await _semaphore.obtain_permit(_schema, fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout, {}); } _units = _permit->consume_memory(tests::random::get_int(128, 1024)); } @@ -564,7 +564,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) { for (auto& schema : schemas) { const auto nr_permits = tests::random::get_int(2, 32); for (unsigned i = 0; i < nr_permits; ++i) { - auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout, {}); + auto permit = semaphore.make_tracking_only_permit(schema, op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout, {}); if (tests::random::get_int(0, 4)) { auto units = permit.consume_resources(reader_resources(tests::random::get_int(0, 1), tests::random::get_int(1024, 16 * 1024 * 1024))); permits.push_back(std::pair(std::move(permit), std::move(units))); @@ -614,7 +614,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { simple_schema s; - const auto schema_ptr = s.schema().get(); + const auto schema = s.schema(); const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024}; reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); auto stop_sem = deferred_stop(semaphore); @@ -624,7 +624,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { testlog.trace("Running admission scenario {}, with exepcted_can_admit={}", description, expected_can_admit); const auto stats_before = semaphore.get_stats(); - auto admit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}); + auto admit_fut = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}); admit_fut.wait(); const bool can_admit = !admit_fut.failed(); if (can_admit) { @@ -651,13 +651,13 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // resources and waitlist { - reader_permit_opt permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + reader_permit_opt permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); const auto stats_before = semaphore.get_stats(); - auto enqueued_permit_fut = semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::no_timeout, {}); + auto enqueued_permit_fut = semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::no_timeout, {}); { const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE(!enqueued_permit_fut.available()); @@ -679,7 +679,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // need_cpu and awaits { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "!need_cpu"); { @@ -708,7 +708,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // forward progress -- readmission { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit)); BOOST_REQUIRE(semaphore.try_evict_one_inactive_read()); @@ -734,7 +734,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // inactive readers { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "!need_cpu"); { @@ -762,10 +762,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { // evicting inactive readers for admission { - auto permit1 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit1 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit1)); - auto permit2 = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit2 = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); auto irh2 = semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), permit2)); require_can_admit(true, "evictable reads"); @@ -780,7 +780,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { const auto stats_before = semaphore.get_stats(); - auto permit2_fut = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::no_timeout, {}); + auto permit2_fut = semaphore.obtain_permit(schema, get_name(), 1024, db::no_timeout, {}); const auto stats_after = semaphore.get_stats(); BOOST_REQUIRE_EQUAL(stats_after.reads_admitted, stats_before.reads_admitted); @@ -801,7 +801,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); + return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); }, [] (reader_permit_opt& permit1) { permit1 = {}; @@ -816,7 +816,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - return reader_permit_opt(semaphore.obtain_permit(schema_ptr, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); + return reader_permit_opt(semaphore.obtain_permit(schema, get_name(), 2 * 1024, db::timeout_clock::now(), {}).get()); }, [&] (reader_permit_opt& permit1) { return semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), *permit1)); @@ -830,7 +830,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, std::optional{permit}); }, [&] (std::pair>& permit_and_need_cpu_guard) { @@ -846,7 +846,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_admission) { { check_admitting_enqueued_read( [&] { - auto permit = semaphore.obtain_permit(schema_ptr, get_name(), 1024, db::timeout_clock::now(), {}).get(); + auto permit = semaphore.obtain_permit(schema, get_name(), 1024, db::timeout_clock::now(), {}).get(); require_can_admit(true, "enough resources"); return std::pair(permit, reader_permit::need_cpu_guard{permit}); }, [&] (std::pair& permit_and_need_cpu_guard) { @@ -954,7 +954,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ for (auto& s : schemas) { auto& handles = schema_handles[&s]; for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {})))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {})))); } } @@ -981,7 +981,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // Reproduces https://github.com/scylladb/scylladb/issues/11770 SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_when_all_is_awaits) { simple_schema ss; - const auto& s = *ss.schema(); + const auto& s = ss.schema(); const auto initial_resources = reader_concurrency_semaphore::resources{2, 32 * 1024}; reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory); @@ -1009,10 +1009,10 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ } }; - auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get(); + auto p1 = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get(); auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1)); - auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}).get(); + auto p2 = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get(); read rd2(p2); auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func()); @@ -1021,7 +1021,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_ // * 1 need_cpu (but not awaiting) read on the ready list // * 1 waiter // * no more count resources left - auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout, {}); + auto p3_fut = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}); BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 2); // (waiters includes _ready_list entries) BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_admission, 1); BOOST_REQUIRE_EQUAL(semaphore.get_stats().need_cpu_permits, 0); // permit looses need_cpu status while waiting for execution @@ -1337,7 +1337,7 @@ memory_limit_table create_memory_limit_table(cql_test_env& env, uint64_t target_ auto sst = tbl.make_sstable(); auto writer_cfg = sst_man.configure_writer("test"); sst->write_components( - make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s.get(), "test", db::no_timeout, {}), mut, s->full_slice()), + make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_tracking_only_permit(s, "test", db::no_timeout, {}), mut, s->full_slice()), 1, s, writer_cfg, @@ -1561,7 +1561,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_blessed_read_goes_ina simple_schema ss; auto s = ss.schema(); - auto permit = semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get(); + auto permit = semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get(); std::vector permit_res; @@ -1590,7 +1590,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re simple_schema ss; auto s = ss.schema(); - auto permit = reader_permit_opt(semaphore.obtain_permit(s.get(), get_name(), 1024, db::no_timeout, {}).get()); + auto permit = reader_permit_opt(semaphore.obtain_permit(s, get_name(), 1024, db::no_timeout, {}).get()); auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, *permit)); diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index c44404dd5d..0e97a827b1 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4697,7 +4697,7 @@ SEASTAR_THREAD_TEST_CASE(test_cache_reader_semaphore_oom_kill) { // Check different amounts of memory consumed before the read, so the OOM kill is triggered in different places. for (unsigned memory = 1; memory <= 512; memory *= 2) { semaphore.set_resources({1, memory}); - auto permit = semaphore.obtain_permit(s.schema().get(), "read", 0, db::no_timeout, {}).get(); + auto permit = semaphore.obtain_permit(s.schema(), "read", 0, db::no_timeout, {}).get(); auto create_reader_and_read_all = [&] { auto rd = cache.make_reader(s.schema(), permit, pr, &gc_state); auto close_rd = deferred_close(rd); diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 1546db6469..ce63c8d785 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -441,7 +441,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) { auto write_to_sstable = [&] (mutation m) { auto sst = t->make_streaming_staging_sstable(); sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer("test"); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -553,7 +553,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { auto sst = t->make_streaming_staging_sstable(); sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -563,7 +563,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { }).get0(); // consume all units except what is needed to admit a single reader. - auto sponge_permit = sem.make_tracking_only_permit(s.get(), "sponge", db::no_timeout, {}); + auto sponge_permit = sem.make_tracking_only_permit(s, "sponge", db::no_timeout, {}); auto resources = sponge_permit.consume_resources(sem.available_resources() - reader_resources{1, replica::new_reader_base_cost}); testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory); @@ -625,7 +625,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak auto sst = t->make_streaming_staging_sstable(); sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout, {}); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}); sst->write_components(make_flat_mutation_reader_from_mutations_v2(m.schema(), std::move(permit), m), 1ul, s, sst_cfg, {}).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -726,7 +726,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { void check(mutation mut) { // First we check that we would be able to create a reader, even // though the staging reader consumed all resources. - auto permit = _semaphore.obtain_permit(_schema.get(), "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0(); + auto permit = _semaphore.obtain_permit(_schema, "consumer_verifier", replica::new_reader_base_cost, db::timeout_clock::now(), {}).get0(); const size_t current_rows = rows_in_mut(mut); const auto total_rows = _partition_rows.at(mut.decorated_key()); @@ -835,7 +835,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { return less(a.decorated_key(), b.decorated_key()); }); - auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); + auto permit = sem.obtain_permit(schema, get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); auto mt = make_memtable(schema, muts); auto p = make_manually_paused_evictable_reader_v2( @@ -936,7 +936,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_random_mutati auto stop_sem = deferred_stop(sem); const abort_source as; auto mt = make_memtable(schema, {mut}); - auto permit = sem.obtain_permit(schema.get(), get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); + auto permit = sem.obtain_permit(schema, get_name(), replica::new_reader_base_cost, db::no_timeout, {}).get0(); auto p = make_manually_paused_evictable_reader_v2( mt->as_data_source(), schema, @@ -998,7 +998,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_empty_mutatio auto schema = ss.schema(); reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(schema.get(), "test", db::no_timeout, {}); + auto permit = sem.make_tracking_only_permit(schema, "test", db::no_timeout, {}); abort_source as; auto [staging_reader, staging_reader_handle] = make_manually_paused_evictable_reader_v2(make_empty_mutation_source(), schema, permit, query::full_partition_range, schema->full_slice(), {}, mutation_reader::forwarding::no); diff --git a/test/lib/reader_concurrency_semaphore.hh b/test/lib/reader_concurrency_semaphore.hh index 213a2b27a0..23648a6306 100644 --- a/test/lib/reader_concurrency_semaphore.hh +++ b/test/lib/reader_concurrency_semaphore.hh @@ -9,6 +9,7 @@ #pragma once #include "../../reader_concurrency_semaphore.hh" +#include "schema/schema.hh" namespace tests { diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 5841d7a625..ea87043692 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -99,7 +99,7 @@ public: return *_contexts[shard]->semaphore; } virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override { - return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout, std::move(trace_ptr)); + return semaphore().obtain_permit(schema, description, 128 * 1024, timeout, std::move(trace_ptr)); } }; diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 99c2ab7ff5..4ad8ef0cbd 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -194,7 +194,7 @@ public: tmpdir& tempdir() noexcept { return _impl->dir; } data_dictionary::storage_options get_storage_options() const noexcept { return _impl->storage; } - reader_permit make_reader_permit(const schema* const s, const char* n, db::timeout_clock::time_point timeout) { + reader_permit make_reader_permit(const schema_ptr &s, const char* n, db::timeout_clock::time_point timeout) { return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {}); } reader_permit make_reader_permit(db::timeout_clock::time_point timeout = db::no_timeout) { diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 3ebde78c89..856a134268 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -87,7 +87,7 @@ public: return _compaction_strategy_state; } reader_permit make_compaction_reader_permit() const override { - return table().compaction_concurrency_semaphore().make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {}); + return table().compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "table_for_tests::table_state", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _sstables_manager; diff --git a/test/manual/sstable_scan_footprint_test.cc b/test/manual/sstable_scan_footprint_test.cc index ec6da28765..2f54e9d72b 100644 --- a/test/manual/sstable_scan_footprint_test.cc +++ b/test/manual/sstable_scan_footprint_test.cc @@ -144,7 +144,7 @@ public: } }; -void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned reads, unsigned concurrency, std::function(unsigned)> read) { +void execute_reads(const schema_ptr& schema, reader_concurrency_semaphore& sem, unsigned reads, unsigned concurrency, std::function(unsigned)> read) { const reader_resources initial_res = sem.available_resources(); unsigned n = 0; gate g; @@ -175,7 +175,7 @@ void execute_reads(const schema& s, reader_concurrency_semaphore& sem, unsigned if (sem.get_stats().waiters) { testlog.trace("Waiting for queue to drain"); - sem.obtain_permit(&s, "drain", 1, db::no_timeout, {}).get(); + sem.obtain_permit(schema, "drain", 1, db::no_timeout, {}).get(); } } @@ -267,7 +267,7 @@ void test_main_thread(cql_test_env& env) { try { auto _ = sc.collect(); memory::set_heap_profiling_sampling_rate(100); - execute_reads(*s, sem, reads, read_concurrency, [&] (unsigned i) { + execute_reads(s, sem, reads, read_concurrency, [&] (unsigned i) { return env.execute_cql(format("select * from ks.test where pk = 0 and ck > {} limit 100;", tests::random::get_int(rows / 2))).discard_result(); }); diff --git a/test/perf/perf.cc b/test/perf/perf.cc index 76c02cb894..1e08519677 100644 --- a/test/perf/perf.cc +++ b/test/perf/perf.cc @@ -11,6 +11,7 @@ #include #include "seastarx.hh" #include "reader_concurrency_semaphore.hh" +#include "schema/schema.hh" uint64_t perf_mallocs() { diff --git a/test/unit/row_cache_stress_test.cc b/test/unit/row_cache_stress_test.cc index 7531f062be..a5a43c98fb 100644 --- a/test/unit/row_cache_stress_test.cc +++ b/test/unit/row_cache_stress_test.cc @@ -54,7 +54,7 @@ struct table { } reader_permit make_permit() { - return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout, {}); + return semaphore.make_tracking_only_permit(s.schema(), "test", db::no_timeout, {}); } future<> stop() noexcept { return semaphore.stop(); diff --git a/tools/schema_loader.cc b/tools/schema_loader.cc index 370eb5a182..81b4f50ac6 100644 --- a/tools/schema_loader.cc +++ b/tools/schema_loader.cc @@ -530,7 +530,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files sst_man, schema_tables_path / schema_table_table_dir[s], schema_factory, - rcs_sem.make_tracking_only_permit(s.get(), "schema_mutation", db::no_timeout, {}), + rcs_sem.make_tracking_only_permit(s, "schema_mutation", db::no_timeout, {}), keyspace, {table}); }; @@ -552,7 +552,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files sst_man, schema_tables_path / schema_table_table_dir[db::schema_tables::types()], db::schema_tables::types, - rcs_sem.make_tracking_only_permit(db::schema_tables::types().get(), "types_mutation", db::no_timeout, {}), + rcs_sem.make_tracking_only_permit(db::schema_tables::types(), "types_mutation", db::no_timeout, {}), keyspace, {}); if (types_mut) { diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index 530216885d..79910ecfd7 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -3058,7 +3058,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big- reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, app_name, reader_concurrency_semaphore::register_metrics::no); auto stop_semaphore = deferred_stop(rcs_sem); - const auto permit = rcs_sem.make_tracking_only_permit(schema.get(), app_name, db::no_timeout, {}); + const auto permit = rcs_sem.make_tracking_only_permit(schema, app_name, db::no_timeout, {}); try { operations_with_func.at(operation)(schema, permit, sstables, sst_man, app_config);