Merge ' reader_concurrency_semaphore: set_notify_handler(): disable timeout ' from Botond Dénes

`set_notify_handler()` is called after a querier was inserted into the querier cache. It has two purposes: set a callback for eviction and set a TTL for the cache entry. This latter was not disabling the pre-existing timeout of the permit (if any) and this would lead to premature eviction of the cache entry if the timeout was shorter than TTL (which his typical).
Disable the timeout before setting the TTL to prevent premature eviction.

Fixes: https://github.com/scylladb/scylladb/issues/22629

Backport required to all active releases, they are all affected.

Closes scylladb/scylladb#22701

* github.com:scylladb/scylladb:
  reader_concurrency_semaphore: set_notify_handler(): disable timeout
  reader_permit: mark check_abort() as const
This commit is contained in:
Avi Kivity
2025-02-08 20:05:03 +02:00
3 changed files with 87 additions and 18 deletions

View File

@@ -495,7 +495,7 @@ public:
_trace_ptr = std::move(trace_ptr);
}
void check_abort() {
void check_abort() const {
if (_ex) {
std::rethrow_exception(_ex);
}
@@ -644,7 +644,7 @@ void reader_permit::set_trace_state(tracing::trace_state_ptr trace_ptr) noexcept
_impl->set_trace_state(std::move(trace_ptr));
}
void reader_permit::check_abort() {
void reader_permit::check_abort() const {
return _impl->check_abort();
}
@@ -1165,6 +1165,7 @@ void reader_concurrency_semaphore::set_notify_handler(inactive_read_handle& irh,
auto& ir = *(*irh._permit)->aux_data().ir;
ir.notify_handler = std::move(notify_handler);
if (ttl_opt) {
irh._permit->set_timeout(db::no_timeout);
ir.ttl_timer.set_callback([this, permit = *irh._permit] () mutable {
evict(*permit, evict_reason::time);
});

View File

@@ -174,7 +174,7 @@ public:
// If the read was aborted, throw the exception the read was aborted with.
// Otherwise no-op.
void check_abort();
void check_abort() const;
query::max_result_size max_result_size() const;
void set_max_result_size(query::max_result_size);

View File

@@ -110,10 +110,10 @@ private:
}
template <typename Querier>
Querier make_querier(const dht::partition_range& range) {
Querier make_querier(const dht::partition_range& range, db::timeout_clock::time_point timeout) {
return Querier(_mutation_source,
_s.schema(),
_sem.make_tracking_only_permit(_s.schema(), "make-querier", db::no_timeout, {}),
_sem.make_tracking_only_permit(_s.schema(), "make-querier", timeout, {}),
range,
_s.schema()->full_slice(),
nullptr);
@@ -218,10 +218,10 @@ public:
template <typename Querier>
entry_info produce_first_page_and_save_querier(void(query::querier_cache::*insert_mem_ptr)(query_id, Querier&&, tracing::trace_state_ptr), unsigned key,
const dht::partition_range& range, const query::partition_slice& slice, uint64_t row_limit) {
const dht::partition_range& range, const query::partition_slice& slice, uint64_t row_limit, db::timeout_clock::time_point timeout = db::no_timeout) {
const auto cache_key = make_cache_key(key);
auto querier = make_querier<Querier>(range);
auto querier = make_querier<Querier>(range, timeout);
auto dk_ck = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits<uint32_t>::max(), gc_clock::now()).get();
auto&& dk = dk_ck.first;
auto&& ck = dk_ck.second;
@@ -290,27 +290,29 @@ public:
}
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range,
const query::partition_slice& slice, uint64_t row_limit = 5) {
return produce_first_page_and_save_querier<query::querier>(&query::querier_cache::insert_mutation_querier, key, range, slice, row_limit);
const query::partition_slice& slice, uint64_t row_limit = 5, db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_querier<query::querier>(&query::querier_cache::insert_mutation_querier, key, range, slice, row_limit, timeout);
}
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range, uint64_t row_limit = 5) {
return produce_first_page_and_save_mutation_querier(key, range, make_default_slice(), row_limit);
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range, uint64_t row_limit = 5,
db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_mutation_querier(key, range, make_default_slice(), row_limit, timeout);
}
// Singular overload
entry_info produce_first_page_and_save_mutation_querier(unsigned key, std::size_t i, uint64_t row_limit = 5) {
entry_info produce_first_page_and_save_mutation_querier(unsigned key, std::size_t i, uint64_t row_limit = 5,
db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_mutation_querier(key, make_singular_partition_range(i), _s.schema()->full_slice(), row_limit);
}
// Use the whole range
entry_info produce_first_page_and_save_mutation_querier(unsigned key) {
return produce_first_page_and_save_mutation_querier(key, make_default_partition_range(), _s.schema()->full_slice());
entry_info produce_first_page_and_save_mutation_querier(unsigned key, db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_mutation_querier(key, make_default_partition_range(), _s.schema()->full_slice(), 5, timeout);
}
// For tests testing just one insert-lookup.
entry_info produce_first_page_and_save_mutation_querier() {
return produce_first_page_and_save_mutation_querier(1);
entry_info produce_first_page_and_save_mutation_querier(db::timeout_clock::time_point timeout = db::no_timeout) {
return produce_first_page_and_save_mutation_querier(1, timeout);
}
test_querier_cache& assert_cache_lookup_data_querier(unsigned lookup_key,
@@ -337,9 +339,10 @@ public:
test_querier_cache& assert_cache_lookup_mutation_querier(unsigned lookup_key,
const schema& lookup_schema,
const dht::partition_range& lookup_range,
const query::partition_slice& lookup_slice) {
const query::partition_slice& lookup_slice,
db::timeout_clock::time_point timeout = db::no_timeout) {
auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, db::no_timeout);
auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, timeout);
if (querier_opt) {
querier_opt->close().get();
}
@@ -840,4 +843,69 @@ SEASTAR_THREAD_TEST_CASE(test_semaphore_mismatch) {
}
}
#if SEASTAR_DEBUG
static const std::chrono::seconds ttl_timeout_test_timeout = 4s;
#else
static const std::chrono::seconds ttl_timeout_test_timeout = 1s;
#endif
SEASTAR_THREAD_TEST_CASE(test_timeout_not_sticky_on_insert) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_mutation_querier(db::timeout_clock::now() + ttl_timeout_test_timeout);
sleep(ttl_timeout_test_timeout * 2).get();
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.no_misses()
.no_drops()
.no_evictions();
}
SEASTAR_THREAD_TEST_CASE(test_ttl_not_sticky_on_lookup) {
test_querier_cache t(ttl_timeout_test_timeout);
auto& sem = t.get_semaphore();
auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 1024, db::no_timeout, {}).get();
const auto entry = t.produce_first_page_and_save_mutation_querier();
const auto new_timeout = db::timeout_clock::now() + 900s;
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, new_timeout)
.no_misses()
.no_drops()
.no_evictions();
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
sleep(ttl_timeout_test_timeout * 2).get();
// check_abort() will throw if the permit timed out due to sticky TTL during the above sleep.
BOOST_REQUIRE_NO_THROW(entry.permit.check_abort());
}
SEASTAR_THREAD_TEST_CASE(test_timeout_is_applied_on_lookup) {
test_querier_cache t;
auto& sem = t.get_semaphore();
auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 1024, db::no_timeout, {}).get();
const auto entry = t.produce_first_page_and_save_mutation_querier();
const auto new_timeout = db::timeout_clock::now() + ttl_timeout_test_timeout;
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, new_timeout)
.no_misses()
.no_drops()
.no_evictions();
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
BOOST_REQUIRE_NO_THROW(entry.permit.check_abort());
sleep(ttl_timeout_test_timeout * 2).get();
BOOST_REQUIRE_THROW(entry.permit.check_abort(), seastar::named_semaphore_timed_out);
}
BOOST_AUTO_TEST_SUITE_END()