diff --git a/db/view/view.cc b/db/view/view.cc index 7ad15e47bc..9a6f771a9b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2595,6 +2595,10 @@ void view_updating_consumer::do_flush_buffer() { } void view_updating_consumer::flush_builder() { + _buffer.emplace_back(_mut_builder->flush()); +} + +void view_updating_consumer::end_builder() { _mut_builder->consume_end_of_partition(); if (auto mut_opt = _mut_builder->consume_end_of_stream()) { _buffer.emplace_back(std::move(*mut_opt)); @@ -2605,9 +2609,7 @@ void view_updating_consumer::flush_builder() { void view_updating_consumer::maybe_flush_buffer_mid_partition() { if (_buffer_size >= buffer_size_hard_limit) { flush_builder(); - auto dk = _buffer.back().decorated_key(); do_flush_buffer(); - consume_new_partition(dk); } } diff --git a/db/view/view_updating_consumer.hh b/db/view/view_updating_consumer.hh index 3c53cf5959..00b7373358 100644 --- a/db/view/view_updating_consumer.hh +++ b/db/view/view_updating_consumer.hh @@ -51,6 +51,7 @@ private: private: void do_flush_buffer(); void flush_builder(); + void end_builder(); void maybe_flush_buffer_mid_partition(); public: @@ -115,7 +116,7 @@ public: if (_as->abort_requested()) { return stop_iteration::yes; } - flush_builder(); + end_builder(); if (_buffer_size >= buffer_size_soft_limit) { do_flush_buffer(); } diff --git a/mutation/mutation_rebuilder.hh b/mutation/mutation_rebuilder.hh index 59c5c76f1f..c3b3db6135 100644 --- a/mutation/mutation_rebuilder.hh +++ b/mutation/mutation_rebuilder.hh @@ -55,6 +55,17 @@ public: return stop_iteration::yes; } + // Might only be called between consume_new_partition() + // and consume_end_of_partition(). + // + // Returns (and forgets) the partition contents consumed so far. + // Can be used to split the processing of a large mutation into + // multiple smaller `mutation` objects (which add up to the full mutation). + mutation flush() { + assert(_m); + return std::exchange(*_m, mutation(_s, _m->decorated_key())); + } + mutation_opt consume_end_of_stream() { return std::move(_m); } @@ -67,6 +78,7 @@ class mutation_rebuilder_v2 { schema_ptr _s; mutation_rebuilder _builder; range_tombstone_assembler _rt_assembler; + position_in_partition _pos = position_in_partition::before_all_clustered_rows(); public: mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { } public: @@ -91,6 +103,7 @@ public: } stop_iteration consume(range_tombstone_change&& rt) { + _pos = rt.position(); if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) { _builder.consume(std::move(*rt_opt)); } @@ -103,6 +116,7 @@ public: } stop_iteration consume(clustering_row&& cr) { + _pos = position_in_partition::after_key(*_s, cr.position()); _builder.consume(std::move(cr)); return stop_iteration::no; } @@ -116,4 +130,22 @@ public: _rt_assembler.on_end_of_stream(); return _builder.consume_end_of_stream(); } + + // Might only be called between consume_new_partition() + // and consume_end_of_partition(). + // + // Returns (and forgets) the partition contents consumed so far. + // Can be used to split the processing of a large mutation into + // multiple smaller `mutation` objects (which add up to the full mutation). + // + // The active range tombstone (if present) is flushed with end bound + // just after the last seen clustered position, but the range tombstone + // remains active, and the next mutation will see it restarted at the + // position it was flushed at. + mutation flush() { + if (auto rt_opt = _rt_assembler.flush(*_s, _pos)) { + _builder.consume(std::move(*rt_opt)); + } + return _builder.flush(); + } }; diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 3837dbc642..46077dc03c 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -3027,6 +3027,75 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) { } } +// Tests mutation_rebuilder_v2::flush(). +SEASTAR_THREAD_TEST_CASE(test_mutation_rebuilder_v2_flush) { + simple_schema ss; + schema_ptr s = ss.schema(); + auto pk = ss.make_pkey(); + tests::reader_concurrency_semaphore_wrapper semaphore; + auto p = semaphore.make_permit(); + + // Main idea of the test: we prepare a stream with all "interesting" + // situations (with respect to positions), for example: + // - RTC right before and after a key + // - Overlapping RTCs + // - Keys without a RTC in between, but with an active RTC from before + // - Keys without a RTC in between, but without an active RTC from before + // etc. + // + // Then we pass this stream through mutation_rebuilder_v2 with two flushes + // in between (on all possible positions), and check that the result is + // the same as without flushes. + auto frags = std::vector(); + frags.emplace_back(*s, p, partition_start(pk, {})); + frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_all_clustered_rows(), ss.new_tombstone())); + frags.emplace_back(*s, p, clustering_row(ss.make_ckey(0))); + frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(1)), ss.new_tombstone())); + frags.emplace_back(*s, p, clustering_row(ss.make_ckey(1))); + frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone())); + frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone())); + frags.emplace_back(*s, p, clustering_row(ss.make_ckey(2))); + frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(3)), tombstone{})); + frags.emplace_back(*s, p, clustering_row(ss.make_ckey(3))); + frags.emplace_back(*s, p, clustering_row(ss.make_ckey(4))); + frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(4)), ss.new_tombstone())); + frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(5)), ss.new_tombstone())); + frags.emplace_back(*s, p, clustering_row(ss.make_ckey(5))); + frags.emplace_back(*s, p, clustering_row(ss.make_ckey(6))); + frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_all_clustered_rows(), tombstone{})); + frags.emplace_back(*s, p, partition_end()); + + mutation_rebuilder_v2 rebuilder_without_flush(s); + for (int i = 0; i < frags.size(); ++i) { + rebuilder_without_flush.consume(mutation_fragment_v2(*s, p, frags[i])); + } + auto m_expected = std::move(*rebuilder_without_flush.consume_end_of_stream()); + + // We do two flushes (we test all possible combinations of their positions, + // including no flush). + // This is to test that the first flush doesn't break the rebuilder in + // a way that prevents another flush. + for (int first_flush = 0; first_flush < frags.size(); ++first_flush) { + for (int second_flush = first_flush; second_flush < frags.size(); ++second_flush) { + mutation_rebuilder_v2 rebuilder(s); + auto m1 = mutation(s, pk); // Contents of flush 1. + auto m2 = mutation(s, pk); // Contents of flush 2. + auto m3 = mutation(s, pk); // Contents of final flush. + for (int i = 0; i < frags.size(); ++i) { + rebuilder.consume(mutation_fragment_v2(*s, p, frags[i])); + if (i == first_flush) { + m1 = rebuilder.flush(); + } + if (i == second_flush) { + m2 = rebuilder.flush(); + } + } + m3 = std::move(*rebuilder.consume_end_of_stream()); + assert_that(m1 + m2 + m3).is_equal_to(m_expected); + } + } +} + SEASTAR_TEST_CASE(mutation_with_dummy_clustering_row_is_consumed_monotonically) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore;