introduce multishard_combining_reader_v2

All changes are mechanical.

Signed-off-by: Michael Livshin <michael.livshin@scylladb.com>
This commit is contained in:
Michael Livshin
2022-01-05 21:18:13 +02:00
parent 4bc0deb7e9
commit 7f0e228cbb
2 changed files with 259 additions and 0 deletions

View File

@@ -2714,6 +2714,226 @@ flat_mutation_reader make_multishard_combining_reader_for_tests(
std::move(trace_state), fwd_mr);
}
// See make_multishard_combining_reader() for description.
class multishard_combining_reader_v2 : public flat_mutation_reader_v2::impl {
struct shard_and_token {
shard_id shard;
dht::token token;
bool operator<(const shard_and_token& o) const {
// Reversed, as we want a min-heap.
return token > o.token;
}
};
const dht::sharder& _sharder;
std::vector<std::unique_ptr<shard_reader_v2>> _shard_readers;
// Contains the position of each shard with token granularity, organized
// into a min-heap. Used to select the shard with the smallest token each
// time a shard reader produces a new partition.
std::vector<shard_and_token> _shard_selection_min_heap;
unsigned _current_shard;
bool _crossed_shards;
unsigned _concurrency = 1;
void on_partition_range_change(const dht::partition_range& pr);
bool maybe_move_to_next_shard(const dht::token* const t = nullptr);
future<> handle_empty_reader_buffer();
public:
multishard_combining_reader_v2(
const dht::sharder& sharder,
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr);
// this is captured.
multishard_combining_reader_v2(const multishard_combining_reader_v2&) = delete;
multishard_combining_reader_v2& operator=(const multishard_combining_reader_v2&) = delete;
multishard_combining_reader_v2(multishard_combining_reader_v2&&) = delete;
multishard_combining_reader_v2& operator=(multishard_combining_reader_v2&&) = delete;
virtual future<> fill_buffer() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
virtual future<> fast_forward_to(position_range pr) override;
virtual future<> close() noexcept override;
};
void multishard_combining_reader_v2::on_partition_range_change(const dht::partition_range& pr) {
_shard_selection_min_heap.clear();
_shard_selection_min_heap.reserve(_sharder.shard_count());
auto token = pr.start() ? pr.start()->value().token() : dht::minimum_token();
_current_shard = _sharder.shard_of(token);
auto sharder = dht::ring_position_range_sharder(_sharder, pr);
auto next = sharder.next(*_schema);
// The first value of `next` is thrown away, as it is the ring range of the current shard.
// We only want to do a full round, until we get back to the shard we started from (`_current_shard`).
// We stop earlier if the sharder has no ranges for the remaining shards.
for (next = sharder.next(*_schema); next && next->shard != _current_shard; next = sharder.next(*_schema)) {
_shard_selection_min_heap.push_back(shard_and_token{next->shard, next->ring_range.start()->value().token()});
boost::push_heap(_shard_selection_min_heap);
}
}
bool multishard_combining_reader_v2::maybe_move_to_next_shard(const dht::token* const t) {
if (_shard_selection_min_heap.empty() || (t && *t < _shard_selection_min_heap.front().token)) {
return false;
}
boost::pop_heap(_shard_selection_min_heap);
const auto next_shard = _shard_selection_min_heap.back().shard;
_shard_selection_min_heap.pop_back();
if (t) {
_shard_selection_min_heap.push_back(shard_and_token{_current_shard, *t});
boost::push_heap(_shard_selection_min_heap);
}
_crossed_shards = true;
_current_shard = next_shard;
return true;
}
future<> multishard_combining_reader_v2::handle_empty_reader_buffer() {
auto& reader = *_shard_readers[_current_shard];
if (reader.is_end_of_stream()) {
if (_shard_selection_min_heap.empty()) {
_end_of_stream = true;
} else {
maybe_move_to_next_shard();
}
return make_ready_future<>();
} else if (reader.is_read_ahead_in_progress()) {
return reader.fill_buffer();
} else {
// If we crossed shards and the next reader has an empty buffer we
// double concurrency so the next time we cross shards we will have
// more chances of hitting the reader's buffer.
if (_crossed_shards) {
_concurrency = std::min(_concurrency * 2, _sharder.shard_count());
// Read ahead shouldn't change the min selection heap so we work on a local copy.
auto shard_selection_min_heap_copy = _shard_selection_min_heap;
// If concurrency > 1 we kick-off concurrency-1 read-aheads in the
// background. They will be brought to the foreground when we move
// to their respective shard.
for (unsigned i = 1; i < _concurrency && !shard_selection_min_heap_copy.empty(); ++i) {
boost::pop_heap(shard_selection_min_heap_copy);
const auto next_shard = shard_selection_min_heap_copy.back().shard;
shard_selection_min_heap_copy.pop_back();
_shard_readers[next_shard]->read_ahead();
}
}
return reader.fill_buffer();
}
}
multishard_combining_reader_v2::multishard_combining_reader_v2(
const dht::sharder& sharder,
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: impl(std::move(s), std::move(permit)), _sharder(sharder) {
on_partition_range_change(pr);
_shard_readers.reserve(_sharder.shard_count());
for (unsigned i = 0; i < _sharder.shard_count(); ++i) {
_shard_readers.emplace_back(std::make_unique<shard_reader_v2>(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr));
}
}
future<> multishard_combining_reader_v2::fill_buffer() {
_crossed_shards = false;
return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this] {
auto& reader = *_shard_readers[_current_shard];
if (reader.is_buffer_empty()) {
return handle_empty_reader_buffer();
}
while (!reader.is_buffer_empty() && !is_buffer_full()) {
if (const auto& mf = reader.peek_buffer(); mf.is_partition_start() && maybe_move_to_next_shard(&mf.as_partition_start().key().token())) {
return make_ready_future<>();
}
push_mutation_fragment(reader.pop_mutation_fragment());
}
return make_ready_future<>();
});
}
future<> multishard_combining_reader_v2::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
return _shard_readers[_current_shard]->next_partition();
}
return make_ready_future<>();
}
future<> multishard_combining_reader_v2::fast_forward_to(const dht::partition_range& pr) {
clear_buffer();
_end_of_stream = false;
on_partition_range_change(pr);
return parallel_for_each(_shard_readers, [&pr] (std::unique_ptr<shard_reader_v2>& sr) {
return sr->fast_forward_to(pr);
});
}
future<> multishard_combining_reader_v2::fast_forward_to(position_range pr) {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
future<> multishard_combining_reader_v2::close() noexcept {
return parallel_for_each(_shard_readers, [] (std::unique_ptr<shard_reader_v2>& sr) {
return sr->close();
});
}
flat_mutation_reader_v2 make_multishard_combining_reader_v2(
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
const dht::sharder& sharder = schema->get_sharder();
return make_flat_mutation_reader_v2<multishard_combining_reader_v2>(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc,
std::move(trace_state), fwd_mr);
}
flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests(
const dht::sharder& sharder,
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
return make_flat_mutation_reader_v2<multishard_combining_reader_v2>(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc,
std::move(trace_state), fwd_mr);
}
class queue_reader final : public flat_mutation_reader::impl {
friend class queue_reader_handle;

View File

@@ -716,6 +716,45 @@ flat_mutation_reader make_multishard_combining_reader_for_tests(
tracing::trace_state_ptr trace_state = nullptr,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
/// Make a multishard_combining_reader.
///
/// multishard_combining_reader takes care of reading a range from all shards
/// that own a subrange in the range. Shard reader are created on-demand, when
/// the shard is visited for the first time.
///
/// The read starts with a concurrency of one, that is the reader reads from a
/// single shard at a time. The concurrency is exponentially increased (to a
/// maximum of the number of shards) when a reader's buffer is empty after
/// moving the next shard. This condition is important as we only wan't to
/// increase concurrency for sparse tables that have little data and the reader
/// has to move between shards often. When concurrency is > 1, the reader
/// issues background read-aheads to the next shards so that by the time it
/// needs to move to them they have the data ready.
/// For dense tables (where we rarely cross shards) we rely on the
/// foreign_reader to issue sufficient read-aheads on its own to avoid blocking.
///
/// The readers' life-cycles are managed through the supplied lifecycle policy.
flat_mutation_reader_v2 make_multishard_combining_reader_v2(
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state = nullptr,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests(
const dht::sharder& sharder,
shared_ptr<reader_lifecycle_policy_v2> lifecycle_policy,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state = nullptr,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
class queue_reader;
/// Calls to different methods cannot overlap!