diff --git a/db/view/view.cc b/db/view/view.cc index 929c374b69..e7ada73b92 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -8,6 +8,7 @@ * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0) */ +#include #include #include #include @@ -46,6 +47,7 @@ #include "mutation_partition.hh" #include "service/migration_manager.hh" #include "service/storage_proxy.hh" +#include "utils/small_vector.hh" #include "view_info.hh" #include "view_update_checks.hh" #include "types/user.hh" @@ -59,6 +61,7 @@ #include "readers/evictable.hh" #include "delete_ghost_rows_visitor.hh" #include "locator/host_id.hh" +#include "cartesian_product.hh" using namespace std::chrono_literals; @@ -471,7 +474,6 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons * will all unselected columns. */ - auto marker = base_row.marker(); // WARNING: The code assumes that if multiple regular base columns are present in the view key, // they share liveness information. It's true especially in the only case currently allowed by CQL, // which assumes there's up to one non-pk column in the view key. It's also true in alternator, @@ -484,41 +486,187 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp()); } - return marker; + return base_row.marker(); } -deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) { - std::vector linearized_values; - auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) -> managed_bytes_view { - auto* base_col = _base->get_column_definition(cdef.name()); + +namespace { +struct bytes_view_with_action { + managed_bytes_view _bytes_view; + bytes_with_action::action _action; + bytes_view_with_action(managed_bytes_view view, bytes_with_action::action action) + : _bytes_view(view) + , _action(action) + {} + bytes_view_with_action(managed_bytes_view view) + : _bytes_view(view) + {} + bytes_view_with_action(bytes_with_action&& bwa, std::deque& linearized_values) + : bytes_view_with_action(managed_bytes_view(linearized_values.emplace_back(std::move(bwa._view))), bwa._action) + {} + static managed_bytes_view get_view(const bytes_view_with_action& bvwa) { + return bvwa._bytes_view; + } +}; + +// value_getter is used to extract values for specific columns during view update. +struct value_getter { + // linearized_values hold bytes for values of computed columns, for which we later store references in the form of managed_bytes_view. + // deque doesn't invalidate references at emplace_back. + std::deque linearized_values; + + // Index of column being currently processed. + size_t column_position = 0; + // Discovered index of collection computed column. + std::optional collection_column_position; +private: + // Schemas of base table and view. + const schema& _base; + const view_ptr& _view; + + const partition_key& _base_key; + const clustering_row& _update; + const std::optional& _existing; + +public: + value_getter(const schema& base, const view_ptr& view, const partition_key& base_key, const clustering_row& update, const std::optional& existing) + : _base(base) + , _view(view) + , _base_key(base_key) + , _update(update) + , _existing(existing) + {} + + using vector_type = utils::small_vector; + vector_type operator()(const column_definition& cdef) { + column_position++; + + auto* base_col = _base.get_column_definition(cdef.name()); if (!base_col) { - bytes_opt computed_value; - if (!cdef.is_computed()) { - //FIXME(sarna): this legacy code is here for backward compatibility and should be removed - // once "computed_columns feature" is supported by every node - if (!service::get_local_storage_proxy().local_db().find_column_family(_base->id()).get_index_manager().is_index(*_view)) { - throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text())); - } - computed_value = legacy_token_column_computation().compute_value(*_base, base_key); - } else { - computed_value = cdef.get_computation().compute_value(*_base, base_key); - } - return managed_bytes_view(linearized_values.emplace_back(*computed_value)); + return handle_computed_column(cdef); } switch (base_col->kind) { case column_kind::partition_key: - return base_key.get_component(*_base, base_col->position()); + return {_base_key.get_component(_base, base_col->position())}; case column_kind::clustering_key: - return update.key().get_component(*_base, base_col->position()); + return {_update.key().get_component(_base, base_col->position())}; default: - auto& c = update.cells().cell_at(base_col->id); + auto& c = _update.cells().cell_at(base_col->id); auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data; - return value_view; + return {managed_bytes_view{value_view}}; } - }); - auto& partition = partition_for(partition_key::from_range(_view->partition_key_columns() | get_value)); - auto ckey = clustering_key::from_range(_view->clustering_key_columns() | get_value); - return partition.clustered_row(*_view, std::move(ckey)); + } + +private: + vector_type handle_computed_column(const column_definition& cdef) { + bytes computed_value; + if (!cdef.is_computed()) { + //FIXME(sarna): this legacy code is here for backward compatibility and should be removed + // once "computed_columns feature" is supported by every node + if (!service::get_local_storage_proxy().local_db().find_column_family(_base.id()).get_index_manager().is_index(*_view)) { + throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text())); + } + computed_value = legacy_token_column_computation().compute_value(_base, _base_key); + } else { + auto& computation = cdef.get_computation(); + if (auto* collection_computation = dynamic_cast(&computation)) { + return handle_collection_column_computation(collection_computation); + } + computed_value = computation.compute_value(_base, _base_key); + } + + return {managed_bytes_view(linearized_values.emplace_back(std::move(computed_value)))}; + } + + vector_type handle_collection_column_computation(const collection_column_computation* collection_computation) { + vector_type ret; + if (collection_column_position.has_value()) { + on_internal_error(vlogger, format("Multiple columns in view (either pk or ck) are collection computed columns. Current is {}, the previous one found was {}", column_position - 1, *collection_column_position)); + } + collection_column_position = column_position - 1; + + for (auto& bwa : collection_computation->compute_values_with_action(_base, _base_key, _update, _existing)) { + ret.push_back({std::move(bwa), linearized_values}); + } + return ret; + } +}; +} + + +std::vector +view_updates::get_view_rows(const partition_key& base_key, const clustering_row& update, const std::optional& existing) { + value_getter getter(*_base, _view, base_key, update, existing); + auto get_value = boost::adaptors::transformed(std::ref(getter)); + + + std::vector pk_elems, ck_elems; + boost::copy(_view->partition_key_columns() | get_value, std::back_inserter(pk_elems)); + // If no collection column was found, each of the actions will contain no_action, + // in particular, it does not harm to use column 0. + const bool had_multiple_values_in_pk = bool(getter.collection_column_position); + const size_t action_column = getter.collection_column_position.value_or(0); + // Allow for at most one collection computed column in pk and in ck. + getter.collection_column_position.reset(); + boost::copy(_view->clustering_key_columns() | get_value, std::back_inserter(ck_elems)); + const bool had_multiple_values_in_ck = bool(getter.collection_column_position); + + + std::vector ret; + auto compute_row = [&](Range&& pk, Range&& ck) { + partition_key pkey = partition_key::from_range(boost::adaptors::transform(pk, bytes_view_with_action::get_view)); + clustering_key ckey = clustering_key::from_range(boost::adaptors::transform(ck, bytes_view_with_action::get_view)); + auto action = (action_column < pk.size() ? pk[action_column] : ck[action_column - pk.size()])._action; + mutation_partition& partition = partition_for(std::move(pkey)); + ret.push_back({&partition.clustered_row(*_view, std::move(ckey)), action}); + }; + + if (had_multiple_values_in_pk) { + // cartesian_product expects std::vector>, while we have std::vector. + std::vector> pk_elems_, ck_elems_; + auto std_vector_from_small_vector = boost::adaptors::transformed([](const auto& vector) { + return std::vector{vector.begin(), vector.end()}; + }); + boost::copy(pk_elems | std_vector_from_small_vector, std::back_inserter(pk_elems_)); + boost::copy(ck_elems | std_vector_from_small_vector, std::back_inserter(ck_elems_)); + + auto cartesian_product_pk = cartesian_product(pk_elems_), + cartesian_product_ck = cartesian_product(ck_elems_); + auto ck_it = cartesian_product_ck.begin(); + + if (had_multiple_values_in_ck) { + // The computed collection column in clustering key was associated with the computed collection column from the partition key. + // This is a case for indexes over collection values. + + auto throw_length_error = [&] { + size_t pk_size = cartesian_product_size(pk_elems_), + ck_size = cartesian_product_size(ck_elems_); + on_internal_error(vlogger, format("Computed sizes of possible partition keys and clustering keys don't match: {} != {}", pk_size, ck_size)); + }; + for (std::vector& pk : cartesian_product_pk) { + if (ck_it == cartesian_product_ck.end()) { + throw_length_error(); + } + compute_row(pk, *ck_it); + ++ck_it; + } + if (ck_it != cartesian_product_ck.end()) { + throw_length_error(); + } + } else { + for (std::vector& pk : cartesian_product_pk) { + for (std::vector& ck : cartesian_product_ck) { + compute_row(pk, ck); + } + } + } + } else { + // Here it's the old regular index over regular values. Each vector has just one element. + auto get_front = boost::adaptors::transformed([](const auto& v) { return v.front(); }); + compute_row(pk_elems | get_front, ck_elems | get_front); + } + + return ret; } static const column_definition* view_column(const schema& base, const schema& view, column_id base_id) { @@ -680,12 +828,19 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_ if (!matches_view_filter(*_base, _view_info, base_key, update, now)) { return; } - deletable_row& r = get_view_row(base_key, update); - auto marker = compute_row_marker(update); - r.apply(marker); - r.apply(update.tomb()); - add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r.cells()); - _op_count++; + + auto view_rows = get_view_rows(base_key, update, std::nullopt); + auto update_marker = compute_row_marker(update); + for (const auto& [r, action]: view_rows) { + if (auto rm = std::get_if(&action)) { + r->apply(*rm); + } else { + r->apply(update_marker); + } + r->apply(update.tomb()); + add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r->cells()); + } + _op_count += view_rows.size(); } /** @@ -701,27 +856,33 @@ void view_updates::delete_old_entry(const partition_key& base_key, const cluster } void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) { - auto& r = get_view_row(base_key, existing); - const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk(); - if (!col_ids.empty()) { - // We delete the old row using a shadowable row tombstone, making sure that - // the tombstone deletes everything in the row (or it might still show up). - // Note: multi-cell columns can't be part of the primary key. - auto& def = _base->regular_column_at(col_ids[0]); - auto cell = existing.cells().cell_at(col_ids[0]).as_atomic_cell(def); - if (cell.is_live()) { - r.apply(shadowable_tombstone(cell.timestamp(), now)); + auto view_rows = get_view_rows(base_key, existing, std::nullopt); + for (const auto& [r, action] : view_rows) { + const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk(); + if (_view_info.has_computed_column_depending_on_base_non_primary_key()) { + if (auto ts_tag = std::get_if(&action)) { + r->apply(ts_tag->into_shadowable_tombstone(now)); + } + } else if (!col_ids.empty()) { + // We delete the old row using a shadowable row tombstone, making sure that + // the tombstone deletes everything in the row (or it might still show up). + // Note: multi-cell columns can't be part of the primary key. + auto& def = _base->regular_column_at(col_ids[0]); + auto cell = existing.cells().cell_at(col_ids[0]).as_atomic_cell(def); + if (cell.is_live()) { + r->apply(shadowable_tombstone(cell.timestamp(), now)); + } + } else { + // "update" caused the base row to have been deleted, and !col_id + // means view row is the same - so it needs to be deleted as well + // using the same deletion timestamps for the individual cells. + r->apply(update.marker()); + auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); + add_cells_to_view(*_base, *_view, std::move(diff), r->cells()); } - } else { - // "update" caused the base row to have been deleted, and !col_id - // means view row is the same - so it needs to be deleted as well - // using the same deletion timestamps for the individual cells. - r.apply(update.marker()); - auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); - add_cells_to_view(*_base, *_view, std::move(diff), r.cells()); + r->apply(update.tomb()); } - r.apply(update.tomb()); - _op_count++; + _op_count += view_rows.size(); } /* @@ -819,14 +980,42 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_ return; } - deletable_row& r = get_view_row(base_key, update); - auto marker = compute_row_marker(update); - r.apply(marker); - r.apply(update.tomb()); + auto view_rows = get_view_rows(base_key, update, std::nullopt); + auto update_marker = compute_row_marker(update); + for (const auto& [r, action] : view_rows) { + if (auto rm = std::get_if(&action)) { + r->apply(*rm); + } else { + r->apply(update_marker); + } + r->apply(update.tomb()); - auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); - add_cells_to_view(*_base, *_view, std::move(diff), r.cells()); - _op_count++; + auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells()); + add_cells_to_view(*_base, *_view, std::move(diff), r->cells()); + } + _op_count += view_rows.size(); +} + +void view_updates::update_entry_for_computed_column( + const partition_key& base_key, + const clustering_row& update, + const std::optional& existing, + gc_clock::time_point now) { + auto view_rows = get_view_rows(base_key, update, existing); + for (const auto& [r, action] : view_rows) { + struct visitor { + deletable_row* row; + gc_clock::time_point now; + void operator()(bytes_with_action::no_action) {} + void operator()(bytes_with_action::shadowable_tombstone_tag t) { + row->apply(t.into_shadowable_tombstone(now)); + } + void operator()(row_marker rm) { + row->apply(rm); + } + }; + std::visit(visitor{r, now}, action); + } } void view_updates::generate_update( @@ -846,6 +1035,9 @@ void view_updates::generate_update( } const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk(); + if (_view_info.has_computed_column_depending_on_base_non_primary_key()) { + return update_entry_for_computed_column(base_key, update, existing, now); + } if (col_ids.empty()) { // The view key is necessarily the same pre and post update. if (existing && existing->is_live(*_base)) { diff --git a/db/view/view.hh b/db/view/view.hh index 3fb623ec25..7f0ecef308 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -148,6 +148,7 @@ struct bytes_with_action { : _view(std::move(view)) , _action(action) {} + }; class view_updates final { @@ -175,16 +176,21 @@ public: private: mutation_partition& partition_for(partition_key&& key); row_marker compute_row_marker(const clustering_row& base_row) const; - deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update); + struct view_row_entry { + deletable_row* _row; + bytes_with_action::action _action; + }; + std::vector get_view_rows(const partition_key& base_key, const clustering_row& update, const std::optional& existing); bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const; void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now); void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now); void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now); void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) { - create_entry(base_key, update, now); delete_old_entry(base_key, existing, update, now); + create_entry(base_key, update, now); } + void update_entry_for_computed_column(const partition_key& base_key, const clustering_row& update, const std::optional& existing, gc_clock::time_point now); }; class view_update_builder {