From bedc82e52c94157400e8754f8052ba0b6e7ca69d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 30 Jun 2022 08:57:20 +0300 Subject: [PATCH] tree: use emit_only_live_rows::no emit_only_live_rows is a convenience so downstream consumers of the mutation compactors don't have to check the `bool is_live` already passed to them. This convenience however causes a template parameter and additional logic for the compactor. As the most prominent of these consumers (the query result builder) will soon have to switch to emit_only_live_rows::no for other reasons anyway (it will want to count tombstones), we take the opportunity to switch everybody to ::no. This can be done with very little additional complexity to these consumer -- basically an additional if or two. This prepares the ground for removing this template parameter and the associate logic from the compactor. --- db/view/view.cc | 8 +++++--- multishard_mutation_query.cc | 4 +--- mutation_partition.cc | 24 ++++++++++++++++++------ querier.hh | 2 +- test/perf/perf_row_cache_reads.cc | 3 +-- 5 files changed, 26 insertions(+), 15 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 3e25b51aea..5ae6b3c81b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -380,7 +380,6 @@ static query::partition_slice make_partition_slice(const schema& s) { class data_query_result_builder { public: using result_type = query::result; - static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes; private: query::result::builder _res_builder; @@ -1958,8 +1957,11 @@ public: return stop_iteration::no; } - stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { + stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_live) { inject_failure("view_builder_consume_clustering_row"); + if (!is_live) { + return stop_iteration::no; + } if (_views_to_build.empty() || _builder._as.abort_requested()) { return stop_iteration::yes; } @@ -2034,7 +2036,7 @@ public: // Called in the context of a seastar::thread. void view_builder::execute(build_step& step, exponential_backoff_retry r) { gc_clock::time_point now = gc_clock::now(); - auto consumer = compact_for_query_v2( + auto consumer = compact_for_query_v2( *step.reader.schema(), now, step.pslice, diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index bdd626c0e0..065767e1dc 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -610,7 +610,7 @@ future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unco namespace { template -using compact_for_result_state = compact_for_query_state_v2; +using compact_for_result_state = compact_for_query_state_v2; template requires std::is_nothrow_move_constructible_v @@ -803,7 +803,6 @@ namespace { class mutation_query_result_builder { public: using result_type = reconcilable_result; - static constexpr emit_only_live_rows only_live = emit_only_live_rows::no; private: reconcilable_result_builder _builder; @@ -824,7 +823,6 @@ public: class data_query_result_builder { public: using result_type = query::result; - static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes; private: const compact_for_result_state& _compaction_state; diff --git a/mutation_partition.cc b/mutation_partition.cc index 22fa9d83d2..ba8724be0e 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2072,11 +2072,17 @@ void query_result_builder::consume_new_partition(const dht::decorated_key& dk) { void query_result_builder::consume(tombstone t) { _mutation_consumer->consume(t); } -stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool) { +stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool is_live) { + if (!is_live) { + return _stop; + } _stop = _mutation_consumer->consume(std::move(sr), t); return _stop; } -stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool) { +stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool is_live) { + if (!is_live) { + return _stop; + } _stop = _mutation_consumer->consume(std::move(cr), t); return _stop; } @@ -2200,7 +2206,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result_options opts) { // This result was already built with a limit, don't apply another one. query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - auto consumer = compact_for_query_v2(*s, gc_clock::time_point::min(), slice, max_rows, + auto consumer = compact_for_query_v2(*s, gc_clock::time_point::min(), slice, max_rows, max_partitions, query_result_builder(*s, builder)); auto compaction_state = consumer.get_state(); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; @@ -2232,7 +2238,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) { query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); - auto consumer = compact_for_query_v2(*m.schema(), now, slice, row_limit, + auto consumer = compact_for_query_v2(*m.schema(), now, slice, row_limit, query::max_partitions, query_result_builder(*m.schema(), builder)); auto compaction_state = consumer.get_state(); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; @@ -2249,11 +2255,17 @@ public: _mutation = mutation(_schema.shared_from_this(), dk); } void consume(tombstone) { } - stop_iteration consume(static_row&& sr, tombstone, bool) { + stop_iteration consume(static_row&& sr, tombstone, bool is_live) { + if (!is_live) { + return stop_iteration::no; + } _mutation->partition().static_row().maybe_create() = std::move(sr.cells()); return stop_iteration::no; } - stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { + stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_live) { + if (!is_live) { + return stop_iteration::no; + } _mutation->partition().insert_row(_schema, cr.key(), std::move(cr).as_deletable_row()); return stop_iteration::no; } diff --git a/querier.hh b/querier.hh index 7dec6b08ef..edbdb2d408 100644 --- a/querier.hh +++ b/querier.hh @@ -183,7 +183,7 @@ public: } }; -using data_querier = querier; +using data_querier = querier; using mutation_querier = querier; /// Local state of a multishard query. diff --git a/test/perf/perf_row_cache_reads.cc b/test/perf/perf_row_cache_reads.cc index f5c4f2fe15..644ad8050a 100644 --- a/test/perf/perf_row_cache_reads.cc +++ b/test/perf/perf_row_cache_reads.cc @@ -191,8 +191,7 @@ void test_scan_with_range_delete_over_rows() { auto d = duration_in_seconds([&] { auto slice = partition_slice_builder(*s).build(); - auto q = query::querier(cache_ms, s, semaphore.make_permit(), pr, slice, - default_priority_class(), nullptr); + auto q = query::querier(cache_ms, s, semaphore.make_permit(), pr, slice, default_priority_class(), nullptr); auto close_q = deferred_close(q); q.consume_page(noop_compacted_fragments_consumer(), std::numeric_limits::max(),