multishard_combining_reader: make shard_reader a shared pointer

Some members of shard reader have to be accessed even after it is
destroyed. This is required by background work that might still be
pending when the reader is destroyed. This was solved by creating a
special `state` struct, which contained all the members of the shard
readers that had to be accessed even after it was destroyed. This state
struct was managed through a shared pointer, that each continuation that
was expected to outlive the reader, held a copy of. This however created
a minefield, where each line of the code had to be carefully audited to
access only fields that will be guaranteed to remain valid.
Fix this mess by making the whole class a shared pointer, with
`enable_shared_from_this`. Now each continuation just has to make sure
to keep `this` alive and code can now access all members freely (well,
almost).
This commit is contained in:
Botond Dénes
2019-01-10 16:30:09 +02:00
parent f1c3421eb4
commit da0c01c68b

View File

@@ -937,22 +937,7 @@ namespace {
// Thin wrapper around a flat_mutation_reader (foreign_reader) that
// lazy-creates the reader when needed and transparently keeps track
// of read-ahead.
// Shard reader instances have to stay alive until all pending read-ahead
// completes. But at the same time we don't want to do any additional work
// after the parent reader was destroyed. To solve this we do two things:
// * Move flat_mutation_reader instance into a struct managed through a
// shared pointer. Continuations using this internal state will share
// owhership of this struct with the shard reader instance.
// * Add a stopped flag to the struct which will be set when the shard
// reader is destroyed. When this is set don't do any work in the
// pending continuations, just "run through them".
class shard_reader {
struct state {
std::unique_ptr<foreign_reader> reader;
bool stopped = false;
bool drop_partition_start = false;
bool drop_static_row = false;
};
class shard_reader : public enable_lw_shared_from_this<shard_reader> {
schema_ptr _schema;
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
const unsigned _shard;
@@ -961,7 +946,10 @@ class shard_reader {
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
lw_shared_ptr<state> _state;
std::unique_ptr<foreign_reader> _reader;
bool _stopped = false;
bool _drop_partition_start = false;
bool _drop_static_row = false;
std::optional<future<>> _read_ahead;
std::optional<future<>> _pause;
@@ -997,30 +985,29 @@ public:
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr)
, _state(make_lw_shared<state>()) {
, _fwd_mr(fwd_mr) {
}
shard_reader(shard_reader&&) = default;
shard_reader(shard_reader&&) = delete;
shard_reader& operator=(shard_reader&&) = delete;
shard_reader(const shard_reader&) = delete;
shard_reader& operator=(const shard_reader&) = delete;
~shard_reader();
void stop() noexcept;
// These methods assume the reader is already created.
bool is_end_of_stream() const {
return _state->reader->is_end_of_stream();
return _reader->is_end_of_stream();
}
bool is_buffer_empty() const {
return _state->reader->is_buffer_empty();
return _reader->is_buffer_empty();
}
mutation_fragment pop_mutation_fragment() {
return _state->reader->pop_mutation_fragment();
return _reader->pop_mutation_fragment();
}
const mutation_fragment& peek_buffer() const {
return _state->reader->peek_buffer();
return _reader->peek_buffer();
}
future<> fill_buffer(db::timeout_clock::time_point timeout);
@@ -1029,10 +1016,10 @@ public:
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout);
future<> create_reader();
explicit operator bool() const {
return bool(_state->reader);
return bool(_reader);
}
bool done() const {
return _state->reader && _state->reader->is_buffer_empty() && _state->reader->is_end_of_stream();
return _reader && _reader->is_buffer_empty() && _reader->is_end_of_stream();
}
void read_ahead(db::timeout_clock::time_point timeout);
bool is_read_ahead_in_progress() const {
@@ -1041,14 +1028,14 @@ public:
void pause();
};
shard_reader::~shard_reader() {
void shard_reader::stop() noexcept {
// Nothing to do if there was no reader created, nor is there a background
// read ahead in progress which will create one.
if (!_state->reader && !_read_ahead) {
if (!_reader && !_read_ahead) {
return;
}
_state->stopped = true;
_stopped = true;
auto f = [this] {
if (_read_ahead) {
@@ -1060,13 +1047,13 @@ shard_reader::~shard_reader() {
}
}();
_lifecycle_policy->destroy_reader(_shard, f.then([state = _state.get()] {
return state->reader->stop();
}).finally([state = _state] {}));
_lifecycle_policy->destroy_reader(_shard, f.then([this] {
return _reader->stop();
}).finally([zis = shared_from_this()] {}));
}
void shard_reader::update_last_position() {
auto& reader = *_state->reader;
auto& reader = *_reader;
if (reader.is_buffer_empty()) {
return;
}
@@ -1120,15 +1107,15 @@ future<foreign_ptr<std::unique_ptr<flat_mutation_reader>>> shard_reader::recreat
if (_last_position_in_partition) {
switch (_last_position_in_partition->region()) {
case partition_region::partition_start:
_state->drop_partition_start = true;
_drop_partition_start = true;
break;
case partition_region::static_row:
_state->drop_partition_start = true;
_state->drop_static_row = true;
_drop_partition_start = true;
_drop_static_row = true;
break;
case partition_region::clustered:
_state->drop_partition_start = true;
_state->drop_static_row = true;
_drop_partition_start = true;
_drop_static_row = true;
adjust_partition_slice();
slice = &*_slice_override;
break;
@@ -1164,35 +1151,34 @@ future<foreign_ptr<std::unique_ptr<flat_mutation_reader>>> shard_reader::recreat
}
future<> shard_reader::resume() {
return std::exchange(_pause, std::nullopt)->then([this, state = _state] {
if (state->stopped) {
return std::exchange(_pause, std::nullopt)->then([this] {
if (_stopped) {
return make_ready_future<>();
}
return _lifecycle_policy->try_resume(_shard).then(
[this, state = std::move(state)] (foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) mutable {
return _lifecycle_policy->try_resume(_shard).then([this] (foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) mutable {
if (reader) {
state->reader->resume(std::move(reader));
_reader->resume(std::move(reader));
return make_ready_future<>();
} else if (state->stopped) {
} else if (_stopped) {
return make_ready_future<>();
} else {
return recreate_reader().then([this, state = std::move(state)] (foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) {
state->reader->resume(std::move(reader));
return recreate_reader().then([this] (foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) {
_reader->resume(std::move(reader));
});
}
});
});
}).finally([zis = shared_from_this()] {});
}
future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
return _state->reader->fill_buffer(timeout).then([this, state = _state] {
auto& reader = *state->reader;
return _reader->fill_buffer(timeout).then([this, zis = shared_from_this()] {
auto& reader = *_reader;
if (reader.is_buffer_empty()) {
return;
}
if (state->drop_partition_start) {
state->drop_partition_start = false;
if (_drop_partition_start) {
_drop_partition_start = false;
if (reader.peek_buffer().is_partition_start()) {
reader.pop_mutation_fragment();
}
@@ -1201,14 +1187,14 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
if (reader.is_buffer_empty()) {
return;
}
if (state->drop_static_row) {
state->drop_static_row = false;
if (_drop_static_row) {
_drop_static_row = false;
if (reader.peek_buffer().is_static_row()) {
reader.pop_mutation_fragment();
}
}
if (!state->stopped) {
if (!_stopped) {
update_last_position();
}
});
@@ -1218,7 +1204,7 @@ future<> shard_reader::fill_buffer(db::timeout_clock::time_point timeout) {
if (_read_ahead) {
return *std::exchange(_read_ahead, std::nullopt);
}
if (!_state->reader->is_buffer_empty()) {
if (!_reader->is_buffer_empty()) {
return make_ready_future<>();
}
if (_pause) {
@@ -1236,20 +1222,20 @@ void shard_reader::next_partition() {
// `next_partition()` is called on the multishard reader before the
// first `fill_buffer()` call. In this case we are right before the first
// partition so this call has no effect, hence we can ignore it.
if (_state->reader) {
_state->reader->next_partition();
if (_reader) {
_reader->next_partition();
}
}
future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
_pr = &pr;
if (_state->reader) {
if (_reader) {
_last_pkey.reset();
_last_position_in_partition.reset();
auto do_fast_forward = [this, &pr, timeout] {
return _state->reader->fast_forward_to(pr, timeout);
return _reader->fast_forward_to(pr, timeout);
};
if (_pause) {
@@ -1268,34 +1254,34 @@ future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeo
}
future<> shard_reader::create_reader() {
if (_state->reader) {
if (_reader) {
return make_ready_future<>();
}
if (_read_ahead) {
return *std::exchange(_read_ahead, std::nullopt);
}
return _lifecycle_policy->create_reader(_shard, _schema, *_pr, _ps, _pc, _trace_state, _fwd_mr).then(
[schema = _schema, state = _state] (foreign_ptr<std::unique_ptr<flat_mutation_reader>>&& r) mutable {
state->reader = std::make_unique<foreign_reader>(std::move(schema), std::move(r));
[this, zis = shared_from_this()] (foreign_ptr<std::unique_ptr<flat_mutation_reader>>&& r) mutable {
_reader = std::make_unique<foreign_reader>(_schema, std::move(r));
});
}
void shard_reader::read_ahead(db::timeout_clock::time_point timeout) {
if (_read_ahead || (_state->reader && (_state->reader->is_end_of_stream() || !_state->reader->is_buffer_empty()))) {
if (_read_ahead || (_reader && (_reader->is_end_of_stream() || !_reader->is_buffer_empty()))) {
return;
}
auto f = _state->reader
auto f = _reader
? (_pause ? resume() : make_ready_future<>())
: create_reader();
_read_ahead.emplace(f.then([this, state = _state, timeout] () mutable {
if (state->stopped) {
_read_ahead.emplace(f.then([this, timeout, zis = shared_from_this()] () mutable {
if (_stopped) {
return make_ready_future<>();
}
return do_fill_buffer(timeout).then([this, state = std::move(state)] {
return do_fill_buffer(timeout).then([this, zis = std::move(zis)] {
// Read ahead is still in the background, so pause the reader.
if (!state->stopped && _read_ahead) {
if (!_stopped && _read_ahead) {
pause();
}
});
@@ -1307,13 +1293,13 @@ void shard_reader::pause() {
return;
}
auto f = _read_ahead ? *std::exchange(_read_ahead, std::nullopt) : make_ready_future<>();
_pause = f.then([this, state = _state] () mutable {
if (state->stopped) {
_pause = f.then([this] () mutable {
if (_stopped) {
return make_ready_future<>();
}
return state->reader->pause().then([this, state = std::move(state)] (foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) {
if (state->stopped) {
state->reader->resume(std::move(reader));
return _reader->pause().then([this] (foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader) {
if (_stopped) {
_reader->resume(std::move(reader));
return make_ready_future<>();
}
@@ -1323,7 +1309,7 @@ void shard_reader::pause() {
return _lifecycle_policy->pause(std::move(reader));
});
});
}).finally([zis = shared_from_this()] {});
}
} // anonymous namespace
@@ -1341,7 +1327,7 @@ class multishard_combining_reader : public flat_mutation_reader::impl {
};
const dht::i_partitioner& _partitioner;
std::vector<shard_reader> _shard_readers;
std::vector<lw_shared_ptr<shard_reader>> _shard_readers;
// Contains the position of each shard with token granularity, organized
// into a min-heap. Used to select the shard with the smallest token each
// time a shard reader produces a new partition.
@@ -1365,6 +1351,8 @@ public:
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr);
~multishard_combining_reader();
// this is captured.
multishard_combining_reader(const multishard_combining_reader&) = delete;
multishard_combining_reader& operator=(const multishard_combining_reader&) = delete;
@@ -1418,7 +1406,7 @@ bool multishard_combining_reader::maybe_move_to_next_shard(const dht::token* con
}
future<> multishard_combining_reader::handle_empty_reader_buffer(db::timeout_clock::time_point timeout) {
auto& reader = _shard_readers[_current_shard];
auto& reader = *_shard_readers[_current_shard];
if (reader.is_end_of_stream()) {
if (_shard_selection_min_heap.empty()) {
@@ -1441,7 +1429,7 @@ future<> multishard_combining_reader::handle_empty_reader_buffer(db::timeout_clo
// background. They will be brought to the foreground when we move
// to their respective shard.
for (unsigned i = 1; i < _concurrency; ++i) {
_shard_readers[(_current_shard + i) % _partitioner.shard_count()].read_ahead(timeout);
_shard_readers[(_current_shard + i) % _partitioner.shard_count()]->read_ahead(timeout);
}
}
return reader.fill_buffer(timeout);
@@ -1464,14 +1452,20 @@ multishard_combining_reader::multishard_combining_reader(
_shard_readers.reserve(_partitioner.shard_count());
for (unsigned i = 0; i < _partitioner.shard_count(); ++i) {
_shard_readers.emplace_back(_schema, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr);
_shard_readers.emplace_back(make_lw_shared<shard_reader>(_schema, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr));
}
}
multishard_combining_reader::~multishard_combining_reader() {
for (auto& sr : _shard_readers) {
sr->stop();
}
}
future<> multishard_combining_reader::fill_buffer(db::timeout_clock::time_point timeout) {
_crossed_shards = false;
return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this, timeout] {
auto& reader = _shard_readers[_current_shard];
auto& reader = *_shard_readers[_current_shard];
if (!reader) {
return reader.create_reader();
}
@@ -1494,7 +1488,7 @@ future<> multishard_combining_reader::fill_buffer(db::timeout_clock::time_point
void multishard_combining_reader::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_shard_readers[_current_shard].next_partition();
_shard_readers[_current_shard]->next_partition();
}
}
@@ -1502,8 +1496,8 @@ future<> multishard_combining_reader::fast_forward_to(const dht::partition_range
clear_buffer();
_end_of_stream = false;
on_partition_range_change(pr);
return parallel_for_each(_shard_readers, [&pr, timeout] (shard_reader& sr) {
return sr.fast_forward_to(pr, timeout);
return parallel_for_each(_shard_readers, [&pr, timeout] (lw_shared_ptr<shard_reader>& sr) {
return sr->fast_forward_to(pr, timeout);
});
}