view: fix range tombstone handling on flushes in view_updating_consumer
View update routines accept `mutation` objects. But what comes out of staging sstable readers is a stream of mutation_fragment_v2 objects. To build view updates after a repair/streaming, we have to convert the fragment stream into `mutation`s. This is done by piping the stream to mutation_rebuilder_v2. To keep memory usage limited, the stream for a single partition might have to be split into multiple partial `mutation` objects. view_update_consumer does that, but in improper way -- when the split/flush happens inside an active range tombstone, the range tombstone isn't closed properly. This is illegal, and triggers an internal error. This patch fixes the problem by closing the active range tombstone (and reopening in the same position in the next `mutation` object). The tombstone is closed just after the last seen clustered position. This is not necessary for correctness -- for example we could delay all processing of the range tombstone until we see its end bound -- but it seems like the most natural semantic. Fixes #14503
This commit is contained in:
@@ -2595,6 +2595,10 @@ void view_updating_consumer::do_flush_buffer() {
|
||||
}
|
||||
|
||||
void view_updating_consumer::flush_builder() {
|
||||
_buffer.emplace_back(_mut_builder->flush());
|
||||
}
|
||||
|
||||
void view_updating_consumer::end_builder() {
|
||||
_mut_builder->consume_end_of_partition();
|
||||
if (auto mut_opt = _mut_builder->consume_end_of_stream()) {
|
||||
_buffer.emplace_back(std::move(*mut_opt));
|
||||
@@ -2605,9 +2609,7 @@ void view_updating_consumer::flush_builder() {
|
||||
void view_updating_consumer::maybe_flush_buffer_mid_partition() {
|
||||
if (_buffer_size >= buffer_size_hard_limit) {
|
||||
flush_builder();
|
||||
auto dk = _buffer.back().decorated_key();
|
||||
do_flush_buffer();
|
||||
consume_new_partition(dk);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -51,6 +51,7 @@ private:
|
||||
private:
|
||||
void do_flush_buffer();
|
||||
void flush_builder();
|
||||
void end_builder();
|
||||
void maybe_flush_buffer_mid_partition();
|
||||
|
||||
public:
|
||||
@@ -115,7 +116,7 @@ public:
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
flush_builder();
|
||||
end_builder();
|
||||
if (_buffer_size >= buffer_size_soft_limit) {
|
||||
do_flush_buffer();
|
||||
}
|
||||
|
||||
@@ -55,6 +55,17 @@ public:
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
// Might only be called between consume_new_partition()
|
||||
// and consume_end_of_partition().
|
||||
//
|
||||
// Returns (and forgets) the partition contents consumed so far.
|
||||
// Can be used to split the processing of a large mutation into
|
||||
// multiple smaller `mutation` objects (which add up to the full mutation).
|
||||
mutation flush() {
|
||||
assert(_m);
|
||||
return std::exchange(*_m, mutation(_s, _m->decorated_key()));
|
||||
}
|
||||
|
||||
mutation_opt consume_end_of_stream() {
|
||||
return std::move(_m);
|
||||
}
|
||||
@@ -67,6 +78,7 @@ class mutation_rebuilder_v2 {
|
||||
schema_ptr _s;
|
||||
mutation_rebuilder _builder;
|
||||
range_tombstone_assembler _rt_assembler;
|
||||
position_in_partition _pos = position_in_partition::before_all_clustered_rows();
|
||||
public:
|
||||
mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { }
|
||||
public:
|
||||
@@ -91,6 +103,7 @@ public:
|
||||
}
|
||||
|
||||
stop_iteration consume(range_tombstone_change&& rt) {
|
||||
_pos = rt.position();
|
||||
if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) {
|
||||
_builder.consume(std::move(*rt_opt));
|
||||
}
|
||||
@@ -103,6 +116,7 @@ public:
|
||||
}
|
||||
|
||||
stop_iteration consume(clustering_row&& cr) {
|
||||
_pos = position_in_partition::after_key(*_s, cr.position());
|
||||
_builder.consume(std::move(cr));
|
||||
return stop_iteration::no;
|
||||
}
|
||||
@@ -116,4 +130,22 @@ public:
|
||||
_rt_assembler.on_end_of_stream();
|
||||
return _builder.consume_end_of_stream();
|
||||
}
|
||||
|
||||
// Might only be called between consume_new_partition()
|
||||
// and consume_end_of_partition().
|
||||
//
|
||||
// Returns (and forgets) the partition contents consumed so far.
|
||||
// Can be used to split the processing of a large mutation into
|
||||
// multiple smaller `mutation` objects (which add up to the full mutation).
|
||||
//
|
||||
// The active range tombstone (if present) is flushed with end bound
|
||||
// just after the last seen clustered position, but the range tombstone
|
||||
// remains active, and the next mutation will see it restarted at the
|
||||
// position it was flushed at.
|
||||
mutation flush() {
|
||||
if (auto rt_opt = _rt_assembler.flush(*_s, _pos)) {
|
||||
_builder.consume(std::move(*rt_opt));
|
||||
}
|
||||
return _builder.flush();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3027,6 +3027,75 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) {
|
||||
}
|
||||
}
|
||||
|
||||
// Tests mutation_rebuilder_v2::flush().
|
||||
SEASTAR_THREAD_TEST_CASE(test_mutation_rebuilder_v2_flush) {
|
||||
simple_schema ss;
|
||||
schema_ptr s = ss.schema();
|
||||
auto pk = ss.make_pkey();
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto p = semaphore.make_permit();
|
||||
|
||||
// Main idea of the test: we prepare a stream with all "interesting"
|
||||
// situations (with respect to positions), for example:
|
||||
// - RTC right before and after a key
|
||||
// - Overlapping RTCs
|
||||
// - Keys without a RTC in between, but with an active RTC from before
|
||||
// - Keys without a RTC in between, but without an active RTC from before
|
||||
// etc.
|
||||
//
|
||||
// Then we pass this stream through mutation_rebuilder_v2 with two flushes
|
||||
// in between (on all possible positions), and check that the result is
|
||||
// the same as without flushes.
|
||||
auto frags = std::vector<mutation_fragment_v2>();
|
||||
frags.emplace_back(*s, p, partition_start(pk, {}));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_all_clustered_rows(), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(0)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(1)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(1)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(2)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(3)), tombstone{}));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(3)));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(4)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(4)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(5)), ss.new_tombstone()));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(5)));
|
||||
frags.emplace_back(*s, p, clustering_row(ss.make_ckey(6)));
|
||||
frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_all_clustered_rows(), tombstone{}));
|
||||
frags.emplace_back(*s, p, partition_end());
|
||||
|
||||
mutation_rebuilder_v2 rebuilder_without_flush(s);
|
||||
for (int i = 0; i < frags.size(); ++i) {
|
||||
rebuilder_without_flush.consume(mutation_fragment_v2(*s, p, frags[i]));
|
||||
}
|
||||
auto m_expected = std::move(*rebuilder_without_flush.consume_end_of_stream());
|
||||
|
||||
// We do two flushes (we test all possible combinations of their positions,
|
||||
// including no flush).
|
||||
// This is to test that the first flush doesn't break the rebuilder in
|
||||
// a way that prevents another flush.
|
||||
for (int first_flush = 0; first_flush < frags.size(); ++first_flush) {
|
||||
for (int second_flush = first_flush; second_flush < frags.size(); ++second_flush) {
|
||||
mutation_rebuilder_v2 rebuilder(s);
|
||||
auto m1 = mutation(s, pk); // Contents of flush 1.
|
||||
auto m2 = mutation(s, pk); // Contents of flush 2.
|
||||
auto m3 = mutation(s, pk); // Contents of final flush.
|
||||
for (int i = 0; i < frags.size(); ++i) {
|
||||
rebuilder.consume(mutation_fragment_v2(*s, p, frags[i]));
|
||||
if (i == first_flush) {
|
||||
m1 = rebuilder.flush();
|
||||
}
|
||||
if (i == second_flush) {
|
||||
m2 = rebuilder.flush();
|
||||
}
|
||||
}
|
||||
m3 = std::move(*rebuilder.consume_end_of_stream());
|
||||
assert_that(m1 + m2 + m3).is_equal_to(m_expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(mutation_with_dummy_clustering_row_is_consumed_monotonically) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
|
||||
Reference in New Issue
Block a user