From 4d2ce5c3043d0501badca46788b640ce5b8e7322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 30 Jun 2022 06:59:35 +0300 Subject: [PATCH] mutation_compactor: remove emit_only_live_rows template parameter Now that we use emit_only_live_rows::no everywhere we can remove this template parameters. Only the template parameter is removed, the internal logic around it is left in place (will be removed in a next patch), by hard-wiring `only_live()`. --- db/view/view.cc | 2 +- multishard_mutation_query.cc | 2 +- mutation_compactor.hh | 37 ++++++++++++++---------------------- mutation_partition.cc | 6 +++--- querier.hh | 8 ++++---- readers/mutation_readers.cc | 4 ++-- test/boost/mutation_test.cc | 18 +++++++++--------- 7 files changed, 34 insertions(+), 43 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 5ae6b3c81b..0a5968347c 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2036,7 +2036,7 @@ public: // Called in the context of a seastar::thread. void view_builder::execute(build_step& step, exponential_backoff_retry r) { gc_clock::time_point now = gc_clock::now(); - auto consumer = compact_for_query_v2( + auto consumer = compact_for_query_v2( *step.reader.schema(), now, step.pslice, diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 065767e1dc..ea12667654 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -610,7 +610,7 @@ future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unco namespace { template -using compact_for_result_state = compact_for_query_state_v2; +using compact_for_result_state = compact_for_query_state_v2; template requires std::is_nothrow_move_constructible_v diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 33c73ec91f..19c244694b 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -21,11 +21,6 @@ static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) { }); } -enum class emit_only_live_rows { - no, - yes, -}; - enum class compact_for_sstables { no, yes, @@ -133,10 +128,7 @@ struct compaction_stats { uint64_t range_tombstones = 0; }; -// emit_only_live::yes will cause compact_for_query to emit only live -// static and clustering rows. It doesn't affect the way range tombstones are -// emitted. -template +template class compact_mutation_state { const schema& _schema; gc_clock::time_point _query_time; @@ -203,7 +195,7 @@ private: return gc_consumer_stop || consumer_stop; } static constexpr bool only_live() { - return OnlyLive == emit_only_live_rows::yes; + return false; } static constexpr bool sstable_compaction() { return SSTableCompaction == compact_for_sstables::yes; @@ -571,10 +563,10 @@ public: const compaction_stats& stats() const { return _stats; } }; -template +template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 class compact_mutation_v2 { - lw_shared_ptr> _state; + lw_shared_ptr> _state; Consumer _consumer; // Garbage Collected Consumer GCConsumer _gc_consumer; @@ -582,7 +574,7 @@ class compact_mutation_v2 { public: compact_mutation_v2(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit, uint32_t partition_limit, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) + : _state(make_lw_shared>(s, query_time, slice, limit, partition_limit)) , _consumer(std::move(consumer)) , _gc_consumer(std::move(gc_consumer)) { } @@ -590,12 +582,12 @@ public: compact_mutation_v2(const schema& s, gc_clock::time_point compaction_time, std::function get_max_purgeable, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) - : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) + : _state(make_lw_shared>(s, compaction_time, get_max_purgeable)) , _consumer(std::move(consumer)) , _gc_consumer(std::move(gc_consumer)) { } - compact_mutation_v2(lw_shared_ptr> state, Consumer consumer, + compact_mutation_v2(lw_shared_ptr> state, Consumer consumer, GCConsumer gc_consumer = GCConsumer()) : _state(std::move(state)) , _consumer(std::move(consumer)) @@ -630,22 +622,21 @@ public: return _state->consume_end_of_stream(_consumer, _gc_consumer); } - lw_shared_ptr> get_state() { + lw_shared_ptr> get_state() { return _state; } }; -template +template requires CompactedFragmentsConsumerV2 -struct compact_for_query_v2 : compact_mutation_v2 { - using compact_mutation_v2::compact_mutation_v2; +struct compact_for_query_v2 : compact_mutation_v2 { + using compact_mutation_v2::compact_mutation_v2; }; -template -using compact_for_query_state_v2 = compact_mutation_state; +using compact_for_query_state_v2 = compact_mutation_state; template requires CompactedFragmentsConsumerV2 && CompactedFragmentsConsumerV2 -struct compact_for_compaction_v2 : compact_mutation_v2 { - using compact_mutation_v2::compact_mutation_v2; +struct compact_for_compaction_v2 : compact_mutation_v2 { + using compact_mutation_v2::compact_mutation_v2; }; diff --git a/mutation_partition.cc b/mutation_partition.cc index ba8724be0e..de0e747cca 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2206,7 +2206,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result_options opts) { // This result was already built with a limit, don't apply another one. query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - auto consumer = compact_for_query_v2(*s, gc_clock::time_point::min(), slice, max_rows, + auto consumer = compact_for_query_v2(*s, gc_clock::time_point::min(), slice, max_rows, max_partitions, query_result_builder(*s, builder)); auto compaction_state = consumer.get_state(); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; @@ -2238,7 +2238,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) { query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - auto consumer = compact_for_query_v2(*m.schema(), now, slice, row_limit, + auto consumer = compact_for_query_v2(*m.schema(), now, slice, row_limit, query::max_partitions, query_result_builder(*m.schema(), builder)); auto compaction_state = consumer.get_state(); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; @@ -2468,7 +2468,7 @@ future counter_write_query(schema_ptr s, const mutation_source& so // do_with() doesn't support immovable objects auto r_a_r = std::make_unique(s, source, std::move(permit), dk, slice, std::move(trace_ptr)); auto cwqrb = counter_write_query_result_builder(*s); - auto cfq = compact_for_query_v2( + auto cfq = compact_for_query_v2( *s, gc_clock::now(), slice, query::max_rows, query::max_partitions, std::move(cwqrb)); auto f = r_a_r->reader.consume(std::move(cfq)); return f.finally([r_a_r = std::move(r_a_r)] { diff --git a/querier.hh b/querier.hh index 395d4f3fd2..700e3497b1 100644 --- a/querier.hh +++ b/querier.hh @@ -31,7 +31,7 @@ namespace query { template requires CompactedFragmentsConsumerV2 auto consume_page(flat_mutation_reader_v2& reader, - lw_shared_ptr> compaction_state, + lw_shared_ptr compaction_state, const query::partition_slice& slice, Consumer&& consumer, uint64_t row_limit, @@ -42,7 +42,7 @@ auto consume_page(flat_mutation_reader_v2& reader, const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end; compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer); - auto reader_consumer = compact_for_query_v2(compaction_state, std::move(consumer)); + auto reader_consumer = compact_for_query_v2(compaction_state, std::move(consumer)); return reader.consume(std::move(reader_consumer)); }); @@ -132,7 +132,7 @@ public: /// page. It should be dropped instead and a new one should be created /// instead. class querier : public querier_base { - lw_shared_ptr> _compaction_state; + lw_shared_ptr _compaction_state; public: querier(const mutation_source& ms, @@ -143,7 +143,7 @@ public: const io_priority_class& pc, tracing::trace_state_ptr trace_ptr) : querier_base(schema, permit, std::move(range), std::move(slice), ms, pc, std::move(trace_ptr)) - , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { + , _compaction_state(make_lw_shared(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { } bool are_limits_reached() const { diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index 668f2fe27d..21896c2456 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -1380,11 +1380,11 @@ std::pair make_queue_reader_v2( namespace { class compacting_reader : public flat_mutation_reader_v2::impl { - friend class compact_mutation_state; + friend class compact_mutation_state; private: flat_mutation_reader_v2 _reader; - compact_mutation_state _compactor; + compact_mutation_state _compactor; noop_compacted_fragments_consumer _gc_consumer; // Uncompacted stream diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index f69ef47161..03f72506eb 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2734,7 +2734,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { testlog.info("non-paged v2"); { mutation res_mut(s, pk); - auto c = compact_for_query_v2(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer_v2{permit, res_mut, max_rows}); + auto c = compact_for_query_v2(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer_v2{permit, res_mut, max_rows}); auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); @@ -2746,14 +2746,14 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { testlog.info("limited pages v2"); { mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { auto c = consumer_v2{permit, res_mut, max_rows}; compaction_state->start_new_page(1, max_partitions, query_time, reader.peek().get()->position().region(), c); - reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); + reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); } BOOST_REQUIRE_EQUAL(res_mut, ref_mut); @@ -2762,14 +2762,14 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { testlog.info("short pages v2"); { mutation res_mut(s, pk); - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { auto c = consumer_v2{permit, res_mut, 2}; compaction_state->start_new_page(max_rows, max_partitions, query_time, reader.peek().get()->position().region(), c); - reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); + reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); } BOOST_REQUIRE_EQUAL(res_mut, ref_mut); @@ -2787,9 +2787,9 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { if (detached_state) { restore_state(reader, std::move(*detached_state)); } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); auto c = consumer_v2{permit, res_mut, max_rows}; - reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); + reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); detached_state = std::move(*compaction_state).detach_state(); } @@ -2808,9 +2808,9 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { if (detached_state) { restore_state(reader, std::move(*detached_state)); } - auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); auto c = consumer_v2{permit, res_mut, 2}; - reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); + reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); detached_state = std::move(*compaction_state).detach_state(); }