mutation_compactor: honour stop_iteration from consumers

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
Paweł Dziepak
2016-11-17 16:52:15 +00:00
parent 5d7185fd39
commit 34f9eb4cbd
2 changed files with 29 additions and 25 deletions

View File

@@ -44,14 +44,14 @@ enum class compact_for_sstables {
template<typename T>
concept bool CompactedMutationsConsumer() {
return requires(T obj, tombstone t, const dht::decorated_key& dk, static_row sr,
clustering_row cr, range_tombstone_begin rt, tombstone current_tombstone, bool is_alive)
clustering_row cr, range_tombstone rt, tombstone current_tombstone, bool is_alive)
{
obj.consume_new_partition(dk);
obj.consume(t);
obj.consume(std::move(sr), current_tombstone, is_alive);
obj.consume(std::move(cr), current_tombstone, is_alive);
obj.consume(std::move(rt));
obj.consume_end_of_partition();
{ obj.consume(std::move(sr), current_tombstone, is_alive) } ->stop_iteration;
{ obj.consume(std::move(cr), current_tombstone, is_alive) } ->stop_iteration;
{ obj.consume(std::move(rt)) } ->stop_iteration;
{ obj.consume_end_of_partition() } ->stop_iteration;
obj.consume_end_of_stream();
};
}
@@ -177,7 +177,7 @@ public:
_static_row_live = is_live;
if (is_live || (!only_live() && !sr.empty())) {
partition_is_not_empty();
_consumer.consume(std::move(sr), current_tombstone, is_live);
return _consumer.consume(std::move(sr), current_tombstone, is_live);
}
return stop_iteration::no;
}
@@ -193,10 +193,11 @@ public:
is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before);
if (only_live() && is_live) {
partition_is_not_empty();
_consumer.consume(std::move(cr), t, true);
auto stop = _consumer.consume(std::move(cr), t, true);
if (++_rows_in_current_partition == _current_partition_limit) {
return stop_iteration::yes;
}
return stop;
} else if (!only_live()) {
if (is_live) {
if (!sstable_compaction() && _rows_in_current_partition == _current_partition_limit) {
@@ -206,7 +207,7 @@ public:
}
if (!cr.empty()) {
partition_is_not_empty();
_consumer.consume(std::move(cr), t, is_live);
return _consumer.consume(std::move(cr), t, is_live);
}
}
return stop_iteration::no;
@@ -217,7 +218,7 @@ public:
// FIXME: drop tombstone if it is fully covered by other range tombstones
if (!can_purge_tombstone(rt.tomb) && rt.tomb > _range_tombstones.get_partition_tombstone()) {
partition_is_not_empty();
_consumer.consume(std::move(rt));
return _consumer.consume(std::move(rt));
}
return stop_iteration::no;
}
@@ -232,9 +233,10 @@ public:
_row_limit -= _rows_in_current_partition;
_partition_limit -= _rows_in_current_partition > 0;
_consumer.consume_end_of_partition();
auto stop = _consumer.consume_end_of_partition();
if (!sstable_compaction()) {
return _row_limit && _partition_limit ? stop_iteration::no : stop_iteration::yes;
return _row_limit && _partition_limit && stop != stop_iteration::yes
? stop_iteration::no : stop_iteration::yes;
}
}
return stop_iteration::no;

View File

@@ -1791,20 +1791,21 @@ public:
void consume(tombstone t) {
_mutation_consumer->consume(t);
}
void consume(static_row&& sr, tombstone t, bool) {
_mutation_consumer->consume(std::move(sr), t);
stop_iteration consume(static_row&& sr, tombstone t, bool) {
return _mutation_consumer->consume(std::move(sr), t);
}
void consume(clustering_row&& cr, tombstone t, bool) {
_mutation_consumer->consume(std::move(cr), t);
stop_iteration consume(clustering_row&& cr, tombstone t, bool) {
return _mutation_consumer->consume(std::move(cr), t);
}
void consume(range_tombstone&& rt) {
_mutation_consumer->consume(std::move(rt));
stop_iteration consume(range_tombstone&& rt) {
return _mutation_consumer->consume(std::move(rt));
}
void consume_end_of_partition() {
stop_iteration consume_end_of_partition() {
auto live_rows_in_partition = _mutation_consumer->consume_end_of_stream();
_live_rows += live_rows_in_partition;
_partitions += live_rows_in_partition > 0;
return stop_iteration::no;
}
data_query_result consume_end_of_stream() {
@@ -1856,24 +1857,25 @@ public:
void consume(tombstone t) {
_mutation_consumer->consume(t);
}
void consume(static_row&& sr, tombstone, bool is_alive) {
stop_iteration consume(static_row&& sr, tombstone, bool is_alive) {
_static_row_is_alive = is_alive;
_mutation_consumer->consume(std::move(sr));
return _mutation_consumer->consume(std::move(sr));
}
void consume(clustering_row&& cr, tombstone, bool is_alive) {
stop_iteration consume(clustering_row&& cr, tombstone, bool is_alive) {
_live_rows += is_alive;
_mutation_consumer->consume(std::move(cr));
return _mutation_consumer->consume(std::move(cr));
}
void consume(range_tombstone&& rt) {
_mutation_consumer->consume(std::move(rt));
stop_iteration consume(range_tombstone&& rt) {
return _mutation_consumer->consume(std::move(rt));
}
void consume_end_of_partition() {
stop_iteration consume_end_of_partition() {
if (_live_rows == 0 && _static_row_is_alive && !_has_ck_selector) {
++_live_rows;
}
_total_live_rows += _live_rows;
_result.emplace_back(partition { _live_rows, _mutation_consumer->consume_end_of_stream() });
return stop_iteration::no;
}
reconcilable_result consume_end_of_stream() {