replica: add abort polling to memtable and cache readers
Continuing the read once it is aborted (e.g. due to timeout) is a waste of resources, as the produced results will be discarded. Poll the permit's abort exception in the memtable and cache reader's fill_buffer(). This results in one poll per buffer filled (8KB of data). We already have similar poll for sstable readers, as disk reads are usually much heavier and therefore it is more important to stop them ASAP after abort. Cache and memtable reads are usually quick but not always, hence it is important to also have polling in the cache and memtable readers. Refs: #11469 Fixes: #28148 Closes scylladb/scylladb#28149
This commit is contained in:
committed by
Tomasz Grabiec
parent
0aebc17c4c
commit
1e09a34686
@@ -323,6 +323,9 @@ void cache_mutation_reader::touch_partition() {
|
||||
|
||||
inline
|
||||
future<> cache_mutation_reader::fill_buffer() {
|
||||
if (const auto& ex = get_abort_exception(); ex) {
|
||||
return make_exception_future<>(ex);
|
||||
}
|
||||
if (_state == state::before_static_row) {
|
||||
touch_partition();
|
||||
auto after_static_row = [this] {
|
||||
|
||||
@@ -250,6 +250,9 @@ public:
|
||||
}
|
||||
|
||||
virtual future<> fill_buffer() override {
|
||||
if (const auto& ex = get_abort_exception(); ex) {
|
||||
return make_exception_future<>(ex);
|
||||
}
|
||||
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
|
||||
_reader.with_reserve([&] {
|
||||
if (!_static_row_done) {
|
||||
|
||||
@@ -1602,4 +1602,39 @@ SEASTAR_TEST_CASE(memtable_reader_after_tablet_migration) {
|
||||
}, cfg);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_memtable_reader_abort) {
|
||||
simple_schema ss;
|
||||
const auto s = ss.schema();
|
||||
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
|
||||
replica::table_stats tbl_stats;
|
||||
replica::memtable_table_shared_data table_shared_data;
|
||||
replica::dirty_memory_manager mgr;
|
||||
|
||||
auto mt = make_lw_shared<replica::memtable>(s, mgr, table_shared_data, tbl_stats);
|
||||
|
||||
auto pk = ss.make_pkey(0);
|
||||
auto pr = dht::partition_range::make_singular(pk);
|
||||
|
||||
mutation m(s, pk);
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
ss.add_row(m, ss.make_ckey(i), "v1");
|
||||
}
|
||||
mt->apply(m);
|
||||
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
auto reader_opt = mt->make_mutation_reader_opt(s, permit, pr, s->full_slice());
|
||||
BOOST_REQUIRE(reader_opt);
|
||||
auto close_reader = deferred_close(*reader_opt);
|
||||
|
||||
permit.set_timeout(db::timeout_clock::now());
|
||||
|
||||
// Wait for timer to fire so the permit is timed out.
|
||||
BOOST_REQUIRE(eventually_true([&] { return bool(permit.get_abort_exception()); }));
|
||||
|
||||
BOOST_REQUIRE_THROW((*reader_opt)().get(), named_semaphore_timed_out);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "mutation/mutation_rebuilder.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
#include "test/lib/memtable_snapshot_source.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/reader_concurrency_semaphore.hh"
|
||||
@@ -5569,4 +5570,31 @@ SEASTAR_TEST_CASE(test_cache_tombstone_gc_memtable_overlap_check_elision) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_cache_reader_abort) {
|
||||
auto s = make_schema();
|
||||
auto m = make_new_mutation(s);
|
||||
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
|
||||
cache_tracker tracker;
|
||||
row_cache cache(s, snapshot_source_from_snapshot(make_source_with(m)), tracker);
|
||||
|
||||
// make sure the data is cached
|
||||
assert_that(cache.make_reader(s, semaphore.make_permit(), query::full_partition_range))
|
||||
.produces(m)
|
||||
.produces_end_of_stream();
|
||||
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
auto reader = cache.make_reader(s, permit, query::full_partition_range);
|
||||
auto close_reader = deferred_close(reader);
|
||||
|
||||
permit.set_timeout(db::timeout_clock::now());
|
||||
|
||||
// Wait for timer to fire so the permit is timed out.
|
||||
BOOST_REQUIRE(eventually_true([&] { return bool(permit.get_abort_exception()); }));
|
||||
|
||||
BOOST_REQUIRE_THROW(reader().get(), named_semaphore_timed_out);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
Reference in New Issue
Block a user