diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 4bb0e005d0..027d9a5f29 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -551,7 +551,7 @@ protected: : _cf(cf) , _sstable_creator(std::move(descriptor.creator)) , _schema(cf.schema()) - , _permit(_cf.compaction_concurrency_semaphore().make_tracking_only_permit(_cf.schema().get(), "compaction")) + , _permit(_cf.compaction_concurrency_semaphore().make_tracking_only_permit(_cf.schema().get(), "compaction", db::no_timeout)) , _sstables(std::move(descriptor.sstables)) , _max_sstable_size(descriptor.max_sstable_bytes) , _sstable_level(descriptor.level) @@ -1718,7 +1718,7 @@ static future scrub_sstables_validate_mode(sstables::compaction clogger.info("Scrubbing in validate mode {}", sstables_list_msg); - auto permit = cf.compaction_concurrency_semaphore().make_tracking_only_permit(schema.get(), "scrub:validate"); + auto permit = cf.compaction_concurrency_semaphore().make_tracking_only_permit(schema.get(), "scrub:validate", db::no_timeout); auto reader = sstables->make_local_shard_sstable_reader(schema, permit, query::full_partition_range, schema->full_slice(), descriptor.io_priority, tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no, default_read_monitor_generator()); diff --git a/database.cc b/database.cc index 5f64bfa3f3..07b6a43a3b 100644 --- a/database.cc +++ b/database.cc @@ -1649,7 +1649,7 @@ future database::do_apply_counter_update(column_family& cf, const froz // counter state for each modified cell... tracing::trace(trace_state, "Reading counter values from the CF"); - auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write"); + auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout); return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state, timeout) .then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) { // ...now, that we got existing state of all affected counter diff --git a/db/view/view.cc b/db/view/view.cc index 545376f965..d4e18b5243 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1387,7 +1387,7 @@ view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_di : _db(db) , _sys_dist_ks(sys_dist_ks) , _mnotifier(mn) - , _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder")) { + , _permit(_db.get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "view_builder", db::no_timeout)) { setup_metrics(); } diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 83a909b9cc..37f34cd3b1 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -220,10 +220,10 @@ class read_context : public reader_lifecycle_policy { public: read_context(distributed& db, schema_ptr s, const query::read_command& cmd, const dht::partition_range_vector& ranges, - tracing::trace_state_ptr trace_state) + tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) : _db(db) , _schema(std::move(s)) - , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query")) + , _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema.get(), "multishard-mutation-query", timeout)) , _cmd(cmd) , _ranges(ranges) , _trace_state(std::move(trace_state)) @@ -670,7 +670,7 @@ future do_query( tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, ResultBuilder&& result_builder) { - auto ctx = seastar::make_shared(db, s, cmd, ranges, trace_state); + auto ctx = seastar::make_shared(db, s, cmd, ranges, trace_state, timeout); co_await ctx->lookup_readers(); diff --git a/mutation_writer/multishard_writer.cc b/mutation_writer/multishard_writer.cc index b69ecc099a..bbac741e11 100644 --- a/mutation_writer/multishard_writer.cc +++ b/mutation_writer/multishard_writer.cc @@ -124,7 +124,7 @@ future<> multishard_writer::make_shard_writer(unsigned shard) { reader = make_foreign(std::make_unique(std::move(reader)))] () mutable { auto s = gs.get(); auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, "shard_writer"); - auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer"); + auto permit = semaphore->make_tracking_only_permit(s.get(), "multishard-writer", db::no_timeout); auto this_shard_reader = make_foreign_reader(s, std::move(permit), std::move(reader)); return make_foreign(std::make_unique(gs.get(), std::move(semaphore), std::move(this_shard_reader), consumer)); }).then([this, shard] (foreign_ptr> writer) { diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index 19810dac31..7f34efa639 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -92,6 +92,7 @@ class reader_permit::impl bool _marked_as_used = false; uint64_t _blocked_branches = 0; bool _marked_as_blocked = false; + db::timeout_clock::time_point _timeout; private: void on_permit_used() { @@ -129,20 +130,22 @@ private: public: struct value_tag {}; - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources) + impl(reader_concurrency_semaphore& semaphore, const schema* const schema, const std::string_view& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout) : _semaphore(semaphore) , _schema(schema) , _op_name_view(op_name) , _base_resources(base_resources) + , _timeout(timeout) { _semaphore.on_permit_created(*this); } - impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources) + impl(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, reader_resources base_resources, db::timeout_clock::time_point timeout) : _semaphore(semaphore) , _schema(schema) , _op_name(std::move(op_name)) , _op_name_view(_op_name) , _base_resources(base_resources) + , _timeout(timeout) { _semaphore.on_permit_created(*this); } @@ -301,6 +304,10 @@ public: } return _semaphore.do_wait_admission(shared_from_this(), timeout); } + + db::timeout_clock::time_point timeout() const noexcept { + return _timeout; + } }; static_assert(std::is_nothrow_copy_constructible_v); @@ -311,14 +318,14 @@ reader_permit::reader_permit(shared_ptr impl) : _impl(std::move(impl)) } reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, - reader_resources base_resources) - : _impl(::seastar::make_shared(semaphore, schema, op_name, base_resources)) + reader_resources base_resources, db::timeout_clock::time_point timeout) + : _impl(::seastar::make_shared(semaphore, schema, op_name, base_resources, timeout)) { } reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, - reader_resources base_resources) - : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name), base_resources)) + reader_resources base_resources, db::timeout_clock::time_point timeout) + : _impl(::seastar::make_shared(semaphore, schema, std::move(op_name), base_resources, timeout)) { } @@ -385,6 +392,10 @@ void reader_permit::mark_unblocked() noexcept { _impl->mark_unblocked(); } +db::timeout_clock::time_point reader_permit::timeout() const noexcept { + return _impl->timeout(); +} + std::ostream& operator<<(std::ostream& os, reader_permit::state s) { switch (s) { case reader_permit::state::waiting: @@ -881,7 +892,7 @@ void reader_concurrency_semaphore::on_permit_unblocked() noexcept { future reader_concurrency_semaphore::obtain_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout) { - auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}); + auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout); return do_wait_admission(permit, timeout).then([permit] () mutable { return std::move(permit); }); @@ -889,23 +900,23 @@ future reader_concurrency_semaphore::obtain_permit(const schema* future reader_concurrency_semaphore::obtain_permit(const schema* const schema, sstring&& op_name, size_t memory, db::timeout_clock::time_point timeout) { - auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast(memory)}); + auto permit = reader_permit(*this, schema, std::move(op_name), {1, static_cast(memory)}, timeout); return do_wait_admission(permit, timeout).then([permit] () mutable { return std::move(permit); }); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name) { - return reader_permit(*this, schema, std::string_view(op_name), {}); +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout) { + return reader_permit(*this, schema, std::string_view(op_name), {}, timeout); } -reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name) { - return reader_permit(*this, schema, std::move(op_name), {}); +reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout) { + return reader_permit(*this, schema, std::move(op_name), {}, timeout); } future<> reader_concurrency_semaphore::with_permit(const schema* const schema, const char* const op_name, size_t memory, db::timeout_clock::time_point timeout, read_func func) { - return do_wait_admission(reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}), timeout, std::move(func)); + return do_wait_admission(reader_permit(*this, schema, std::string_view(op_name), {1, static_cast(memory)}, timeout), timeout, std::move(func)); } future<> reader_concurrency_semaphore::with_ready_permit(reader_permit permit, read_func func) { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 42170806de..5685b3a45f 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -344,8 +344,8 @@ public: /// /// Some permits cannot be associated with any table, so passing nullptr as /// the schema parameter is allowed. - reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name); - reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name); + reader_permit make_tracking_only_permit(const schema* const schema, const char* const op_name, db::timeout_clock::time_point timeout); + reader_permit make_tracking_only_permit(const schema* const schema, sstring&& op_name, db::timeout_clock::time_point timeout); /// Run the function through the semaphore's execution stage with an admitted permit /// diff --git a/reader_permit.hh b/reader_permit.hh index c9a1cf7d45..9e14186c4f 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -112,9 +112,9 @@ private: reader_permit() = default; reader_permit(shared_ptr); explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, std::string_view op_name, - reader_resources base_resources); + reader_resources base_resources, db::timeout_clock::time_point timeout); explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name, - reader_resources base_resources); + reader_resources base_resources, db::timeout_clock::time_point timeout); void on_waiting(); void on_admission(); @@ -161,6 +161,8 @@ public: reader_resources base_resources() const; sstring description() const; + + db::timeout_clock::time_point timeout() const noexcept; }; using reader_permit_opt = optimized_optional; diff --git a/sstables/mx/writer.cc b/sstables/mx/writer.cc index 97ee6fb9f0..2ab1757aa1 100644 --- a/sstables/mx/writer.cc +++ b/sstables/mx/writer.cc @@ -808,7 +808,7 @@ public: // Initialize at the end of the constructor body, so we can delay making // the semaphore used until we know that no more exceptions can be thrown. - _range_tombstones.emplace(range_tombstone_stream(_schema, _semaphore.make_tracking_only_permit(&s, "mx-writer"))); + _range_tombstones.emplace(range_tombstone_stream(_schema, _semaphore.make_tracking_only_permit(&s, "mx-writer", db::no_timeout))); } ~writer(); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 040e1f824f..c46f2bf249 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1768,7 +1768,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) { auto s = summary_generator(_schema->get_partitioner(), _components->summary, _manager.config().sstable_summary_ratio()); auto ctx = make_lw_shared>( - sem.make_tracking_only_permit(_schema.get(), "generate-summary"), s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size, + sem.make_tracking_only_permit(_schema.get(), "generate-summary", db::no_timeout), s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size, (_version >= sstable_version_types::mc ? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header())) : std::optional{})); @@ -2696,7 +2696,7 @@ future sstable::has_partition_key(const utils::hashed_key& hk, const dht:: std::exception_ptr ex; auto sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::has_partition_key()"); try { - auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename()), default_priority_class(), tracing::trace_state_ptr(), use_caching::yes); + auto lh_index_ptr = std::make_unique(s, sem.make_tracking_only_permit(_schema.get(), s->get_filename(), db::no_timeout), default_priority_class(), tracing::trace_state_ptr(), use_caching::yes); present = co_await lh_index_ptr->advance_lower_and_check_if_present(dk); } catch (...) { ex = std::current_exception(); diff --git a/table.cc b/table.cc index 4d055a8b14..e8fa94747a 100644 --- a/table.cc +++ b/table.cc @@ -601,7 +601,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ auto f = consumer(old->make_flush_reader( old->schema(), - compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()"), + compaction_concurrency_semaphore().make_tracking_only_permit(old->schema().get(), "try_flush_memtable_to_sstable()", db::no_timeout), service::get_local_memtable_flush_priority())); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush @@ -1911,7 +1911,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables:: std::make_unique(reader_concurrency_semaphore::no_limits{}, "write_memtable_to_sstable"), cfg, [&mt, sst] (auto& monitor, auto& semaphore, auto& cfg) { - return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst"), mt, std::move(sst), monitor, cfg) + return write_memtable_to_sstable(semaphore->make_tracking_only_permit(mt.schema().get(), "mt_to_sst", db::no_timeout), mt, std::move(sst), monitor, cfg) .finally([&semaphore] { return semaphore->stop(); }); @@ -2243,7 +2243,7 @@ future table::do_push_view_replica_updates(schema_ptr s auto cr_ranges = co_await db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views); if (cr_ranges.empty()) { tracing::trace(tr_state, "View updates do not require read-before-write"); - co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1"), std::move(views), std::move(m), { }, std::move(tr_state), now); + co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout), std::move(views), std::move(m), { }, std::move(tr_state), now); // In this case we are not doing a read-before-write, just a // write, so no lock is needed. co_return row_locker::lock_holder(); @@ -2268,7 +2268,7 @@ future table::do_push_view_replica_updates(schema_ptr s co_await utils::get_local_injector().inject("table_push_view_replica_updates_timeout", timeout); auto lock = co_await std::move(lockf); auto pk = dht::partition_range::make_singular(m.decorated_key()); - auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2"); + auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout); auto reader = source.make_reader(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now); tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 14688ee69e..d7602edecc 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -647,7 +647,7 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){ } { - auto rd = mt.make_flat_reader(s, db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test")); + auto rd = mt.make_flat_reader(s, db.get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout)); auto close_rd = deferred_close(rd); auto mopt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0(); BOOST_REQUIRE(mopt); diff --git a/test/boost/hashers_test.cc b/test/boost/hashers_test.cc index 8889caa05f..1a9735c8ff 100644 --- a/test/boost/hashers_test.cc +++ b/test/boost/hashers_test.cc @@ -19,6 +19,7 @@ * along with Scylla. If not, see . */ +#include "db/timeout_clock.hh" #define BOOST_TEST_MODULE core #include @@ -74,7 +75,7 @@ BOOST_AUTO_TEST_CASE(bytes_view_hasher_sanity_check) { BOOST_AUTO_TEST_CASE(mutation_fragment_sanity_check) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test"); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout); gc_clock::time_point ts(gc_clock::duration(1234567890000)); auto check_hash = [&] (const mutation_fragment& mf, uint64_t expected) { diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc index da19735ba5..992c72355d 100644 --- a/test/boost/mutation_fragment_test.cc +++ b/test/boost/mutation_fragment_test.cc @@ -427,7 +427,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) { reader_concurrency_semaphore sem(1, 100, get_name()); auto stop_sem = deferred_stop(sem); - auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name()); + auto permit = sem.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); const auto available_res = sem.available_resources(); const sstring val(1024, 'a'); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 3023f3d5f0..35909184ab 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2919,7 +2919,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name()); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); const auto pkey = s.make_pkey(); size_t max_buffer_size = 512; @@ -3013,7 +3013,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) { reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name()); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); auto pkeys = s.make_pkeys(4); std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3371,7 +3371,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name()); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); auto pkeys = s.make_pkeys(6); boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); @@ -3422,7 +3422,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) { reader_concurrency_semaphore semaphore(1, 0, get_name()); auto stop_sem = deferred_stop(semaphore); simple_schema s; - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name()); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); auto pkeys = s.make_pkeys(2); std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) { diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 1d26c0db12..7be487d00c 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -116,7 +116,7 @@ private: Querier make_querier(const dht::partition_range& range) { return Querier(_mutation_source, _s.schema(), - _sem.make_tracking_only_permit(_s.schema().get(), "make-querier"), + _sem.make_tracking_only_permit(_s.schema().get(), "make-querier", db::no_timeout), range, _s.schema()->full_slice(), service::get_local_sstable_query_read_priority(), @@ -789,8 +789,8 @@ SEASTAR_THREAD_TEST_CASE(test_unique_inactive_read_handle) { .with_column("v", int32_type) .build(); - auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader(schema, sem1.make_tracking_only_permit(schema.get(), get_name()))); - auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader(schema, sem2.make_tracking_only_permit(schema.get(), get_name()))); + auto sem1_h1 = sem1.register_inactive_read(make_empty_flat_reader(schema, sem1.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout))); + auto sem2_h1 = sem2.register_inactive_read(make_empty_flat_reader(schema, sem2.make_tracking_only_permit(schema.get(), get_name(), db::no_timeout))); // Sanity check that lookup still works with empty handle. BOOST_REQUIRE(!sem1.unregister_inactive_read(reader_concurrency_semaphore::inactive_read_handle{})); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index 2d6ab2226d..d5357cb2fb 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -38,7 +38,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) auto stop_sem = deferred_stop(semaphore); for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name())))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout)))); } BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); })); @@ -50,7 +50,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_clear_inactive_reads) handles.clear(); for (int i = 0; i < 10; ++i) { - handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name())))); + handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout)))); } BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); })); @@ -68,14 +68,14 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele // Not admitted, active { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name()); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); auto units2 = permit.consume_memory(1024); } BOOST_REQUIRE_EQUAL(semaphore.available_resources(), initial_resources); // Not admitted, inactive { - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name()); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); auto units2 = permit.consume_memory(1024); auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit)); @@ -106,7 +106,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_clos reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); auto stop_sem = deferred_stop(semaphore); - auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name()); + auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout); { auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit)); // The handle is destroyed here, triggering the destrution of the inactive read. @@ -251,7 +251,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) { } future<> obtain_permit() { if (_memory_only) { - _permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m"); + _permit = _semaphore.make_tracking_only_permit(_schema.get(), "reader_m", db::no_timeout); } else { _permit = co_await _semaphore.obtain_permit(_schema.get(), fmt::format("reader_{}", _evictable ? 'e' : 'a'), 1024, db::no_timeout); } @@ -548,7 +548,7 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_dump_reader_diganostics) { for (auto& schema : schemas) { const auto nr_permits = tests::random::get_int(2, 32); for (unsigned i = 0; i < nr_permits; ++i) { - auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int(0, nr_ops - 1))); + auto permit = semaphore.make_tracking_only_permit(schema.get(), op_names.at(tests::random::get_int(0, nr_ops - 1)), db::no_timeout); if (tests::random::get_int(0, 4)) { auto units = permit.consume_resources(reader_resources(tests::random::get_int(0, 1), tests::random::get_int(1024, 16 * 1024 * 1024))); permits.push_back(std::pair(std::move(permit), std::move(units))); @@ -581,7 +581,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits BOOST_TEST_MESSAGE("1 permit"); { auto semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, get_name()); - auto permit = std::make_unique(semaphore->make_tracking_only_permit(nullptr, "permit1")); + auto permit = std::make_unique(semaphore->make_tracking_only_permit(nullptr, "permit1", db::no_timeout)); // Test will fail via use-after-free auto f = semaphore->stop().then([semaphore = std::move(semaphore)] { }); diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 849212c1ae..13f92fa5b7 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -440,7 +440,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) { sstables::sstable_writer_config sst_cfg = e.db().local().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test"); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); sst->write_components(flat_mutation_reader_from_mutations(std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -550,7 +550,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test"); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); sst->write_components(flat_mutation_reader_from_mutations(std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); @@ -627,7 +627,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_register_semaphore_unit_leak sstables::sstable_writer_config sst_cfg = e.local_db().get_user_sstables_manager().configure_writer("test"); auto& pc = service::get_local_streaming_priority(); - auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test"); + auto permit = e.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout); sst->write_components(flat_mutation_reader_from_mutations(std::move(permit), {m}), 1ul, s, sst_cfg, {}, pc).get(); sst->open_data().get(); t->add_sstable_and_update_cache(sst).get(); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 92d6b998c8..2f03df6b8c 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -340,7 +340,7 @@ public: table_name = std::move(table_name)] (database& db) mutable { auto& cf = db.find_column_family(ks_name, table_name); auto schema = cf.schema(); - auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()"); + auto permit = db.get_reader_concurrency_semaphore().make_tracking_only_permit(schema.get(), "require_column_has_value()", db::no_timeout); return cf.find_partition_slow(schema, permit, pkey) .then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) { assert(p != nullptr); @@ -808,7 +808,7 @@ future<> do_with_cql_env_thread(std::function func, cql_tes } reader_permit make_reader_permit(cql_test_env& env) { - return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test"); + return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test", db::no_timeout); } namespace debug { diff --git a/test/lib/reader_concurrency_semaphore.hh b/test/lib/reader_concurrency_semaphore.hh index 76e198634c..f483bfddfa 100644 --- a/test/lib/reader_concurrency_semaphore.hh +++ b/test/lib/reader_concurrency_semaphore.hh @@ -39,7 +39,7 @@ public: } reader_concurrency_semaphore& semaphore() { return *_semaphore; }; - reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test"); } + reader_permit make_permit() { return _semaphore->make_tracking_only_permit(nullptr, "test", db::no_timeout); } }; } // namespace tests diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 9b8d11f610..b7750d27ee 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -82,7 +82,12 @@ public: test_env_sstables_manager& manager() { return *_mgr; } reader_concurrency_semaphore& semaphore() { return *_semaphore; } - reader_permit make_reader_permit(const schema* const s = nullptr, const char* n = "test") { return _semaphore->make_tracking_only_permit(s, n); } + reader_permit make_reader_permit(const schema* const s, const char* n, db::timeout_clock::time_point timeout) { + return _semaphore->make_tracking_only_permit(s, n, timeout); + } + reader_permit make_reader_permit(db::timeout_clock::time_point timeout = db::no_timeout) { + return _semaphore->make_tracking_only_permit(nullptr, "test", timeout); + } future<> working_sst(schema_ptr schema, sstring dir, unsigned long generation) { return reusable_sst(std::move(schema), dir, generation).then([] (auto ptr) { return make_ready_future<>(); }); diff --git a/test/perf/perf.cc b/test/perf/perf.cc index 21e29077cc..b970042403 100644 --- a/test/perf/perf.cc +++ b/test/perf/perf.cc @@ -79,7 +79,7 @@ reader_concurrency_semaphore_wrapper::~reader_concurrency_semaphore_wrapper() { } reader_permit reader_concurrency_semaphore_wrapper::make_permit() { - return _semaphore->make_tracking_only_permit(nullptr, "perf"); + return _semaphore->make_tracking_only_permit(nullptr, "perf", db::no_timeout); } } // namespace perf diff --git a/test/unit/row_cache_stress_test.cc b/test/unit/row_cache_stress_test.cc index eff83f68ff..ae43bde86c 100644 --- a/test/unit/row_cache_stress_test.cc +++ b/test/unit/row_cache_stress_test.cc @@ -67,7 +67,7 @@ struct table { } reader_permit make_permit() { - return semaphore.make_tracking_only_permit(s.schema().get(), "test"); + return semaphore.make_tracking_only_permit(s.schema().get(), "test", db::no_timeout); } future<> stop() noexcept { return semaphore.stop(); diff --git a/tools/scylla-sstable-index.cc b/tools/scylla-sstable-index.cc index a71bf16743..49e4857f0c 100644 --- a/tools/scylla-sstable-index.cc +++ b/tools/scylla-sstable-index.cc @@ -179,7 +179,7 @@ Note: UDT is not supported for now. auto stop_semaphore = deferred_stop(rcs_sem); { - sstables::index_reader idx_reader(sst, rcs_sem.make_tracking_only_permit(primary_key_schema.get(), "idx"), default_priority_class(), {}, + sstables::index_reader idx_reader(sst, rcs_sem.make_tracking_only_permit(primary_key_schema.get(), "idx", db::no_timeout), default_priority_class(), {}, sstables::use_caching::yes); list_partitions(*primary_key_schema, idx_reader);