multishard_combining_reader: prepare for read-ahead otliving the reader

When the multishard reader is destroyed there might be severeal pending
read-aheads running in the background. These read-aheads need their
associated reader to stay alive until after the read-ahead completes.
To solve this move the flat_mutation_reader into a struct and manage
this struct's lifetime through a shared pointer. Fibers associated with
read-aheads that might outlive the multishard reader will hold on to a
copy of the shard pointer keeping the underlying reader alive until they
complete. To avoid doing any extra work a flag is added to this state
which is set when the multishard reader is destroyed. When this flag is
set, pending continuations will return early.  All this is encapsulated
in multishard_combining_reader::shard_reader the multishard reader code
itself need not be changed.
This commit is contained in:
Botond Dénes
2018-04-30 12:50:51 +03:00
parent a05d398be7
commit 04643fb223

View File

@@ -997,10 +997,23 @@ class multishard_combining_reader : public flat_mutation_reader::impl {
// Thin wrapper around a flat_mutation_reader (foreign_reader) that
// lazy-creates the reader when needed and transparently keeps track
// of read-ahead.
// Shard reader instances have to stay alive until all pending read-ahead
// completes. But at the same time we don't want to do any additional work
// after the parent reader was destroyed. To solve this we do two things:
// * Move flat_mutation_reader instance into a struct managed through a
// shared pointer. Continuations using this internal state will share
// owhership of this struct with the shard reader instance.
// * Add an adandoned flag to the struct which will be set when the shard
// reader is destroyed. When this is set don't do any work in the
// pending continuations, just "run through them".
class shard_reader {
multishard_combining_reader& _parent;
unsigned _shard;
std::optional<flat_mutation_reader> _reader;
struct state {
std::optional<flat_mutation_reader> reader;
bool abandoned = false;
};
const multishard_combining_reader& _parent;
const unsigned _shard;
lw_shared_ptr<state> _state;
unsigned _pending_next_partition = 0;
std::optional<future<>> _read_ahead;
promise<> _reader_promise;
@@ -1008,7 +1021,8 @@ class multishard_combining_reader : public flat_mutation_reader::impl {
public:
shard_reader(multishard_combining_reader& parent, unsigned shard)
: _parent(parent)
, _shard(shard) {
, _shard(shard)
, _state(make_lw_shared<state>()) {
}
shard_reader(shard_reader&&) = default;
@@ -1018,24 +1032,25 @@ class multishard_combining_reader : public flat_mutation_reader::impl {
shard_reader& operator=(const shard_reader&) = delete;
~shard_reader() {
_state->abandoned = true;
if (_read_ahead) {
// Avoid errors in the logs about ignored exceptional future.
_read_ahead->finally([] {});
// Keep state (the reader) alive until the read-ahead completes.
_read_ahead->finally([state = _state] {});
}
}
// These methods assume the reader is already created.
bool is_end_of_stream() const {
return _reader->is_end_of_stream();
return _state->reader->is_end_of_stream();
}
bool is_buffer_empty() const {
return _reader->is_buffer_empty();
return _state->reader->is_buffer_empty();
}
mutation_fragment pop_mutation_fragment() {
return _reader->pop_mutation_fragment();
return _state->reader->pop_mutation_fragment();
}
const mutation_fragment& peek_buffer() const {
return _reader->peek_buffer();
return _state->reader->peek_buffer();
}
future<> fill_buffer(db::timeout_clock::time_point timeout);
@@ -1045,10 +1060,10 @@ class multishard_combining_reader : public flat_mutation_reader::impl {
future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout);
future<> create_reader();
explicit operator bool() const {
return _reader.has_value();
return _state->reader.has_value();
}
bool done() const {
return _reader.has_value() && _reader->is_buffer_empty() && _reader->is_end_of_stream();
return _state->reader.has_value() && _state->reader->is_buffer_empty() && _state->reader->is_end_of_stream();
}
void read_ahead(db::timeout_clock::time_point timeout);
bool is_read_ahead_in_progress() const {
@@ -1089,20 +1104,20 @@ future<> multishard_combining_reader::shard_reader::fill_buffer(db::timeout_cloc
if (_read_ahead) {
return *std::exchange(_read_ahead, std::nullopt);
}
return _reader->fill_buffer();
return _state->reader->fill_buffer();
}
void multishard_combining_reader::shard_reader::next_partition() {
if (_reader) {
_reader->next_partition();
if (_state->reader) {
_state->reader->next_partition();
} else {
++_pending_next_partition;
}
}
future<> multishard_combining_reader::shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
if (_reader) {
return _reader->fast_forward_to(pr, timeout);
if (_state->reader) {
return _state->reader->fast_forward_to(pr, timeout);
}
// No need to fast-forward uncreated readers, they will be passed the new
// range when created.
@@ -1110,37 +1125,48 @@ future<> multishard_combining_reader::shard_reader::fast_forward_to(const dht::p
}
future<> multishard_combining_reader::shard_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
if (_reader) {
return _reader->fast_forward_to(pr, timeout);
if (_state->reader) {
return _state->reader->fast_forward_to(pr, timeout);
}
return create_reader().then([this, pr = std::move(pr), timeout] {
return _reader->fast_forward_to(pr, timeout);
return _state->reader->fast_forward_to(pr, timeout);
});
}
future<> multishard_combining_reader::shard_reader::create_reader() {
if (_reader) {
if (_state->reader) {
return make_ready_future<>();
}
if (_read_ahead) {
return _reader_promise.get_future();
}
return _parent._reader_factory(_shard, *_parent._pr, _parent._fwd_sm, _parent._fwd_mr).then(
[this] (foreign_ptr<std::unique_ptr<flat_mutation_reader>>&& r) mutable {
_reader.emplace(make_foreign_reader(_parent._schema, std::move(r), _parent._fwd_sm));
[this, state = _state] (foreign_ptr<std::unique_ptr<flat_mutation_reader>>&& r) mutable {
// Use the captured instance to check whether the reader is abdandoned.
// If the reader is abandoned we can't read members of this anymore.
if (state->abandoned) {
return;
}
_state->reader.emplace(make_foreign_reader(_parent._schema, std::move(r), _parent._fwd_sm));
while (_pending_next_partition) {
--_pending_next_partition;
_reader->next_partition();
_state->reader->next_partition();
}
_reader_promise.set_value();
});
}
void multishard_combining_reader::shard_reader::read_ahead(db::timeout_clock::time_point timeout) {
if (_reader) {
_read_ahead.emplace(_reader->fill_buffer(timeout));
if (_state->reader) {
_read_ahead.emplace(_state->reader->fill_buffer(timeout));
} else {
_read_ahead.emplace(create_reader().then([this, timeout] { return _reader->fill_buffer(timeout); }));
_read_ahead.emplace(create_reader().then([state = _state, timeout] () mutable {
if (state->abandoned) {
return make_ready_future<>();
}
return state->reader->fill_buffer(timeout);
}));
}
}