diff --git a/db/view/view.cc b/db/view/view.cc index 33bc94f7b1..6fbad4bd94 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -174,6 +174,8 @@ private: row_marker compute_row_marker(const clustering_row& base_row) const; deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update); void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now); + void delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now); + void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now); }; row_marker view_updates::compute_row_marker(const clustering_row& base_row) const { @@ -291,6 +293,44 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_ add_cells_to_view(*_base, *_view->schema(), update.cells(), r.cells()); } +/** + * Deletes the view entry corresponding to the provided base row. + * This method checks that the base row does match the view filter before bothering. + */ +void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now) { + // Before deleting an old entry, make sure it was matching the view filter + // (otherwise there is nothing to delete) + if (_view->matches_view_filter(*_base, base_key, existing, now)) { + do_delete_old_entry(base_key, existing, now); + } +} + +void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now) { + // We delete the old row using a shadowable row tombstone, making sure that + // the tombstone deletes everything in the row (or it might still show up). + // FIXME: If the entry is "resurrected" by a later update, we would need to + // ensure that the timestamp for the entry then is bigger than the tombstone + // we're just inserting, which is not currently guaranteed. See CASSANDRA-11500 + // for details. + auto& view_schema = *_view->schema(); + auto ts = existing.marker().timestamp(); + auto set_max_ts = [&ts] (atomic_cell_view&& cell) { + ts = std::max(ts, cell.timestamp()); + }; + existing.cells().for_each_cell([&, this] (column_id id, const atomic_cell_or_collection& cell) { + auto* def = view_column(*_base, view_schema, id); + if (!def) { + return; + } + if (def->is_atomic()) { + set_max_ts(cell.as_atomic_cell()); + } else { + static_pointer_cast(def->type)->for_each_cell(cell.as_collection_mutation(), set_max_ts); + } + }); + get_view_row(base_key, existing).apply(tombstone(ts, now)); +} + } // namespace view } // namespace db