mutation_compactor: add validator
The mutation compactor is used on most read-paths we have, so adding a validator to it gives us a good coverage, in particular it gives us full coverage of queries and compaction. The validator validates mutation token (and mutation fragment kind) monotonicity as that is quite cheap, while it is enough to catch the most common problems. As we already have a validator on the compaction path (in the sstable writer), the validator is disabled when the mutation compactor is instantiated for compaction. We should probably make this configurable at some point. The addition of this validator should prevent the worst of the fragment reordering bugs to affect reads.
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
|
||||
#include "compaction/compaction_garbage_collector.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
#include "mutation_fragment_stream_validator.hh"
|
||||
#include "range_tombstone_assembler.hh"
|
||||
#include "tombstone_gc.hh"
|
||||
#include "full_position.hh"
|
||||
@@ -168,12 +169,15 @@ class compact_mutation_state {
|
||||
|
||||
compaction_stats _stats;
|
||||
|
||||
mutation_fragment_stream_validating_filter _validator;
|
||||
|
||||
// Remember if we requested to stop mid-partition.
|
||||
stop_iteration _stop = stop_iteration::no;
|
||||
private:
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
|
||||
stop_iteration do_consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) {
|
||||
_validator(mutation_fragment_v2::kind::range_tombstone_change, rtc.position(), rtc.tombstone());
|
||||
stop_iteration gc_consumer_stop = stop_iteration::no;
|
||||
stop_iteration consumer_stop = stop_iteration::no;
|
||||
if (rtc.tombstone() <= _partition_tombstone) {
|
||||
@@ -276,6 +280,7 @@ public:
|
||||
, _tombstone_gc_state(nullptr)
|
||||
, _last_dk({dht::token(), partition_key::make_empty()})
|
||||
, _last_pos(position_in_partition::for_partition_end())
|
||||
, _validator("mutation_compactor for read", _schema, mutation_fragment_stream_validation_level::token)
|
||||
{
|
||||
static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction.");
|
||||
}
|
||||
@@ -292,11 +297,15 @@ public:
|
||||
, _last_dk({dht::token(), partition_key::make_empty()})
|
||||
, _last_pos(position_in_partition::for_partition_end())
|
||||
, _collector(std::make_unique<mutation_compactor_garbage_collector>(_schema))
|
||||
// We already have a validator for compaction in the sstable writer, no need to validate twice
|
||||
, _validator("mutation_compactor for compaction", _schema, mutation_fragment_stream_validation_level::none)
|
||||
{
|
||||
static_assert(sstable_compaction(), "This constructor can only be used for sstable compaction.");
|
||||
}
|
||||
|
||||
void consume_new_partition(const dht::decorated_key& dk) {
|
||||
_validator(mutation_fragment_v2::kind::partition_start, position_in_partition_view::for_partition_start(), {});
|
||||
_validator(dk);
|
||||
_stop = stop_iteration::no;
|
||||
auto& pk = dk.key();
|
||||
_dk = &dk;
|
||||
@@ -338,6 +347,7 @@ public:
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
|
||||
stop_iteration consume(static_row&& sr, Consumer& consumer, GCConsumer& gc_consumer) {
|
||||
_validator(mutation_fragment_v2::kind::static_row, sr.position(), {});
|
||||
_last_static_row = static_row(_schema, sr);
|
||||
_last_pos = position_in_partition(position_in_partition::static_row_tag_t());
|
||||
auto current_tombstone = _partition_tombstone;
|
||||
@@ -370,6 +380,7 @@ public:
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
|
||||
stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) {
|
||||
_validator(mutation_fragment_v2::kind::clustering_row, cr.position(), {});
|
||||
if (!sstable_compaction()) {
|
||||
_last_pos = cr.position();
|
||||
}
|
||||
@@ -441,6 +452,7 @@ public:
|
||||
do_consume(std::move(rtc), consumer, gc_consumer);
|
||||
_effective_tombstone = prev_tombstone;
|
||||
}
|
||||
_validator.on_end_of_partition();
|
||||
if (!_empty_partition_in_gc_consumer) {
|
||||
gc_consumer.consume_end_of_partition();
|
||||
}
|
||||
@@ -466,6 +478,7 @@ public:
|
||||
template <typename Consumer, typename GCConsumer>
|
||||
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
|
||||
auto consume_end_of_stream(Consumer& consumer, GCConsumer& gc_consumer) {
|
||||
_validator.on_end_of_stream();
|
||||
if (_dk) {
|
||||
_last_dk = *_dk;
|
||||
_dk = &_last_dk;
|
||||
@@ -518,6 +531,9 @@ public:
|
||||
|
||||
noop_compacted_fragments_consumer nc;
|
||||
|
||||
if (next_fragment_region != partition_region::partition_start) {
|
||||
_validator.reset(mutation_fragment_v2::kind::partition_start, position_in_partition_view::for_partition_start(), {});
|
||||
}
|
||||
if (next_fragment_region == partition_region::clustered && _last_static_row) {
|
||||
// Stopping here would cause an infinite loop so ignore return value.
|
||||
consume(*std::exchange(_last_static_row, {}), consumer, nc);
|
||||
|
||||
Reference in New Issue
Block a user