Merge "repair: row_level: prevent deadlocks when repairing homogenous nodes" from Botond

"
This series backports the series "repair: row_level: prevent deadlocks
when repairing homogenous nodes" (merged as a9c7a1a86) to branch-4.1.
"

Fixes #6272

* 'repair-row-level-evictable-local-reader/branch-4.1' of https://github.com/denesb/scylla:
  repair: row_level: destroy reader on EOS or error
  repair: row_level: use evictable_reader for local reads
  mutation_reader: expose evictable_reader
  mutation_reader: evictable_reader: add auto_pause flag
  mutation_reader: make evictable_reader a flat_mutation_reader
  mutation_reader: s/inactive_shard_read/inactive_evictable_reader/
  mutation_reader: move inactive_shard_reader code up
  mutation_reader: fix indentation
  mutation_reader: shard_reader: extract remote_reader as evictable_reader
  mutation_reader: reader_lifecycle_policy: make semaphore() available early

(cherry picked from commit 59aa1834a7)
This commit is contained in:
Avi Kivity
2020-08-23 18:06:12 +03:00
parent e31ffbf2e6
commit 2708b0d664
6 changed files with 708 additions and 367 deletions

View File

@@ -2009,9 +2009,10 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
reader_concurrency_semaphore* semaphore;
};
distributed<database>& _db;
utils::UUID _table_id;
std::vector<reader_context> _contexts;
public:
explicit streaming_reader_lifecycle_policy(distributed<database>& db) : _db(db), _contexts(smp::count) {
streaming_reader_lifecycle_policy(distributed<database>& db, utils::UUID table_id) : _db(db), _table_id(table_id), _contexts(smp::count) {
}
virtual flat_mutation_reader create_reader(
schema_ptr schema,
@@ -2040,7 +2041,12 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
});
}
virtual reader_concurrency_semaphore& semaphore() override {
return *_contexts[engine().cpu_id()].semaphore;
const auto shard = engine().cpu_id();
if (!_contexts[shard].semaphore) {
auto& cf = _db.local().find_column_family(_table_id);
_contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore();
}
return *_contexts[shard].semaphore;
}
};
auto ms = mutation_source([&db] (schema_ptr s,
@@ -2051,7 +2057,8 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db), std::move(s), pr, ps, pc,
auto table_id = s->id();
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db, table_id), std::move(s), pr, ps, pc,
std::move(trace_state), fwd_mr);
});
auto&& full_slice = schema->full_slice();

View File

@@ -195,6 +195,7 @@ class read_context : public reader_lifecycle_policy {
// One for each shard. Index is shard id.
std::vector<reader_meta> _readers;
std::vector<reader_concurrency_semaphore*> _semaphores;
gate _dismantling_gate;
@@ -211,7 +212,8 @@ public:
, _schema(std::move(s))
, _cmd(cmd)
, _ranges(ranges)
, _trace_state(std::move(trace_state)) {
, _trace_state(std::move(trace_state))
, _semaphores(smp::count, nullptr) {
_readers.resize(smp::count);
}
@@ -236,7 +238,12 @@ public:
virtual void destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept override;
virtual reader_concurrency_semaphore& semaphore() override {
return _readers[engine().cpu_id()].rparts->semaphore;
const auto shard = engine().cpu_id();
if (!_semaphores[shard]) {
auto& table = _db.local().find_column_family(_schema);
_semaphores[shard] = &table.read_concurrency_semaphore();
}
return *_semaphores[shard];
}
future<> lookup_readers();

View File

@@ -982,6 +982,435 @@ flat_mutation_reader make_foreign_reader(schema_ptr schema,
namespace {
struct fill_buffer_result {
foreign_ptr<std::unique_ptr<const circular_buffer<mutation_fragment>>> buffer;
bool end_of_stream = false;
fill_buffer_result() = default;
fill_buffer_result(circular_buffer<mutation_fragment> buffer, bool end_of_stream)
: buffer(make_foreign(std::make_unique<const circular_buffer<mutation_fragment>>(std::move(buffer))))
, end_of_stream(end_of_stream) {
}
};
class inactive_evictable_reader : public reader_concurrency_semaphore::inactive_read {
flat_mutation_reader_opt _reader;
public:
inactive_evictable_reader(flat_mutation_reader reader)
: _reader(std::move(reader)) {
}
flat_mutation_reader reader() && {
return std::move(*_reader);
}
virtual void evict() override {
_reader = {};
}
};
}
// Encapsulates all data and logic that is local to the remote shard the
// reader lives on.
class evictable_reader : public flat_mutation_reader::impl {
public:
using auto_pause = bool_class<class auto_pause_tag>;
private:
auto_pause _auto_pause;
mutation_source _ms;
reader_concurrency_semaphore& _semaphore;
const dht::partition_range* _pr;
const query::partition_slice& _ps;
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
reader_concurrency_semaphore::inactive_read_handle _irh;
bool _reader_created = false;
bool _drop_partition_start = false;
bool _drop_static_row = false;
position_in_partition::tri_compare _tri_cmp;
std::optional<dht::decorated_key> _last_pkey;
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
// These are used when the reader has to be recreated (after having been
// evicted while paused) and the range and/or slice it is recreated with
// differs from the original ones.
std::optional<dht::partition_range> _range_override;
std::optional<query::partition_slice> _slice_override;
bool _pending_next_partition = false;
flat_mutation_reader_opt _reader;
private:
void do_pause(flat_mutation_reader reader);
void maybe_pause(flat_mutation_reader reader);
flat_mutation_reader_opt try_resume();
void update_next_position(flat_mutation_reader& reader);
void adjust_partition_slice();
flat_mutation_reader recreate_reader();
flat_mutation_reader resume_or_create_reader();
bool should_drop_fragment(const mutation_fragment& mf);
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
future<> fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
public:
evictable_reader(
auto_pause ap,
mutation_source ms,
schema_ptr schema,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr);
~evictable_reader();
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual void next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override {
throw_with_backtrace<std::bad_function_call>();
}
reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && {
return std::move(_irh);
}
void pause() {
if (_reader) {
do_pause(std::move(*_reader));
}
}
};
void evictable_reader::do_pause(flat_mutation_reader reader) {
_irh = _semaphore.register_inactive_read(std::make_unique<inactive_evictable_reader>(std::move(reader)));
}
void evictable_reader::maybe_pause(flat_mutation_reader reader) {
if (_auto_pause) {
do_pause(std::move(reader));
} else {
_reader = std::move(reader);
}
}
flat_mutation_reader_opt evictable_reader::try_resume() {
auto ir_ptr = _semaphore.unregister_inactive_read(std::move(_irh));
if (!ir_ptr) {
return {};
}
auto& ir = static_cast<inactive_evictable_reader&>(*ir_ptr);
return std::move(ir).reader();
}
void evictable_reader::update_next_position(flat_mutation_reader& reader) {
if (is_buffer_empty()) {
return;
}
auto rbegin = std::reverse_iterator(buffer().end());
auto rend = std::reverse_iterator(buffer().begin());
if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment::is_partition_start)); pk_it != rend) {
_last_pkey = pk_it->as_partition_start().key();
}
const auto last_pos = buffer().back().position();
switch (last_pos.region()) {
case partition_region::partition_start:
_next_position_in_partition = position_in_partition::for_static_row();
break;
case partition_region::static_row:
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
break;
case partition_region::clustered:
if (reader.is_buffer_empty()) {
_next_position_in_partition = position_in_partition::after_key(last_pos);
} else {
const auto& next_frag = reader.peek_buffer();
if (next_frag.is_end_of_partition()) {
push_mutation_fragment(reader.pop_mutation_fragment());
_next_position_in_partition = position_in_partition::for_partition_start();
} else {
_next_position_in_partition = position_in_partition(next_frag.position());
}
}
break;
case partition_region::partition_end:
_next_position_in_partition = position_in_partition::for_partition_start();
break;
}
}
void evictable_reader::adjust_partition_slice() {
if (!_slice_override) {
_slice_override = _ps;
}
auto ranges = _slice_override->default_row_ranges();
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
_slice_override->clear_ranges();
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
}
flat_mutation_reader evictable_reader::recreate_reader() {
const dht::partition_range* range = _pr;
const query::partition_slice* slice = &_ps;
if (_last_pkey) {
bool partition_range_is_inclusive = true;
switch (_next_position_in_partition.region()) {
case partition_region::partition_start:
partition_range_is_inclusive = false;
break;
case partition_region::static_row:
_drop_partition_start = true;
break;
case partition_region::clustered:
_drop_partition_start = true;
_drop_static_row = true;
adjust_partition_slice();
slice = &*_slice_override;
break;
case partition_region::partition_end:
partition_range_is_inclusive = false;
break;
}
// The original range contained a single partition and we've read it
// all. We'd have to create a reader with an empty range that would
// immediately be at EOS. This is not possible so just create an empty
// reader instead.
// This should be extremely rare (who'd create a multishard reader to
// read a single partition) but still, let's make sure we handle it
// correctly.
if (_pr->is_singular() && !partition_range_is_inclusive) {
return make_empty_flat_reader(_schema);
}
_range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end());
range = &*_range_override;
}
return _ms.make_reader(
_schema,
no_reader_permit(),
*range,
*slice,
_pc,
_trace_state,
streamed_mutation::forwarding::no,
_fwd_mr);
}
flat_mutation_reader evictable_reader::resume_or_create_reader() {
if (!_reader_created) {
auto reader = _ms.make_reader(_schema, no_reader_permit(), *_pr, _ps, _pc, _trace_state, streamed_mutation::forwarding::no, _fwd_mr);
_reader_created = true;
return reader;
}
if (_reader) {
return std::move(*_reader);
}
if (auto reader_opt = try_resume()) {
return std::move(*reader_opt);
}
return recreate_reader();
}
bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
if (_drop_partition_start && mf.is_partition_start()) {
_drop_partition_start = false;
return true;
}
if (_drop_static_row && mf.is_static_row()) {
_drop_static_row = false;
return true;
}
return false;
}
future<> evictable_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
if (!_drop_partition_start && !_drop_static_row) {
return reader.fill_buffer(timeout);
}
return repeat([this, &reader, timeout] {
return reader.fill_buffer(timeout).then([this, &reader] {
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
reader.pop_mutation_fragment();
}
return stop_iteration(reader.is_buffer_full() || reader.is_end_of_stream());
});
});
}
future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
return do_fill_buffer(reader, timeout).then([this, &reader, timeout] {
if (reader.is_buffer_empty()) {
return make_ready_future<>();
}
reader.move_buffer_content_to(*this);
auto stop = [this, &reader] {
// The only problematic fragment kind is the range tombstone.
// All other fragment kinds are safe to end the buffer on, and
// are guaranteed to represent progress vs. the last buffer fill.
if (!buffer().back().is_range_tombstone()) {
return true;
}
if (reader.is_buffer_empty()) {
return reader.is_end_of_stream();
}
const auto& next_pos = reader.peek_buffer().position();
// To ensure safe progress we have to ensure the following:
//
// _next_position_in_partition < buffer.back().position() < next_pos
//
// * The first condition is to ensure we made progress since the
// last buffer fill. Otherwise we might get into an endless loop if
// the reader is recreated after each `fill_buffer()` call.
// * The second condition is to ensure we have seen all fragments
// with the same position. Otherwise we might jump over those
// remaining fragments with the same position as the last
// fragment's in the buffer when the reader is recreated.
return _tri_cmp(_next_position_in_partition, buffer().back().position()) < 0 && _tri_cmp(buffer().back().position(), next_pos) < 0;
};
// Read additional fragments until it is safe to stop, if needed.
// We have to ensure we stop at a fragment such that if the reader is
// evicted and recreated later, we won't be skipping any fragments.
// Practically, range tombstones are the only ones that are
// problematic to end the buffer on. This is due to the fact range
// tombstones can have the same position that multiple following range
// tombstones, or a single following clustering row in the stream has.
// When a range tombstone is the last in the buffer, we have to continue
// to read until we are sure we've read all fragments sharing the same
// position, so that we can safely continue reading from after said
// position.
return do_until(stop, [this, &reader, timeout] {
if (reader.is_buffer_empty()) {
return do_fill_buffer(reader, timeout);
}
push_mutation_fragment(reader.pop_mutation_fragment());
return make_ready_future<>();
});
}).then([this, &reader] {
update_next_position(reader);
});
}
evictable_reader::evictable_reader(
auto_pause ap,
mutation_source ms,
schema_ptr schema,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: impl(std::move(schema))
, _auto_pause(ap)
, _ms(std::move(ms))
, _semaphore(semaphore)
, _pr(&pr)
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr)
, _tri_cmp(*_schema) {
}
evictable_reader::~evictable_reader() {
try_resume();
}
future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) {
const auto pending_next_partition = std::exchange(_pending_next_partition, false);
if (pending_next_partition) {
_next_position_in_partition = position_in_partition::for_partition_start();
}
if (is_end_of_stream()) {
return make_ready_future<>();
}
return do_with(resume_or_create_reader(),
[this, pending_next_partition, timeout] (flat_mutation_reader& reader) mutable {
if (pending_next_partition) {
reader.next_partition();
}
return fill_buffer(reader, timeout).then([this, &reader] {
_end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty();
maybe_pause(std::move(reader));
});
});
}
void evictable_reader::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_pending_next_partition = true;
_next_position_in_partition = position_in_partition::for_partition_start();
}
}
future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
_pr = &pr;
_last_pkey.reset();
_next_position_in_partition = position_in_partition::for_partition_start();
clear_buffer();
_end_of_stream = false;
if (_reader) {
return _reader->fast_forward_to(pr, timeout);
}
if (!_reader_created || !_irh) {
return make_ready_future<>();
}
if (auto reader_opt = try_resume()) {
auto f = reader_opt->fast_forward_to(pr, timeout);
return f.then([this, reader = std::move(*reader_opt)] () mutable {
maybe_pause(std::move(reader));
});
}
return make_ready_future<>();
}
evictable_reader_handle::evictable_reader_handle(evictable_reader& r) : _r(&r)
{ }
void evictable_reader_handle::evictable_reader_handle::pause() {
_r->pause();
}
flat_mutation_reader make_auto_paused_evictable_reader(
mutation_source ms,
schema_ptr schema,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
return make_flat_mutation_reader<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms), std::move(schema), semaphore, pr, ps,
pc, std::move(trace_state), fwd_mr);
}
std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(
mutation_source ms,
schema_ptr schema,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
auto reader = std::make_unique<evictable_reader>(evictable_reader::auto_pause::no, std::move(ms), std::move(schema), semaphore, pr, ps,
pc, std::move(trace_state), fwd_mr);
auto handle = evictable_reader_handle(*reader.get());
return std::pair(flat_mutation_reader(std::move(reader)), handle);
}
namespace {
// A special-purpose shard reader.
//
// Shard reader manages a reader located on a remote shard. It transparently
@@ -992,66 +1421,6 @@ namespace {
// wrapped into a flat_mutation_reader, as it needs to be managed by a shared
// pointer.
class shard_reader : public enable_lw_shared_from_this<shard_reader>, public flat_mutation_reader::impl {
struct fill_buffer_result {
foreign_ptr<std::unique_ptr<const circular_buffer<mutation_fragment>>> buffer;
bool end_of_stream = false;
fill_buffer_result() = default;
fill_buffer_result(circular_buffer<mutation_fragment> buffer, bool end_of_stream)
: buffer(make_foreign(std::make_unique<const circular_buffer<mutation_fragment>>(std::move(buffer))))
, end_of_stream(end_of_stream) {
}
};
// Encapsulates all data and logic that is local to the remote shard the
// reader lives on.
class remote_reader {
schema_ptr _schema;
reader_lifecycle_policy& _lifecycle_policy;
const dht::partition_range* _pr;
const query::partition_slice& _ps;
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
reader_concurrency_semaphore::inactive_read_handle _irh;
bool _reader_created = false;
bool _drop_partition_start = false;
bool _drop_static_row = false;
position_in_partition::tri_compare _tri_cmp;
std::optional<dht::decorated_key> _last_pkey;
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
// These are used when the reader has to be recreated (after having been
// evicted while paused) and the range and/or slice it is recreated with
// differs from the original ones.
std::optional<dht::partition_range> _range_override;
std::optional<query::partition_slice> _slice_override;
private:
void update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer);
void adjust_partition_slice();
flat_mutation_reader recreate_reader();
flat_mutation_reader resume_or_create_reader();
bool should_drop_fragment(const mutation_fragment& mf);
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
future<> fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer, db::timeout_clock::time_point timeout);
public:
remote_reader(
schema_ptr schema,
reader_lifecycle_policy& lifecycle_policy,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr);
future<fill_buffer_result> fill_buffer(const dht::partition_range& pr, bool pending_next_partition, db::timeout_clock::time_point timeout);
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout);
reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && {
return std::move(_irh);
}
};
private:
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
const unsigned _shard;
@@ -1063,7 +1432,7 @@ private:
bool _pending_next_partition = false;
bool _stopped = false;
std::optional<future<>> _read_ahead;
foreign_ptr<std::unique_ptr<remote_reader>> _reader;
foreign_ptr<std::unique_ptr<evictable_reader>> _reader;
private:
future<> do_fill_buffer(db::timeout_clock::time_point timeout);
@@ -1125,275 +1494,50 @@ void shard_reader::stop() noexcept {
_lifecycle_policy->destroy_reader(_shard, f.then([this] {
return smp::submit_to(_shard, [this] {
return make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(*_reader).inactive_read_handle()));
}).then([this] (foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>> irh) {
return reader_lifecycle_policy::stopped_reader{std::move(irh), detach_buffer(), _pending_next_partition};
auto ret = std::tuple(
make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(*_reader).inactive_read_handle())),
make_foreign(std::make_unique<circular_buffer<mutation_fragment>>(_reader->detach_buffer())));
_reader.reset();
return ret;
}).then([this] (std::tuple<foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>>,
foreign_ptr<std::unique_ptr<circular_buffer<mutation_fragment>>>> remains) {
auto&& [irh, remote_buffer] = remains;
auto buffer = detach_buffer();
for (const auto& mf : *remote_buffer) {
buffer.emplace_back(*_schema, mf); // we are copying from the remote shard.
}
return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer), _pending_next_partition};
});
}).finally([zis = shared_from_this()] {}));
}
void shard_reader::remote_reader::update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) {
if (buffer.empty()) {
return;
}
auto rbegin = std::reverse_iterator(buffer.end());
auto rend = std::reverse_iterator(buffer.begin());
if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment::is_partition_start)); pk_it != rend) {
_last_pkey = pk_it->as_partition_start().key();
}
const auto last_pos = buffer.back().position();
switch (last_pos.region()) {
case partition_region::partition_start:
_next_position_in_partition = position_in_partition::for_static_row();
break;
case partition_region::static_row:
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
break;
case partition_region::clustered:
if (reader.is_buffer_empty()) {
_next_position_in_partition = position_in_partition::after_key(last_pos);
} else {
const auto& next_frag = reader.peek_buffer();
if (next_frag.is_end_of_partition()) {
buffer.emplace_back(reader.pop_mutation_fragment());
_next_position_in_partition = position_in_partition::for_partition_start();
} else {
_next_position_in_partition = position_in_partition(next_frag.position());
}
}
break;
case partition_region::partition_end:
_next_position_in_partition = position_in_partition::for_partition_start();
break;
}
}
void shard_reader::remote_reader::adjust_partition_slice() {
if (!_slice_override) {
_slice_override = _ps;
}
auto ranges = _slice_override->default_row_ranges();
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
_slice_override->clear_ranges();
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
}
flat_mutation_reader shard_reader::remote_reader::recreate_reader() {
const dht::partition_range* range = _pr;
const query::partition_slice* slice = &_ps;
if (_last_pkey) {
bool partition_range_is_inclusive = true;
switch (_next_position_in_partition.region()) {
case partition_region::partition_start:
partition_range_is_inclusive = false;
break;
case partition_region::static_row:
_drop_partition_start = true;
break;
case partition_region::clustered:
_drop_partition_start = true;
_drop_static_row = true;
adjust_partition_slice();
slice = &*_slice_override;
break;
case partition_region::partition_end:
partition_range_is_inclusive = false;
break;
}
// The original range contained a single partition and we've read it
// all. We'd have to create a reader with an empty range that would
// immediately be at EOS. This is not possible so just create an empty
// reader instead.
// This should be extremely rare (who'd create a multishard reader to
// read a single partition) but still, let's make sure we handle it
// correctly.
if (_pr->is_singular() && !partition_range_is_inclusive) {
return make_empty_flat_reader(_schema);
}
_range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end());
range = &*_range_override;
}
return _lifecycle_policy.create_reader(
_schema,
*range,
*slice,
_pc,
_trace_state,
_fwd_mr);
}
flat_mutation_reader shard_reader::remote_reader::resume_or_create_reader() {
if (!_reader_created) {
auto reader = _lifecycle_policy.create_reader(_schema, *_pr, _ps, _pc, _trace_state, _fwd_mr);
_reader_created = true;
return reader;
}
if (auto reader_opt = _lifecycle_policy.try_resume(std::move(_irh))) {
return std::move(*reader_opt);
}
return recreate_reader();
}
bool shard_reader::remote_reader::should_drop_fragment(const mutation_fragment& mf) {
if (_drop_partition_start && mf.is_partition_start()) {
_drop_partition_start = false;
return true;
}
if (_drop_static_row && mf.is_static_row()) {
_drop_static_row = false;
return true;
}
return false;
}
future<> shard_reader::remote_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
if (!_drop_partition_start && !_drop_static_row) {
return reader.fill_buffer(timeout);
}
return repeat([this, &reader, timeout] {
return reader.fill_buffer(timeout).then([this, &reader] {
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
reader.pop_mutation_fragment();
}
return stop_iteration(reader.is_buffer_full() || reader.is_end_of_stream());
});
});
}
future<> shard_reader::remote_reader::fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer,
db::timeout_clock::time_point timeout) {
return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] {
if (reader.is_buffer_empty()) {
return make_ready_future<>();
}
buffer = reader.detach_buffer();
auto stop = [this, &reader, &buffer] {
// The only problematic fragment kind is the range tombstone.
// All other fragment kinds are safe to end the buffer on, and
// are guaranteed to represent progress vs. the last buffer fill.
if (!buffer.back().is_range_tombstone()) {
return true;
}
if (reader.is_buffer_empty()) {
return reader.is_end_of_stream();
}
const auto& next_pos = reader.peek_buffer().position();
// To ensure safe progress we have to ensure the following:
//
// _next_position_in_partition < buffer.back().position() < next_pos
//
// * The first condition is to ensure we made progress since the
// last buffer fill. Otherwise we might get into an endless loop if
// the reader is recreated after each `fill_buffer()` call.
// * The second condition is to ensure we have seen all fragments
// with the same position. Otherwise we might jump over those
// remaining fragments with the same position as the last
// fragment's in the buffer when the reader is recreated.
return _tri_cmp(_next_position_in_partition, buffer.back().position()) < 0 && _tri_cmp(buffer.back().position(), next_pos) < 0;
};
// Read additional fragments until it is safe to stop, if needed.
// We have to ensure we stop at a fragment such that if the reader is
// evicted and recreated later, we won't be skipping any fragments.
// Practically, range tombstones are the only ones that are
// problematic to end the buffer on. This is due to the fact range
// tombstones can have the same position that multiple following range
// tombstones, or a single following clustering row in the stream has.
// When a range tombstone is the last in the buffer, we have to continue
// to read until we are sure we've read all fragments sharing the same
// position, so that we can safely continue reading from after said
// position.
return do_until(stop, [this, &reader, &buffer, timeout] {
if (reader.is_buffer_empty()) {
return do_fill_buffer(reader, timeout);
}
buffer.emplace_back(reader.pop_mutation_fragment());
return make_ready_future<>();
});
}).then([this, &reader, &buffer] {
update_next_position(reader, buffer);
});
}
shard_reader::remote_reader::remote_reader(
schema_ptr schema,
reader_lifecycle_policy& lifecycle_policy,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: _schema(std::move(schema))
, _lifecycle_policy(lifecycle_policy)
, _pr(&pr)
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr)
, _tri_cmp(*_schema) {
}
future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffer(const dht::partition_range& pr, bool pending_next_partition,
db::timeout_clock::time_point timeout) {
// We could have missed a `fast_forward_to()` if the reader wasn't created yet.
_pr = &pr;
if (pending_next_partition) {
_next_position_in_partition = position_in_partition::for_partition_start();
}
return do_with(resume_or_create_reader(), circular_buffer<mutation_fragment>{},
[this, pending_next_partition, timeout] (flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) mutable {
if (pending_next_partition) {
reader.next_partition();
}
return fill_buffer(reader, buffer, timeout).then([this, &reader, &buffer] {
const auto eos = reader.is_end_of_stream() && reader.is_buffer_empty();
_irh = _lifecycle_policy.pause(std::move(reader));
return fill_buffer_result(std::move(buffer), eos);
});
});
}
future<> shard_reader::remote_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
_pr = &pr;
_last_pkey.reset();
_next_position_in_partition = position_in_partition::for_partition_start();
if (!_reader_created || !_irh) {
return make_ready_future<>();
}
if (auto reader_opt = _lifecycle_policy.try_resume(std::move(_irh))) {
auto f = reader_opt->fast_forward_to(pr, timeout);
return f.then([this, reader = std::move(*reader_opt)] () mutable {
_irh = _lifecycle_policy.pause(std::move(reader));
});
}
return make_ready_future<>();
}
future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
auto fill_buf_fut = make_ready_future<fill_buffer_result>();
const auto pending_next_partition = std::exchange(_pending_next_partition, false);
struct reader_and_buffer_fill_result {
foreign_ptr<std::unique_ptr<remote_reader>> reader;
foreign_ptr<std::unique_ptr<evictable_reader>> reader;
fill_buffer_result result;
};
if (!_reader) {
fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema), pending_next_partition, timeout] {
auto rreader = make_foreign(std::make_unique<remote_reader>(gs.get(), *_lifecycle_policy, *_pr, _ps, _pc, _trace_state, _fwd_mr));
auto f = rreader->fill_buffer(*_pr, pending_next_partition, timeout);
return f.then([rreader = std::move(rreader)] (fill_buffer_result res) mutable {
fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema), timeout] {
auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] (
schema_ptr s,
reader_permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr ts,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return lifecycle_policy->create_reader(std::move(s), pr, ps, pc, std::move(ts), fwd_mr);
});
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
gs.get(), _lifecycle_policy->semaphore(), *_pr, _ps, _pc, _trace_state, _fwd_mr));
auto f = rreader->fill_buffer(timeout);
return f.then([rreader = std::move(rreader)] () mutable {
auto res = fill_buffer_result(rreader->detach_buffer(), rreader->is_end_of_stream());
return make_ready_future<reader_and_buffer_fill_result>(reader_and_buffer_fill_result{std::move(rreader), std::move(res)});
});
}).then([this, timeout] (reader_and_buffer_fill_result res) {
@@ -1402,7 +1546,12 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
});
} else {
fill_buf_fut = smp::submit_to(_shard, [this, pending_next_partition, timeout] () mutable {
return _reader->fill_buffer(*_pr, pending_next_partition, timeout);
if (pending_next_partition) {
_reader->next_partition();
}
return _reader->fill_buffer(timeout).then([this] {
return fill_buffer_result(_reader->detach_buffer(), _reader->is_end_of_stream());
});
});
}
@@ -1648,27 +1797,9 @@ future<> multishard_combining_reader::fast_forward_to(position_range pr, db::tim
return make_exception_future<>(std::bad_function_call());
}
namespace {
class inactive_shard_read : public reader_concurrency_semaphore::inactive_read {
flat_mutation_reader_opt _reader;
public:
inactive_shard_read(flat_mutation_reader reader)
: _reader(std::move(reader)) {
}
flat_mutation_reader reader() && {
return std::move(*_reader);
}
virtual void evict() override {
_reader = {};
}
};
}
reader_concurrency_semaphore::inactive_read_handle
reader_lifecycle_policy::pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader) {
return sem.register_inactive_read(std::make_unique<inactive_shard_read>(std::move(reader)));
return sem.register_inactive_read(std::make_unique<inactive_evictable_reader>(std::move(reader)));
}
flat_mutation_reader_opt
@@ -1677,7 +1808,7 @@ reader_lifecycle_policy::try_resume(reader_concurrency_semaphore& sem, reader_co
if (!ir_ptr) {
return {};
}
auto& ir = static_cast<inactive_shard_read&>(*ir_ptr);
auto& ir = static_cast<inactive_evictable_reader&>(*ir_ptr);
return std::move(ir).reader();
}

View File

@@ -372,6 +372,64 @@ flat_mutation_reader make_foreign_reader(schema_ptr schema,
foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader,
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no);
/// Make an auto-paused evictable reader.
///
/// The reader is paused after each use, that is after each call to any of its
/// members that cause actual reading to be done (`fill_buffer()` and
/// `fast_forward_to()`). When paused, the reader is made evictable, that it is
/// it is registered with reader concurrency semaphore as an inactive read.
/// The reader is resumed automatically on the next use. If it was evicted, it
/// will be recreated at the position it left off reading. This is all
/// transparent to its user.
/// Parameters passed by reference have to be kept alive while the reader is
/// alive.
flat_mutation_reader make_auto_paused_evictable_reader(
mutation_source ms,
schema_ptr schema,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr);
class evictable_reader;
class evictable_reader_handle {
friend std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(mutation_source, schema_ptr, reader_concurrency_semaphore&,
const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, mutation_reader::forwarding);
private:
evictable_reader* _r;
private:
explicit evictable_reader_handle(evictable_reader& r);
public:
void pause();
};
/// Make a manually-paused evictable reader.
///
/// The reader can be paused via the evictable reader handle when desired. The
/// intended usage is subsequent reads done in bursts, after which the reader is
/// not used for some time. When paused, the reader is made evictable, that is,
/// it is registered with reader concurrency semaphore as an inactive read.
/// The reader is resumed automatically on the next use. If it was evicted, it
/// will be recreated at the position it left off reading. This is all
/// transparent to its user.
/// Parameters passed by reference have to be kept alive while the reader is
/// alive.
std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(
mutation_source ms,
schema_ptr schema,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr);
/// Reader lifecycle policy for the mulitshard combining reader.
///
/// This policy is expected to make sure any additional resource the readers

View File

@@ -373,6 +373,7 @@ private:
std::optional<utils::phased_barrier::operation> _local_read_op;
// Local reader or multishard reader to read the range
flat_mutation_reader _reader;
std::optional<evictable_reader_handle> _reader_handle;
// Current partition read from disk
lw_shared_ptr<const decorated_key_with_hash> _current_dk;
@@ -392,32 +393,49 @@ public:
, _sharder(remote_partitioner, range, remote_shard)
, _seed(seed)
, _local_read_op(local_reader ? std::optional(cf.read_in_progress()) : std::nullopt)
, _reader(make_reader(db, cf, local_reader)) {
}
private:
flat_mutation_reader
make_reader(seastar::sharded<database>& db,
column_family& cf,
is_local_reader local_reader) {
, _reader(nullptr) {
if (local_reader) {
return cf.make_streaming_reader(_schema, _range);
auto ms = mutation_source([&cf] (
schema_ptr s,
reader_permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return cf.make_streaming_reader(std::move(s), pr, ps, fwd_mr);
});
std::tie(_reader, _reader_handle) = make_manually_paused_evictable_reader(
std::move(ms),
_schema,
cf.streaming_read_concurrency_semaphore(),
_range,
_schema->full_slice(),
service::get_local_streaming_read_priority(),
{},
mutation_reader::forwarding::no);
} else {
_reader = make_multishard_streaming_reader(db, _schema, [this] {
auto shard_range = _sharder.next();
if (shard_range) {
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
});
}
return make_multishard_streaming_reader(db, _schema, [this] {
auto shard_range = _sharder.next();
if (shard_range) {
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
});
}
public:
future<mutation_fragment_opt>
read_mutation_fragment() {
return _reader(db::no_timeout);
}
void on_end_of_stream() {
_reader = make_empty_flat_reader(_schema);
_reader_handle.reset();
}
lw_shared_ptr<const decorated_key_with_hash>& get_current_dk() {
return _current_dk;
}
@@ -436,6 +454,11 @@ public:
}
}
void pause() {
if (_reader_handle) {
_reader_handle->pause();
}
}
};
class repair_writer {
@@ -1019,11 +1042,7 @@ private:
return repair_hash(h.finalize_uint64());
}
stop_iteration handle_mutation_fragment(mutation_fragment_opt mfopt, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
if (!mfopt) {
return stop_iteration::yes;
}
mutation_fragment& mf = *mfopt;
stop_iteration handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
if (mf.is_partition_start()) {
auto& start = mf.as_partition_start();
_repair_reader.set_current_dk(start.key());
@@ -1058,9 +1077,18 @@ private:
}
_gate.check();
return _repair_reader.read_mutation_fragment().then([this, &cur_size, &new_rows_size, &cur_rows] (mutation_fragment_opt mfopt) mutable {
return handle_mutation_fragment(std::move(mfopt), cur_size, new_rows_size, cur_rows);
if (!mfopt) {
_repair_reader.on_end_of_stream();
return stop_iteration::yes;
}
return handle_mutation_fragment(*mfopt, cur_size, new_rows_size, cur_rows);
});
}).then([&cur_rows, &new_rows_size] () mutable {
}).then_wrapped([this, &cur_rows, &new_rows_size] (future<> fut) mutable {
if (fut.failed()) {
_repair_reader.on_end_of_stream();
return make_exception_future<std::list<repair_row>, size_t>(fut.get_exception());
}
_repair_reader.pause();
return make_ready_future<std::list<repair_row>, size_t>(std::move(cur_rows), new_rows_size);
});
});

View File

@@ -43,6 +43,7 @@
#include "test/lib/make_random_string.hh"
#include "test/lib/dummy_partitioner.hh"
#include "test/lib/reader_lifecycle_policy.hh"
#include "test/lib/random_utils.hh"
#include "dht/sharder.hh"
#include "mutation_reader.hh"
@@ -2737,3 +2738,112 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_next_partition) {
}
reader_assertions.produces_end_of_stream();
}
SEASTAR_THREAD_TEST_CASE(test_auto_paused_evictable_reader_is_mutation_source) {
auto make_populate = [] (schema_ptr s, const std::vector<mutation>& mutations, gc_clock::time_point query_time) {
auto mt = make_lw_shared<memtable>(s);
for (auto& mut : mutations) {
mt->apply(mut);
}
auto sem = make_lw_shared<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits());
return mutation_source([=] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto mr = make_auto_paused_evictable_reader(mt->as_data_source(), std::move(s), *sem, range, slice, pc, std::move(trace_state), fwd_mr);
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
}
return mr;
});
};
run_mutation_source_tests(make_populate);
}
SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source) {
class maybe_pausing_reader : public flat_mutation_reader::impl {
flat_mutation_reader _reader;
std::optional<evictable_reader_handle> _handle;
private:
void maybe_pause() {
if (!tests::random::get_int(0, 4)) {
_handle->pause();
}
}
public:
maybe_pausing_reader(
memtable& mt,
reader_concurrency_semaphore& semaphore,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: impl(mt.schema()), _reader(nullptr) {
std::tie(_reader, _handle) = make_manually_paused_evictable_reader(mt.as_data_source(), mt.schema(), semaphore, pr, ps, pc,
std::move(trace_state), fwd_mr);
}
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
return _reader.fill_buffer(timeout).then([this] {
_end_of_stream = _reader.is_end_of_stream();
_reader.move_buffer_content_to(*this);
}).then([this] {
maybe_pause();
});
}
virtual void next_partition() override {
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
return;
}
_end_of_stream = false;
_reader.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();
_end_of_stream = false;
return _reader.fast_forward_to(pr, timeout).then([this] {
maybe_pause();
});
}
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
throw_with_backtrace<std::bad_function_call>();
}
virtual size_t buffer_size() const override {
return flat_mutation_reader::impl::buffer_size() + _reader.buffer_size();
}
};
auto make_populate = [this] (schema_ptr s, const std::vector<mutation>& mutations, gc_clock::time_point query_time) {
auto mt = make_lw_shared<memtable>(s);
for (auto& mut : mutations) {
mt->apply(mut);
}
auto sem = make_lw_shared<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits());
return mutation_source([=] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto mr = make_flat_mutation_reader<maybe_pausing_reader>(*mt, *sem, range, slice, pc, std::move(trace_state), fwd_mr);
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
}
return mr;
});
};
run_mutation_source_tests(make_populate);
}