db/view: Generate regular tombstone for base deletions

Instead of shadowable tombstones, which only apply to updates.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2017-05-11 18:02:19 +02:00
parent 1fd8b8e723
commit 38be85a21d

View File

@@ -223,12 +223,12 @@ private:
row_marker compute_row_marker(const clustering_row& base_row) const;
deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update);
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now);
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now);
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now);
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now);
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
create_entry(base_key, update, now);
delete_old_entry(base_key, existing, now);
delete_old_entry(base_key, existing, row_tombstone(), now);
}
};
@@ -350,15 +350,19 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
* Deletes the view entry corresponding to the provided base row.
* This method checks that the base row does match the view filter before bothering.
*/
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now) {
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) {
// Before deleting an old entry, make sure it was matching the view filter
// (otherwise there is nothing to delete)
if (matches_view_filter(*_base, _view_info, base_key, existing, now)) {
do_delete_old_entry(base_key, existing, now);
do_delete_old_entry(base_key, existing, t, now);
}
}
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now) {
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const row_tombstone& t, gc_clock::time_point now) {
if (t) {
get_view_row(base_key, existing).apply(t);
return;
}
// We delete the old row using a shadowable row tombstone, making sure that
// the tombstone deletes everything in the row (or it might still show up).
// FIXME: If the entry is "resurrected" by a later update, we would need to
@@ -399,7 +403,7 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
return;
}
if (!matches_view_filter(*_base, _view_info, base_key, update, now)) {
do_delete_old_entry(base_key, existing, now);
do_delete_old_entry(base_key, existing, row_tombstone(), now);
return;
}
@@ -432,7 +436,7 @@ void view_updates::generate_update(
// The view key is necessarily the same pre and post update.
if (existing && !existing->empty()) {
if (update.empty()) {
delete_old_entry(base_key, *existing, now);
delete_old_entry(base_key, *existing, update.tomb(), now);
} else {
update_entry(base_key, update, *existing, now);
}
@@ -455,7 +459,7 @@ void view_updates::generate_update(
replace_entry(base_key, update, *existing, now);
}
} else {
delete_old_entry(base_key, *existing, now);
delete_old_entry(base_key, *existing, update.tomb(), now);
}
return;
}