diff --git a/db/view/view.cc b/db/view/view.cc index be4f7ab9c5..1f1ca40ea5 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -364,7 +364,7 @@ public: void consume(tombstone t) { _builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } - stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { _builder.consume_end_of_stream(); @@ -1950,7 +1950,7 @@ public: return stop_iteration::no; } - stop_iteration consume(range_tombstone&&) { + stop_iteration consume(range_tombstone_change&&) { inject_failure("view_builder_consume_range_tombstone"); return stop_iteration::no; } @@ -2008,7 +2008,7 @@ public: // Called in the context of a seastar::thread. void view_builder::execute(build_step& step, exponential_backoff_retry r) { gc_clock::time_point now = gc_clock::now(); - auto consumer = compact_for_query( + auto consumer = compact_for_query_v2( *step.reader.schema(), now, step.pslice, diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 9632cda9ca..6e0417e59d 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -610,7 +610,7 @@ future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unco namespace { template -using compact_for_result_state = compact_for_query_state; +using compact_for_result_state = compact_for_query_state_v2; template requires std::is_nothrow_move_constructible_v @@ -821,7 +821,7 @@ public: void consume(tombstone t) { _builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } - stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); } }; @@ -844,7 +844,7 @@ public: void consume(tombstone t) { _builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } - stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { _builder.consume_end_of_stream(); diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 14cb3ae0cc..01b99a966d 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -30,23 +30,6 @@ enum class compact_for_sstables { yes, }; -enum class compactor_output_format { - v1, - v2 -}; - -template -concept CompactedFragmentsConsumer = requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, - clustering_row cr, range_tombstone rt, tombstone current_tombstone, row_tombstone current_row_tombstone, bool is_alive) { - obj.consume_new_partition(dk); - obj.consume(t); - { obj.consume(std::move(sr), current_tombstone, is_alive) } -> std::same_as; - { obj.consume(std::move(cr), current_row_tombstone, is_alive) } -> std::same_as; - { obj.consume(std::move(rt)) } -> std::same_as; - { obj.consume_end_of_partition() } -> std::same_as; - obj.consume_end_of_stream(); -}; - template concept CompactedFragmentsConsumerV2 = requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, clustering_row cr, range_tombstone_change rtc, tombstone current_tombstone, row_tombstone current_row_tombstone, bool is_alive) { @@ -59,12 +42,6 @@ concept CompactedFragmentsConsumerV2 = requires(T obj, tombstone t, const dht::d obj.consume_end_of_stream(); }; -// TODO: I want to make this choose the right concept for OutputFormat but -// probably not worth the effort for the (hopefully) brief time for which we -// have to support both. -template -concept CompactedFragmentsConsumerWithVersion = CompactedFragmentsConsumer || CompactedFragmentsConsumerV2; - struct detached_compaction_state { ::partition_start partition_start; std::optional<::static_row> static_row; @@ -77,7 +54,6 @@ public: void consume(tombstone t) {} stop_iteration consume(static_row&& sr, tombstone, bool) { return stop_iteration::no; } stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { return stop_iteration::no; } stop_iteration consume(range_tombstone_change&& rtc) { return stop_iteration::no; } stop_iteration consume_end_of_partition() { return stop_iteration::no; } void consume_end_of_stream() {} @@ -159,7 +135,7 @@ struct compaction_stats { // emit_only_live::yes will cause compact_for_query to emit only live // static and clustering rows. It doesn't affect the way range tombstones are // emitted. -template +template class compact_mutation_state { const schema& _schema; gc_clock::time_point _query_time; @@ -173,7 +149,6 @@ class compact_mutation_state { uint64_t _partition_row_limit{}; tombstone _partition_tombstone; - range_tombstone_assembler _rt_assembler; bool _static_row_live{}; uint64_t _rows_in_current_partition; @@ -200,21 +175,7 @@ class compact_mutation_state { compaction_stats _stats; private: template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion - stop_iteration do_consume(range_tombstone&& rt, Consumer& consumer, GCConsumer& gc_consumer) { - if (rt.tomb <= _partition_tombstone) { - return stop_iteration::no; - } - if (can_purge_tombstone(rt.tomb)) { - partition_is_not_empty_for_gc_consumer(gc_consumer); - return gc_consumer.consume(std::move(rt)); - } else { - partition_is_not_empty(consumer); - return consumer.consume(std::move(rt)); - } - } - template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { stop_iteration gc_consumer_stop = stop_iteration::no; stop_iteration consumer_stop = stop_iteration::no; @@ -240,19 +201,6 @@ private: } return gc_consumer_stop || consumer_stop; } - template - tombstone tombstone_for_row(const clustering_key& ckey, Consumer& consumer, GCConsumer& gc_consumer) { - if constexpr (OutputFormat == compactor_output_format::v2) { - return std::max(_partition_tombstone, _effective_tombstone); - } else { - if (_rt_assembler.needs_flush()) { - if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(ckey))) { - do_consume(std::move(*rt_opt), consumer, gc_consumer); - } - } - return std::max(_partition_tombstone, _rt_assembler.get_current_tombstone()); - } - } static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; } @@ -368,7 +316,6 @@ public: _rows_in_current_partition = 0; _static_row_live = false; _partition_tombstone = {}; - _rt_assembler.reset(); _current_partition_limit = std::min(_row_limit, _partition_row_limit); _max_purgeable = api::missing_timestamp; _gc_before = std::nullopt; @@ -380,7 +327,7 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 void consume(tombstone t, Consumer& consumer, GCConsumer& gc_consumer) { _partition_tombstone = t; if (!only_live()) { @@ -393,13 +340,13 @@ public: } template - requires CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 void force_partition_not_empty(Consumer& consumer) { partition_is_not_empty(consumer); } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume(static_row&& sr, Consumer& consumer, GCConsumer& gc_consumer) { _last_static_row = static_row(_schema, sr); auto current_tombstone = _partition_tombstone; @@ -430,12 +377,12 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) { if (!sstable_compaction()) { _last_clustering_pos = cr.position(); } - auto current_tombstone = tombstone_for_row(cr.key(), consumer, gc_consumer); + auto current_tombstone = std::max(_partition_tombstone, _effective_tombstone); auto t = cr.tomb(); t.apply(current_tombstone); @@ -497,39 +444,25 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { if (!sstable_compaction()) { _last_clustering_pos = rtc.position(); } ++_stats.range_tombstones; - if constexpr (OutputFormat == compactor_output_format::v1) { - _effective_tombstone = rtc.tombstone(); - if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { - return do_consume(std::move(*rt_opt), consumer, gc_consumer); - } - } else { - do_consume(std::move(rtc), consumer, gc_consumer); - } + do_consume(std::move(rtc), consumer, gc_consumer); return stop_iteration::no; } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) { if (_effective_tombstone) { auto rtc = range_tombstone_change(position_in_partition::after_key(_last_clustering_pos), tombstone{}); - if constexpr (OutputFormat == compactor_output_format::v1) { - if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { - do_consume(std::move(*rt_opt), consumer, gc_consumer); - } - _rt_assembler.on_end_of_stream(); - } else { - // do_consume() overwrites _effective_tombstone with {}, so save and restore it. - auto prev_tombstone = _effective_tombstone; - do_consume(std::move(rtc), consumer, gc_consumer); - _effective_tombstone = prev_tombstone; - } + // do_consume() overwrites _effective_tombstone with {}, so save and restore it. + auto prev_tombstone = _effective_tombstone; + do_consume(std::move(rtc), consumer, gc_consumer); + _effective_tombstone = prev_tombstone; } if (!_empty_partition_in_gc_consumer) { gc_consumer.consume_end_of_partition(); @@ -554,7 +487,7 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 auto consume_end_of_stream(Consumer& consumer, GCConsumer& gc_consumer) { if (_dk) { _last_dk = *_dk; @@ -578,7 +511,7 @@ public: /// partition-header and static row if there are clustering rows or range /// tombstones left in the partition. template - requires CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 void start_new_page(uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, @@ -601,11 +534,7 @@ public: } if (_effective_tombstone) { auto rtc = range_tombstone_change(position_in_partition_view::after_key(_last_clustering_pos), _effective_tombstone); - if constexpr (OutputFormat == compactor_output_format::v2) { - do_consume(std::move(rtc), consumer, nc); - } else if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { - do_consume(std::move(*rt_opt), consumer, nc); - } + do_consume(std::move(rtc), consumer, nc); } } @@ -632,70 +561,10 @@ public: const compaction_stats& stats() const { return _stats; } }; -template -requires CompactedFragmentsConsumer && CompactedFragmentsConsumer -class compact_mutation { - lw_shared_ptr> _state; - Consumer _consumer; - // Garbage Collected Consumer - GCConsumer _gc_consumer; - -public: - compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit, - uint32_t partition_limit, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) - , _consumer(std::move(consumer)) - , _gc_consumer(std::move(gc_consumer)) { - } - - compact_mutation(const schema& s, gc_clock::time_point compaction_time, - std::function get_max_purgeable, - Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) - , _consumer(std::move(consumer)) - , _gc_consumer(std::move(gc_consumer)) { - } - - compact_mutation(lw_shared_ptr> state, Consumer consumer, - GCConsumer gc_consumer = GCConsumer()) - : _state(std::move(state)) - , _consumer(std::move(consumer)) - , _gc_consumer(std::move(gc_consumer)) { - } - - void consume_new_partition(const dht::decorated_key& dk) { - _state->consume_new_partition(dk); - } - - void consume(tombstone t) { - _state->consume(std::move(t), _consumer, _gc_consumer); - } - - stop_iteration consume(static_row&& sr) { - return _state->consume(std::move(sr), _consumer, _gc_consumer); - } - - stop_iteration consume(clustering_row&& cr) { - return _state->consume(std::move(cr), _consumer, _gc_consumer); - } - - stop_iteration consume(range_tombstone_change&& rtc) { - return _state->consume(std::move(rtc), _consumer, _gc_consumer); - } - - stop_iteration consume_end_of_partition() { - return _state->consume_end_of_partition(_consumer, _gc_consumer); - } - - auto consume_end_of_stream() { - return _state->consume_end_of_stream(_consumer, _gc_consumer); - } -}; - template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 class compact_mutation_v2 { - lw_shared_ptr> _state; + lw_shared_ptr> _state; Consumer _consumer; // Garbage Collected Consumer GCConsumer _gc_consumer; @@ -703,7 +572,7 @@ class compact_mutation_v2 { public: compact_mutation_v2(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit, uint32_t partition_limit, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) + : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) , _consumer(std::move(consumer)) , _gc_consumer(std::move(gc_consumer)) { } @@ -711,12 +580,12 @@ public: compact_mutation_v2(const schema& s, gc_clock::time_point compaction_time, std::function get_max_purgeable, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) + : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) , _consumer(std::move(consumer)) , _gc_consumer(std::move(gc_consumer)) { } - compact_mutation_v2(lw_shared_ptr> state, Consumer consumer, + compact_mutation_v2(lw_shared_ptr> state, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) : _state(std::move(state)) , _consumer(std::move(consumer)) @@ -752,15 +621,6 @@ public: } }; -template -requires CompactedFragmentsConsumer -struct compact_for_query : compact_mutation { - using compact_mutation::compact_mutation; -}; - -template -using compact_for_query_state = compact_mutation_state; - template requires CompactedFragmentsConsumerV2 struct compact_for_query_v2 : compact_mutation_v2 { @@ -768,7 +628,7 @@ struct compact_for_query_v2 : compact_mutation_v2 -using compact_for_query_state_v2 = compact_mutation_state; +using compact_for_query_state_v2 = compact_mutation_state; template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 diff --git a/mutation_partition.cc b/mutation_partition.cc index 29dc38aca6..d821ec54f2 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1949,8 +1949,7 @@ stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone _stop = _mutation_consumer->consume(std::move(cr), t); return _stop; } -stop_iteration query_result_builder::consume(range_tombstone&& rt) { - _stop = _mutation_consumer->consume(std::move(rt)); +stop_iteration query_result_builder::consume(range_tombstone_change&& rtc) { return _stop; } @@ -1988,6 +1987,7 @@ stop_iteration query::result_memory_accounter::check_local_limit() const { } void reconcilable_result_builder::consume_new_partition(const dht::decorated_key& dk) { + _rt_assembler.reset(); _return_static_content_on_partition_with_no_rows = _slice.options.contains(query::partition_slice::option::always_return_static_content) || !has_ck_selector(_slice.row_ranges(_schema, dk.key())); @@ -2007,6 +2007,11 @@ stop_iteration reconcilable_result_builder::consume(static_row&& sr, tombstone, } stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tombstone, bool is_alive) { + if (_rt_assembler.needs_flush()) { + if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(cr.key()))) { + consume(std::move(*rt_opt)); + } + } _live_rows += is_alive; auto stop = _memory_accounter.update_and_check(cr.memory_usage(_schema)); if (is_alive) { @@ -2029,7 +2034,15 @@ stop_iteration reconcilable_result_builder::consume(range_tombstone&& rt) { return _mutation_consumer->consume(std::move(rt)); } +stop_iteration reconcilable_result_builder::consume(range_tombstone_change&& rtc) { + if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { + return consume(std::move(*rt_opt)); + } + return stop_iteration::no; +} + stop_iteration reconcilable_result_builder::consume_end_of_partition() { + _rt_assembler.on_end_of_stream(); if (_live_rows == 0 && _static_row_is_alive && _return_static_content_on_partition_with_no_rows) { ++_live_rows; // Normally we count only live clustering rows, to guarantee that @@ -2056,7 +2069,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result_options opts) { // This result was already built with a limit, don't apply another one. query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - auto consumer = compact_for_query(*s, gc_clock::time_point::min(), slice, max_rows, + auto consumer = compact_for_query_v2(*s, gc_clock::time_point::min(), slice, max_rows, max_partitions, query_result_builder(*s, builder)); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; @@ -2075,7 +2088,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) { query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - auto consumer = compact_for_query(*m.schema(), now, slice, row_limit, + auto consumer = compact_for_query_v2(*m.schema(), now, slice, row_limit, query::max_partitions, query_result_builder(*m.schema(), builder)); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; std::move(m).consume(consumer, reverse); @@ -2099,7 +2112,7 @@ public: _mutation->partition().insert_row(_schema, cr.key(), std::move(cr).as_deletable_row()); return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration consume(range_tombstone_change&& rtc) { return stop_iteration::no; } stop_iteration consume_end_of_partition() { @@ -2298,7 +2311,7 @@ future counter_write_query(schema_ptr s, const mutation_source& so // do_with() doesn't support immovable objects auto r_a_r = std::make_unique(s, source, std::move(permit), dk, slice, std::move(trace_ptr)); auto cwqrb = counter_write_query_result_builder(*s); - auto cfq = compact_for_query( + auto cfq = compact_for_query_v2( *s, gc_clock::now(), slice, query::max_rows, query::max_partitions, std::move(cwqrb)); auto f = r_a_r->reader.consume(std::move(cfq)); return f.finally([r_a_r = std::move(r_a_r)] { diff --git a/mutation_query.hh b/mutation_query.hh index ccec702d53..79e5d8feb5 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -14,6 +14,7 @@ #include "db/timeout_clock.hh" #include "mutation.hh" #include "utils/chunked_vector.hh" +#include "range_tombstone_assembler.hh" class reconcilable_result; class frozen_reconcilable_result; @@ -132,10 +133,15 @@ class reconcilable_result_builder { query::result_memory_accounter _memory_accounter; stop_iteration _stop; std::optional _mutation_consumer; + range_tombstone_assembler _rt_assembler; uint64_t _live_rows{}; // make this the last member so it is destroyed first. #7240 utils::chunked_vector _result; + +private: + stop_iteration consume(range_tombstone&& rt); + public: // Expects table schema (non-reversed) and half-reversed (legacy) slice when building results for reverse query. reconcilable_result_builder(const schema& s, const query::partition_slice& slice, @@ -148,7 +154,7 @@ public: void consume(tombstone t); stop_iteration consume(static_row&& sr, tombstone, bool is_alive); stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_alive); - stop_iteration consume(range_tombstone&& rt); + stop_iteration consume(range_tombstone_change&& rtc); stop_iteration consume_end_of_partition(); reconcilable_result consume_end_of_stream(); }; diff --git a/mutation_reader.cc b/mutation_reader.cc index 07734f1a14..3374994e23 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -2307,11 +2307,11 @@ std::pair make_queue_reader_v2( namespace { class compacting_reader : public flat_mutation_reader_v2::impl { - friend class compact_mutation_state; + friend class compact_mutation_state; private: flat_mutation_reader_v2 _reader; - compact_mutation_state _compactor; + compact_mutation_state _compactor; noop_compacted_fragments_consumer _gc_consumer; // Uncompacted stream diff --git a/querier.hh b/querier.hh index 55256c173d..ef924d4a35 100644 --- a/querier.hh +++ b/querier.hh @@ -44,8 +44,8 @@ public: *_last_ckey = cr.key(); return _consumer.consume(std::move(cr), std::move(t), is_live); } - stop_iteration consume(range_tombstone&& rt) { - return _consumer.consume(std::move(rt)); + stop_iteration consume(range_tombstone_change&& rtc) { + return _consumer.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _consumer.consume_end_of_partition(); @@ -63,42 +63,9 @@ public: /// or std::nullopt if the last row wasn't a clustering row, and whatever the /// consumer's `consume_end_of_stream()` method returns. template -requires CompactedFragmentsConsumer -auto consume_page(flat_mutation_reader& reader, - lw_shared_ptr> compaction_state, - const query::partition_slice& slice, - Consumer&& consumer, - uint64_t row_limit, - uint32_t partition_limit, - gc_clock::time_point query_time) { - return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] ( - mutation_fragment* next_fragment) mutable { - const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; - compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); - - auto last_ckey = make_lw_shared>(); - auto reader_consumer = compact_for_query>( - compaction_state, - clustering_position_tracker(std::move(consumer), last_ckey)); - - return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable { - static_assert(sizeof...(results) <= 1); - return make_ready_future, std::decay_t...>>(std::tuple(std::move(*last_ckey), std::move(results)...)); - }); - }); -} - -/// Consume a page worth of data from the reader. -/// -/// Uses `compaction_state` for compacting the fragments and `consumer` for -/// building the results. -/// Returns a future containing a tuple with the last consumed clustering key, -/// or std::nullopt if the last row wasn't a clustering row, and whatever the -/// consumer's `consume_end_of_stream()` method returns. -template -requires CompactedFragmentsConsumer +requires CompactedFragmentsConsumerV2 auto consume_page(flat_mutation_reader_v2& reader, - lw_shared_ptr> compaction_state, + lw_shared_ptr> compaction_state, const query::partition_slice& slice, Consumer&& consumer, uint64_t row_limit, @@ -110,7 +77,7 @@ auto consume_page(flat_mutation_reader_v2& reader, compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); auto last_ckey = make_lw_shared>(); - auto reader_consumer = compact_for_query>( + auto reader_consumer = compact_for_query_v2>( compaction_state, clustering_position_tracker(std::move(consumer), last_ckey)); @@ -211,7 +178,7 @@ public: /// instead. template class querier : public querier_base { - lw_shared_ptr> _compaction_state; + lw_shared_ptr> _compaction_state; std::optional _last_ckey; public: @@ -223,7 +190,7 @@ public: const io_priority_class& pc, tracing::trace_state_ptr trace_ptr) : querier_base(schema, permit, std::move(range), std::move(slice), ms, pc, std::move(trace_ptr)) - , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { + , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { } bool are_limits_reached() const { @@ -231,7 +198,7 @@ public: } template - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumerV2 auto consume_page(Consumer&& consumer, uint64_t row_limit, uint32_t partition_limit, diff --git a/query-result-writer.hh b/query-result-writer.hh index 4b35f05607..9c901abdcc 100644 --- a/query-result-writer.hh +++ b/query-result-writer.hh @@ -185,7 +185,7 @@ public: class row; class static_row; class clustering_row; -class range_tombstone; +class range_tombstone_change; // Adds mutation to query::result. class mutation_querier { @@ -207,7 +207,7 @@ public: stop_iteration consume(static_row&& sr, tombstone current_tombstone); // Requires that cr.has_any_live_data() stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone); - stop_iteration consume(range_tombstone&&) { return stop_iteration::no; } + stop_iteration consume(range_tombstone_change&&) { return stop_iteration::no; } uint64_t consume_end_of_stream(); }; @@ -223,7 +223,7 @@ public: void consume(tombstone t); stop_iteration consume(static_row&& sr, tombstone t, bool); stop_iteration consume(clustering_row&& cr, row_tombstone t, bool); - stop_iteration consume(range_tombstone&& rt); + stop_iteration consume(range_tombstone_change&& rtc); stop_iteration consume_end_of_partition(); void consume_end_of_stream(); }; diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 26b68e3685..5ce0b50964 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2689,37 +2689,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { ref_mut.partition().compact_for_query(*s, pk, query_time, {query::clustering_range::make_open_ended_both_sides()}, true, false, max_rows); } - struct consumer { - reader_permit permit; - mutation& mut; - const uint64_t row_limit; - uint64_t rows = 0; - - void consume_new_partition(const dht::decorated_key& dk) { - BOOST_REQUIRE(mut.decorated_key().equal(*mut.schema(), dk)); - } - void consume(const tombstone& t) { - BOOST_REQUIRE_EQUAL(t, mut.partition().partition_tombstone()); - } - stop_iteration consume(static_row&& sr, tombstone, bool) { - mut.apply(mutation_fragment(*mut.schema(), permit, std::move(sr))); - return stop_iteration(++rows >= row_limit); - } - stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { - mut.apply(mutation_fragment(*mut.schema(), permit, std::move(cr))); - return stop_iteration(++rows >= row_limit); - } - stop_iteration consume(range_tombstone&& rt) { - mut.apply(mutation_fragment(*mut.schema(), permit, std::move(rt))); - return stop_iteration(++rows >= row_limit); - } - stop_iteration consume_end_of_partition() { - return stop_iteration::yes; - } - void consume_end_of_stream() { - } - }; - struct consumer_v2 { reader_permit permit; mutation& mut; @@ -2762,18 +2731,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { } }; - testlog.info("non-paged"); - { - mutation res_mut(s, pk); - auto c = compact_for_query(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer{permit, res_mut, max_rows}); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - reader.consume(std::move(c)).get(); - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("non-paged v2"); { mutation res_mut(s, pk); @@ -2786,26 +2743,10 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } - testlog.info("limited pages"); - { - mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { - auto c = consumer{permit, res_mut, max_rows}; - compaction_state->start_new_page(1, max_partitions, query_time, reader.peek().get()->position().region(), c); - reader.consume(compact_for_query(compaction_state, std::move(c))).get(); - } - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("limited pages v2"); { mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); @@ -2818,29 +2759,13 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } - testlog.info("short pages"); + testlog.info("short pages v2"); { mutation res_mut(s, pk); auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { - auto c = consumer{permit, res_mut, 2}; - compaction_state->start_new_page(max_rows, max_partitions, query_time, reader.peek().get()->position().region(), c); - reader.consume(compact_for_query(compaction_state, std::move(c))).get(); - } - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - - testlog.info("short pages v2"); - { - mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { auto c = consumer_v2{permit, res_mut, 2}; compaction_state->start_new_page(max_rows, max_partitions, query_time, reader.peek().get()->position().region(), c); @@ -2850,27 +2775,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } - testlog.info("limited pages - detach state"); - { - mutation res_mut(s, pk); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - std::optional detached_state; - - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { - if (detached_state) { - restore_state(reader, std::move(*detached_state)); - } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); - auto c = consumer{permit, res_mut, max_rows}; - reader.consume(compact_for_query(compaction_state, std::move(c))).get(); - detached_state = std::move(*compaction_state).detach_state(); - } - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("limited pages - detach state v2"); { mutation res_mut(s, pk); @@ -2883,7 +2787,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { if (detached_state) { restore_state(reader, std::move(*detached_state)); } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); auto c = consumer_v2{permit, res_mut, max_rows}; reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); detached_state = std::move(*compaction_state).detach_state(); @@ -2892,27 +2796,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } - testlog.info("short pages - detach state"); - { - mutation res_mut(s, pk); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - std::optional detached_state; - - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { - if (detached_state) { - restore_state(reader, std::move(*detached_state)); - } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); - auto c = consumer{permit, res_mut, 2}; - reader.consume(compact_for_query(compaction_state, std::move(c))).get(); - detached_state = std::move(*compaction_state).detach_state(); - } - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("short pages - detach state v2"); { mutation res_mut(s, pk); @@ -2925,7 +2808,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { if (detached_state) { restore_state(reader, std::move(*detached_state)); } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); auto c = consumer_v2{permit, res_mut, 2}; reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); detached_state = std::move(*compaction_state).detach_state(); diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 56cde9102e..f4ba5f257d 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -47,7 +47,7 @@ public: _ck = cr.key(); return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration consume(range_tombstone_change&& rtc) { return stop_iteration::no; } stop_iteration consume_end_of_partition() {