mutation_compactor: remove emit_only_live_rows template parameter
Now that we use emit_only_live_rows::no everywhere we can remove this template parameters. Only the template parameter is removed, the internal logic around it is left in place (will be removed in a next patch), by hard-wiring `only_live()`.
This commit is contained in:
@@ -2036,7 +2036,7 @@ public:
|
||||
// Called in the context of a seastar::thread.
|
||||
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
gc_clock::time_point now = gc_clock::now();
|
||||
auto consumer = compact_for_query_v2<emit_only_live_rows::no, view_builder::consumer>(
|
||||
auto consumer = compact_for_query_v2<view_builder::consumer>(
|
||||
*step.reader.schema(),
|
||||
now,
|
||||
step.pslice,
|
||||
|
||||
@@ -610,7 +610,7 @@ future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unco
|
||||
namespace {
|
||||
|
||||
template <typename ResultType>
|
||||
using compact_for_result_state = compact_for_query_state_v2<emit_only_live_rows::no>;
|
||||
using compact_for_result_state = compact_for_query_state_v2;
|
||||
|
||||
template <typename ResultBuilder>
|
||||
requires std::is_nothrow_move_constructible_v<typename ResultBuilder::result_type>
|
||||
|
||||
@@ -21,11 +21,6 @@ static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) {
|
||||
});
|
||||
}
|
||||
|
||||
enum class emit_only_live_rows {
|
||||
no,
|
||||
yes,
|
||||
};
|
||||
|
||||
enum class compact_for_sstables {
|
||||
no,
|
||||
yes,
|
||||
@@ -133,10 +128,7 @@ struct compaction_stats {
|
||||
uint64_t range_tombstones = 0;
|
||||
};
|
||||
|
||||
// emit_only_live::yes will cause compact_for_query to emit only live
|
||||
// static and clustering rows. It doesn't affect the way range tombstones are
|
||||
// emitted.
|
||||
template<emit_only_live_rows OnlyLive, compact_for_sstables SSTableCompaction>
|
||||
template<compact_for_sstables SSTableCompaction>
|
||||
class compact_mutation_state {
|
||||
const schema& _schema;
|
||||
gc_clock::time_point _query_time;
|
||||
@@ -203,7 +195,7 @@ private:
|
||||
return gc_consumer_stop || consumer_stop;
|
||||
}
|
||||
static constexpr bool only_live() {
|
||||
return OnlyLive == emit_only_live_rows::yes;
|
||||
return false;
|
||||
}
|
||||
static constexpr bool sstable_compaction() {
|
||||
return SSTableCompaction == compact_for_sstables::yes;
|
||||
@@ -571,10 +563,10 @@ public:
|
||||
const compaction_stats& stats() const { return _stats; }
|
||||
};
|
||||
|
||||
template<emit_only_live_rows OnlyLive, compact_for_sstables SSTableCompaction, typename Consumer, typename GCConsumer>
|
||||
template<compact_for_sstables SSTableCompaction, typename Consumer, typename GCConsumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
|
||||
class compact_mutation_v2 {
|
||||
lw_shared_ptr<compact_mutation_state<OnlyLive, SSTableCompaction>> _state;
|
||||
lw_shared_ptr<compact_mutation_state<SSTableCompaction>> _state;
|
||||
Consumer _consumer;
|
||||
// Garbage Collected Consumer
|
||||
GCConsumer _gc_consumer;
|
||||
@@ -582,7 +574,7 @@ class compact_mutation_v2 {
|
||||
public:
|
||||
compact_mutation_v2(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit,
|
||||
uint32_t partition_limit, Consumer consumer, GCConsumer gc_consumer = GCConsumer())
|
||||
: _state(make_lw_shared<compact_mutation_state<OnlyLive, SSTableCompaction>>(s, query_time, slice, limit, partition_limit))
|
||||
: _state(make_lw_shared<compact_mutation_state<SSTableCompaction>>(s, query_time, slice, limit, partition_limit))
|
||||
, _consumer(std::move(consumer))
|
||||
, _gc_consumer(std::move(gc_consumer)) {
|
||||
}
|
||||
@@ -590,12 +582,12 @@ public:
|
||||
compact_mutation_v2(const schema& s, gc_clock::time_point compaction_time,
|
||||
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable,
|
||||
Consumer consumer, GCConsumer gc_consumer = GCConsumer())
|
||||
: _state(make_lw_shared<compact_mutation_state<OnlyLive, SSTableCompaction>>(s, compaction_time, get_max_purgeable))
|
||||
: _state(make_lw_shared<compact_mutation_state<SSTableCompaction>>(s, compaction_time, get_max_purgeable))
|
||||
, _consumer(std::move(consumer))
|
||||
, _gc_consumer(std::move(gc_consumer)) {
|
||||
}
|
||||
|
||||
compact_mutation_v2(lw_shared_ptr<compact_mutation_state<OnlyLive, SSTableCompaction>> state, Consumer consumer,
|
||||
compact_mutation_v2(lw_shared_ptr<compact_mutation_state<SSTableCompaction>> state, Consumer consumer,
|
||||
GCConsumer gc_consumer = GCConsumer())
|
||||
: _state(std::move(state))
|
||||
, _consumer(std::move(consumer))
|
||||
@@ -630,22 +622,21 @@ public:
|
||||
return _state->consume_end_of_stream(_consumer, _gc_consumer);
|
||||
}
|
||||
|
||||
lw_shared_ptr<compact_mutation_state<OnlyLive, SSTableCompaction>> get_state() {
|
||||
lw_shared_ptr<compact_mutation_state<SSTableCompaction>> get_state() {
|
||||
return _state;
|
||||
}
|
||||
};
|
||||
|
||||
template<emit_only_live_rows only_live, typename Consumer>
|
||||
template<typename Consumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer>
|
||||
struct compact_for_query_v2 : compact_mutation_v2<only_live, compact_for_sstables::no, Consumer, noop_compacted_fragments_consumer> {
|
||||
using compact_mutation_v2<only_live, compact_for_sstables::no, Consumer, noop_compacted_fragments_consumer>::compact_mutation_v2;
|
||||
struct compact_for_query_v2 : compact_mutation_v2<compact_for_sstables::no, Consumer, noop_compacted_fragments_consumer> {
|
||||
using compact_mutation_v2<compact_for_sstables::no, Consumer, noop_compacted_fragments_consumer>::compact_mutation_v2;
|
||||
};
|
||||
|
||||
template<emit_only_live_rows OnlyLive>
|
||||
using compact_for_query_state_v2 = compact_mutation_state<OnlyLive, compact_for_sstables::no>;
|
||||
using compact_for_query_state_v2 = compact_mutation_state<compact_for_sstables::no>;
|
||||
|
||||
template<typename Consumer, typename GCConsumer = noop_compacted_fragments_consumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
|
||||
struct compact_for_compaction_v2 : compact_mutation_v2<emit_only_live_rows::no, compact_for_sstables::yes, Consumer, GCConsumer> {
|
||||
using compact_mutation_v2<emit_only_live_rows::no, compact_for_sstables::yes, Consumer, GCConsumer>::compact_mutation_v2;
|
||||
struct compact_for_compaction_v2 : compact_mutation_v2<compact_for_sstables::yes, Consumer, GCConsumer> {
|
||||
using compact_mutation_v2<compact_for_sstables::yes, Consumer, GCConsumer>::compact_mutation_v2;
|
||||
};
|
||||
|
||||
@@ -2206,7 +2206,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
|
||||
query::result_options opts) {
|
||||
// This result was already built with a limit, don't apply another one.
|
||||
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
|
||||
auto consumer = compact_for_query_v2<emit_only_live_rows::no, query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
|
||||
auto consumer = compact_for_query_v2<query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
|
||||
max_partitions, query_result_builder(*s, builder));
|
||||
auto compaction_state = consumer.get_state();
|
||||
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
|
||||
@@ -2238,7 +2238,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
|
||||
query::result
|
||||
query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) {
|
||||
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
|
||||
auto consumer = compact_for_query_v2<emit_only_live_rows::no, query_result_builder>(*m.schema(), now, slice, row_limit,
|
||||
auto consumer = compact_for_query_v2<query_result_builder>(*m.schema(), now, slice, row_limit,
|
||||
query::max_partitions, query_result_builder(*m.schema(), builder));
|
||||
auto compaction_state = consumer.get_state();
|
||||
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
|
||||
@@ -2468,7 +2468,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
|
||||
// do_with() doesn't support immovable objects
|
||||
auto r_a_r = std::make_unique<range_and_reader>(s, source, std::move(permit), dk, slice, std::move(trace_ptr));
|
||||
auto cwqrb = counter_write_query_result_builder(*s);
|
||||
auto cfq = compact_for_query_v2<emit_only_live_rows::yes, counter_write_query_result_builder>(
|
||||
auto cfq = compact_for_query_v2<counter_write_query_result_builder>(
|
||||
*s, gc_clock::now(), slice, query::max_rows, query::max_partitions, std::move(cwqrb));
|
||||
auto f = r_a_r->reader.consume(std::move(cfq));
|
||||
return f.finally([r_a_r = std::move(r_a_r)] {
|
||||
|
||||
@@ -31,7 +31,7 @@ namespace query {
|
||||
template <typename Consumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer>
|
||||
auto consume_page(flat_mutation_reader_v2& reader,
|
||||
lw_shared_ptr<compact_for_query_state_v2<emit_only_live_rows::no>> compaction_state,
|
||||
lw_shared_ptr<compact_for_query_state_v2> compaction_state,
|
||||
const query::partition_slice& slice,
|
||||
Consumer&& consumer,
|
||||
uint64_t row_limit,
|
||||
@@ -42,7 +42,7 @@ auto consume_page(flat_mutation_reader_v2& reader,
|
||||
const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end;
|
||||
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer);
|
||||
|
||||
auto reader_consumer = compact_for_query_v2<emit_only_live_rows::no, Consumer>(compaction_state, std::move(consumer));
|
||||
auto reader_consumer = compact_for_query_v2<Consumer>(compaction_state, std::move(consumer));
|
||||
|
||||
return reader.consume(std::move(reader_consumer));
|
||||
});
|
||||
@@ -132,7 +132,7 @@ public:
|
||||
/// page. It should be dropped instead and a new one should be created
|
||||
/// instead.
|
||||
class querier : public querier_base {
|
||||
lw_shared_ptr<compact_for_query_state_v2<emit_only_live_rows::no>> _compaction_state;
|
||||
lw_shared_ptr<compact_for_query_state_v2> _compaction_state;
|
||||
|
||||
public:
|
||||
querier(const mutation_source& ms,
|
||||
@@ -143,7 +143,7 @@ public:
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_ptr)
|
||||
: querier_base(schema, permit, std::move(range), std::move(slice), ms, pc, std::move(trace_ptr))
|
||||
, _compaction_state(make_lw_shared<compact_for_query_state_v2<emit_only_live_rows::no>>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) {
|
||||
, _compaction_state(make_lw_shared<compact_for_query_state_v2>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) {
|
||||
}
|
||||
|
||||
bool are_limits_reached() const {
|
||||
|
||||
@@ -1380,11 +1380,11 @@ std::pair<flat_mutation_reader_v2, queue_reader_handle_v2> make_queue_reader_v2(
|
||||
namespace {
|
||||
|
||||
class compacting_reader : public flat_mutation_reader_v2::impl {
|
||||
friend class compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::yes>;
|
||||
friend class compact_mutation_state<compact_for_sstables::yes>;
|
||||
|
||||
private:
|
||||
flat_mutation_reader_v2 _reader;
|
||||
compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::yes> _compactor;
|
||||
compact_mutation_state<compact_for_sstables::yes> _compactor;
|
||||
noop_compacted_fragments_consumer _gc_consumer;
|
||||
|
||||
// Uncompacted stream
|
||||
|
||||
@@ -2734,7 +2734,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
|
||||
testlog.info("non-paged v2");
|
||||
{
|
||||
mutation res_mut(s, pk);
|
||||
auto c = compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer_v2{permit, res_mut, max_rows});
|
||||
auto c = compact_for_query_v2<consumer_v2>(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer_v2{permit, res_mut, max_rows});
|
||||
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
|
||||
auto close_reader = deferred_close(reader);
|
||||
|
||||
@@ -2746,14 +2746,14 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
|
||||
testlog.info("limited pages v2");
|
||||
{
|
||||
mutation res_mut(s, pk);
|
||||
auto compaction_state = make_lw_shared<compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::no>>(*s, query_time, s->full_slice(), 1, max_partitions);
|
||||
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), 1, max_partitions);
|
||||
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
|
||||
auto close_reader = deferred_close(reader);
|
||||
|
||||
while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) {
|
||||
auto c = consumer_v2{permit, res_mut, max_rows};
|
||||
compaction_state->start_new_page(1, max_partitions, query_time, reader.peek().get()->position().region(), c);
|
||||
reader.consume(compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(compaction_state, std::move(c))).get();
|
||||
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, std::move(c))).get();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
|
||||
@@ -2762,14 +2762,14 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
|
||||
testlog.info("short pages v2");
|
||||
{
|
||||
mutation res_mut(s, pk);
|
||||
auto compaction_state = make_lw_shared<compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
|
||||
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
|
||||
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
|
||||
auto close_reader = deferred_close(reader);
|
||||
|
||||
while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) {
|
||||
auto c = consumer_v2{permit, res_mut, 2};
|
||||
compaction_state->start_new_page(max_rows, max_partitions, query_time, reader.peek().get()->position().region(), c);
|
||||
reader.consume(compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(compaction_state, std::move(c))).get();
|
||||
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, std::move(c))).get();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
|
||||
@@ -2787,9 +2787,9 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
|
||||
if (detached_state) {
|
||||
restore_state(reader, std::move(*detached_state));
|
||||
}
|
||||
auto compaction_state = make_lw_shared<compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::no>>(*s, query_time, s->full_slice(), 1, max_partitions);
|
||||
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), 1, max_partitions);
|
||||
auto c = consumer_v2{permit, res_mut, max_rows};
|
||||
reader.consume(compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(compaction_state, std::move(c))).get();
|
||||
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, std::move(c))).get();
|
||||
detached_state = std::move(*compaction_state).detach_state();
|
||||
}
|
||||
|
||||
@@ -2808,9 +2808,9 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
|
||||
if (detached_state) {
|
||||
restore_state(reader, std::move(*detached_state));
|
||||
}
|
||||
auto compaction_state = make_lw_shared<compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
|
||||
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
|
||||
auto c = consumer_v2{permit, res_mut, 2};
|
||||
reader.consume(compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(compaction_state, std::move(c))).get();
|
||||
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, std::move(c))).get();
|
||||
detached_state = std::move(*compaction_state).detach_state();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user