diff --git a/db/view/view.cc b/db/view/view.cc index 4819faba01..dac81cb44d 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -163,6 +163,7 @@ public: }); } + void generate_update(const partition_key& base_key, const clustering_row& update, const stdx::optional& existing, gc_clock::time_point now); private: mutation_partition& partition_for(partition_key&& key) { auto it = _updates.find(key); @@ -364,6 +365,63 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_ add_cells_to_view(*_base, *_view->schema(), diff, r.cells()); } +void view_updates::generate_update( + const partition_key& base_key, + const clustering_row& update, + const stdx::optional& existing, + gc_clock::time_point now) { + // Note that the base PK columns in update and existing are the same, since we're intrinsically dealing + // with the same base row. So we have to check 3 things: + // 1) that the clustering key doesn't have a null, which can happen for compact tables. If that's the case, + // there is no corresponding entries. + // 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update. + // 3) whether the update actually matches the view SELECT filter + + if (!update.key().is_full(*_base)) { + return; + } + + auto* col = _view->base_non_pk_column_in_view_pk(); + if (!col) { + // The view key is necessarily the same pre and post update. + if (existing && !existing->empty()) { + if (update.empty()) { + delete_old_entry(base_key, *existing, now); + } else { + update_entry(base_key, update, *existing, now); + } + } else if (!update.empty()) { + create_entry(base_key, update, now); + } + return; + } + + auto col_id = col->id; + auto* after = update.cells().find_cell(col_id); + if (existing) { + auto* before = existing->cells().find_cell(col_id); + if (before) { + if (after) { + // Note: multi-cell columns can't be part of the primary key. + auto cmp = compare_atomic_cell_for_merge(before->as_atomic_cell(), after->as_atomic_cell()); + if (cmp == 0) { + replace_entry(base_key, update, *existing, now); + } else { + update_entry(base_key, update, *existing, now); + } + } else { + delete_old_entry(base_key, *existing, now); + } + return; + } + } + + // No existing row or the cell wasn't live + if (after) { + create_entry(base_key, update, now); + } +} + } // namespace view } // namespace db