db/view/view.cc: compute view_updates for views over collections
For collection indexes, logic of computing values for each of the column needed to change, since a single particular column might produce more than one value as a result. The liveness info from individual cells of the collection impacts the liveness info of resulting rows. Therefore it is needed to rewrite the control flow - instead of functions getting a row from get_view_row and later computing row markers and applying it, they compute these values by themselves. Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
committed by
Nadav Har'El
parent
112086767c
commit
32289d681f
308
db/view/view.cc
308
db/view/view.cc
@@ -8,6 +8,7 @@
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
@@ -46,6 +47,7 @@
|
||||
#include "mutation_partition.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "utils/small_vector.hh"
|
||||
#include "view_info.hh"
|
||||
#include "view_update_checks.hh"
|
||||
#include "types/user.hh"
|
||||
@@ -59,6 +61,7 @@
|
||||
#include "readers/evictable.hh"
|
||||
#include "delete_ghost_rows_visitor.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "cartesian_product.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -471,7 +474,6 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
* will all unselected columns.
|
||||
*/
|
||||
|
||||
auto marker = base_row.marker();
|
||||
// WARNING: The code assumes that if multiple regular base columns are present in the view key,
|
||||
// they share liveness information. It's true especially in the only case currently allowed by CQL,
|
||||
// which assumes there's up to one non-pk column in the view key. It's also true in alternator,
|
||||
@@ -484,41 +486,187 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp());
|
||||
}
|
||||
|
||||
return marker;
|
||||
return base_row.marker();
|
||||
}
|
||||
|
||||
deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) {
|
||||
std::vector<bytes> linearized_values;
|
||||
auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) -> managed_bytes_view {
|
||||
auto* base_col = _base->get_column_definition(cdef.name());
|
||||
|
||||
namespace {
|
||||
struct bytes_view_with_action {
|
||||
managed_bytes_view _bytes_view;
|
||||
bytes_with_action::action _action;
|
||||
bytes_view_with_action(managed_bytes_view view, bytes_with_action::action action)
|
||||
: _bytes_view(view)
|
||||
, _action(action)
|
||||
{}
|
||||
bytes_view_with_action(managed_bytes_view view)
|
||||
: _bytes_view(view)
|
||||
{}
|
||||
bytes_view_with_action(bytes_with_action&& bwa, std::deque<bytes>& linearized_values)
|
||||
: bytes_view_with_action(managed_bytes_view(linearized_values.emplace_back(std::move(bwa._view))), bwa._action)
|
||||
{}
|
||||
static managed_bytes_view get_view(const bytes_view_with_action& bvwa) {
|
||||
return bvwa._bytes_view;
|
||||
}
|
||||
};
|
||||
|
||||
// value_getter is used to extract values for specific columns during view update.
|
||||
struct value_getter {
|
||||
// linearized_values hold bytes for values of computed columns, for which we later store references in the form of managed_bytes_view.
|
||||
// deque doesn't invalidate references at emplace_back.
|
||||
std::deque<bytes> linearized_values;
|
||||
|
||||
// Index of column being currently processed.
|
||||
size_t column_position = 0;
|
||||
// Discovered index of collection computed column.
|
||||
std::optional<size_t> collection_column_position;
|
||||
private:
|
||||
// Schemas of base table and view.
|
||||
const schema& _base;
|
||||
const view_ptr& _view;
|
||||
|
||||
const partition_key& _base_key;
|
||||
const clustering_row& _update;
|
||||
const std::optional<clustering_row>& _existing;
|
||||
|
||||
public:
|
||||
value_getter(const schema& base, const view_ptr& view, const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing)
|
||||
: _base(base)
|
||||
, _view(view)
|
||||
, _base_key(base_key)
|
||||
, _update(update)
|
||||
, _existing(existing)
|
||||
{}
|
||||
|
||||
using vector_type = utils::small_vector<bytes_view_with_action, 1>;
|
||||
vector_type operator()(const column_definition& cdef) {
|
||||
column_position++;
|
||||
|
||||
auto* base_col = _base.get_column_definition(cdef.name());
|
||||
if (!base_col) {
|
||||
bytes_opt computed_value;
|
||||
if (!cdef.is_computed()) {
|
||||
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
|
||||
// once "computed_columns feature" is supported by every node
|
||||
if (!service::get_local_storage_proxy().local_db().find_column_family(_base->id()).get_index_manager().is_index(*_view)) {
|
||||
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
|
||||
}
|
||||
computed_value = legacy_token_column_computation().compute_value(*_base, base_key);
|
||||
} else {
|
||||
computed_value = cdef.get_computation().compute_value(*_base, base_key);
|
||||
}
|
||||
return managed_bytes_view(linearized_values.emplace_back(*computed_value));
|
||||
return handle_computed_column(cdef);
|
||||
}
|
||||
switch (base_col->kind) {
|
||||
case column_kind::partition_key:
|
||||
return base_key.get_component(*_base, base_col->position());
|
||||
return {_base_key.get_component(_base, base_col->position())};
|
||||
case column_kind::clustering_key:
|
||||
return update.key().get_component(*_base, base_col->position());
|
||||
return {_update.key().get_component(_base, base_col->position())};
|
||||
default:
|
||||
auto& c = update.cells().cell_at(base_col->id);
|
||||
auto& c = _update.cells().cell_at(base_col->id);
|
||||
auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data;
|
||||
return value_view;
|
||||
return {managed_bytes_view{value_view}};
|
||||
}
|
||||
});
|
||||
auto& partition = partition_for(partition_key::from_range(_view->partition_key_columns() | get_value));
|
||||
auto ckey = clustering_key::from_range(_view->clustering_key_columns() | get_value);
|
||||
return partition.clustered_row(*_view, std::move(ckey));
|
||||
}
|
||||
|
||||
private:
|
||||
vector_type handle_computed_column(const column_definition& cdef) {
|
||||
bytes computed_value;
|
||||
if (!cdef.is_computed()) {
|
||||
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
|
||||
// once "computed_columns feature" is supported by every node
|
||||
if (!service::get_local_storage_proxy().local_db().find_column_family(_base.id()).get_index_manager().is_index(*_view)) {
|
||||
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
|
||||
}
|
||||
computed_value = legacy_token_column_computation().compute_value(_base, _base_key);
|
||||
} else {
|
||||
auto& computation = cdef.get_computation();
|
||||
if (auto* collection_computation = dynamic_cast<const collection_column_computation*>(&computation)) {
|
||||
return handle_collection_column_computation(collection_computation);
|
||||
}
|
||||
computed_value = computation.compute_value(_base, _base_key);
|
||||
}
|
||||
|
||||
return {managed_bytes_view(linearized_values.emplace_back(std::move(computed_value)))};
|
||||
}
|
||||
|
||||
vector_type handle_collection_column_computation(const collection_column_computation* collection_computation) {
|
||||
vector_type ret;
|
||||
if (collection_column_position.has_value()) {
|
||||
on_internal_error(vlogger, format("Multiple columns in view (either pk or ck) are collection computed columns. Current is {}, the previous one found was {}", column_position - 1, *collection_column_position));
|
||||
}
|
||||
collection_column_position = column_position - 1;
|
||||
|
||||
for (auto& bwa : collection_computation->compute_values_with_action(_base, _base_key, _update, _existing)) {
|
||||
ret.push_back({std::move(bwa), linearized_values});
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
std::vector<view_updates::view_row_entry>
|
||||
view_updates::get_view_rows(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing) {
|
||||
value_getter getter(*_base, _view, base_key, update, existing);
|
||||
auto get_value = boost::adaptors::transformed(std::ref(getter));
|
||||
|
||||
|
||||
std::vector<value_getter::vector_type> pk_elems, ck_elems;
|
||||
boost::copy(_view->partition_key_columns() | get_value, std::back_inserter(pk_elems));
|
||||
// If no collection column was found, each of the actions will contain no_action,
|
||||
// in particular, it does not harm to use column 0.
|
||||
const bool had_multiple_values_in_pk = bool(getter.collection_column_position);
|
||||
const size_t action_column = getter.collection_column_position.value_or(0);
|
||||
// Allow for at most one collection computed column in pk and in ck.
|
||||
getter.collection_column_position.reset();
|
||||
boost::copy(_view->clustering_key_columns() | get_value, std::back_inserter(ck_elems));
|
||||
const bool had_multiple_values_in_ck = bool(getter.collection_column_position);
|
||||
|
||||
|
||||
std::vector<view_updates::view_row_entry> ret;
|
||||
auto compute_row = [&]<typename Range>(Range&& pk, Range&& ck) {
|
||||
partition_key pkey = partition_key::from_range(boost::adaptors::transform(pk, bytes_view_with_action::get_view));
|
||||
clustering_key ckey = clustering_key::from_range(boost::adaptors::transform(ck, bytes_view_with_action::get_view));
|
||||
auto action = (action_column < pk.size() ? pk[action_column] : ck[action_column - pk.size()])._action;
|
||||
mutation_partition& partition = partition_for(std::move(pkey));
|
||||
ret.push_back({&partition.clustered_row(*_view, std::move(ckey)), action});
|
||||
};
|
||||
|
||||
if (had_multiple_values_in_pk) {
|
||||
// cartesian_product expects std::vector<std::vector<>>, while we have std::vector<small_vector>.
|
||||
std::vector<std::vector<bytes_view_with_action>> pk_elems_, ck_elems_;
|
||||
auto std_vector_from_small_vector = boost::adaptors::transformed([](const auto& vector) {
|
||||
return std::vector<bytes_view_with_action>{vector.begin(), vector.end()};
|
||||
});
|
||||
boost::copy(pk_elems | std_vector_from_small_vector, std::back_inserter(pk_elems_));
|
||||
boost::copy(ck_elems | std_vector_from_small_vector, std::back_inserter(ck_elems_));
|
||||
|
||||
auto cartesian_product_pk = cartesian_product(pk_elems_),
|
||||
cartesian_product_ck = cartesian_product(ck_elems_);
|
||||
auto ck_it = cartesian_product_ck.begin();
|
||||
|
||||
if (had_multiple_values_in_ck) {
|
||||
// The computed collection column in clustering key was associated with the computed collection column from the partition key.
|
||||
// This is a case for indexes over collection values.
|
||||
|
||||
auto throw_length_error = [&] {
|
||||
size_t pk_size = cartesian_product_size(pk_elems_),
|
||||
ck_size = cartesian_product_size(ck_elems_);
|
||||
on_internal_error(vlogger, format("Computed sizes of possible partition keys and clustering keys don't match: {} != {}", pk_size, ck_size));
|
||||
};
|
||||
for (std::vector<bytes_view_with_action>& pk : cartesian_product_pk) {
|
||||
if (ck_it == cartesian_product_ck.end()) {
|
||||
throw_length_error();
|
||||
}
|
||||
compute_row(pk, *ck_it);
|
||||
++ck_it;
|
||||
}
|
||||
if (ck_it != cartesian_product_ck.end()) {
|
||||
throw_length_error();
|
||||
}
|
||||
} else {
|
||||
for (std::vector<bytes_view_with_action>& pk : cartesian_product_pk) {
|
||||
for (std::vector<bytes_view_with_action>& ck : cartesian_product_ck) {
|
||||
compute_row(pk, ck);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Here it's the old regular index over regular values. Each vector has just one element.
|
||||
auto get_front = boost::adaptors::transformed([](const auto& v) { return v.front(); });
|
||||
compute_row(pk_elems | get_front, ck_elems | get_front);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static const column_definition* view_column(const schema& base, const schema& view, column_id base_id) {
|
||||
@@ -680,12 +828,19 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
|
||||
if (!matches_view_filter(*_base, _view_info, base_key, update, now)) {
|
||||
return;
|
||||
}
|
||||
deletable_row& r = get_view_row(base_key, update);
|
||||
auto marker = compute_row_marker(update);
|
||||
r.apply(marker);
|
||||
r.apply(update.tomb());
|
||||
add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r.cells());
|
||||
_op_count++;
|
||||
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt);
|
||||
auto update_marker = compute_row_marker(update);
|
||||
for (const auto& [r, action]: view_rows) {
|
||||
if (auto rm = std::get_if<row_marker>(&action)) {
|
||||
r->apply(*rm);
|
||||
} else {
|
||||
r->apply(update_marker);
|
||||
}
|
||||
r->apply(update.tomb());
|
||||
add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r->cells());
|
||||
}
|
||||
_op_count += view_rows.size();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -701,27 +856,33 @@ void view_updates::delete_old_entry(const partition_key& base_key, const cluster
|
||||
}
|
||||
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
|
||||
auto& r = get_view_row(base_key, existing);
|
||||
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk();
|
||||
if (!col_ids.empty()) {
|
||||
// We delete the old row using a shadowable row tombstone, making sure that
|
||||
// the tombstone deletes everything in the row (or it might still show up).
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto& def = _base->regular_column_at(col_ids[0]);
|
||||
auto cell = existing.cells().cell_at(col_ids[0]).as_atomic_cell(def);
|
||||
if (cell.is_live()) {
|
||||
r.apply(shadowable_tombstone(cell.timestamp(), now));
|
||||
auto view_rows = get_view_rows(base_key, existing, std::nullopt);
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk();
|
||||
if (_view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
||||
if (auto ts_tag = std::get_if<bytes_with_action::shadowable_tombstone_tag>(&action)) {
|
||||
r->apply(ts_tag->into_shadowable_tombstone(now));
|
||||
}
|
||||
} else if (!col_ids.empty()) {
|
||||
// We delete the old row using a shadowable row tombstone, making sure that
|
||||
// the tombstone deletes everything in the row (or it might still show up).
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto& def = _base->regular_column_at(col_ids[0]);
|
||||
auto cell = existing.cells().cell_at(col_ids[0]).as_atomic_cell(def);
|
||||
if (cell.is_live()) {
|
||||
r->apply(shadowable_tombstone(cell.timestamp(), now));
|
||||
}
|
||||
} else {
|
||||
// "update" caused the base row to have been deleted, and !col_id
|
||||
// means view row is the same - so it needs to be deleted as well
|
||||
// using the same deletion timestamps for the individual cells.
|
||||
r->apply(update.marker());
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, std::move(diff), r->cells());
|
||||
}
|
||||
} else {
|
||||
// "update" caused the base row to have been deleted, and !col_id
|
||||
// means view row is the same - so it needs to be deleted as well
|
||||
// using the same deletion timestamps for the individual cells.
|
||||
r.apply(update.marker());
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, std::move(diff), r.cells());
|
||||
r->apply(update.tomb());
|
||||
}
|
||||
r.apply(update.tomb());
|
||||
_op_count++;
|
||||
_op_count += view_rows.size();
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -819,14 +980,42 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
|
||||
return;
|
||||
}
|
||||
|
||||
deletable_row& r = get_view_row(base_key, update);
|
||||
auto marker = compute_row_marker(update);
|
||||
r.apply(marker);
|
||||
r.apply(update.tomb());
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt);
|
||||
auto update_marker = compute_row_marker(update);
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
if (auto rm = std::get_if<row_marker>(&action)) {
|
||||
r->apply(*rm);
|
||||
} else {
|
||||
r->apply(update_marker);
|
||||
}
|
||||
r->apply(update.tomb());
|
||||
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, std::move(diff), r.cells());
|
||||
_op_count++;
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view, std::move(diff), r->cells());
|
||||
}
|
||||
_op_count += view_rows.size();
|
||||
}
|
||||
|
||||
void view_updates::update_entry_for_computed_column(
|
||||
const partition_key& base_key,
|
||||
const clustering_row& update,
|
||||
const std::optional<clustering_row>& existing,
|
||||
gc_clock::time_point now) {
|
||||
auto view_rows = get_view_rows(base_key, update, existing);
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
struct visitor {
|
||||
deletable_row* row;
|
||||
gc_clock::time_point now;
|
||||
void operator()(bytes_with_action::no_action) {}
|
||||
void operator()(bytes_with_action::shadowable_tombstone_tag t) {
|
||||
row->apply(t.into_shadowable_tombstone(now));
|
||||
}
|
||||
void operator()(row_marker rm) {
|
||||
row->apply(rm);
|
||||
}
|
||||
};
|
||||
std::visit(visitor{r, now}, action);
|
||||
}
|
||||
}
|
||||
|
||||
void view_updates::generate_update(
|
||||
@@ -846,6 +1035,9 @@ void view_updates::generate_update(
|
||||
}
|
||||
|
||||
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk();
|
||||
if (_view_info.has_computed_column_depending_on_base_non_primary_key()) {
|
||||
return update_entry_for_computed_column(base_key, update, existing, now);
|
||||
}
|
||||
if (col_ids.empty()) {
|
||||
// The view key is necessarily the same pre and post update.
|
||||
if (existing && existing->is_live(*_base)) {
|
||||
|
||||
@@ -148,6 +148,7 @@ struct bytes_with_action {
|
||||
: _view(std::move(view))
|
||||
, _action(action)
|
||||
{}
|
||||
|
||||
};
|
||||
|
||||
class view_updates final {
|
||||
@@ -175,16 +176,21 @@ public:
|
||||
private:
|
||||
mutation_partition& partition_for(partition_key&& key);
|
||||
row_marker compute_row_marker(const clustering_row& base_row) const;
|
||||
deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update);
|
||||
struct view_row_entry {
|
||||
deletable_row* _row;
|
||||
bytes_with_action::action _action;
|
||||
};
|
||||
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing);
|
||||
bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const;
|
||||
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
|
||||
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
|
||||
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
|
||||
create_entry(base_key, update, now);
|
||||
delete_old_entry(base_key, existing, update, now);
|
||||
create_entry(base_key, update, now);
|
||||
}
|
||||
void update_entry_for_computed_column(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
|
||||
};
|
||||
|
||||
class view_update_builder {
|
||||
|
||||
Reference in New Issue
Block a user