diff --git a/db/view/view.cc b/db/view/view.cc index 6fbad4bd94..bb2de389d1 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -176,6 +176,7 @@ private: void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now); void delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now); void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now); + void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now); }; row_marker view_updates::compute_row_marker(const clustering_row& base_row) const { @@ -331,6 +332,34 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus get_view_row(base_key, existing).apply(tombstone(ts, now)); } +/** + * Creates the updates to apply to the existing view entry given the base table row before + * and after the update, assuming that the update hasn't changed to which view entry the + * row corresponds (that is, we know the columns composing the view PK haven't changed). + * + * This method checks that the base row (before and after) matches the view filter before + * applying anything. + */ +void view_updates::update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) { + // While we know update and existing correspond to the same view entry, + // they may not match the view filter. + if (!_view->matches_view_filter(*_base, base_key, existing, now)) { + create_entry(base_key, update, now); + return; + } + if (!_view->matches_view_filter(*_base, base_key, update, now)) { + do_delete_old_entry(base_key, existing, now); + return; + } + + deletable_row& r = get_view_row(base_key, update); + r.apply(compute_row_marker(update)); + r.apply(update.tomb()); + + auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); + add_cells_to_view(*_base, *_view->schema(), diff, r.cells()); +} + } // namespace view } // namespace db