db/view: adjust existing view update generation path to use clustering_or_static_row
The view update path is modified to use `clustering_or_static_row` instead of just `clustering_row`.
This commit is contained in:
123
db/view/view.cc
123
db/view/view.cc
@@ -445,18 +445,24 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now) {
|
||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||
// TODO: Filtering is only supported in materialized views which don't support
|
||||
// static rows yet. Skip the whole function if it is a static row update.
|
||||
if (update.is_static_row()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto slice = make_partition_slice(base);
|
||||
|
||||
data_query_result_builder builder(base, slice);
|
||||
builder.consume_new_partition(dht::decorate_key(base, key));
|
||||
builder.consume(clustering_row(base, update), row_tombstone{}, update.is_live(base, tombstone{}, now));
|
||||
builder.consume(clustering_row(base, update.as_clustering_row(base)), row_tombstone{}, update.is_live(base, tombstone(), now));
|
||||
builder.consume_end_of_partition();
|
||||
auto result = builder.consume_end_of_stream();
|
||||
view_filter_checking_visitor visitor(base, view);
|
||||
query::result_view::consume(result, slice, visitor);
|
||||
|
||||
return clustering_prefix_matches(base, view, key, update.key())
|
||||
return clustering_prefix_matches(base, view, key, *update.key())
|
||||
&& visitor.matches_view_filter();
|
||||
}
|
||||
|
||||
@@ -483,7 +489,7 @@ size_t view_updates::op_count() const {
|
||||
return _op_count++;;
|
||||
}
|
||||
|
||||
row_marker view_updates::compute_row_marker(const clustering_row& base_row) const {
|
||||
row_marker view_updates::compute_row_marker(const clustering_or_static_row& base_row) const {
|
||||
/*
|
||||
* We need to compute both the timestamp and expiration.
|
||||
*
|
||||
@@ -511,9 +517,11 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
// they share liveness information. It's true especially in the only case currently allowed by CQL,
|
||||
// which assumes there's up to one non-pk column in the view key. It's also true in alternator,
|
||||
// which does not carry TTL information.
|
||||
const auto& col_ids = _base_info->base_regular_columns_in_view_pk();
|
||||
const auto& col_ids = base_row.is_clustering_row()
|
||||
? _base_info->base_regular_columns_in_view_pk()
|
||||
: _base_info->base_static_columns_in_view_pk();
|
||||
if (!col_ids.empty()) {
|
||||
auto& def = _base->regular_column_at(col_ids[0]);
|
||||
auto& def = _base->column_at(base_row.column_kind(), col_ids[0]);
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto cell = base_row.cells().cell_at(col_ids[0]).as_atomic_cell(def);
|
||||
return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp());
|
||||
@@ -522,7 +530,6 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
return base_row.marker();
|
||||
}
|
||||
|
||||
|
||||
namespace {
|
||||
// The following struct is identical to view_key_with_action, except the key
|
||||
// is stored as a managed_bytes_view instead of bytes.
|
||||
@@ -560,11 +567,11 @@ private:
|
||||
const view_ptr& _view;
|
||||
|
||||
const partition_key& _base_key;
|
||||
const clustering_row& _update;
|
||||
const std::optional<clustering_row>& _existing;
|
||||
const clustering_or_static_row& _update;
|
||||
const std::optional<clustering_or_static_row>& _existing;
|
||||
|
||||
public:
|
||||
value_getter(const schema& base, const view_ptr& view, const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing)
|
||||
value_getter(const schema& base, const view_ptr& view, const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing)
|
||||
: _base(base)
|
||||
, _view(view)
|
||||
, _base_key(base_key)
|
||||
@@ -584,8 +591,15 @@ public:
|
||||
case column_kind::partition_key:
|
||||
return {_base_key.get_component(_base, base_col->position())};
|
||||
case column_kind::clustering_key:
|
||||
return {_update.key().get_component(_base, base_col->position())};
|
||||
if (_update.is_static_row()) {
|
||||
on_internal_error(vlogger, "Tried to get view row value for a static row update in a view with partition key having clustering columns from original table");
|
||||
}
|
||||
return {_update.key()->get_component(_base, base_col->position())};
|
||||
default:
|
||||
if (base_col->kind != _update.column_kind()) {
|
||||
on_internal_error(vlogger, format("Tried to get a {} column from a {} row update, which is impossible",
|
||||
to_sstring(base_col->kind), _update.is_clustering_row() ? "clustering" : "static"));
|
||||
}
|
||||
auto& c = _update.cells().cell_at(base_col->id);
|
||||
auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data;
|
||||
return {managed_bytes_view{value_view}};
|
||||
@@ -620,13 +634,7 @@ private:
|
||||
}
|
||||
collection_column_position = column_position - 1;
|
||||
|
||||
// TODO: Introduced just for the sake of clear commit history, will be removed in following commits
|
||||
const auto update = clustering_or_static_row(clustering_row(_base, _update));
|
||||
const auto existing = _existing
|
||||
? std::make_optional<clustering_or_static_row>(clustering_row(_base, *_existing))
|
||||
: std::nullopt;
|
||||
|
||||
for (auto& bwa : collection_computation->compute_values_with_action(_base, _base_key, update, existing)) {
|
||||
for (auto& bwa : collection_computation->compute_values_with_action(_base, _base_key, _update, _existing)) {
|
||||
ret.push_back({std::move(bwa), linearized_values});
|
||||
}
|
||||
return ret;
|
||||
@@ -636,7 +644,7 @@ private:
|
||||
|
||||
|
||||
std::vector<view_updates::view_row_entry>
|
||||
view_updates::get_view_rows(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing) {
|
||||
view_updates::get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing) {
|
||||
value_getter getter(*_base, _view, base_key, update, existing);
|
||||
auto get_value = boost::adaptors::transformed(std::ref(getter));
|
||||
|
||||
@@ -851,9 +859,9 @@ void create_virtual_column(schema_builder& builder, const bytes& name, const dat
|
||||
}
|
||||
}
|
||||
|
||||
static void add_cells_to_view(const schema& base, const schema& view, row base_cells, row& view_cells) {
|
||||
static void add_cells_to_view(const schema& base, const schema& view, column_kind kind, row base_cells, row& view_cells) {
|
||||
base_cells.for_each_cell([&] (column_id id, atomic_cell_or_collection& c) {
|
||||
auto* view_col = view_column(base, view, column_kind::regular_column, id);
|
||||
auto* view_col = view_column(base, view, kind, id);
|
||||
if (view_col && !view_col->is_primary_key()) {
|
||||
maybe_make_virtual(c, view_col);
|
||||
view_cells.append_cell(view_col->id, std::move(c));
|
||||
@@ -865,13 +873,14 @@ static void add_cells_to_view(const schema& base, const schema& view, row base_c
|
||||
* Creates a view entry corresponding to the provided base row.
|
||||
* This method checks that the base row does match the view filter before applying anything.
|
||||
*/
|
||||
void view_updates::create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now) {
|
||||
void view_updates::create_entry(const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||
if (!matches_view_filter(*_base, _view_info, base_key, update, now)) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt);
|
||||
auto update_marker = compute_row_marker(update);
|
||||
const auto kind = update.column_kind();
|
||||
for (const auto& [r, action]: view_rows) {
|
||||
if (auto rm = std::get_if<row_marker>(&action)) {
|
||||
r->apply(*rm);
|
||||
@@ -879,7 +888,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
|
||||
r->apply(update_marker);
|
||||
}
|
||||
r->apply(update.tomb());
|
||||
add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r->cells());
|
||||
add_cells_to_view(*_base, *_view, kind, row(*_base, kind, update.cells()), r->cells());
|
||||
}
|
||||
_op_count += view_rows.size();
|
||||
}
|
||||
@@ -888,7 +897,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
|
||||
* Deletes the view entry corresponding to the provided base row.
|
||||
* This method checks that the base row does match the view filter before bothering.
|
||||
*/
|
||||
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
|
||||
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||
// Before deleting an old entry, make sure it was matching the view filter
|
||||
// (otherwise there is nothing to delete)
|
||||
if (matches_view_filter(*_base, _view_info, base_key, existing, now)) {
|
||||
@@ -896,10 +905,13 @@ void view_updates::delete_old_entry(const partition_key& base_key, const cluster
|
||||
}
|
||||
}
|
||||
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||
auto view_rows = get_view_rows(base_key, existing, std::nullopt);
|
||||
const auto kind = existing.column_kind();
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
const auto& col_ids = _base_info->base_regular_columns_in_view_pk();
|
||||
const auto& col_ids = existing.is_clustering_row()
|
||||
? _base_info->base_regular_columns_in_view_pk()
|
||||
: _base_info->base_static_columns_in_view_pk();
|
||||
if (_view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
||||
if (auto ts_tag = std::get_if<view_key_and_action::shadowable_tombstone_tag>(&action)) {
|
||||
r->apply(ts_tag->into_shadowable_tombstone(now));
|
||||
@@ -908,7 +920,7 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
|
||||
// We delete the old row using a shadowable row tombstone, making sure that
|
||||
// the tombstone deletes everything in the row (or it might still show up).
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto& def = _base->regular_column_at(col_ids[0]);
|
||||
auto& def = _base->column_at(kind, col_ids[0]);
|
||||
auto cell = existing.cells().cell_at(col_ids[0]).as_atomic_cell(def);
|
||||
if (cell.is_live()) {
|
||||
r->apply(shadowable_tombstone(cell.timestamp(), now));
|
||||
@@ -918,8 +930,8 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
|
||||
// means view row is the same - so it needs to be deleted as well
|
||||
// using the same deletion timestamps for the individual cells.
|
||||
r->apply(update.marker());
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, std::move(diff), r->cells());
|
||||
auto diff = update.cells().difference(*_base, kind, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, kind, std::move(diff), r->cells());
|
||||
}
|
||||
r->apply(update.tomb());
|
||||
}
|
||||
@@ -946,7 +958,7 @@ static bool atomic_cells_liveness_equal(atomic_cell_view left, atomic_cell_view
|
||||
return true;
|
||||
}
|
||||
|
||||
bool view_updates::can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const {
|
||||
bool view_updates::can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const {
|
||||
const row& existing_row = existing.cells();
|
||||
const row& updated_row = update.cells();
|
||||
|
||||
@@ -1005,7 +1017,7 @@ bool view_updates::can_skip_view_updates(const clustering_row& update, const clu
|
||||
* This method checks that the base row (before and after) matches the view filter before
|
||||
* applying anything.
|
||||
*/
|
||||
void view_updates::update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
|
||||
void view_updates::update_entry(const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now) {
|
||||
// While we know update and existing correspond to the same view entry,
|
||||
// they may not match the view filter.
|
||||
if (!matches_view_filter(*_base, _view_info, base_key, existing, now)) {
|
||||
@@ -1023,6 +1035,7 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
|
||||
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt);
|
||||
auto update_marker = compute_row_marker(update);
|
||||
const auto kind = update.column_kind();
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
if (auto rm = std::get_if<row_marker>(&action)) {
|
||||
r->apply(*rm);
|
||||
@@ -1031,16 +1044,16 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
|
||||
}
|
||||
r->apply(update.tomb());
|
||||
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, std::move(diff), r->cells());
|
||||
auto diff = update.cells().difference(*_base, kind, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, kind, std::move(diff), r->cells());
|
||||
}
|
||||
_op_count += view_rows.size();
|
||||
}
|
||||
|
||||
void view_updates::update_entry_for_computed_column(
|
||||
const partition_key& base_key,
|
||||
const clustering_row& update,
|
||||
const std::optional<clustering_row>& existing,
|
||||
const clustering_or_static_row& update,
|
||||
const std::optional<clustering_or_static_row>& existing,
|
||||
gc_clock::time_point now) {
|
||||
auto view_rows = get_view_rows(base_key, update, existing);
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
@@ -1061,9 +1074,10 @@ void view_updates::update_entry_for_computed_column(
|
||||
|
||||
void view_updates::generate_update(
|
||||
const partition_key& base_key,
|
||||
const clustering_row& update,
|
||||
const std::optional<clustering_row>& existing,
|
||||
const clustering_or_static_row& update,
|
||||
const std::optional<clustering_or_static_row>& existing,
|
||||
gc_clock::time_point now) {
|
||||
|
||||
// Note that the base PK columns in update and existing are the same, since we're intrinsically dealing
|
||||
// with the same base row. So we have to check 3 things:
|
||||
// 1) that the clustering key doesn't have a null, which can happen for compact tables. If that's the case,
|
||||
@@ -1071,15 +1085,20 @@ void view_updates::generate_update(
|
||||
// 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update.
|
||||
// 3) whether the update actually matches the view SELECT filter
|
||||
|
||||
if (!update.key().is_full(*_base)) {
|
||||
return;
|
||||
if (update.is_clustering_row()) {
|
||||
if (!update.key()->is_full(*_base)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const auto& col_ids = _base_info->base_regular_columns_in_view_pk();
|
||||
if (_view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
||||
return update_entry_for_computed_column(base_key, update, existing, now);
|
||||
}
|
||||
if (col_ids.empty()) {
|
||||
if (!_base_info->has_base_non_pk_columns_in_view_pk) {
|
||||
if (update.is_static_row()) {
|
||||
// TODO: support static rows in views with pk only including columns from base pk
|
||||
return;
|
||||
}
|
||||
// The view key is necessarily the same pre and post update.
|
||||
if (existing && existing->is_live(*_base)) {
|
||||
if (update.is_live(*_base)) {
|
||||
@@ -1093,6 +1112,20 @@ void view_updates::generate_update(
|
||||
return;
|
||||
}
|
||||
|
||||
const auto& col_ids = update.is_clustering_row()
|
||||
? _base_info->base_regular_columns_in_view_pk()
|
||||
: _base_info->base_static_columns_in_view_pk();
|
||||
|
||||
// The view has a non-primary-key column from the base table as its primary key.
|
||||
// That means it's either a regular or static column. If we are currently
|
||||
// processing an update which does not correspond to the column's kind,
|
||||
// just stop here.
|
||||
if (col_ids.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const auto kind = update.column_kind();
|
||||
|
||||
// If one of the key columns is missing, set has_new_row = false
|
||||
// meaning that after the update there will be no view row.
|
||||
// If one of the key columns is missing in the existing value,
|
||||
@@ -1103,7 +1136,7 @@ void view_updates::generate_update(
|
||||
bool same_row = true;
|
||||
for (auto col_id : col_ids) {
|
||||
auto* after = update.cells().find_cell(col_id);
|
||||
auto& cdef = _base->regular_column_at(col_id);
|
||||
auto& cdef = _base->column_at(kind, col_id);
|
||||
if (existing) {
|
||||
auto* before = existing->cells().find_cell(col_id);
|
||||
// Note that this cell is necessarily atomic, because col_ids are
|
||||
@@ -1232,8 +1265,12 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
|
||||
update.marker().compact_and_expire(update.tomb().tomb(), _now, always_gc, gc_before);
|
||||
update.cells().compact_and_expire(*_schema, column_kind::regular_column, update.tomb(), _now, always_gc, gc_before, update.marker());
|
||||
|
||||
const auto update_row = clustering_or_static_row(std::move(update));
|
||||
const auto existing_row = existing
|
||||
? std::make_optional<clustering_or_static_row>(std::move(*existing))
|
||||
: std::optional<clustering_or_static_row>();
|
||||
for (auto&& v : _view_updates) {
|
||||
v.generate_update(_key, update, existing, _now);
|
||||
v.generate_update(_key, update_row, existing_row, _now);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -163,7 +163,7 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de
|
||||
* @param now the current time in seconds (to decide what is live and what isn't).
|
||||
* @return whether the base row matches the view filter.
|
||||
*/
|
||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now);
|
||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
|
||||
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck);
|
||||
|
||||
@@ -225,24 +225,24 @@ public:
|
||||
|
||||
future<> move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations);
|
||||
|
||||
void generate_update(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
|
||||
void generate_update(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
|
||||
|
||||
size_t op_count() const;
|
||||
|
||||
private:
|
||||
mutation_partition& partition_for(partition_key&& key);
|
||||
row_marker compute_row_marker(const clustering_row& base_row) const;
|
||||
row_marker compute_row_marker(const clustering_or_static_row& base_row) const;
|
||||
struct view_row_entry {
|
||||
deletable_row* _row;
|
||||
view_key_and_action::action _action;
|
||||
};
|
||||
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing);
|
||||
bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const;
|
||||
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
|
||||
void update_entry_for_computed_column(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
|
||||
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing);
|
||||
bool can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const;
|
||||
void create_entry(const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
void update_entry(const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now);
|
||||
void update_entry_for_computed_column(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
|
||||
};
|
||||
|
||||
class view_update_builder {
|
||||
|
||||
Reference in New Issue
Block a user