diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 5b7ad7a7d6..8f63ad576f 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -35,6 +35,11 @@ enum class emit_only_live_rows { yes, }; +enum class compact_for_sstables { + no, + yes, +}; + /* template concept bool CompactedMutationsConsumer() { @@ -55,7 +60,7 @@ concept bool CompactedMutationsConsumer() { // emit_only_live::yes will cause compact_for_query to emit only live // static and clustering rows. It doesn't affect the way range tombstones are // emitted. -template +template class compact_mutation { const schema& _schema; gc_clock::time_point _query_time; @@ -63,9 +68,9 @@ class compact_mutation { std::function _get_max_purgeable; api::timestamp_type _max_purgeable = api::max_timestamp; const query::partition_slice& _slice; - uint32_t _row_limit; - uint32_t _partition_limit; - uint32_t _partition_row_limit; + uint32_t _row_limit{}; + uint32_t _partition_limit{}; + uint32_t _partition_row_limit{}; Consumer _consumer; tombstone _partition_tombstone; @@ -82,6 +87,9 @@ private: static constexpr bool only_live() { return OnlyLive == emit_only_live_rows::yes; } + static constexpr bool sstable_compaction() { + return SSTableCompaction == compact_for_sstables::yes; + } void partition_is_not_empty() { if (_empty_partition) { @@ -95,7 +103,7 @@ private: } bool can_purge_tombstone(const tombstone& t) { - return t.timestamp < _max_purgeable && t.deletion_time < _gc_before; + return (!sstable_compaction() || t.timestamp < _max_purgeable) && t.deletion_time < _gc_before; }; public: compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit, @@ -108,7 +116,22 @@ public: , _partition_limit(partition_limit) , _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit()) , _consumer(std::move(consumer)) - { } + { + static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction."); + } + + compact_mutation(const schema& s, gc_clock::time_point compaction_time, Consumer consumer, + std::function get_max_purgeable) + : _schema(s) + , _query_time(compaction_time) + , _gc_before(_query_time - s.gc_grace_seconds()) + , _get_max_purgeable(std::move(get_max_purgeable)) + , _slice(query::full_slice) + , _consumer(std::move(consumer)) + { + static_assert(sstable_compaction(), "This constructor can only be used for sstable compaction."); + static_assert(!only_live(), "SSTable compaction cannot be run with emit_only_live_rows::yes."); + } void consume_new_partition(const dht::decorated_key& dk) { auto& pk = dk.key(); @@ -120,7 +143,7 @@ public: _current_tombstone = { }; _partition_tombstone = { }; _current_partition_limit = std::min(_row_limit, _partition_row_limit); - if (_get_max_purgeable) { + if (sstable_compaction()) { _max_purgeable = _get_max_purgeable(dk); } } @@ -161,7 +184,7 @@ public: } } else if (!only_live()) { if (is_live) { - if (_rows_in_current_partition == _current_partition_limit) { + if (!sstable_compaction() && _rows_in_current_partition == _current_partition_limit) { return stop_iteration::yes; } _rows_in_current_partition++; @@ -204,7 +227,9 @@ public: _row_limit -= _rows_in_current_partition; _partition_limit -= 1; _consumer.consume_end_of_partition(); - return _row_limit && _partition_limit ? stop_iteration::no : stop_iteration::yes; + if (!sstable_compaction()) { + return _row_limit && _partition_limit ? stop_iteration::no : stop_iteration::yes; + } } return stop_iteration::no; } @@ -213,3 +238,13 @@ public: return _consumer.consume_end_of_stream(); } }; + +template +struct compact_for_query : compact_mutation { + using compact_mutation::compact_mutation; +}; + +template +struct compact_for_compaction : compact_mutation { + using compact_mutation::compact_mutation; +}; \ No newline at end of file diff --git a/mutation_partition.cc b/mutation_partition.cc index 8f48813bd7..564e7f3dcd 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1799,7 +1799,7 @@ future data_query(schema_ptr s, const mutation_source& source auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); auto qrb = query_result_builder(*s, builder); - auto cfq = compact_mutation(*s, query_time, slice, row_limit, partition_limit, std::move(qrb)); + auto cfq = compact_for_query(*s, query_time, slice, row_limit, partition_limit, std::move(qrb)); auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority()); return consume_flattened(std::move(reader), std::move(cfq), is_reversed); @@ -1875,7 +1875,7 @@ mutation_query(schema_ptr s, auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); auto rrb = reconcilable_result_builder(*s, slice); - auto cfq = compact_mutation(*s, query_time, slice, row_limit, partition_limit, std::move(rrb)); + auto cfq = compact_for_query(*s, query_time, slice, row_limit, partition_limit, std::move(rrb)); auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority()); return consume_flattened(std::move(reader), std::move(cfq), is_reversed);