From 04643fb223e4474a8867381940e8b426b314ca27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 30 Apr 2018 12:50:51 +0300 Subject: [PATCH] multishard_combining_reader: prepare for read-ahead otliving the reader When the multishard reader is destroyed there might be severeal pending read-aheads running in the background. These read-aheads need their associated reader to stay alive until after the read-ahead completes. To solve this move the flat_mutation_reader into a struct and manage this struct's lifetime through a shared pointer. Fibers associated with read-aheads that might outlive the multishard reader will hold on to a copy of the shard pointer keeping the underlying reader alive until they complete. To avoid doing any extra work a flag is added to this state which is set when the multishard reader is destroyed. When this flag is set, pending continuations will return early. All this is encapsulated in multishard_combining_reader::shard_reader the multishard reader code itself need not be changed. --- mutation_reader.cc | 80 ++++++++++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index f4dfbb4987..57c01e19f1 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -997,10 +997,23 @@ class multishard_combining_reader : public flat_mutation_reader::impl { // Thin wrapper around a flat_mutation_reader (foreign_reader) that // lazy-creates the reader when needed and transparently keeps track // of read-ahead. + // Shard reader instances have to stay alive until all pending read-ahead + // completes. But at the same time we don't want to do any additional work + // after the parent reader was destroyed. To solve this we do two things: + // * Move flat_mutation_reader instance into a struct managed through a + // shared pointer. Continuations using this internal state will share + // owhership of this struct with the shard reader instance. + // * Add an adandoned flag to the struct which will be set when the shard + // reader is destroyed. When this is set don't do any work in the + // pending continuations, just "run through them". class shard_reader { - multishard_combining_reader& _parent; - unsigned _shard; - std::optional _reader; + struct state { + std::optional reader; + bool abandoned = false; + }; + const multishard_combining_reader& _parent; + const unsigned _shard; + lw_shared_ptr _state; unsigned _pending_next_partition = 0; std::optional> _read_ahead; promise<> _reader_promise; @@ -1008,7 +1021,8 @@ class multishard_combining_reader : public flat_mutation_reader::impl { public: shard_reader(multishard_combining_reader& parent, unsigned shard) : _parent(parent) - , _shard(shard) { + , _shard(shard) + , _state(make_lw_shared()) { } shard_reader(shard_reader&&) = default; @@ -1018,24 +1032,25 @@ class multishard_combining_reader : public flat_mutation_reader::impl { shard_reader& operator=(const shard_reader&) = delete; ~shard_reader() { + _state->abandoned = true; if (_read_ahead) { - // Avoid errors in the logs about ignored exceptional future. - _read_ahead->finally([] {}); + // Keep state (the reader) alive until the read-ahead completes. + _read_ahead->finally([state = _state] {}); } } // These methods assume the reader is already created. bool is_end_of_stream() const { - return _reader->is_end_of_stream(); + return _state->reader->is_end_of_stream(); } bool is_buffer_empty() const { - return _reader->is_buffer_empty(); + return _state->reader->is_buffer_empty(); } mutation_fragment pop_mutation_fragment() { - return _reader->pop_mutation_fragment(); + return _state->reader->pop_mutation_fragment(); } const mutation_fragment& peek_buffer() const { - return _reader->peek_buffer(); + return _state->reader->peek_buffer(); } future<> fill_buffer(db::timeout_clock::time_point timeout); @@ -1045,10 +1060,10 @@ class multishard_combining_reader : public flat_mutation_reader::impl { future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout); future<> create_reader(); explicit operator bool() const { - return _reader.has_value(); + return _state->reader.has_value(); } bool done() const { - return _reader.has_value() && _reader->is_buffer_empty() && _reader->is_end_of_stream(); + return _state->reader.has_value() && _state->reader->is_buffer_empty() && _state->reader->is_end_of_stream(); } void read_ahead(db::timeout_clock::time_point timeout); bool is_read_ahead_in_progress() const { @@ -1089,20 +1104,20 @@ future<> multishard_combining_reader::shard_reader::fill_buffer(db::timeout_cloc if (_read_ahead) { return *std::exchange(_read_ahead, std::nullopt); } - return _reader->fill_buffer(); + return _state->reader->fill_buffer(); } void multishard_combining_reader::shard_reader::next_partition() { - if (_reader) { - _reader->next_partition(); + if (_state->reader) { + _state->reader->next_partition(); } else { ++_pending_next_partition; } } future<> multishard_combining_reader::shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { - if (_reader) { - return _reader->fast_forward_to(pr, timeout); + if (_state->reader) { + return _state->reader->fast_forward_to(pr, timeout); } // No need to fast-forward uncreated readers, they will be passed the new // range when created. @@ -1110,37 +1125,48 @@ future<> multishard_combining_reader::shard_reader::fast_forward_to(const dht::p } future<> multishard_combining_reader::shard_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) { - if (_reader) { - return _reader->fast_forward_to(pr, timeout); + if (_state->reader) { + return _state->reader->fast_forward_to(pr, timeout); } return create_reader().then([this, pr = std::move(pr), timeout] { - return _reader->fast_forward_to(pr, timeout); + return _state->reader->fast_forward_to(pr, timeout); }); } future<> multishard_combining_reader::shard_reader::create_reader() { - if (_reader) { + if (_state->reader) { return make_ready_future<>(); } if (_read_ahead) { return _reader_promise.get_future(); } return _parent._reader_factory(_shard, *_parent._pr, _parent._fwd_sm, _parent._fwd_mr).then( - [this] (foreign_ptr>&& r) mutable { - _reader.emplace(make_foreign_reader(_parent._schema, std::move(r), _parent._fwd_sm)); + [this, state = _state] (foreign_ptr>&& r) mutable { + // Use the captured instance to check whether the reader is abdandoned. + // If the reader is abandoned we can't read members of this anymore. + if (state->abandoned) { + return; + } + + _state->reader.emplace(make_foreign_reader(_parent._schema, std::move(r), _parent._fwd_sm)); while (_pending_next_partition) { --_pending_next_partition; - _reader->next_partition(); + _state->reader->next_partition(); } _reader_promise.set_value(); }); } void multishard_combining_reader::shard_reader::read_ahead(db::timeout_clock::time_point timeout) { - if (_reader) { - _read_ahead.emplace(_reader->fill_buffer(timeout)); + if (_state->reader) { + _read_ahead.emplace(_state->reader->fill_buffer(timeout)); } else { - _read_ahead.emplace(create_reader().then([this, timeout] { return _reader->fill_buffer(timeout); })); + _read_ahead.emplace(create_reader().then([state = _state, timeout] () mutable { + if (state->abandoned) { + return make_ready_future<>(); + } + return state->reader->fill_buffer(timeout); + })); } }