flat_mutation_reader: partition_reversing_mutation_reader: implement no-op close

We don't own _source therefore do not close it.
That said, we still need to make sure that the reversing reader
itself is closed to calm down the check when it's destroyed.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-01-26 23:55:52 +02:00
parent f4dfaaa6c9
commit 978501c336
3 changed files with 16 additions and 3 deletions

View File

@@ -206,6 +206,11 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
// we don't own _source therefore do not close it
return make_ready_future<>();
}
};
return make_flat_mutation_reader<partition_reversing_mutation_reader>(original, max_size);

View File

@@ -21,6 +21,8 @@
#pragma once
#include <seastar/util/closeable.hh>
#include "mutation_compactor.hh"
#include "mutation_reader.hh"
@@ -96,7 +98,7 @@ auto consume_page(flat_mutation_reader& reader,
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout, max_size] () mutable {
if (slice.options.contains(query::partition_slice::option::reversed)) {
return do_with(make_reversing_reader(reader, max_size),
return with_closeable(make_reversing_reader(reader, max_size),
[reader_consumer = std::move(reader_consumer), timeout] (flat_mutation_reader& reversing_reader) mutable {
return reversing_reader.consume(std::move(reader_consumer), timeout);
});

View File

@@ -23,6 +23,7 @@
#include <seastar/core/thread.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/closeable.hh>
#include "mutation.hh"
#include "mutation_fragment.hh"
@@ -542,8 +543,9 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
return fmr.consume_in_thread(std::move(fsc), db::no_timeout);
} else {
if (reversed) {
auto reverse_reader = make_reversing_reader(fmr, query::max_result_size(size_t(1) << 20));
return reverse_reader.consume(std::move(fsc), db::no_timeout).get0();
return with_closeable(make_reversing_reader(fmr, query::max_result_size(size_t(1) << 20)), [fsc = std::move(fsc)] (flat_mutation_reader& reverse_reader) mutable {
return reverse_reader.consume(std::move(fsc), db::no_timeout);
}).get0();
}
return fmr.consume(std::move(fsc), db::no_timeout).get0();
}
@@ -770,7 +772,11 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
const uint64_t hard_limit = size_t(1) << 18;
auto reader = flat_mutation_reader_from_mutations(tests::make_permit(), {mut});
// need to close both readers since the reverse_reader
// doesn't own the reader passed to it by ref.
auto close_reader = deferred_close(reader);
auto reverse_reader = make_reversing_reader(reader, query::max_result_size(size_t(1) << 10, hard_limit));
auto close_reverse_reader = deferred_close(reverse_reader);
try {
reverse_reader.consume(phony_consumer{}, db::no_timeout).get();