tree: use emit_only_live_rows::no

emit_only_live_rows is a convenience so downstream consumers of the
mutation compactors don't have to check the `bool is_live` already
passed to them. This convenience however causes a template parameter and
additional logic for the compactor. As the most prominent of these
consumers (the query result builder) will soon have to switch to
emit_only_live_rows::no for other reasons anyway (it will want to count
tombstones), we take the opportunity to switch everybody to ::no. This
can be done with very little additional complexity to these consumer --
basically an additional if or two.
This prepares the ground for removing this template parameter and the
associate logic from the compactor.
This commit is contained in:
Botond Dénes
2022-06-30 08:57:20 +03:00
parent 742dc10185
commit bedc82e52c
5 changed files with 26 additions and 15 deletions

View File

@@ -380,7 +380,6 @@ static query::partition_slice make_partition_slice(const schema& s) {
class data_query_result_builder {
public:
using result_type = query::result;
static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes;
private:
query::result::builder _res_builder;
@@ -1958,8 +1957,11 @@ public:
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_live) {
inject_failure("view_builder_consume_clustering_row");
if (!is_live) {
return stop_iteration::no;
}
if (_views_to_build.empty() || _builder._as.abort_requested()) {
return stop_iteration::yes;
}
@@ -2034,7 +2036,7 @@ public:
// Called in the context of a seastar::thread.
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
gc_clock::time_point now = gc_clock::now();
auto consumer = compact_for_query_v2<emit_only_live_rows::yes, view_builder::consumer>(
auto consumer = compact_for_query_v2<emit_only_live_rows::no, view_builder::consumer>(
*step.reader.schema(),
now,
step.pslice,

View File

@@ -610,7 +610,7 @@ future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unco
namespace {
template <typename ResultType>
using compact_for_result_state = compact_for_query_state_v2<ResultType::only_live>;
using compact_for_result_state = compact_for_query_state_v2<emit_only_live_rows::no>;
template <typename ResultBuilder>
requires std::is_nothrow_move_constructible_v<typename ResultBuilder::result_type>
@@ -803,7 +803,6 @@ namespace {
class mutation_query_result_builder {
public:
using result_type = reconcilable_result;
static constexpr emit_only_live_rows only_live = emit_only_live_rows::no;
private:
reconcilable_result_builder _builder;
@@ -824,7 +823,6 @@ public:
class data_query_result_builder {
public:
using result_type = query::result;
static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes;
private:
const compact_for_result_state<data_query_result_builder>& _compaction_state;

View File

@@ -2072,11 +2072,17 @@ void query_result_builder::consume_new_partition(const dht::decorated_key& dk) {
void query_result_builder::consume(tombstone t) {
_mutation_consumer->consume(t);
}
stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool) {
stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool is_live) {
if (!is_live) {
return _stop;
}
_stop = _mutation_consumer->consume(std::move(sr), t);
return _stop;
}
stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool) {
stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool is_live) {
if (!is_live) {
return _stop;
}
_stop = _mutation_consumer->consume(std::move(cr), t);
return _stop;
}
@@ -2200,7 +2206,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
query::result_options opts) {
// This result was already built with a limit, don't apply another one.
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
auto consumer = compact_for_query_v2<emit_only_live_rows::yes, query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
auto consumer = compact_for_query_v2<emit_only_live_rows::no, query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
max_partitions, query_result_builder(*s, builder));
auto compaction_state = consumer.get_state();
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
@@ -2232,7 +2238,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
query::result
query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) {
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
auto consumer = compact_for_query_v2<emit_only_live_rows::yes, query_result_builder>(*m.schema(), now, slice, row_limit,
auto consumer = compact_for_query_v2<emit_only_live_rows::no, query_result_builder>(*m.schema(), now, slice, row_limit,
query::max_partitions, query_result_builder(*m.schema(), builder));
auto compaction_state = consumer.get_state();
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
@@ -2249,11 +2255,17 @@ public:
_mutation = mutation(_schema.shared_from_this(), dk);
}
void consume(tombstone) { }
stop_iteration consume(static_row&& sr, tombstone, bool) {
stop_iteration consume(static_row&& sr, tombstone, bool is_live) {
if (!is_live) {
return stop_iteration::no;
}
_mutation->partition().static_row().maybe_create() = std::move(sr.cells());
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_live) {
if (!is_live) {
return stop_iteration::no;
}
_mutation->partition().insert_row(_schema, cr.key(), std::move(cr).as_deletable_row());
return stop_iteration::no;
}

View File

@@ -183,7 +183,7 @@ public:
}
};
using data_querier = querier<emit_only_live_rows::yes>;
using data_querier = querier<emit_only_live_rows::no>;
using mutation_querier = querier<emit_only_live_rows::no>;
/// Local state of a multishard query.

View File

@@ -191,8 +191,7 @@ void test_scan_with_range_delete_over_rows() {
auto d = duration_in_seconds([&] {
auto slice = partition_slice_builder(*s).build();
auto q = query::querier<emit_only_live_rows::yes>(cache_ms, s, semaphore.make_permit(), pr, slice,
default_priority_class(), nullptr);
auto q = query::querier<emit_only_live_rows::no>(cache_ms, s, semaphore.make_permit(), pr, slice, default_priority_class(), nullptr);
auto close_q = deferred_close(q);
q.consume_page(noop_compacted_fragments_consumer(),
std::numeric_limits<uint32_t>::max(),