readers/multishard: propagate fill_buffer_hint to shard_reader:fill_reader_buffer()
The hint will tell the shard reader exactly how much data to produce, to avoid multiple cross-shard round-trips and possible evict-recreate cycles. The hint is neither used yet or calculated yet, this is coming in the next patches.
This commit is contained in:
@@ -686,6 +686,11 @@ std::pair<mutation_reader, evictable_reader_handle_v2> make_manually_paused_evic
|
||||
|
||||
namespace {
|
||||
|
||||
struct buffer_fill_hint {
|
||||
size_t size;
|
||||
dht::token stop_token;
|
||||
};
|
||||
|
||||
// A special-purpose shard reader.
|
||||
//
|
||||
// Shard reader manages a reader located on a remote shard. It transparently
|
||||
@@ -707,8 +712,8 @@ private:
|
||||
foreign_ptr<std::unique_ptr<evictable_reader_v2>> _reader;
|
||||
|
||||
private:
|
||||
future<remote_fill_buffer_result_v2> fill_reader_buffer(evictable_reader_v2& reader);
|
||||
future<> do_fill_buffer();
|
||||
future<remote_fill_buffer_result_v2> fill_reader_buffer(evictable_reader_v2& reader, std::optional<buffer_fill_hint> hint);
|
||||
future<> do_fill_buffer(std::optional<buffer_fill_hint> hint);
|
||||
|
||||
public:
|
||||
shard_reader_v2(
|
||||
@@ -738,7 +743,10 @@ public:
|
||||
const mutation_fragment_v2& peek_buffer() const {
|
||||
return buffer().front();
|
||||
}
|
||||
virtual future<> fill_buffer() override;
|
||||
future<> fill_buffer(std::optional<buffer_fill_hint> hint);
|
||||
virtual future<> fill_buffer() override {
|
||||
return fill_buffer(std::nullopt);
|
||||
}
|
||||
virtual future<> next_partition() override;
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override;
|
||||
virtual future<> fast_forward_to(position_range) override;
|
||||
@@ -792,7 +800,7 @@ future<> shard_reader_v2::close() noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
future<remote_fill_buffer_result_v2> shard_reader_v2::fill_reader_buffer(evictable_reader_v2& reader) {
|
||||
future<remote_fill_buffer_result_v2> shard_reader_v2::fill_reader_buffer(evictable_reader_v2& reader, std::optional<buffer_fill_hint> hint) {
|
||||
reader_permit::need_cpu_guard ncpu_guard{reader.permit()};
|
||||
|
||||
co_await reader.fill_buffer();
|
||||
@@ -800,7 +808,7 @@ future<remote_fill_buffer_result_v2> shard_reader_v2::fill_reader_buffer(evictab
|
||||
co_return remote_fill_buffer_result_v2(reader.detach_buffer(), reader.is_end_of_stream());
|
||||
}
|
||||
|
||||
future<> shard_reader_v2::do_fill_buffer() {
|
||||
future<> shard_reader_v2::do_fill_buffer(std::optional<buffer_fill_hint> hint) {
|
||||
struct reader_and_buffer_fill_result {
|
||||
foreign_ptr<std::unique_ptr<evictable_reader_v2>> reader;
|
||||
remote_fill_buffer_result_v2 result;
|
||||
@@ -808,7 +816,7 @@ future<> shard_reader_v2::do_fill_buffer() {
|
||||
|
||||
auto res = co_await std::invoke([&] () -> future<remote_fill_buffer_result_v2> {
|
||||
if (!_reader) {
|
||||
reader_and_buffer_fill_result res = co_await smp::submit_to(_shard, coroutine::lambda([this, gs = global_schema_ptr(_schema)] () -> future<reader_and_buffer_fill_result> {
|
||||
reader_and_buffer_fill_result res = co_await smp::submit_to(_shard, coroutine::lambda([this, gs = global_schema_ptr(_schema), hint] () -> future<reader_and_buffer_fill_result> {
|
||||
auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
@@ -851,7 +859,7 @@ future<> shard_reader_v2::do_fill_buffer() {
|
||||
|
||||
try {
|
||||
tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id());
|
||||
auto res = co_await fill_reader_buffer(*rreader);
|
||||
auto res = co_await fill_reader_buffer(*rreader, hint);
|
||||
co_return reader_and_buffer_fill_result{std::move(rreader), std::move(res)};
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
@@ -862,8 +870,8 @@ future<> shard_reader_v2::do_fill_buffer() {
|
||||
_reader = std::move(res.reader);
|
||||
co_return std::move(res.result);
|
||||
} else {
|
||||
co_return co_await smp::submit_to(_shard, coroutine::lambda([this] () -> future<remote_fill_buffer_result_v2> {
|
||||
return fill_reader_buffer(*_reader);
|
||||
co_return co_await smp::submit_to(_shard, coroutine::lambda([this, hint] () -> future<remote_fill_buffer_result_v2> {
|
||||
return fill_reader_buffer(*_reader, hint);
|
||||
}));
|
||||
}
|
||||
});
|
||||
@@ -876,7 +884,7 @@ future<> shard_reader_v2::do_fill_buffer() {
|
||||
_end_of_stream = res.end_of_stream;
|
||||
}
|
||||
|
||||
future<> shard_reader_v2::fill_buffer() {
|
||||
future<> shard_reader_v2::fill_buffer(std::optional<buffer_fill_hint> hint) {
|
||||
// FIXME: want to move this to the inner scopes but it makes clang miscompile the code.
|
||||
reader_permit::awaits_guard guard(_permit);
|
||||
if (_read_ahead) {
|
||||
@@ -886,7 +894,7 @@ future<> shard_reader_v2::fill_buffer() {
|
||||
if (!is_buffer_empty()) {
|
||||
co_return;
|
||||
}
|
||||
co_await do_fill_buffer();
|
||||
co_await do_fill_buffer(hint);
|
||||
}
|
||||
|
||||
future<> shard_reader_v2::next_partition() {
|
||||
@@ -942,7 +950,7 @@ void shard_reader_v2::read_ahead() {
|
||||
return;
|
||||
}
|
||||
|
||||
_read_ahead.emplace(do_fill_buffer());
|
||||
_read_ahead.emplace(do_fill_buffer(std::nullopt));
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
Reference in New Issue
Block a user