foreign_reader: prepare for read-ahead outliving the reader
The foreign reader keeps track of ongoing read-aheads via a foreign_ptr to the read-ahead's future on the remote shard. This pointer is overwritten after each "remote call" to the remote reader with a pointer to the future of the new read-ahead's future. There are severeal problems with the current implementation: 1) There is a new read-ahead launched after each "remote call" unconditionally, even if the remote reader is at EOS. This will start unecessary read-ahead when the reader is already finished and may be soon destroyed (legally) by the client. 2) The pointer to the remote read-ahead future is not set to nullptr when a remote call is issued. Thus in the destructor, where we attach a continuation to the read-ahead's future to extend the reader's lifetime until after the read-ahead finishes, we migh attach a continuation to a future that already has one and run into a failed assert(). To fix this issues reset the read-ahead pointer to nullptr each time a remote call is issued and don't start a new read-ahead if the remote reader is at EOS. This way we can ensure that when the reader is destroyed we either have a valid and non-stale read-aead future or none at all and can reliably make a decision about whether we need to extend the lifetime of the remote reader or not.
This commit is contained in:
@@ -862,17 +862,20 @@ class foreign_reader : public flat_mutation_reader::impl {
|
||||
// we don't have to wait on the remote reader filling its buffer.
|
||||
template <typename Operation, typename Result = futurize_t<std::result_of_t<Operation()>>>
|
||||
Result forward_operation(db::timeout_clock::time_point timeout, Operation op) {
|
||||
auto read_ahead_future = _read_ahead_future ? _read_ahead_future.get() : nullptr;
|
||||
return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(), read_ahead_future,
|
||||
pending_next_partition = std::exchange(_pending_next_partition, 0), timeout, op = std::move(op)] () mutable {
|
||||
return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(),
|
||||
read_ahead_future = std::exchange(_read_ahead_future, nullptr),
|
||||
pending_next_partition = std::exchange(_pending_next_partition, 0),
|
||||
timeout,
|
||||
op = std::move(op)] () mutable {
|
||||
auto exec_op_and_read_ahead = [=] () mutable {
|
||||
while (pending_next_partition) {
|
||||
--pending_next_partition;
|
||||
reader->next_partition();
|
||||
}
|
||||
return op().then([=] (auto... results) {
|
||||
auto f = reader->is_end_of_stream() ? nullptr : std::make_unique<future<>>(reader->fill_buffer(timeout));
|
||||
return make_ready_future<foreign_unique_ptr<future<>>, decltype(results)...>(
|
||||
std::make_unique<future<>>(reader->fill_buffer(timeout)), std::move(results)...);
|
||||
make_foreign(std::move(f)), std::move(results)...);
|
||||
});
|
||||
};
|
||||
if (read_ahead_future) {
|
||||
|
||||
Reference in New Issue
Block a user