From 8d5283f0364a42e6c21d515f4cb52c3bf23c66e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 25 Sep 2024 03:53:09 -0400 Subject: [PATCH] readers/multishard: propagate fill_buffer_hint to shard_reader:fill_reader_buffer() The hint will tell the shard reader exactly how much data to produce, to avoid multiple cross-shard round-trips and possible evict-recreate cycles. The hint is neither used yet or calculated yet, this is coming in the next patches. --- readers/multishard.cc | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/readers/multishard.cc b/readers/multishard.cc index 48b8721bc4..dbdc495db6 100644 --- a/readers/multishard.cc +++ b/readers/multishard.cc @@ -686,6 +686,11 @@ std::pair make_manually_paused_evic namespace { +struct buffer_fill_hint { + size_t size; + dht::token stop_token; +}; + // A special-purpose shard reader. // // Shard reader manages a reader located on a remote shard. It transparently @@ -707,8 +712,8 @@ private: foreign_ptr> _reader; private: - future fill_reader_buffer(evictable_reader_v2& reader); - future<> do_fill_buffer(); + future fill_reader_buffer(evictable_reader_v2& reader, std::optional hint); + future<> do_fill_buffer(std::optional hint); public: shard_reader_v2( @@ -738,7 +743,10 @@ public: const mutation_fragment_v2& peek_buffer() const { return buffer().front(); } - virtual future<> fill_buffer() override; + future<> fill_buffer(std::optional hint); + virtual future<> fill_buffer() override { + return fill_buffer(std::nullopt); + } virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr) override; virtual future<> fast_forward_to(position_range) override; @@ -792,7 +800,7 @@ future<> shard_reader_v2::close() noexcept { } } -future shard_reader_v2::fill_reader_buffer(evictable_reader_v2& reader) { +future shard_reader_v2::fill_reader_buffer(evictable_reader_v2& reader, std::optional hint) { reader_permit::need_cpu_guard ncpu_guard{reader.permit()}; co_await reader.fill_buffer(); @@ -800,7 +808,7 @@ future shard_reader_v2::fill_reader_buffer(evictab co_return remote_fill_buffer_result_v2(reader.detach_buffer(), reader.is_end_of_stream()); } -future<> shard_reader_v2::do_fill_buffer() { +future<> shard_reader_v2::do_fill_buffer(std::optional hint) { struct reader_and_buffer_fill_result { foreign_ptr> reader; remote_fill_buffer_result_v2 result; @@ -808,7 +816,7 @@ future<> shard_reader_v2::do_fill_buffer() { auto res = co_await std::invoke([&] () -> future { if (!_reader) { - reader_and_buffer_fill_result res = co_await smp::submit_to(_shard, coroutine::lambda([this, gs = global_schema_ptr(_schema)] () -> future { + reader_and_buffer_fill_result res = co_await smp::submit_to(_shard, coroutine::lambda([this, gs = global_schema_ptr(_schema), hint] () -> future { auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] ( schema_ptr s, reader_permit permit, @@ -851,7 +859,7 @@ future<> shard_reader_v2::do_fill_buffer() { try { tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id()); - auto res = co_await fill_reader_buffer(*rreader); + auto res = co_await fill_reader_buffer(*rreader, hint); co_return reader_and_buffer_fill_result{std::move(rreader), std::move(res)}; } catch (...) { ex = std::current_exception(); @@ -862,8 +870,8 @@ future<> shard_reader_v2::do_fill_buffer() { _reader = std::move(res.reader); co_return std::move(res.result); } else { - co_return co_await smp::submit_to(_shard, coroutine::lambda([this] () -> future { - return fill_reader_buffer(*_reader); + co_return co_await smp::submit_to(_shard, coroutine::lambda([this, hint] () -> future { + return fill_reader_buffer(*_reader, hint); })); } }); @@ -876,7 +884,7 @@ future<> shard_reader_v2::do_fill_buffer() { _end_of_stream = res.end_of_stream; } -future<> shard_reader_v2::fill_buffer() { +future<> shard_reader_v2::fill_buffer(std::optional hint) { // FIXME: want to move this to the inner scopes but it makes clang miscompile the code. reader_permit::awaits_guard guard(_permit); if (_read_ahead) { @@ -886,7 +894,7 @@ future<> shard_reader_v2::fill_buffer() { if (!is_buffer_empty()) { co_return; } - co_await do_fill_buffer(); + co_await do_fill_buffer(hint); } future<> shard_reader_v2::next_partition() { @@ -942,7 +950,7 @@ void shard_reader_v2::read_ahead() { return; } - _read_ahead.emplace(do_fill_buffer()); + _read_ahead.emplace(do_fill_buffer(std::nullopt)); } } // anonymous namespace