From 0740019e4dad09c85d93b455a3b7a594bd29d705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 17 Mar 2022 10:07:23 +0200 Subject: [PATCH] db/view: migrate view_update_builder to v2 To avoid noise, the interface is left as v1 and inbound readers are converted in the constructor. --- db/view/view.cc | 44 ++++++++++++++++++++------------------------ db/view/view.hh | 22 ++++++++++++---------- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 1f1ca40ea5..665e9ad99a 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -875,7 +875,7 @@ future<> view_update_builder::close() noexcept { } future view_update_builder::advance_all() { - auto existings_f = _existings ? (*_existings)() : make_ready_future>(); + auto existings_f = _existings ? (*_existings)() : make_ready_future(); return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable { _update = std::move(std::get<0>(fragments).get0()); _existing = std::move(std::get<1>(fragments).get0()); @@ -910,11 +910,11 @@ future> view_update_builder::b bool do_advance_existings = false; if (_update && _update->is_partition_start()) { _key = std::move(std::move(_update)->as_partition_start().key().key()); - _update_tombstone_tracker.set_partition_tombstone(_update->as_partition_start().partition_tombstone()); + _update_partition_tombstone = _update->as_partition_start().partition_tombstone(); do_advance_updates = true; } if (_existing && _existing->is_partition_start()) { - _existing_tombstone_tracker.set_partition_tombstone(_existing->as_partition_start().partition_tombstone()); + _existing_partition_tombstone = _existing->as_partition_start().partition_tombstone(); do_advance_existings = true; } if (do_advance_updates) { @@ -960,10 +960,6 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional } } -static void apply_tracked_tombstones(range_tombstone_accumulator& tracker, clustering_row& row) { - row.apply(tracker.tombstone_for_row(row.key())); -} - future view_update_builder::on_results() { constexpr size_t max_rows_for_view_updates = 100; size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) { @@ -975,12 +971,12 @@ future view_update_builder::on_results() { auto cmp = position_in_partition::tri_compare(*_schema)(_update->position(), _existing->position()); if (cmp < 0) { // We have an update where there was nothing before - if (_update->is_range_tombstone()) { - _update_tombstone_tracker.apply(std::move(_update->as_range_tombstone())); + if (_update->is_range_tombstone_change()) { + _update_current_tombstone = _update->as_range_tombstone_change().tombstone(); } else if (_update->is_clustering_row()) { auto update = std::move(*_update).as_clustering_row(); - apply_tracked_tombstones(_update_tombstone_tracker, update); - auto tombstone = _existing_tombstone_tracker.current_tombstone(); + update.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); + auto tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); auto existing = tombstone ? std::optional(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()) : std::nullopt; @@ -991,12 +987,12 @@ future view_update_builder::on_results() { if (cmp > 0) { // We have something existing but no update (which will happen either because it's a range tombstone marker in // existing, or because we've fetched the existing row due to some partition/range deletion in the updates) - if (_existing->is_range_tombstone()) { - _existing_tombstone_tracker.apply(std::move(_existing->as_range_tombstone())); + if (_existing->is_range_tombstone_change()) { + _existing_current_tombstone = _existing->as_range_tombstone_change().tombstone(); } else if (_existing->is_clustering_row()) { auto existing = std::move(*_existing).as_clustering_row(); - apply_tracked_tombstones(_existing_tombstone_tracker, existing); - auto tombstone = _update_tombstone_tracker.current_tombstone(); + existing.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); + auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); // The way we build the read command used for existing rows, we should always have a non-empty // tombstone, since we wouldn't have read the existing row otherwise. We don't assert that in case the // read method ever changes. @@ -1008,24 +1004,24 @@ future view_update_builder::on_results() { return stop_updates ? stop () : advance_existings(); } // We're updating a row that had pre-existing data - if (_update->is_range_tombstone()) { - assert(_existing->is_range_tombstone()); - _existing_tombstone_tracker.apply(std::move(*_existing).as_range_tombstone()); - _update_tombstone_tracker.apply(std::move(*_update).as_range_tombstone()); + if (_update->is_range_tombstone_change()) { + assert(_existing->is_range_tombstone_change()); + _existing_current_tombstone = std::move(*_existing).as_range_tombstone_change().tombstone(); + _update_current_tombstone = std::move(*_update).as_range_tombstone_change().tombstone(); } else if (_update->is_clustering_row()) { assert(_existing->is_clustering_row()); _update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - apply_tracked_tombstones(_update_tombstone_tracker, cr); + cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); }); _existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - apply_tracked_tombstones(_existing_tombstone_tracker, cr); + cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); }); generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() }); } return stop_updates ? stop() : advance_all(); } - auto tombstone = _update_tombstone_tracker.current_tombstone(); + auto tombstone = std::max(_update_partition_tombstone, _update_current_tombstone); if (tombstone && _existing && !_existing->is_end_of_partition()) { // We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted if (_existing->is_clustering_row()) { @@ -1040,9 +1036,9 @@ future view_update_builder::on_results() { if (_update && !_update->is_end_of_partition()) { if (_update->is_clustering_row()) { _update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable { - apply_tracked_tombstones(_update_tombstone_tracker, cr); + cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone)); }); - auto existing_tombstone = _existing_tombstone_tracker.current_tombstone(); + auto existing_tombstone = std::max(_existing_partition_tombstone, _existing_current_tombstone); auto existing = existing_tombstone ? std::optional(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) : std::nullopt; diff --git a/db/view/view.hh b/db/view/view.hh index c321a7dead..74e78eb0aa 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -13,6 +13,8 @@ #include "query-request.hh" #include "schema_fwd.hh" #include "readers/flat_mutation_reader.hh" +#include "readers/flat_mutation_reader_v2.hh" +#include "readers/conversion.hh" #include "frozen_mutation.hh" class frozen_mutation_and_schema; @@ -163,12 +165,14 @@ private: class view_update_builder { schema_ptr _schema; // The base schema std::vector _view_updates; - flat_mutation_reader _updates; - flat_mutation_reader_opt _existings; - range_tombstone_accumulator _update_tombstone_tracker; - range_tombstone_accumulator _existing_tombstone_tracker; - mutation_fragment_opt _update; - mutation_fragment_opt _existing; + flat_mutation_reader_v2 _updates; + flat_mutation_reader_v2_opt _existings; + tombstone _update_partition_tombstone; + tombstone _update_current_tombstone; + tombstone _existing_partition_tombstone; + tombstone _existing_current_tombstone; + mutation_fragment_v2_opt _update; + mutation_fragment_v2_opt _existing; gc_clock::time_point _now; partition_key _key = partition_key::make_empty(); public: @@ -180,10 +184,8 @@ public: gc_clock::time_point now) : _schema(std::move(s)) , _view_updates(std::move(views_to_update)) - , _updates(std::move(updates)) - , _existings(std::move(existings)) - , _update_tombstone_tracker(*_schema) - , _existing_tombstone_tracker(*_schema) + , _updates(upgrade_to_v2(std::move(updates))) + , _existings(existings ? upgrade_to_v2(std::move(*existings)) : flat_mutation_reader_v2_opt{}) , _now(now) { } view_update_builder(view_update_builder&& other) noexcept = default;