mutation_reader: evictable_reader: futurize resume_or_recreate_reader()
In preparation for waiting for readmission after eviction in a later patch.
This commit is contained in:
@@ -1088,7 +1088,7 @@ private:
|
||||
void update_next_position(flat_mutation_reader& reader);
|
||||
void adjust_partition_slice();
|
||||
flat_mutation_reader recreate_reader();
|
||||
flat_mutation_reader resume_or_create_reader();
|
||||
future<flat_mutation_reader> resume_or_create_reader(db::timeout_clock::time_point timeout);
|
||||
void maybe_validate_partition_start(const flat_mutation_reader::tracked_buffer& buffer);
|
||||
void validate_position_in_partition(position_in_partition_view pos) const;
|
||||
bool should_drop_fragment(const mutation_fragment& mf);
|
||||
@@ -1257,14 +1257,14 @@ flat_mutation_reader evictable_reader::recreate_reader() {
|
||||
_fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader evictable_reader::resume_or_create_reader() {
|
||||
future<flat_mutation_reader> evictable_reader::resume_or_create_reader(db::timeout_clock::time_point timeout) {
|
||||
if (_reader) {
|
||||
return std::move(*_reader);
|
||||
co_return std::move(*_reader);
|
||||
}
|
||||
if (auto reader_opt = try_resume()) {
|
||||
return std::move(*reader_opt);
|
||||
co_return std::move(*reader_opt);
|
||||
}
|
||||
return recreate_reader();
|
||||
co_return recreate_reader();
|
||||
}
|
||||
|
||||
template <typename... Arg>
|
||||
@@ -1509,14 +1509,12 @@ evictable_reader::evictable_reader(
|
||||
|
||||
future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
if (is_end_of_stream()) {
|
||||
return make_ready_future<>();
|
||||
co_return;
|
||||
}
|
||||
return with_closeable(resume_or_create_reader(), [this, timeout] (flat_mutation_reader& reader) mutable {
|
||||
return fill_buffer(reader, timeout).then([this, &reader] {
|
||||
_end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty();
|
||||
maybe_pause(std::move(reader));
|
||||
});
|
||||
});
|
||||
_reader = co_await resume_or_create_reader(timeout);
|
||||
co_await fill_buffer(*_reader, timeout);
|
||||
_end_of_stream = _reader->is_end_of_stream() && _reader->is_buffer_empty();
|
||||
maybe_pause(std::move(*_reader));
|
||||
}
|
||||
|
||||
future<> evictable_reader::next_partition() {
|
||||
@@ -1525,7 +1523,7 @@ future<> evictable_reader::next_partition() {
|
||||
if (!is_buffer_empty()) {
|
||||
co_return;
|
||||
}
|
||||
auto reader = resume_or_create_reader();
|
||||
auto reader = co_await resume_or_create_reader(db::no_timeout);
|
||||
co_await reader.next_partition();
|
||||
maybe_pause(std::move(reader));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user