diff --git a/mutation_reader.cc b/mutation_reader.cc index 5ee501635f..631a01c2ed 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1088,7 +1088,7 @@ private: void update_next_position(flat_mutation_reader& reader); void adjust_partition_slice(); flat_mutation_reader recreate_reader(); - flat_mutation_reader resume_or_create_reader(); + future resume_or_create_reader(db::timeout_clock::time_point timeout); void maybe_validate_partition_start(const flat_mutation_reader::tracked_buffer& buffer); void validate_position_in_partition(position_in_partition_view pos) const; bool should_drop_fragment(const mutation_fragment& mf); @@ -1257,14 +1257,14 @@ flat_mutation_reader evictable_reader::recreate_reader() { _fwd_mr); } -flat_mutation_reader evictable_reader::resume_or_create_reader() { +future evictable_reader::resume_or_create_reader(db::timeout_clock::time_point timeout) { if (_reader) { - return std::move(*_reader); + co_return std::move(*_reader); } if (auto reader_opt = try_resume()) { - return std::move(*reader_opt); + co_return std::move(*reader_opt); } - return recreate_reader(); + co_return recreate_reader(); } template @@ -1509,14 +1509,12 @@ evictable_reader::evictable_reader( future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) { if (is_end_of_stream()) { - return make_ready_future<>(); + co_return; } - return with_closeable(resume_or_create_reader(), [this, timeout] (flat_mutation_reader& reader) mutable { - return fill_buffer(reader, timeout).then([this, &reader] { - _end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty(); - maybe_pause(std::move(reader)); - }); - }); + _reader = co_await resume_or_create_reader(timeout); + co_await fill_buffer(*_reader, timeout); + _end_of_stream = _reader->is_end_of_stream() && _reader->is_buffer_empty(); + maybe_pause(std::move(*_reader)); } future<> evictable_reader::next_partition() { @@ -1525,7 +1523,7 @@ future<> evictable_reader::next_partition() { if (!is_buffer_empty()) { co_return; } - auto reader = resume_or_create_reader(); + auto reader = co_await resume_or_create_reader(db::no_timeout); co_await reader.next_partition(); maybe_pause(std::move(reader)); }