sstables: Automatically close exhausted SSTable readers in cleanup
Add a reader that will automatically close the underlying sstable reader if fast forward is called with a range past the range spanned by the SSTable. This is only to be used in the context of fast forward calls in cleanup, as combined reader in full scans can proactively close the readers that returned EOS. Regular reads that go through cache enable fast forwarding to position range, therefore won't enable auto-closed reader. Compactions don't enable any kind of forward, and they won't have it enabled either. The overhead is minimal, with cleanup being able to reach the same 38MB/s as before this patch. Refs #12998. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -1224,6 +1224,67 @@ sstable_set::create_single_key_sstable_reader(
|
||||
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate);
|
||||
}
|
||||
|
||||
class auto_closed_sstable_reader final : public flat_mutation_reader_v2::impl {
|
||||
shared_sstable _sst;
|
||||
flat_mutation_reader_v2_opt _reader;
|
||||
private:
|
||||
future<> maybe_auto_close_sstable_reader(const dht::partition_range& pr) {
|
||||
if (!_sst) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto pos = dht::ring_position_view::for_range_start(pr);
|
||||
auto last_pos_in_reader = dht::ring_position_view(_sst->get_last_decorated_key());
|
||||
|
||||
// If we're fast forwarding past the underlying reader, let's close it
|
||||
// and replace it by an empty reader.
|
||||
if (dht::ring_position_tri_compare(*_schema, pos, last_pos_in_reader) > 0) {
|
||||
co_await _reader->close();
|
||||
_reader = make_empty_flat_reader_v2(_schema, _permit);
|
||||
_sst = nullptr;
|
||||
}
|
||||
}
|
||||
public:
|
||||
auto_closed_sstable_reader(shared_sstable sst,
|
||||
flat_mutation_reader_v2 sst_reader,
|
||||
reader_permit permit)
|
||||
: impl(sst_reader.schema(), std::move(permit))
|
||||
, _sst(std::move(sst))
|
||||
, _reader(std::move(sst_reader)) {
|
||||
}
|
||||
virtual future<> fill_buffer() override {
|
||||
return _reader->fill_buffer().then([this] {
|
||||
_reader->move_buffer_content_to(*this);
|
||||
_end_of_stream = _reader->is_end_of_stream();
|
||||
});
|
||||
}
|
||||
future<> fast_forward_to(const dht::partition_range& pr) override {
|
||||
clear_buffer();
|
||||
|
||||
co_await maybe_auto_close_sstable_reader(pr);
|
||||
|
||||
_end_of_stream = false;
|
||||
co_await _reader->fast_forward_to(pr);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr) override {
|
||||
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
|
||||
}
|
||||
virtual future<> next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty() && !is_end_of_stream()) {
|
||||
return _reader->next_partition();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> close() noexcept override {
|
||||
return _reader->close();
|
||||
}
|
||||
};
|
||||
|
||||
flat_mutation_reader_v2 make_auto_closed_sstable_reader(shared_sstable sst, flat_mutation_reader_v2 sst_reader, reader_permit permit) {
|
||||
return make_flat_mutation_reader_v2<auto_closed_sstable_reader>(std::move(sst), std::move(sst_reader), std::move(permit));
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2
|
||||
sstable_set::make_range_sstable_reader(
|
||||
schema_ptr s,
|
||||
@@ -1266,7 +1327,14 @@ sstable_set::make_local_shard_sstable_reader(
|
||||
if (!predicate(*sst)) {
|
||||
return make_empty_flat_reader_v2(s, permit);
|
||||
}
|
||||
return sst->make_reader(s, permit, pr, slice, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
auto make_reader = [&] () -> flat_mutation_reader_v2 {
|
||||
return sst->make_reader(s, permit, pr, slice, trace_state, fwd, fwd_mr, monitor_generator(sst));
|
||||
};
|
||||
// Auto-closed sstable reader is only enabled in the context of fast-forward to partition ranges
|
||||
if (!fwd && fwd_mr) {
|
||||
return make_auto_closed_sstable_reader(sst, make_reader(), permit);
|
||||
}
|
||||
return make_reader();
|
||||
};
|
||||
if (_impl->size() == 1) [[unlikely]] {
|
||||
auto sstables = _impl->all();
|
||||
|
||||
Reference in New Issue
Block a user