diff --git a/db/view/view.cc b/db/view/view.cc index 2fad19d42f..d381297f50 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1274,6 +1274,33 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional } } +void view_update_builder::generate_update(static_row&& update, const tombstone& update_tomb, + std::optional&& existing, const tombstone& existing_tomb) { + if (!update_tomb && update.empty()) { + throw std::logic_error("A materialized view update cannot be empty"); + } + + auto dk = dht::decorate_key(*_schema, _key); + const auto& gc_state = _base.get_compaction_manager().get_tombstone_gc_state(); + auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now); + + // We allow existing to be disengaged, which we treat the same as an empty row. + if (existing) { + existing->cells().compact_and_expire(*_schema, column_kind::static_column, row_tombstone(existing_tomb), _now, always_gc, gc_before); + update.apply(*_schema, static_row(*_schema, *existing)); + } + + update.cells().compact_and_expire(*_schema, column_kind::static_column, row_tombstone(update_tomb), _now, always_gc, gc_before); + + const auto update_row = clustering_or_static_row(std::move(update)); + const auto existing_row = existing + ? std::make_optional(std::move(*existing)) + : std::optional(); + for (auto&& v : _view_updates) { + v.generate_update(_key, update_row, existing_row, _now); + } +} + future view_update_builder::on_results() { constexpr size_t max_rows_for_view_updates = 100; size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) { @@ -1295,12 +1322,21 @@ future view_update_builder::on_results() { ? std::optional(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()) : std::nullopt; generate_update(std::move(update), std::move(existing)); + } else if (_update->is_static_row()) { + auto update = std::move(*_update).as_static_row(); + auto tombstone = _existing_partition_tombstone; + auto existing = tombstone + ? std::optional(std::in_place) + : std::nullopt; + generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); } return stop_updates ? stop() : advance_updates(); } if (cmp > 0) { // We have something existing but no update (which will happen either because it's a range tombstone marker in - // existing, or because we've fetched the existing row due to some partition/range deletion in the updates) + // existing, or because we've fetched the existing row due to some partition/range deletion in the updates). + // Due to how the read command for existing rows is constructed, it is also possible that there is a static + // row is included, even though we didn't modify it. if (_existing->is_range_tombstone_change()) { _existing_current_tombstone = _existing->as_range_tombstone_change().tombstone(); } else if (_existing->is_clustering_row()) { @@ -1314,6 +1350,21 @@ future view_update_builder::on_results() { auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); generate_update(std::move(update), { std::move(existing) }); } + } else if (_existing->is_static_row()) { + auto existing = std::move(*_existing).as_static_row(); + auto tombstone = _update_partition_tombstone; + // The static row might be unintentionally included when fetching existing clustering rows, + // even if the static row was not updated. We can detect it. A static row can be affected either by: + // + // 1. A static row in the update mutation + // 2. A partition tombstone in the update mutation + // + // If neither of those is present, this means that the static row is included accidentally. + // If we are here, this means that (1) is not present. The `if` that follows checks for (2). + if (tombstone) { + auto update = static_row(); + generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); + } } return stop_updates ? stop () : advance_existings(); } @@ -1331,6 +1382,12 @@ future view_update_builder::on_results() { cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone)); }); generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() }); + } else if (_update->is_static_row()) { + if (!_existing->is_static_row()) { + on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}", + mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing))); + } + generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone); } return stop_updates ? stop() : advance_all(); } @@ -1342,6 +1399,10 @@ future view_update_builder::on_results() { auto existing = clustering_row(*_schema, _existing->as_clustering_row()); auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row()); generate_update(std::move(update), { std::move(existing) }); + } else if (_existing->is_static_row()) { + auto existing = static_row(*_schema, _existing->as_static_row()); + auto update = static_row(); + generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone); } return stop_updates ? stop() : advance_existings(); } @@ -1357,6 +1418,12 @@ future view_update_builder::on_results() { ? std::optional(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row()) : std::nullopt; generate_update(std::move(*_update).as_clustering_row(), std::move(existing)); + } else if (_update->is_static_row()) { + auto existing_tombstone = _existing_partition_tombstone; + auto existing = existing_tombstone + ? std::optional(std::in_place) + : std::nullopt; + generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone); } return stop_updates ? stop() : advance_updates(); } diff --git a/db/view/view.hh b/db/view/view.hh index e99db19576..68a9269f72 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -281,6 +281,7 @@ public: private: void generate_update(clustering_row&& update, std::optional&& existing); + void generate_update(static_row&& update, const tombstone& update_tomb, std::optional&& existing, const tombstone& existing_tomb); future on_results(); future advance_all();