diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 924c7fff0b..88fd1512e3 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -44,14 +44,14 @@ enum class compact_for_sstables { template concept bool CompactedMutationsConsumer() { return requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr, - clustering_row cr, range_tombstone_begin rt, tombstone current_tombstone, bool is_alive) + clustering_row cr, range_tombstone rt, tombstone current_tombstone, bool is_alive) { obj.consume_new_partition(dk); obj.consume(t); - obj.consume(std::move(sr), current_tombstone, is_alive); - obj.consume(std::move(cr), current_tombstone, is_alive); - obj.consume(std::move(rt)); - obj.consume_end_of_partition(); + { obj.consume(std::move(sr), current_tombstone, is_alive) } ->stop_iteration; + { obj.consume(std::move(cr), current_tombstone, is_alive) } ->stop_iteration; + { obj.consume(std::move(rt)) } ->stop_iteration; + { obj.consume_end_of_partition() } ->stop_iteration; obj.consume_end_of_stream(); }; } @@ -177,7 +177,7 @@ public: _static_row_live = is_live; if (is_live || (!only_live() && !sr.empty())) { partition_is_not_empty(); - _consumer.consume(std::move(sr), current_tombstone, is_live); + return _consumer.consume(std::move(sr), current_tombstone, is_live); } return stop_iteration::no; } @@ -193,10 +193,11 @@ public: is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before); if (only_live() && is_live) { partition_is_not_empty(); - _consumer.consume(std::move(cr), t, true); + auto stop = _consumer.consume(std::move(cr), t, true); if (++_rows_in_current_partition == _current_partition_limit) { return stop_iteration::yes; } + return stop; } else if (!only_live()) { if (is_live) { if (!sstable_compaction() && _rows_in_current_partition == _current_partition_limit) { @@ -206,7 +207,7 @@ public: } if (!cr.empty()) { partition_is_not_empty(); - _consumer.consume(std::move(cr), t, is_live); + return _consumer.consume(std::move(cr), t, is_live); } } return stop_iteration::no; @@ -217,7 +218,7 @@ public: // FIXME: drop tombstone if it is fully covered by other range tombstones if (!can_purge_tombstone(rt.tomb) && rt.tomb > _range_tombstones.get_partition_tombstone()) { partition_is_not_empty(); - _consumer.consume(std::move(rt)); + return _consumer.consume(std::move(rt)); } return stop_iteration::no; } @@ -232,9 +233,10 @@ public: _row_limit -= _rows_in_current_partition; _partition_limit -= _rows_in_current_partition > 0; - _consumer.consume_end_of_partition(); + auto stop = _consumer.consume_end_of_partition(); if (!sstable_compaction()) { - return _row_limit && _partition_limit ? stop_iteration::no : stop_iteration::yes; + return _row_limit && _partition_limit && stop != stop_iteration::yes + ? stop_iteration::no : stop_iteration::yes; } } return stop_iteration::no; diff --git a/mutation_partition.cc b/mutation_partition.cc index fb3a77693e..b827fb6026 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1791,20 +1791,21 @@ public: void consume(tombstone t) { _mutation_consumer->consume(t); } - void consume(static_row&& sr, tombstone t, bool) { - _mutation_consumer->consume(std::move(sr), t); + stop_iteration consume(static_row&& sr, tombstone t, bool) { + return _mutation_consumer->consume(std::move(sr), t); } - void consume(clustering_row&& cr, tombstone t, bool) { - _mutation_consumer->consume(std::move(cr), t); + stop_iteration consume(clustering_row&& cr, tombstone t, bool) { + return _mutation_consumer->consume(std::move(cr), t); } - void consume(range_tombstone&& rt) { - _mutation_consumer->consume(std::move(rt)); + stop_iteration consume(range_tombstone&& rt) { + return _mutation_consumer->consume(std::move(rt)); } - void consume_end_of_partition() { + stop_iteration consume_end_of_partition() { auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream(); _live_rows += live_rows_in_partition; _partitions += live_rows_in_partition > 0; + return stop_iteration::no; } data_query_result consume_end_of_stream() { @@ -1856,24 +1857,25 @@ public: void consume(tombstone t) { _mutation_consumer->consume(t); } - void consume(static_row&& sr, tombstone, bool is_alive) { + stop_iteration consume(static_row&& sr, tombstone, bool is_alive) { _static_row_is_alive = is_alive; - _mutation_consumer->consume(std::move(sr)); + return _mutation_consumer->consume(std::move(sr)); } - void consume(clustering_row&& cr, tombstone, bool is_alive) { + stop_iteration consume(clustering_row&& cr, tombstone, bool is_alive) { _live_rows += is_alive; - _mutation_consumer->consume(std::move(cr)); + return _mutation_consumer->consume(std::move(cr)); } - void consume(range_tombstone&& rt) { - _mutation_consumer->consume(std::move(rt)); + stop_iteration consume(range_tombstone&& rt) { + return _mutation_consumer->consume(std::move(rt)); } - void consume_end_of_partition() { + stop_iteration consume_end_of_partition() { if (_live_rows == 0 && _static_row_is_alive && !_has_ck_selector) { ++_live_rows; } _total_live_rows += _live_rows; _result.emplace_back(partition { _live_rows, _mutation_consumer->consume_end_of_stream() }); + return stop_iteration::no; } reconcilable_result consume_end_of_stream() {