view_updates: Generate updates

This patch adds the view_updates::generate_update() function to
generate view updates given a base row update and the corresponding,
pre-existing row. This function will decide which of the previously
introduced functions to call based on whether there is a pre-existing
row and whether there exists a regular base column that's part of the
view's PK.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2017-01-15 20:36:38 +01:00
parent 861d2dfb61
commit 3991a58f08

View File

@@ -163,6 +163,7 @@ public:
});
}
void generate_update(const partition_key& base_key, const clustering_row& update, const stdx::optional<clustering_row>& existing, gc_clock::time_point now);
private:
mutation_partition& partition_for(partition_key&& key) {
auto it = _updates.find(key);
@@ -364,6 +365,63 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
add_cells_to_view(*_base, *_view->schema(), diff, r.cells());
}
void view_updates::generate_update(
const partition_key& base_key,
const clustering_row& update,
const stdx::optional<clustering_row>& existing,
gc_clock::time_point now) {
// Note that the base PK columns in update and existing are the same, since we're intrinsically dealing
// with the same base row. So we have to check 3 things:
// 1) that the clustering key doesn't have a null, which can happen for compact tables. If that's the case,
// there is no corresponding entries.
// 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update.
// 3) whether the update actually matches the view SELECT filter
if (!update.key().is_full(*_base)) {
return;
}
auto* col = _view->base_non_pk_column_in_view_pk();
if (!col) {
// The view key is necessarily the same pre and post update.
if (existing && !existing->empty()) {
if (update.empty()) {
delete_old_entry(base_key, *existing, now);
} else {
update_entry(base_key, update, *existing, now);
}
} else if (!update.empty()) {
create_entry(base_key, update, now);
}
return;
}
auto col_id = col->id;
auto* after = update.cells().find_cell(col_id);
if (existing) {
auto* before = existing->cells().find_cell(col_id);
if (before) {
if (after) {
// Note: multi-cell columns can't be part of the primary key.
auto cmp = compare_atomic_cell_for_merge(before->as_atomic_cell(), after->as_atomic_cell());
if (cmp == 0) {
replace_entry(base_key, update, *existing, now);
} else {
update_entry(base_key, update, *existing, now);
}
} else {
delete_old_entry(base_key, *existing, now);
}
return;
}
}
// No existing row or the cell wasn't live
if (after) {
create_entry(base_key, update, now);
}
}
} // namespace view
} // namespace db