reader_concurrency_semaphore: set_notify_handler(): disable timeout
set_notify_handler() is called after a querier was inserted into the querier cache. It has two purposes: set a callback for eviction and set a TTL for the cache entry. This latter was not disabling the pre-existing timeout of the permit (if any) and this would lead to premature eviction of the cache entry if the timeout was shorter than TTL (which his typical). Disable the timeout before setting the TTL to prevent premature eviction. Fixes: #scylladb/scylladb#22629
This commit is contained in:
@@ -1165,6 +1165,7 @@ void reader_concurrency_semaphore::set_notify_handler(inactive_read_handle& irh,
|
|||||||
auto& ir = *(*irh._permit)->aux_data().ir;
|
auto& ir = *(*irh._permit)->aux_data().ir;
|
||||||
ir.notify_handler = std::move(notify_handler);
|
ir.notify_handler = std::move(notify_handler);
|
||||||
if (ttl_opt) {
|
if (ttl_opt) {
|
||||||
|
irh._permit->set_timeout(db::no_timeout);
|
||||||
ir.ttl_timer.set_callback([this, permit = *irh._permit] () mutable {
|
ir.ttl_timer.set_callback([this, permit = *irh._permit] () mutable {
|
||||||
evict(*permit, evict_reason::time);
|
evict(*permit, evict_reason::time);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -110,10 +110,10 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename Querier>
|
template <typename Querier>
|
||||||
Querier make_querier(const dht::partition_range& range) {
|
Querier make_querier(const dht::partition_range& range, db::timeout_clock::time_point timeout) {
|
||||||
return Querier(_mutation_source,
|
return Querier(_mutation_source,
|
||||||
_s.schema(),
|
_s.schema(),
|
||||||
_sem.make_tracking_only_permit(_s.schema(), "make-querier", db::no_timeout, {}),
|
_sem.make_tracking_only_permit(_s.schema(), "make-querier", timeout, {}),
|
||||||
range,
|
range,
|
||||||
_s.schema()->full_slice(),
|
_s.schema()->full_slice(),
|
||||||
nullptr);
|
nullptr);
|
||||||
@@ -218,10 +218,10 @@ public:
|
|||||||
|
|
||||||
template <typename Querier>
|
template <typename Querier>
|
||||||
entry_info produce_first_page_and_save_querier(void(query::querier_cache::*insert_mem_ptr)(query_id, Querier&&, tracing::trace_state_ptr), unsigned key,
|
entry_info produce_first_page_and_save_querier(void(query::querier_cache::*insert_mem_ptr)(query_id, Querier&&, tracing::trace_state_ptr), unsigned key,
|
||||||
const dht::partition_range& range, const query::partition_slice& slice, uint64_t row_limit) {
|
const dht::partition_range& range, const query::partition_slice& slice, uint64_t row_limit, db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||||
const auto cache_key = make_cache_key(key);
|
const auto cache_key = make_cache_key(key);
|
||||||
|
|
||||||
auto querier = make_querier<Querier>(range);
|
auto querier = make_querier<Querier>(range, timeout);
|
||||||
auto dk_ck = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits<uint32_t>::max(), gc_clock::now()).get();
|
auto dk_ck = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits<uint32_t>::max(), gc_clock::now()).get();
|
||||||
auto&& dk = dk_ck.first;
|
auto&& dk = dk_ck.first;
|
||||||
auto&& ck = dk_ck.second;
|
auto&& ck = dk_ck.second;
|
||||||
@@ -290,27 +290,29 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range,
|
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range,
|
||||||
const query::partition_slice& slice, uint64_t row_limit = 5) {
|
const query::partition_slice& slice, uint64_t row_limit = 5, db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||||
return produce_first_page_and_save_querier<query::querier>(&query::querier_cache::insert_mutation_querier, key, range, slice, row_limit);
|
return produce_first_page_and_save_querier<query::querier>(&query::querier_cache::insert_mutation_querier, key, range, slice, row_limit, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range, uint64_t row_limit = 5) {
|
entry_info produce_first_page_and_save_mutation_querier(unsigned key, const dht::partition_range& range, uint64_t row_limit = 5,
|
||||||
return produce_first_page_and_save_mutation_querier(key, range, make_default_slice(), row_limit);
|
db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||||
|
return produce_first_page_and_save_mutation_querier(key, range, make_default_slice(), row_limit, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Singular overload
|
// Singular overload
|
||||||
entry_info produce_first_page_and_save_mutation_querier(unsigned key, std::size_t i, uint64_t row_limit = 5) {
|
entry_info produce_first_page_and_save_mutation_querier(unsigned key, std::size_t i, uint64_t row_limit = 5,
|
||||||
|
db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||||
return produce_first_page_and_save_mutation_querier(key, make_singular_partition_range(i), _s.schema()->full_slice(), row_limit);
|
return produce_first_page_and_save_mutation_querier(key, make_singular_partition_range(i), _s.schema()->full_slice(), row_limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use the whole range
|
// Use the whole range
|
||||||
entry_info produce_first_page_and_save_mutation_querier(unsigned key) {
|
entry_info produce_first_page_and_save_mutation_querier(unsigned key, db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||||
return produce_first_page_and_save_mutation_querier(key, make_default_partition_range(), _s.schema()->full_slice());
|
return produce_first_page_and_save_mutation_querier(key, make_default_partition_range(), _s.schema()->full_slice(), 5, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
// For tests testing just one insert-lookup.
|
// For tests testing just one insert-lookup.
|
||||||
entry_info produce_first_page_and_save_mutation_querier() {
|
entry_info produce_first_page_and_save_mutation_querier(db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||||
return produce_first_page_and_save_mutation_querier(1);
|
return produce_first_page_and_save_mutation_querier(1, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
test_querier_cache& assert_cache_lookup_data_querier(unsigned lookup_key,
|
test_querier_cache& assert_cache_lookup_data_querier(unsigned lookup_key,
|
||||||
@@ -337,9 +339,10 @@ public:
|
|||||||
test_querier_cache& assert_cache_lookup_mutation_querier(unsigned lookup_key,
|
test_querier_cache& assert_cache_lookup_mutation_querier(unsigned lookup_key,
|
||||||
const schema& lookup_schema,
|
const schema& lookup_schema,
|
||||||
const dht::partition_range& lookup_range,
|
const dht::partition_range& lookup_range,
|
||||||
const query::partition_slice& lookup_slice) {
|
const query::partition_slice& lookup_slice,
|
||||||
|
db::timeout_clock::time_point timeout = db::no_timeout) {
|
||||||
|
|
||||||
auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, db::no_timeout);
|
auto querier_opt = _cache.lookup_mutation_querier(make_cache_key(lookup_key), lookup_schema, lookup_range, lookup_slice, get_semaphore(), nullptr, timeout);
|
||||||
if (querier_opt) {
|
if (querier_opt) {
|
||||||
querier_opt->close().get();
|
querier_opt->close().get();
|
||||||
}
|
}
|
||||||
@@ -840,4 +843,69 @@ SEASTAR_THREAD_TEST_CASE(test_semaphore_mismatch) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if SEASTAR_DEBUG
|
||||||
|
static const std::chrono::seconds ttl_timeout_test_timeout = 4s;
|
||||||
|
#else
|
||||||
|
static const std::chrono::seconds ttl_timeout_test_timeout = 1s;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
SEASTAR_THREAD_TEST_CASE(test_timeout_not_sticky_on_insert) {
|
||||||
|
test_querier_cache t;
|
||||||
|
|
||||||
|
const auto entry = t.produce_first_page_and_save_mutation_querier(db::timeout_clock::now() + ttl_timeout_test_timeout);
|
||||||
|
|
||||||
|
sleep(ttl_timeout_test_timeout * 2).get();
|
||||||
|
|
||||||
|
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
|
||||||
|
.no_misses()
|
||||||
|
.no_drops()
|
||||||
|
.no_evictions();
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_THREAD_TEST_CASE(test_ttl_not_sticky_on_lookup) {
|
||||||
|
test_querier_cache t(ttl_timeout_test_timeout);
|
||||||
|
|
||||||
|
auto& sem = t.get_semaphore();
|
||||||
|
auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 1024, db::no_timeout, {}).get();
|
||||||
|
|
||||||
|
const auto entry = t.produce_first_page_and_save_mutation_querier();
|
||||||
|
|
||||||
|
const auto new_timeout = db::timeout_clock::now() + 900s;
|
||||||
|
|
||||||
|
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, new_timeout)
|
||||||
|
.no_misses()
|
||||||
|
.no_drops()
|
||||||
|
.no_evictions();
|
||||||
|
|
||||||
|
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
|
||||||
|
|
||||||
|
sleep(ttl_timeout_test_timeout * 2).get();
|
||||||
|
|
||||||
|
// check_abort() will throw if the permit timed out due to sticky TTL during the above sleep.
|
||||||
|
BOOST_REQUIRE_NO_THROW(entry.permit.check_abort());
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_THREAD_TEST_CASE(test_timeout_is_applied_on_lookup) {
|
||||||
|
test_querier_cache t;
|
||||||
|
|
||||||
|
auto& sem = t.get_semaphore();
|
||||||
|
auto permit1 = sem.obtain_permit(t.get_schema(), get_name(), 1024, db::no_timeout, {}).get();
|
||||||
|
|
||||||
|
const auto entry = t.produce_first_page_and_save_mutation_querier();
|
||||||
|
|
||||||
|
const auto new_timeout = db::timeout_clock::now() + ttl_timeout_test_timeout;
|
||||||
|
|
||||||
|
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice, new_timeout)
|
||||||
|
.no_misses()
|
||||||
|
.no_drops()
|
||||||
|
.no_evictions();
|
||||||
|
|
||||||
|
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
|
||||||
|
BOOST_REQUIRE_NO_THROW(entry.permit.check_abort());
|
||||||
|
|
||||||
|
sleep(ttl_timeout_test_timeout * 2).get();
|
||||||
|
|
||||||
|
BOOST_REQUIRE_THROW(entry.permit.check_abort(), seastar::named_semaphore_timed_out);
|
||||||
|
}
|
||||||
|
|
||||||
BOOST_AUTO_TEST_SUITE_END()
|
BOOST_AUTO_TEST_SUITE_END()
|
||||||
|
|||||||
Reference in New Issue
Block a user