From d61f934c50fe488ed10429684ba4d61aaf7c9600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 13:41:55 +0200 Subject: [PATCH 01/10] query_result_builder: make consume(range_tombstone) noop The downstream consumer (mutation_querier) already ignores range tombstones, so no point forwarding them to it. This makes adding v2 support easier too as range tombstone changes can be similarly dropped. --- mutation_partition.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/mutation_partition.cc b/mutation_partition.cc index 29dc38aca6..703b91ecf8 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1950,7 +1950,6 @@ stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone return _stop; } stop_iteration query_result_builder::consume(range_tombstone&& rt) { - _stop = _mutation_consumer->consume(std::move(rt)); return _stop; } From 728c14549f9286762f7a78322d13993e6b031d76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 13:43:40 +0200 Subject: [PATCH 02/10] query_result_writer: add v2 support Add a consume() overload which takes a range tombstone change and drops it just like the existing range tombstone overload does: query results don't care about range tombstones. --- mutation_partition.cc | 3 +++ query-result-writer.hh | 3 +++ 2 files changed, 6 insertions(+) diff --git a/mutation_partition.cc b/mutation_partition.cc index 703b91ecf8..99866962c6 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1952,6 +1952,9 @@ stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone stop_iteration query_result_builder::consume(range_tombstone&& rt) { return _stop; } +stop_iteration query_result_builder::consume(range_tombstone_change&& rtc) { + return _stop; +} stop_iteration query_result_builder::consume_end_of_partition() { auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream(); diff --git a/query-result-writer.hh b/query-result-writer.hh index 4b35f05607..c5ed1e7d98 100644 --- a/query-result-writer.hh +++ b/query-result-writer.hh @@ -186,6 +186,7 @@ class row; class static_row; class clustering_row; class range_tombstone; +class range_tombstone_change; // Adds mutation to query::result. class mutation_querier { @@ -208,6 +209,7 @@ public: // Requires that cr.has_any_live_data() stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone); stop_iteration consume(range_tombstone&&) { return stop_iteration::no; } + stop_iteration consume(range_tombstone_change&&) { return stop_iteration::no; } uint64_t consume_end_of_stream(); }; @@ -224,6 +226,7 @@ public: stop_iteration consume(static_row&& sr, tombstone t, bool); stop_iteration consume(clustering_row&& cr, row_tombstone t, bool); stop_iteration consume(range_tombstone&& rt); + stop_iteration consume(range_tombstone_change&& rtc); stop_iteration consume_end_of_partition(); void consume_end_of_stream(); }; From 4629f7d7b586779fc25429f6ed007e41f090351a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 13:44:15 +0200 Subject: [PATCH 03/10] reconcilable_result_builder: add v2 support Add a `consume()` overload for range tombstone changes and convert them internally to range tombstones, as the underlying reconcilable result is still v1. --- mutation_partition.cc | 14 ++++++++++++++ mutation_query.hh | 3 +++ 2 files changed, 17 insertions(+) diff --git a/mutation_partition.cc b/mutation_partition.cc index 99866962c6..9b999e1217 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1990,6 +1990,7 @@ stop_iteration query::result_memory_accounter::check_local_limit() const { } void reconcilable_result_builder::consume_new_partition(const dht::decorated_key& dk) { + _rt_assembler.reset(); _return_static_content_on_partition_with_no_rows = _slice.options.contains(query::partition_slice::option::always_return_static_content) || !has_ck_selector(_slice.row_ranges(_schema, dk.key())); @@ -2009,6 +2010,11 @@ stop_iteration reconcilable_result_builder::consume(static_row&& sr, tombstone, } stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tombstone, bool is_alive) { + if (_rt_assembler.needs_flush()) { + if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(cr.key()))) { + consume(std::move(*rt_opt)); + } + } _live_rows += is_alive; auto stop = _memory_accounter.update_and_check(cr.memory_usage(_schema)); if (is_alive) { @@ -2031,7 +2037,15 @@ stop_iteration reconcilable_result_builder::consume(range_tombstone&& rt) { return _mutation_consumer->consume(std::move(rt)); } +stop_iteration reconcilable_result_builder::consume(range_tombstone_change&& rtc) { + if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { + return consume(std::move(*rt_opt)); + } + return stop_iteration::no; +} + stop_iteration reconcilable_result_builder::consume_end_of_partition() { + _rt_assembler.on_end_of_stream(); if (_live_rows == 0 && _static_row_is_alive && _return_static_content_on_partition_with_no_rows) { ++_live_rows; // Normally we count only live clustering rows, to guarantee that diff --git a/mutation_query.hh b/mutation_query.hh index ccec702d53..9bd600c452 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -14,6 +14,7 @@ #include "db/timeout_clock.hh" #include "mutation.hh" #include "utils/chunked_vector.hh" +#include "range_tombstone_assembler.hh" class reconcilable_result; class frozen_reconcilable_result; @@ -132,6 +133,7 @@ class reconcilable_result_builder { query::result_memory_accounter _memory_accounter; stop_iteration _stop; std::optional _mutation_consumer; + range_tombstone_assembler _rt_assembler; uint64_t _live_rows{}; // make this the last member so it is destroyed first. #7240 @@ -149,6 +151,7 @@ public: stop_iteration consume(static_row&& sr, tombstone, bool is_alive); stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_alive); stop_iteration consume(range_tombstone&& rt); + stop_iteration consume(range_tombstone_change&& rtc); stop_iteration consume_end_of_partition(); reconcilable_result consume_end_of_stream(); }; From 0b5217052d290e9867b46f4015d2084ec6a2ccf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 13:45:32 +0200 Subject: [PATCH 04/10] querier: switch to v2 compactor output The change is mostly mechanical: update all compactor instances to the _v2 variant and update all call-sites, of which there is not that many. As a consequence of this patch, queries -- both single-partition and range-scans -- now do the v2->v1 conversion in the consumers, instead of in the compactor. --- multishard_mutation_query.cc | 6 ++-- querier.hh | 49 ++++++-------------------------- test/boost/querier_cache_test.cc | 2 +- 3 files changed, 12 insertions(+), 45 deletions(-) diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 9632cda9ca..6e0417e59d 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -610,7 +610,7 @@ future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unco namespace { template -using compact_for_result_state = compact_for_query_state; +using compact_for_result_state = compact_for_query_state_v2; template requires std::is_nothrow_move_constructible_v @@ -821,7 +821,7 @@ public: void consume(tombstone t) { _builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } - stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); } }; @@ -844,7 +844,7 @@ public: void consume(tombstone t) { _builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } - stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { _builder.consume_end_of_stream(); diff --git a/querier.hh b/querier.hh index 55256c173d..ef924d4a35 100644 --- a/querier.hh +++ b/querier.hh @@ -44,8 +44,8 @@ public: *_last_ckey = cr.key(); return _consumer.consume(std::move(cr), std::move(t), is_live); } - stop_iteration consume(range_tombstone&& rt) { - return _consumer.consume(std::move(rt)); + stop_iteration consume(range_tombstone_change&& rtc) { + return _consumer.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _consumer.consume_end_of_partition(); @@ -63,42 +63,9 @@ public: /// or std::nullopt if the last row wasn't a clustering row, and whatever the /// consumer's `consume_end_of_stream()` method returns. template -requires CompactedFragmentsConsumer -auto consume_page(flat_mutation_reader& reader, - lw_shared_ptr> compaction_state, - const query::partition_slice& slice, - Consumer&& consumer, - uint64_t row_limit, - uint32_t partition_limit, - gc_clock::time_point query_time) { - return reader.peek().then([=, &reader, consumer = std::move(consumer), &slice] ( - mutation_fragment* next_fragment) mutable { - const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; - compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); - - auto last_ckey = make_lw_shared>(); - auto reader_consumer = compact_for_query>( - compaction_state, - clustering_position_tracker(std::move(consumer), last_ckey)); - - return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable { - static_assert(sizeof...(results) <= 1); - return make_ready_future, std::decay_t...>>(std::tuple(std::move(*last_ckey), std::move(results)...)); - }); - }); -} - -/// Consume a page worth of data from the reader. -/// -/// Uses `compaction_state` for compacting the fragments and `consumer` for -/// building the results. -/// Returns a future containing a tuple with the last consumed clustering key, -/// or std::nullopt if the last row wasn't a clustering row, and whatever the -/// consumer's `consume_end_of_stream()` method returns. -template -requires CompactedFragmentsConsumer +requires CompactedFragmentsConsumerV2 auto consume_page(flat_mutation_reader_v2& reader, - lw_shared_ptr> compaction_state, + lw_shared_ptr> compaction_state, const query::partition_slice& slice, Consumer&& consumer, uint64_t row_limit, @@ -110,7 +77,7 @@ auto consume_page(flat_mutation_reader_v2& reader, compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); auto last_ckey = make_lw_shared>(); - auto reader_consumer = compact_for_query>( + auto reader_consumer = compact_for_query_v2>( compaction_state, clustering_position_tracker(std::move(consumer), last_ckey)); @@ -211,7 +178,7 @@ public: /// instead. template class querier : public querier_base { - lw_shared_ptr> _compaction_state; + lw_shared_ptr> _compaction_state; std::optional _last_ckey; public: @@ -223,7 +190,7 @@ public: const io_priority_class& pc, tracing::trace_state_ptr trace_ptr) : querier_base(schema, permit, std::move(range), std::move(slice), ms, pc, std::move(trace_ptr)) - , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { + , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { } bool are_limits_reached() const { @@ -231,7 +198,7 @@ public: } template - requires CompactedFragmentsConsumer + requires CompactedFragmentsConsumerV2 auto consume_page(Consumer&& consumer, uint64_t row_limit, uint32_t partition_limit, diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 3dce71cac1..f82cffd676 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -45,7 +45,7 @@ public: _ck = cr.key(); return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration consume(range_tombstone_change&& rtc) { return stop_iteration::no; } stop_iteration consume_end_of_partition() { From eacdfb2cb78423a4aed5c74a5bf4de1aa27d89e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 13:12:37 +0200 Subject: [PATCH 05/10] test/boost/mutation_test: remove v1 specific test code From test_compactor_range_tombstone_spanning_many_pages, preparing for the retirement of the v1 output of the compactor. --- test/boost/mutation_test.cc | 117 ------------------------------------ 1 file changed, 117 deletions(-) diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index c6b59d205f..5119508304 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2687,37 +2687,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { ref_mut.partition().compact_for_query(*s, pk, query_time, {query::clustering_range::make_open_ended_both_sides()}, true, false, max_rows); } - struct consumer { - reader_permit permit; - mutation& mut; - const uint64_t row_limit; - uint64_t rows = 0; - - void consume_new_partition(const dht::decorated_key& dk) { - BOOST_REQUIRE(mut.decorated_key().equal(*mut.schema(), dk)); - } - void consume(const tombstone& t) { - BOOST_REQUIRE_EQUAL(t, mut.partition().partition_tombstone()); - } - stop_iteration consume(static_row&& sr, tombstone, bool) { - mut.apply(mutation_fragment(*mut.schema(), permit, std::move(sr))); - return stop_iteration(++rows >= row_limit); - } - stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { - mut.apply(mutation_fragment(*mut.schema(), permit, std::move(cr))); - return stop_iteration(++rows >= row_limit); - } - stop_iteration consume(range_tombstone&& rt) { - mut.apply(mutation_fragment(*mut.schema(), permit, std::move(rt))); - return stop_iteration(++rows >= row_limit); - } - stop_iteration consume_end_of_partition() { - return stop_iteration::yes; - } - void consume_end_of_stream() { - } - }; - struct consumer_v2 { reader_permit permit; mutation& mut; @@ -2760,18 +2729,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { } }; - testlog.info("non-paged"); - { - mutation res_mut(s, pk); - auto c = compact_for_query(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer{permit, res_mut, max_rows}); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - reader.consume(std::move(c)).get(); - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("non-paged v2"); { mutation res_mut(s, pk); @@ -2784,22 +2741,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } - testlog.info("limited pages"); - { - mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { - auto c = consumer{permit, res_mut, max_rows}; - compaction_state->start_new_page(1, max_partitions, query_time, reader.peek().get()->position().region(), c); - reader.consume(compact_for_query(compaction_state, std::move(c))).get(); - } - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("limited pages v2"); { mutation res_mut(s, pk); @@ -2816,22 +2757,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } - testlog.info("short pages"); - { - mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { - auto c = consumer{permit, res_mut, 2}; - compaction_state->start_new_page(max_rows, max_partitions, query_time, reader.peek().get()->position().region(), c); - reader.consume(compact_for_query(compaction_state, std::move(c))).get(); - } - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("short pages v2"); { mutation res_mut(s, pk); @@ -2848,27 +2773,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } - testlog.info("limited pages - detach state"); - { - mutation res_mut(s, pk); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - std::optional detached_state; - - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { - if (detached_state) { - restore_state(reader, std::move(*detached_state)); - } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); - auto c = consumer{permit, res_mut, max_rows}; - reader.consume(compact_for_query(compaction_state, std::move(c))).get(); - detached_state = std::move(*compaction_state).detach_state(); - } - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("limited pages - detach state v2"); { mutation res_mut(s, pk); @@ -2890,27 +2794,6 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } - testlog.info("short pages - detach state"); - { - mutation res_mut(s, pk); - auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); - auto close_reader = deferred_close(reader); - - std::optional detached_state; - - while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { - if (detached_state) { - restore_state(reader, std::move(*detached_state)); - } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); - auto c = consumer{permit, res_mut, 2}; - reader.consume(compact_for_query(compaction_state, std::move(c))).get(); - detached_state = std::move(*compaction_state).detach_state(); - } - - BOOST_REQUIRE_EQUAL(res_mut, ref_mut); - } - testlog.info("short pages - detach state v2"); { mutation res_mut(s, pk); From 87ac2e9ab056b5d97ac8ad558df9e78be3d423fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 13:13:42 +0200 Subject: [PATCH 06/10] tree: migrate to the v2 consumer APIs --- db/view/view.cc | 6 +++--- mutation_partition.cc | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index d5b74d03d6..adf02864c9 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -363,7 +363,7 @@ public: void consume(tombstone t) { _builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); } - stop_iteration consume(range_tombstone&& rt) { return _builder.consume(std::move(rt)); } + stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); } result_type consume_end_of_stream() { _builder.consume_end_of_stream(); @@ -1949,7 +1949,7 @@ public: return stop_iteration::no; } - stop_iteration consume(range_tombstone&&) { + stop_iteration consume(range_tombstone_change&&) { inject_failure("view_builder_consume_range_tombstone"); return stop_iteration::no; } @@ -2007,7 +2007,7 @@ public: // Called in the context of a seastar::thread. void view_builder::execute(build_step& step, exponential_backoff_retry r) { gc_clock::time_point now = gc_clock::now(); - auto consumer = compact_for_query( + auto consumer = compact_for_query_v2( *step.reader.schema(), now, step.pslice, diff --git a/mutation_partition.cc b/mutation_partition.cc index 9b999e1217..3668134447 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2072,7 +2072,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result_options opts) { // This result was already built with a limit, don't apply another one. query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - auto consumer = compact_for_query(*s, gc_clock::time_point::min(), slice, max_rows, + auto consumer = compact_for_query_v2(*s, gc_clock::time_point::min(), slice, max_rows, max_partitions, query_result_builder(*s, builder)); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; @@ -2091,7 +2091,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) { query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - auto consumer = compact_for_query(*m.schema(), now, slice, row_limit, + auto consumer = compact_for_query_v2(*m.schema(), now, slice, row_limit, query::max_partitions, query_result_builder(*m.schema(), builder)); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; std::move(m).consume(consumer, reverse); @@ -2115,7 +2115,7 @@ public: _mutation->partition().insert_row(_schema, cr.key(), std::move(cr).as_deletable_row()); return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration consume(range_tombstone_change&& rtc) { return stop_iteration::no; } stop_iteration consume_end_of_partition() { @@ -2314,7 +2314,7 @@ future counter_write_query(schema_ptr s, const mutation_source& so // do_with() doesn't support immovable objects auto r_a_r = std::make_unique(s, source, std::move(permit), dk, slice, std::move(trace_ptr)); auto cwqrb = counter_write_query_result_builder(*s); - auto cfq = compact_for_query( + auto cfq = compact_for_query_v2( *s, gc_clock::now(), slice, query::max_rows, query::max_partitions, std::move(cwqrb)); auto f = r_a_r->reader.consume(std::move(cfq)); return f.finally([r_a_r = std::move(r_a_r)] { From 924ff6a5031c4fb2886b740114133939767271e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 13:28:53 +0200 Subject: [PATCH 07/10] mutation_compactor: drop v1 support altogether from the API Fully mechanical change. Drop all v1 types, template types. Internal code is left unchanged, will be made v2 only in the next patch. --- mutation_compactor.hh | 121 +++++------------------------------- mutation_reader.cc | 4 +- test/boost/mutation_test.cc | 8 +-- 3 files changed, 23 insertions(+), 110 deletions(-) diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 14cb3ae0cc..0b4a17a629 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -35,18 +35,6 @@ enum class compactor_output_format { v2 }; -template -concept CompactedFragmentsConsumer = requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, - clustering_row cr, range_tombstone rt, tombstone current_tombstone, row_tombstone current_row_tombstone, bool is_alive) { - obj.consume_new_partition(dk); - obj.consume(t); - { obj.consume(std::move(sr), current_tombstone, is_alive) } -> std::same_as; - { obj.consume(std::move(cr), current_row_tombstone, is_alive) } -> std::same_as; - { obj.consume(std::move(rt)) } -> std::same_as; - { obj.consume_end_of_partition() } -> std::same_as; - obj.consume_end_of_stream(); -}; - template concept CompactedFragmentsConsumerV2 = requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, clustering_row cr, range_tombstone_change rtc, tombstone current_tombstone, row_tombstone current_row_tombstone, bool is_alive) { @@ -59,12 +47,6 @@ concept CompactedFragmentsConsumerV2 = requires(T obj, tombstone t, const dht::d obj.consume_end_of_stream(); }; -// TODO: I want to make this choose the right concept for OutputFormat but -// probably not worth the effort for the (hopefully) brief time for which we -// have to support both. -template -concept CompactedFragmentsConsumerWithVersion = CompactedFragmentsConsumer || CompactedFragmentsConsumerV2; - struct detached_compaction_state { ::partition_start partition_start; std::optional<::static_row> static_row; @@ -77,7 +59,6 @@ public: void consume(tombstone t) {} stop_iteration consume(static_row&& sr, tombstone, bool) { return stop_iteration::no; } stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { return stop_iteration::no; } stop_iteration consume(range_tombstone_change&& rtc) { return stop_iteration::no; } stop_iteration consume_end_of_partition() { return stop_iteration::no; } void consume_end_of_stream() {} @@ -159,8 +140,9 @@ struct compaction_stats { // emit_only_live::yes will cause compact_for_query to emit only live // static and clustering rows. It doesn't affect the way range tombstones are // emitted. -template +template class compact_mutation_state { + constexpr static compactor_output_format OutputFormat = compactor_output_format::v2; const schema& _schema; gc_clock::time_point _query_time; std::function _get_max_purgeable; @@ -200,7 +182,7 @@ class compact_mutation_state { compaction_stats _stats; private: template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration do_consume(range_tombstone&& rt, Consumer& consumer, GCConsumer& gc_consumer) { if (rt.tomb <= _partition_tombstone) { return stop_iteration::no; @@ -214,7 +196,7 @@ private: } } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { stop_iteration gc_consumer_stop = stop_iteration::no; stop_iteration consumer_stop = stop_iteration::no; @@ -380,7 +362,7 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 void consume(tombstone t, Consumer& consumer, GCConsumer& gc_consumer) { _partition_tombstone = t; if (!only_live()) { @@ -393,13 +375,13 @@ public: } template - requires CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 void force_partition_not_empty(Consumer& consumer) { partition_is_not_empty(consumer); } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume(static_row&& sr, Consumer& consumer, GCConsumer& gc_consumer) { _last_static_row = static_row(_schema, sr); auto current_tombstone = _partition_tombstone; @@ -430,7 +412,7 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) { if (!sstable_compaction()) { _last_clustering_pos = cr.position(); @@ -497,7 +479,7 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { if (!sstable_compaction()) { _last_clustering_pos = rtc.position(); @@ -515,7 +497,7 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) { if (_effective_tombstone) { auto rtc = range_tombstone_change(position_in_partition::after_key(_last_clustering_pos), tombstone{}); @@ -554,7 +536,7 @@ public: } template - requires CompactedFragmentsConsumerWithVersion && CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 auto consume_end_of_stream(Consumer& consumer, GCConsumer& gc_consumer) { if (_dk) { _last_dk = *_dk; @@ -578,7 +560,7 @@ public: /// partition-header and static row if there are clustering rows or range /// tombstones left in the partition. template - requires CompactedFragmentsConsumerWithVersion + requires CompactedFragmentsConsumerV2 void start_new_page(uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time, @@ -632,70 +614,10 @@ public: const compaction_stats& stats() const { return _stats; } }; -template -requires CompactedFragmentsConsumer && CompactedFragmentsConsumer -class compact_mutation { - lw_shared_ptr> _state; - Consumer _consumer; - // Garbage Collected Consumer - GCConsumer _gc_consumer; - -public: - compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit, - uint32_t partition_limit, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) - , _consumer(std::move(consumer)) - , _gc_consumer(std::move(gc_consumer)) { - } - - compact_mutation(const schema& s, gc_clock::time_point compaction_time, - std::function get_max_purgeable, - Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) - , _consumer(std::move(consumer)) - , _gc_consumer(std::move(gc_consumer)) { - } - - compact_mutation(lw_shared_ptr> state, Consumer consumer, - GCConsumer gc_consumer = GCConsumer()) - : _state(std::move(state)) - , _consumer(std::move(consumer)) - , _gc_consumer(std::move(gc_consumer)) { - } - - void consume_new_partition(const dht::decorated_key& dk) { - _state->consume_new_partition(dk); - } - - void consume(tombstone t) { - _state->consume(std::move(t), _consumer, _gc_consumer); - } - - stop_iteration consume(static_row&& sr) { - return _state->consume(std::move(sr), _consumer, _gc_consumer); - } - - stop_iteration consume(clustering_row&& cr) { - return _state->consume(std::move(cr), _consumer, _gc_consumer); - } - - stop_iteration consume(range_tombstone_change&& rtc) { - return _state->consume(std::move(rtc), _consumer, _gc_consumer); - } - - stop_iteration consume_end_of_partition() { - return _state->consume_end_of_partition(_consumer, _gc_consumer); - } - - auto consume_end_of_stream() { - return _state->consume_end_of_stream(_consumer, _gc_consumer); - } -}; - template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 class compact_mutation_v2 { - lw_shared_ptr> _state; + lw_shared_ptr> _state; Consumer _consumer; // Garbage Collected Consumer GCConsumer _gc_consumer; @@ -703,7 +625,7 @@ class compact_mutation_v2 { public: compact_mutation_v2(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit, uint32_t partition_limit, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) + : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) , _consumer(std::move(consumer)) , _gc_consumer(std::move(gc_consumer)) { } @@ -711,12 +633,12 @@ public: compact_mutation_v2(const schema& s, gc_clock::time_point compaction_time, std::function get_max_purgeable, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) + : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) , _consumer(std::move(consumer)) , _gc_consumer(std::move(gc_consumer)) { } - compact_mutation_v2(lw_shared_ptr> state, Consumer consumer, + compact_mutation_v2(lw_shared_ptr> state, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) : _state(std::move(state)) , _consumer(std::move(consumer)) @@ -752,15 +674,6 @@ public: } }; -template -requires CompactedFragmentsConsumer -struct compact_for_query : compact_mutation { - using compact_mutation::compact_mutation; -}; - -template -using compact_for_query_state = compact_mutation_state; - template requires CompactedFragmentsConsumerV2 struct compact_for_query_v2 : compact_mutation_v2 { @@ -768,7 +681,7 @@ struct compact_for_query_v2 : compact_mutation_v2 -using compact_for_query_state_v2 = compact_mutation_state; +using compact_for_query_state_v2 = compact_mutation_state; template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 diff --git a/mutation_reader.cc b/mutation_reader.cc index 8ead3f4cc6..9c59c98f5d 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -2305,11 +2305,11 @@ std::pair make_queue_reader_v2( namespace { class compacting_reader : public flat_mutation_reader_v2::impl { - friend class compact_mutation_state; + friend class compact_mutation_state; private: flat_mutation_reader_v2 _reader; - compact_mutation_state _compactor; + compact_mutation_state _compactor; noop_compacted_fragments_consumer _gc_consumer; // Uncompacted stream diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 5119508304..c50f593adc 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2744,7 +2744,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { testlog.info("limited pages v2"); { mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); @@ -2760,7 +2760,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { testlog.info("short pages v2"); { mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); @@ -2785,7 +2785,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { if (detached_state) { restore_state(reader, std::move(*detached_state)); } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); auto c = consumer_v2{permit, res_mut, max_rows}; reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); detached_state = std::move(*compaction_state).detach_state(); @@ -2806,7 +2806,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { if (detached_state) { restore_state(reader, std::move(*detached_state)); } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); auto c = consumer_v2{permit, res_mut, 2}; reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); detached_state = std::move(*compaction_state).detach_state(); From 279682056db5114ac9ffe2cab8baddffc3df8594 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 14:52:59 +0200 Subject: [PATCH 08/10] mutation_compactor: drop v1 related code-paths --- mutation_compactor.hh | 67 +++++-------------------------------------- 1 file changed, 7 insertions(+), 60 deletions(-) diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 0b4a17a629..01b99a966d 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -30,11 +30,6 @@ enum class compact_for_sstables { yes, }; -enum class compactor_output_format { - v1, - v2 -}; - template concept CompactedFragmentsConsumerV2 = requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, clustering_row cr, range_tombstone_change rtc, tombstone current_tombstone, row_tombstone current_row_tombstone, bool is_alive) { @@ -142,7 +137,6 @@ struct compaction_stats { // emitted. template class compact_mutation_state { - constexpr static compactor_output_format OutputFormat = compactor_output_format::v2; const schema& _schema; gc_clock::time_point _query_time; std::function _get_max_purgeable; @@ -155,7 +149,6 @@ class compact_mutation_state { uint64_t _partition_row_limit{}; tombstone _partition_tombstone; - range_tombstone_assembler _rt_assembler; bool _static_row_live{}; uint64_t _rows_in_current_partition; @@ -181,20 +174,6 @@ class compact_mutation_state { compaction_stats _stats; private: - template - requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 - stop_iteration do_consume(range_tombstone&& rt, Consumer& consumer, GCConsumer& gc_consumer) { - if (rt.tomb <= _partition_tombstone) { - return stop_iteration::no; - } - if (can_purge_tombstone(rt.tomb)) { - partition_is_not_empty_for_gc_consumer(gc_consumer); - return gc_consumer.consume(std::move(rt)); - } else { - partition_is_not_empty(consumer); - return consumer.consume(std::move(rt)); - } - } template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) { @@ -222,19 +201,6 @@ private: } return gc_consumer_stop || consumer_stop; } - template - tombstone tombstone_for_row(const clustering_key& ckey, Consumer& consumer, GCConsumer& gc_consumer) { - if constexpr (OutputFormat == compactor_output_format::v2) { - return std::max(_partition_tombstone, _effective_tombstone); - } else { - if (_rt_assembler.needs_flush()) { - if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(ckey))) { - do_consume(std::move(*rt_opt), consumer, gc_consumer); - } - } - return std::max(_partition_tombstone, _rt_assembler.get_current_tombstone()); - } - } static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; } @@ -350,7 +316,6 @@ public: _rows_in_current_partition = 0; _static_row_live = false; _partition_tombstone = {}; - _rt_assembler.reset(); _current_partition_limit = std::min(_row_limit, _partition_row_limit); _max_purgeable = api::missing_timestamp; _gc_before = std::nullopt; @@ -417,7 +382,7 @@ public: if (!sstable_compaction()) { _last_clustering_pos = cr.position(); } - auto current_tombstone = tombstone_for_row(cr.key(), consumer, gc_consumer); + auto current_tombstone = std::max(_partition_tombstone, _effective_tombstone); auto t = cr.tomb(); t.apply(current_tombstone); @@ -485,14 +450,7 @@ public: _last_clustering_pos = rtc.position(); } ++_stats.range_tombstones; - if constexpr (OutputFormat == compactor_output_format::v1) { - _effective_tombstone = rtc.tombstone(); - if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { - return do_consume(std::move(*rt_opt), consumer, gc_consumer); - } - } else { - do_consume(std::move(rtc), consumer, gc_consumer); - } + do_consume(std::move(rtc), consumer, gc_consumer); return stop_iteration::no; } @@ -501,17 +459,10 @@ public: stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) { if (_effective_tombstone) { auto rtc = range_tombstone_change(position_in_partition::after_key(_last_clustering_pos), tombstone{}); - if constexpr (OutputFormat == compactor_output_format::v1) { - if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { - do_consume(std::move(*rt_opt), consumer, gc_consumer); - } - _rt_assembler.on_end_of_stream(); - } else { - // do_consume() overwrites _effective_tombstone with {}, so save and restore it. - auto prev_tombstone = _effective_tombstone; - do_consume(std::move(rtc), consumer, gc_consumer); - _effective_tombstone = prev_tombstone; - } + // do_consume() overwrites _effective_tombstone with {}, so save and restore it. + auto prev_tombstone = _effective_tombstone; + do_consume(std::move(rtc), consumer, gc_consumer); + _effective_tombstone = prev_tombstone; } if (!_empty_partition_in_gc_consumer) { gc_consumer.consume_end_of_partition(); @@ -583,11 +534,7 @@ public: } if (_effective_tombstone) { auto rtc = range_tombstone_change(position_in_partition_view::after_key(_last_clustering_pos), _effective_tombstone); - if constexpr (OutputFormat == compactor_output_format::v2) { - do_consume(std::move(rtc), consumer, nc); - } else if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) { - do_consume(std::move(*rt_opt), consumer, nc); - } + do_consume(std::move(rtc), consumer, nc); } } From 21584262be4236a291d9a42f583ff1b5d203513f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 16:40:19 +0200 Subject: [PATCH 09/10] query_result_builder: remove v1 support Amounts to dropping (the noop) range tombstone consume() overload. --- mutation_partition.cc | 3 --- query-result-writer.hh | 3 --- 2 files changed, 6 deletions(-) diff --git a/mutation_partition.cc b/mutation_partition.cc index 3668134447..d821ec54f2 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1949,9 +1949,6 @@ stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone _stop = _mutation_consumer->consume(std::move(cr), t); return _stop; } -stop_iteration query_result_builder::consume(range_tombstone&& rt) { - return _stop; -} stop_iteration query_result_builder::consume(range_tombstone_change&& rtc) { return _stop; } diff --git a/query-result-writer.hh b/query-result-writer.hh index c5ed1e7d98..9c901abdcc 100644 --- a/query-result-writer.hh +++ b/query-result-writer.hh @@ -185,7 +185,6 @@ public: class row; class static_row; class clustering_row; -class range_tombstone; class range_tombstone_change; // Adds mutation to query::result. @@ -208,7 +207,6 @@ public: stop_iteration consume(static_row&& sr, tombstone current_tombstone); // Requires that cr.has_any_live_data() stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone); - stop_iteration consume(range_tombstone&&) { return stop_iteration::no; } stop_iteration consume(range_tombstone_change&&) { return stop_iteration::no; } uint64_t consume_end_of_stream(); }; @@ -225,7 +223,6 @@ public: void consume(tombstone t); stop_iteration consume(static_row&& sr, tombstone t, bool); stop_iteration consume(clustering_row&& cr, row_tombstone t, bool); - stop_iteration consume(range_tombstone&& rt); stop_iteration consume(range_tombstone_change&& rtc); stop_iteration consume_end_of_partition(); void consume_end_of_stream(); From 0632114a9bb5d731f8b68e013745fb5834dfe6a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Mar 2022 17:03:12 +0200 Subject: [PATCH 10/10] reconcilable_result_builder: remove v1 support Amounts to making the range tombstone consume() overload private. It is still used internally to consume the downgraded (from v2) range tombstones. --- mutation_query.hh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mutation_query.hh b/mutation_query.hh index 9bd600c452..79e5d8feb5 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -138,6 +138,10 @@ class reconcilable_result_builder { uint64_t _live_rows{}; // make this the last member so it is destroyed first. #7240 utils::chunked_vector _result; + +private: + stop_iteration consume(range_tombstone&& rt); + public: // Expects table schema (non-reversed) and half-reversed (legacy) slice when building results for reverse query. reconcilable_result_builder(const schema& s, const query::partition_slice& slice, @@ -150,7 +154,6 @@ public: void consume(tombstone t); stop_iteration consume(static_row&& sr, tombstone, bool is_alive); stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_alive); - stop_iteration consume(range_tombstone&& rt); stop_iteration consume(range_tombstone_change&& rtc); stop_iteration consume_end_of_partition(); reconcilable_result consume_end_of_stream();