readers: evictable_reader: skip progress guarantee when next pos is partition start

The evictable reader must ensure that each buffer fill makes forward
progress, i.e. the last fragment in the buffer has a position larger
than the last fragment from the last buffer-fill. Otherwise, the reader
could get stuck in an infinite loop between buffer fills, if the reader
is evicted in-between.
The code guranteeing this forward change has a bug: when the next
expected position is a partition-start (another partition), the code
would loop forever, effectively reading all there is from the underlying
reader.
To avoid this, add a special case to ignore the progress guarantee loop
altogether when the next expected position is a partition start. In this
case, progress is garanteed anyway, because there is exactly one
partition-start fragment in each partition.

Fixes: #13491

Closes #13563
This commit is contained in:
Botond Dénes
2023-04-18 07:52:01 -04:00
committed by Avi Kivity
parent 7baa2d9cb2
commit 72003dc35c
2 changed files with 43 additions and 1 deletions

View File

@@ -609,7 +609,9 @@ future<> evictable_reader_v2::fill_buffer() {
auto* next_mf = co_await _reader->peek();
// First make sure we've made progress w.r.t. _next_position_in_partition.
while (next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) {
// This loop becomes inifinite when next pos is a partition start.
// In that case progress is guranteed anyway, so skip this loop entirely.
while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) {
push_mutation_fragment(_reader->pop_mutation_fragment());
next_mf = co_await _reader->peek();
}

View File

@@ -3631,6 +3631,46 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_p
check(empty_buffer, "end of stream");
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto schema = s.schema();
auto permit = semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout, {});
auto pk = s.make_pkey();
const auto prange = dht::partition_range::make_open_ended_both_sides();
std::deque<mutation_fragment_v2> frags;
frags.emplace_back(*schema, permit, partition_start(pk, {}));
for (size_t ck = 0; ck < 1000; ++ck) {
frags.emplace_back(*schema, permit, range_tombstone_change(position_in_partition::before_key(s.make_ckey(ck)), tombstone(s.new_timestamp(), {})));
}
frags.emplace_back(*schema, permit, range_tombstone_change(position_in_partition::before_key(s.make_ckey(1001)), tombstone()));
frags.emplace_back(*schema, permit, partition_end{});
const auto max_buf_size = frags[0].memory_usage() + frags[1].memory_usage() + frags[2].memory_usage();
auto ms = mutation_source([&frags, max_buf_size] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps) {
auto rd = make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(frags), pr, ps);
rd.set_max_buffer_size(max_buf_size);
return rd;
});
auto [rd, handle] = make_manually_paused_evictable_reader_v2(ms, schema, permit, prange, schema->full_slice(), default_priority_class(), {},
mutation_reader::forwarding::no);
auto stop_rd = deferred_close(rd);
rd.set_max_buffer_size(max_buf_size);
rd.fill_buffer().get();
auto buf1 = rd.detach_buffer();
BOOST_REQUIRE_EQUAL(buf1.size(), 3);
}
struct mutation_bounds {
std::optional<mutation> m;
position_in_partition lower;