db/view: process static rows in view_update_builder::on_results
The `view_update_builder::on_results()` function is changed to react to static rows when comparing read-before-write results with the base table mutation.
This commit is contained in:
@@ -1274,6 +1274,33 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
|
||||
}
|
||||
}
|
||||
|
||||
void view_update_builder::generate_update(static_row&& update, const tombstone& update_tomb,
|
||||
std::optional<static_row>&& existing, const tombstone& existing_tomb) {
|
||||
if (!update_tomb && update.empty()) {
|
||||
throw std::logic_error("A materialized view update cannot be empty");
|
||||
}
|
||||
|
||||
auto dk = dht::decorate_key(*_schema, _key);
|
||||
const auto& gc_state = _base.get_compaction_manager().get_tombstone_gc_state();
|
||||
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
|
||||
|
||||
// We allow existing to be disengaged, which we treat the same as an empty row.
|
||||
if (existing) {
|
||||
existing->cells().compact_and_expire(*_schema, column_kind::static_column, row_tombstone(existing_tomb), _now, always_gc, gc_before);
|
||||
update.apply(*_schema, static_row(*_schema, *existing));
|
||||
}
|
||||
|
||||
update.cells().compact_and_expire(*_schema, column_kind::static_column, row_tombstone(update_tomb), _now, always_gc, gc_before);
|
||||
|
||||
const auto update_row = clustering_or_static_row(std::move(update));
|
||||
const auto existing_row = existing
|
||||
? std::make_optional<clustering_or_static_row>(std::move(*existing))
|
||||
: std::optional<clustering_or_static_row>();
|
||||
for (auto&& v : _view_updates) {
|
||||
v.generate_update(_key, update_row, existing_row, _now);
|
||||
}
|
||||
}
|
||||
|
||||
future<stop_iteration> view_update_builder::on_results() {
|
||||
constexpr size_t max_rows_for_view_updates = 100;
|
||||
size_t rows_for_view_updates = std::accumulate(_view_updates.begin(), _view_updates.end(), 0, [] (size_t acc, const view_updates& vu) {
|
||||
@@ -1295,12 +1322,21 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
? std::optional<clustering_row>(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
generate_update(std::move(update), std::move(existing));
|
||||
} else if (_update->is_static_row()) {
|
||||
auto update = std::move(*_update).as_static_row();
|
||||
auto tombstone = _existing_partition_tombstone;
|
||||
auto existing = tombstone
|
||||
? std::optional<static_row>(std::in_place)
|
||||
: std::nullopt;
|
||||
generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
|
||||
}
|
||||
return stop_updates ? stop() : advance_updates();
|
||||
}
|
||||
if (cmp > 0) {
|
||||
// We have something existing but no update (which will happen either because it's a range tombstone marker in
|
||||
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates)
|
||||
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates).
|
||||
// Due to how the read command for existing rows is constructed, it is also possible that there is a static
|
||||
// row is included, even though we didn't modify it.
|
||||
if (_existing->is_range_tombstone_change()) {
|
||||
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
|
||||
} else if (_existing->is_clustering_row()) {
|
||||
@@ -1314,6 +1350,21 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
}
|
||||
} else if (_existing->is_static_row()) {
|
||||
auto existing = std::move(*_existing).as_static_row();
|
||||
auto tombstone = _update_partition_tombstone;
|
||||
// The static row might be unintentionally included when fetching existing clustering rows,
|
||||
// even if the static row was not updated. We can detect it. A static row can be affected either by:
|
||||
//
|
||||
// 1. A static row in the update mutation
|
||||
// 2. A partition tombstone in the update mutation
|
||||
//
|
||||
// If neither of those is present, this means that the static row is included accidentally.
|
||||
// If we are here, this means that (1) is not present. The `if` that follows checks for (2).
|
||||
if (tombstone) {
|
||||
auto update = static_row();
|
||||
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
|
||||
}
|
||||
}
|
||||
return stop_updates ? stop () : advance_existings();
|
||||
}
|
||||
@@ -1331,6 +1382,12 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
|
||||
});
|
||||
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
|
||||
} else if (_update->is_static_row()) {
|
||||
if (!_existing->is_static_row()) {
|
||||
on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}",
|
||||
mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing)));
|
||||
}
|
||||
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone);
|
||||
}
|
||||
return stop_updates ? stop() : advance_all();
|
||||
}
|
||||
@@ -1342,6 +1399,10 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
auto existing = clustering_row(*_schema, _existing->as_clustering_row());
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
} else if (_existing->is_static_row()) {
|
||||
auto existing = static_row(*_schema, _existing->as_static_row());
|
||||
auto update = static_row();
|
||||
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
|
||||
}
|
||||
return stop_updates ? stop() : advance_existings();
|
||||
}
|
||||
@@ -1357,6 +1418,12 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
|
||||
: std::nullopt;
|
||||
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
|
||||
} else if (_update->is_static_row()) {
|
||||
auto existing_tombstone = _existing_partition_tombstone;
|
||||
auto existing = existing_tombstone
|
||||
? std::optional<static_row>(std::in_place)
|
||||
: std::nullopt;
|
||||
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
|
||||
}
|
||||
return stop_updates ? stop() : advance_updates();
|
||||
}
|
||||
|
||||
@@ -281,6 +281,7 @@ public:
|
||||
|
||||
private:
|
||||
void generate_update(clustering_row&& update, std::optional<clustering_row>&& existing);
|
||||
void generate_update(static_row&& update, const tombstone& update_tomb, std::optional<static_row>&& existing, const tombstone& existing_tomb);
|
||||
future<stop_iteration> on_results();
|
||||
|
||||
future<stop_iteration> advance_all();
|
||||
|
||||
Reference in New Issue
Block a user