diff --git a/querier.cc b/querier.cc index 45569b5dcd..df7d4faf37 100644 --- a/querier.cc +++ b/querier.cc @@ -216,13 +216,13 @@ querier_cache::querier_cache(std::chrono::seconds entry_ttl) } struct querier_utils { - static flat_mutation_reader get_reader(querier_base& q) noexcept { - return std::move(std::get(q._reader)); + static flat_mutation_reader_v2 get_reader(querier_base& q) noexcept { + return std::move(std::get(q._reader)); } static reader_concurrency_semaphore::inactive_read_handle get_inactive_read_handle(querier_base& q) noexcept { return std::move(std::get(q._reader)); } - static void set_reader(querier_base& q, flat_mutation_reader r) noexcept { + static void set_reader(querier_base& q, flat_mutation_reader_v2 r) noexcept { q._reader = std::move(r); } static void set_inactive_read_handle(querier_base& q, reader_concurrency_semaphore::inactive_read_handle h) noexcept { @@ -255,7 +255,7 @@ void querier_cache::insert_querier( auto& sem = q.permit().semaphore(); - auto irh = sem.register_inactive_read(upgrade_to_v2(querier_utils::get_reader(q))); + auto irh = sem.register_inactive_read(querier_utils::get_reader(q)); if (!irh) { ++stats.resource_based_evictions; return; @@ -340,7 +340,7 @@ std::optional querier_cache::lookup_querier( throw std::runtime_error("lookup_querier(): found querier that is evicted"); } reader_opt->set_timeout(timeout); - querier_utils::set_reader(q, downgrade_to_v1(std::move(*reader_opt))); + querier_utils::set_reader(q, std::move(*reader_opt)); --stats.population; const auto can_be_used = can_be_used_for_page(q, s, ranges.front(), slice); @@ -393,7 +393,7 @@ std::optional querier_cache::lookup_shard_mutation_queri future<> querier_base::close() noexcept { struct variant_closer { querier_base& q; - future<> operator()(flat_mutation_reader& reader) { + future<> operator()(flat_mutation_reader_v2& reader) { return reader.close(); } future<> operator()(reader_concurrency_semaphore::inactive_read_handle& irh) { diff --git a/querier.hh b/querier.hh index c5c11b8630..d21ff252b8 100644 --- a/querier.hh +++ b/querier.hh @@ -101,6 +101,39 @@ auto consume_page(flat_mutation_reader& reader, }); } +/// Consume a page worth of data from the reader. +/// +/// Uses `compaction_state` for compacting the fragments and `consumer` for +/// building the results. +/// Returns a future containing a tuple with the last consumed clustering key, +/// or std::nullopt if the last row wasn't a clustering row, and whatever the +/// consumer's `consume_end_of_stream()` method returns. +template +requires CompactedFragmentsConsumer +auto consume_page(flat_mutation_reader_v2& reader, + lw_shared_ptr> compaction_state, + const query::partition_slice& slice, + Consumer&& consumer, + uint64_t row_limit, + uint32_t partition_limit, + gc_clock::time_point query_time) { + return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] ( + mutation_fragment_v2* next_fragment) mutable { + const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; + compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); + + auto last_ckey = make_lw_shared>(); + auto reader_consumer = compact_for_query>( + compaction_state, + clustering_position_tracker(std::move(consumer), last_ckey)); + + return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable { + static_assert(sizeof...(results) <= 1); + return make_ready_future, std::decay_t...>>(std::tuple(std::move(*last_ckey), std::move(results)...)); + }); + }); +} + struct position_view { const dht::decorated_key* partition_key; const clustering_key_prefix* clustering_key; @@ -114,12 +147,12 @@ protected: reader_permit _permit; lw_shared_ptr _range; std::unique_ptr _slice; - std::variant _reader; + std::variant _reader; dht::partition_ranges_view _query_ranges; public: querier_base(reader_permit permit, lw_shared_ptr range, - std::unique_ptr slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges) + std::unique_ptr slice, flat_mutation_reader_v2 reader, dht::partition_ranges_view query_ranges) : _schema(reader.schema()) , _permit(std::move(permit)) , _range(std::move(range)) @@ -134,7 +167,7 @@ public: , _permit(std::move(permit)) , _range(make_lw_shared(std::move(range))) , _slice(std::make_unique(std::move(slice))) - , _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + , _reader(ms.make_reader_v2(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) , _query_ranges(*_range) { } @@ -217,7 +250,7 @@ public: uint32_t partition_limit, gc_clock::time_point query_time, tracing::trace_state_ptr trace_ptr = {}) { - return ::query::consume_page(std::get(_reader), _compaction_state, *_slice, std::move(consumer), row_limit, + return ::query::consume_page(std::get(_reader), _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time).then([this, trace_ptr = std::move(trace_ptr)] (auto&& results) { _last_ckey = std::get>(std::move(results)); const auto& cstats = _compaction_state->stats(); @@ -274,7 +307,7 @@ private: reader_permit permit, dht::decorated_key nominal_pkey, std::optional nominal_ckey) - : querier_base(permit, std::move(reader_range), std::move(reader_slice), std::move(reader), *query_ranges) + : querier_base(permit, std::move(reader_range), std::move(reader_slice), upgrade_to_v2(std::move(reader)), *query_ranges) , _query_ranges(std::move(query_ranges)) , _nominal_pkey(std::move(nominal_pkey)) , _nominal_ckey(std::move(nominal_ckey)) { @@ -307,7 +340,7 @@ public: } flat_mutation_reader reader() && { - return std::move(std::get(_reader)); + return downgrade_to_v1(std::move(std::get(_reader))); } };