Merge 'Secondary indexes on static columns' from Piotr Dulikowski

This pull request introduces support for global secondary indexes based on static columns.

Local secondary indexes based on secondary columns are not planned to be supported and are explicitly forbidden. Because there is only one static row per partition and local indexes require full partition key when querying, such indexes wouldn't be very useful and would only waste resources.

The index table for secondary indexes on static columns, unlike other secondary indexes, do not contain clustering keys from the base table. A static column's value determines a set of full partitions, so the clustering keys would only be unnecessary.

The already existing logic for querying using secondary indexes works after introducing minimal notifications. The view update generation path now works on a common representation of static and clustering rows, but the new representation allowed to keep most of the logic intact.

New cql-pytests are added. All but one of the existing tests for secondary indexes on static columns - ported from Cassandra - now work and have their `xfail` marks lifted; the remaining test requires support for collection indexing, so it will start working only after #2962 is fixed.

Materialized view with static rows as a key are __not__ implemented in this PR.

Fixes: #2963

Closes #11166

* github.com:scylladb/scylladb:
  test_materialized_view: verify that static columns are not allowed
  test_secondary_index: add (currently failing) test for static index paging
  test_secondary_index: add more tests for secondary indexes on static columns
  cassandra_tests: enable existing tests for static columns
  create_index_statement: lift restriction on secondary indexes on static rows
  db/view: fetch and process static rows when building indexes
  gms/feature_service: introduce SECONDARY_INDEXES_ON_STATIC_COLUMNS cluster feature
  create_index_statement: disallow creation of local indexes with static columns
  select_statement: prepare paging for indexes on static columns
  select_statement: do not attempt to fetch clustering columns from secondary index's table
  secondary_index_manager: don't add clustering key columns to index table of static column index
  replica/table: adjust the view read-before-write to return static rows when needed
  db/view: process static rows in view_update_builder::on_results
  db/view: adjust existing view update generation path to use clustering_or_static_row
  column_computation: adjust to use clustering_or_static_row
  db/view: add clustering_or_static_row
  deletable_row: add column_kind parameter to is_live
  view_info: adjust view_column to accept column_kind
  db/view: base_dependent_view_info: split non-pk columns into regular and static
This commit is contained in:
Nadav Har'El
2022-12-08 09:54:05 +02:00
19 changed files with 584 additions and 104 deletions

View File

@@ -12,11 +12,11 @@
class schema;
class partition_key;
class clustering_row;
struct atomic_cell_view;
struct tombstone;
namespace db::view {
struct clustering_or_static_row;
struct view_key_and_action;
}
@@ -118,7 +118,7 @@ class collection_column_computation final : public column_computation {
using collection_kv = std::pair<bytes_view, atomic_cell_view>;
void operate_on_collection_entries(
std::invocable<collection_kv*, collection_kv*, tombstone> auto&& old_and_new_row_func, const schema& schema,
const partition_key& key, const clustering_row& update, const std::optional<clustering_row>& existing) const;
const partition_key& key, const db::view::clustering_or_static_row& update, const std::optional<db::view::clustering_or_static_row>& existing) const;
public:
static collection_column_computation for_keys(const bytes& collection_name) {
@@ -141,5 +141,6 @@ public:
return true;
}
std::vector<db::view::view_key_and_action> compute_values_with_action(const schema& schema, const partition_key& key, const clustering_row& row, const std::optional<clustering_row>& existing) const;
std::vector<db::view::view_key_and_action> compute_values_with_action(const schema& schema, const partition_key& key,
const db::view::clustering_or_static_row& row, const std::optional<db::view::clustering_or_static_row>& existing) const;
};

View File

@@ -114,9 +114,9 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
format("No column definition found for column {}", target->column_name()));
}
//NOTICE(sarna): Should be lifted after resolving issue #2963
if (cd->is_static()) {
throw exceptions::invalid_request_exception("Indexing static columns is not implemented yet.");
if (!db.features().secondary_indexes_on_static_columns && cd->is_static()) {
throw exceptions::invalid_request_exception("Cluster does not support secondary indexes on static columns yet,"
" upgrade the whole cluster first in order to be able to create them");
}
if (cd->type->references_duration()) {
@@ -209,6 +209,12 @@ void create_index_statement::validate_for_local_index(const schema& schema) cons
for (unsigned i = 1; i < _raw_targets.size(); ++i) {
if (std::holds_alternative<index_target::raw::multiple_columns>(_raw_targets[i]->value)) {
throw exceptions::invalid_request_exception(format("Multi-column index targets are currently only supported for partition key"));
} else if (auto* raw_ident = std::get_if<index_target::raw::single_column>(&_raw_targets[i]->value)) {
auto ident = (*raw_ident)->prepare_column_identifier(schema);
auto it = schema.columns_by_name().find(ident->name());
if (it != schema.columns_by_name().end() && it->second->is_static()) {
throw exceptions::invalid_request_exception("Local indexes containing static columns are not supported.");
}
}
}
}

View File

@@ -567,6 +567,11 @@ indexed_table_select_statement::do_execute_base_query(
base_query_state(const base_query_state&) = delete;
};
const column_definition* target_cdef = _schema->get_column_definition(to_bytes(_index.target_column()));
if (!target_cdef) {
throw exceptions::invalid_request_exception("Indexed column not found in schema");
}
const bool is_paged = bool(paging_state);
base_query_state query_state{cmd->get_row_limit() * queried_ranges_count, std::move(ranges_to_vnodes)};
{
@@ -586,7 +591,7 @@ indexed_table_select_statement::do_execute_base_query(
auto base_pk = generate_base_key_from_index_pk<partition_key>(old_paging_state->get_partition_key(),
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
auto row_ranges = command->slice.default_row_ranges();
if (old_paging_state->get_clustering_key() && _schema->clustering_key_size() > 0) {
if (old_paging_state->get_clustering_key() && _schema->clustering_key_size() > 0 && !target_cdef->is_static()) {
auto base_ck = generate_base_key_from_index_pk<clustering_key>(old_paging_state->get_partition_key(),
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
@@ -1013,7 +1018,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
exploded_index_ck.push_back(bytes_view(token_bytes));
append_base_key_to_index_ck<partition_key>(exploded_index_ck, last_base_pk, *cdef);
}
if (last_base_ck) {
if (last_base_ck && !cdef->is_static()) {
append_base_key_to_index_ck<clustering_key>(exploded_index_ck, *last_base_ck, *cdef);
}
@@ -1068,6 +1073,10 @@ indexed_table_select_statement::do_execute(query_processor& qp,
// Obviously, if there are no clustering columns, then we can work at
// the granularity of whole partitions.
whole_partitions = true;
} else if (_schema->get_column_definition(to_bytes(_index.target_column()))->is_static()) {
// Index table for a static index does not have the original tables'
// clustering key columns, so we must not fetch them.
whole_partitions = true;
} else {
if (_index.depends_on(*(_schema->clustering_key_columns().begin()))) {
// Searching on the *first* clustering column means in each of

View File

@@ -116,11 +116,11 @@ const query::partition_slice& view_info::partition_slice() const {
return *_partition_slice;
}
const column_definition* view_info::view_column(const schema& base, column_id base_id) const {
const column_definition* view_info::view_column(const schema& base, column_kind kind, column_id base_id) const {
// FIXME: Map base column_ids to view_column_ids, which can be something like
// a boost::small_vector where the position is the base column_id, and the
// value is either empty or the view's column_id.
return view_column(base.regular_column_at(base_id));
return view_column(base.column_at(kind, base_id));
}
const column_definition* view_info::view_column(const column_definition& base_def) const {
@@ -135,10 +135,13 @@ void view_info::set_base_info(db::view::base_info_ptr base_info) {
}
// A constructor for a base info that can facilitate reads and writes from the materialized view.
db::view::base_dependent_view_info::base_dependent_view_info(schema_ptr base_schema, std::vector<column_id>&& base_non_pk_columns_in_view_pk)
db::view::base_dependent_view_info::base_dependent_view_info(schema_ptr base_schema,
std::vector<column_id>&& base_regular_columns_in_view_pk,
std::vector<column_id>&& base_static_columns_in_view_pk)
: _base_schema{std::move(base_schema)}
, _base_non_pk_columns_in_view_pk{std::move(base_non_pk_columns_in_view_pk)}
, has_base_non_pk_columns_in_view_pk{!_base_non_pk_columns_in_view_pk.empty()}
, _base_regular_columns_in_view_pk{std::move(base_regular_columns_in_view_pk)}
, _base_static_columns_in_view_pk{std::move(base_static_columns_in_view_pk)}
, has_base_non_pk_columns_in_view_pk{!_base_regular_columns_in_view_pk.empty() || !_base_static_columns_in_view_pk.empty()}
, use_only_for_reads{false} {
}
@@ -151,13 +154,22 @@ db::view::base_dependent_view_info::base_dependent_view_info(bool has_base_non_p
, use_only_for_reads{true} {
}
const std::vector<column_id>& db::view::base_dependent_view_info::base_non_pk_columns_in_view_pk() const {
const std::vector<column_id>& db::view::base_dependent_view_info::base_regular_columns_in_view_pk() const {
if (use_only_for_reads) {
on_internal_error(vlogger,
format("base_non_pk_columns_in_view_pk(): operation unsupported when initialized only for view reads. "
format("base_regular_columns_in_view_pk(): operation unsupported when initialized only for view reads. "
"Missing column in the base table: {}", to_sstring_view(_column_missing_in_base.value_or(bytes()))));
}
return _base_non_pk_columns_in_view_pk;
return _base_regular_columns_in_view_pk;
}
const std::vector<column_id>& db::view::base_dependent_view_info::base_static_columns_in_view_pk() const {
if (use_only_for_reads) {
on_internal_error(vlogger,
format("base_static_columns_in_view_pk(): operation unsupported when initialized only for view reads. "
"Missing column in the base table: {}", to_sstring_view(_column_missing_in_base.value_or(bytes()))));
}
return _base_static_columns_in_view_pk;
}
const schema_ptr& db::view::base_dependent_view_info::base_schema() const {
@@ -170,7 +182,8 @@ const schema_ptr& db::view::base_dependent_view_info::base_schema() const {
}
db::view::base_info_ptr view_info::make_base_dependent_view_info(const schema& base) const {
std::vector<column_id> base_non_pk_columns_in_view_pk;
std::vector<column_id> base_regular_columns_in_view_pk;
std::vector<column_id> base_static_columns_in_view_pk;
for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) {
if (view_col.is_computed()) {
@@ -182,8 +195,10 @@ db::view::base_info_ptr view_info::make_base_dependent_view_info(const schema& b
}
const bytes& view_col_name = view_col.name();
auto* base_col = base.get_column_definition(view_col_name);
if (base_col && !base_col->is_primary_key()) {
base_non_pk_columns_in_view_pk.push_back(base_col->id);
if (base_col && base_col->is_regular()) {
base_regular_columns_in_view_pk.push_back(base_col->id);
} else if (base_col && base_col->is_static()) {
base_static_columns_in_view_pk.push_back(base_col->id);
} else if (!base_col) {
vlogger.error("Column {} in view {}.{} was not found in the base table {}.{}",
to_sstring_view(view_col_name), _schema.ks_name(), _schema.cf_name(), base.ks_name(), base.cf_name());
@@ -202,7 +217,7 @@ db::view::base_info_ptr view_info::make_base_dependent_view_info(const schema& b
}
}
return make_lw_shared<db::view::base_dependent_view_info>(base.shared_from_this(), std::move(base_non_pk_columns_in_view_pk));
return make_lw_shared<db::view::base_dependent_view_info>(base.shared_from_this(), std::move(base_regular_columns_in_view_pk), std::move(base_static_columns_in_view_pk));
}
bool view_info::has_base_non_pk_columns_in_view_pk() const {
@@ -218,6 +233,20 @@ bool view_info::has_base_non_pk_columns_in_view_pk() const {
return _base_info->has_base_non_pk_columns_in_view_pk;
}
clustering_row db::view::clustering_or_static_row::as_clustering_row(const schema& s) const {
if (!is_clustering_row()) {
on_internal_error(vlogger, "Tried to interpret a static row as a clustering row");
}
return clustering_row(*_key, tomb(), marker(), row(s, column_kind::regular_column, cells()));
}
static_row db::view::clustering_or_static_row::as_static_row(const schema& s) const {
if (!is_static_row()) {
on_internal_error(vlogger, "Tried to interpret a clustering row as a static row");
}
return static_row(s, cells());
}
namespace db {
namespace view {
@@ -383,9 +412,11 @@ static query::partition_slice make_partition_slice(const schema& s) {
opts.set(query::partition_slice::option::send_clustering_key);
opts.set(query::partition_slice::option::send_timestamp);
opts.set(query::partition_slice::option::send_ttl);
opts.set(query::partition_slice::option::always_return_static_content);
return query::partition_slice(
{query::full_clustering_range},
{ },
boost::copy_range<query::column_id_vector>(s.static_columns()
| boost::adaptors::transformed(std::mem_fn(&column_definition::id))),
boost::copy_range<query::column_id_vector>(s.regular_columns()
| boost::adaptors::transformed(std::mem_fn(&column_definition::id))),
std::move(opts));
@@ -416,18 +447,24 @@ public:
}
};
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now) {
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now) {
// TODO: Filtering is only supported in materialized views which don't support
// static rows yet. Skip the whole function if it is a static row update.
if (update.is_static_row()) {
return true;
}
auto slice = make_partition_slice(base);
data_query_result_builder builder(base, slice);
builder.consume_new_partition(dht::decorate_key(base, key));
builder.consume(clustering_row(base, update), row_tombstone{}, update.is_live(base, tombstone{}, now));
builder.consume(clustering_row(base, update.as_clustering_row(base)), row_tombstone{}, update.is_live(base, tombstone(), now));
builder.consume_end_of_partition();
auto result = builder.consume_end_of_stream();
view_filter_checking_visitor visitor(base, view);
query::result_view::consume(result, slice, visitor);
return clustering_prefix_matches(base, view, key, update.key())
return clustering_prefix_matches(base, view, key, *update.key())
&& visitor.matches_view_filter();
}
@@ -454,7 +491,7 @@ size_t view_updates::op_count() const {
return _op_count++;;
}
row_marker view_updates::compute_row_marker(const clustering_row& base_row) const {
row_marker view_updates::compute_row_marker(const clustering_or_static_row& base_row) const {
/*
* We need to compute both the timestamp and expiration.
*
@@ -482,9 +519,11 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
// they share liveness information. It's true especially in the only case currently allowed by CQL,
// which assumes there's up to one non-pk column in the view key. It's also true in alternator,
// which does not carry TTL information.
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk();
const auto& col_ids = base_row.is_clustering_row()
? _base_info->base_regular_columns_in_view_pk()
: _base_info->base_static_columns_in_view_pk();
if (!col_ids.empty()) {
auto& def = _base->regular_column_at(col_ids[0]);
auto& def = _base->column_at(base_row.column_kind(), col_ids[0]);
// Note: multi-cell columns can't be part of the primary key.
auto cell = base_row.cells().cell_at(col_ids[0]).as_atomic_cell(def);
return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp());
@@ -493,7 +532,6 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
return base_row.marker();
}
namespace {
// The following struct is identical to view_key_with_action, except the key
// is stored as a managed_bytes_view instead of bytes.
@@ -531,11 +569,11 @@ private:
const view_ptr& _view;
const partition_key& _base_key;
const clustering_row& _update;
const std::optional<clustering_row>& _existing;
const clustering_or_static_row& _update;
const std::optional<clustering_or_static_row>& _existing;
public:
value_getter(const schema& base, const view_ptr& view, const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing)
value_getter(const schema& base, const view_ptr& view, const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing)
: _base(base)
, _view(view)
, _base_key(base_key)
@@ -555,8 +593,15 @@ public:
case column_kind::partition_key:
return {_base_key.get_component(_base, base_col->position())};
case column_kind::clustering_key:
return {_update.key().get_component(_base, base_col->position())};
if (_update.is_static_row()) {
on_internal_error(vlogger, "Tried to get view row value for a static row update in a view with partition key having clustering columns from original table");
}
return {_update.key()->get_component(_base, base_col->position())};
default:
if (base_col->kind != _update.column_kind()) {
on_internal_error(vlogger, format("Tried to get a {} column from a {} row update, which is impossible",
to_sstring(base_col->kind), _update.is_clustering_row() ? "clustering" : "static"));
}
auto& c = _update.cells().cell_at(base_col->id);
auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data;
return {managed_bytes_view{value_view}};
@@ -601,7 +646,7 @@ private:
std::vector<view_updates::view_row_entry>
view_updates::get_view_rows(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing) {
view_updates::get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing) {
value_getter getter(*_base, _view, base_key, update, existing);
auto get_value = boost::adaptors::transformed(std::ref(getter));
@@ -675,11 +720,11 @@ view_updates::get_view_rows(const partition_key& base_key, const clustering_row&
return ret;
}
static const column_definition* view_column(const schema& base, const schema& view, column_id base_id) {
static const column_definition* view_column(const schema& base, const schema& view, column_kind kind, column_id base_id) {
// FIXME: Map base column_ids to view_column_ids, which can be something like
// a boost::small_vector where the position is the base column_id, and the
// value is either empty or the view's column_id.
return view.get_column_definition(base.regular_column_at(base_id).name());
return view.get_column_definition(base.column_at(kind, base_id).name());
}
// Utility function for taking an existing cell, and creating a copy with an
@@ -816,9 +861,9 @@ void create_virtual_column(schema_builder& builder, const bytes& name, const dat
}
}
static void add_cells_to_view(const schema& base, const schema& view, row base_cells, row& view_cells) {
static void add_cells_to_view(const schema& base, const schema& view, column_kind kind, row base_cells, row& view_cells) {
base_cells.for_each_cell([&] (column_id id, atomic_cell_or_collection& c) {
auto* view_col = view_column(base, view, id);
auto* view_col = view_column(base, view, kind, id);
if (view_col && !view_col->is_primary_key()) {
maybe_make_virtual(c, view_col);
view_cells.append_cell(view_col->id, std::move(c));
@@ -830,13 +875,14 @@ static void add_cells_to_view(const schema& base, const schema& view, row base_c
* Creates a view entry corresponding to the provided base row.
* This method checks that the base row does match the view filter before applying anything.
*/
void view_updates::create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now) {
void view_updates::create_entry(const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now) {
if (!matches_view_filter(*_base, _view_info, base_key, update, now)) {
return;
}
auto view_rows = get_view_rows(base_key, update, std::nullopt);
auto update_marker = compute_row_marker(update);
const auto kind = update.column_kind();
for (const auto& [r, action]: view_rows) {
if (auto rm = std::get_if<row_marker>(&action)) {
r->apply(*rm);
@@ -844,7 +890,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
r->apply(update_marker);
}
r->apply(update.tomb());
add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r->cells());
add_cells_to_view(*_base, *_view, kind, row(*_base, kind, update.cells()), r->cells());
}
_op_count += view_rows.size();
}
@@ -853,7 +899,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
* Deletes the view entry corresponding to the provided base row.
* This method checks that the base row does match the view filter before bothering.
*/
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
// Before deleting an old entry, make sure it was matching the view filter
// (otherwise there is nothing to delete)
if (matches_view_filter(*_base, _view_info, base_key, existing, now)) {
@@ -861,10 +907,13 @@ void view_updates::delete_old_entry(const partition_key& base_key, const cluster
}
}
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now) {
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
auto view_rows = get_view_rows(base_key, existing, std::nullopt);
const auto kind = existing.column_kind();
for (const auto& [r, action] : view_rows) {
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk();
const auto& col_ids = existing.is_clustering_row()
? _base_info->base_regular_columns_in_view_pk()
: _base_info->base_static_columns_in_view_pk();
if (_view_info.has_computed_column_depending_on_base_non_primary_key()) {
if (auto ts_tag = std::get_if<view_key_and_action::shadowable_tombstone_tag>(&action)) {
r->apply(ts_tag->into_shadowable_tombstone(now));
@@ -873,7 +922,7 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
// We delete the old row using a shadowable row tombstone, making sure that
// the tombstone deletes everything in the row (or it might still show up).
// Note: multi-cell columns can't be part of the primary key.
auto& def = _base->regular_column_at(col_ids[0]);
auto& def = _base->column_at(kind, col_ids[0]);
auto cell = existing.cells().cell_at(col_ids[0]).as_atomic_cell(def);
if (cell.is_live()) {
r->apply(shadowable_tombstone(cell.timestamp(), now));
@@ -883,8 +932,8 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
// means view row is the same - so it needs to be deleted as well
// using the same deletion timestamps for the individual cells.
r->apply(update.marker());
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
add_cells_to_view(*_base, *_view, std::move(diff), r->cells());
auto diff = update.cells().difference(*_base, kind, existing.cells());
add_cells_to_view(*_base, *_view, kind, std::move(diff), r->cells());
}
r->apply(update.tomb());
}
@@ -911,7 +960,7 @@ static bool atomic_cells_liveness_equal(atomic_cell_view left, atomic_cell_view
return true;
}
bool view_updates::can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const {
bool view_updates::can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const {
const row& existing_row = existing.cells();
const row& updated_row = update.cells();
@@ -970,7 +1019,7 @@ bool view_updates::can_skip_view_updates(const clustering_row& update, const clu
* This method checks that the base row (before and after) matches the view filter before
* applying anything.
*/
void view_updates::update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
void view_updates::update_entry(const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now) {
// While we know update and existing correspond to the same view entry,
// they may not match the view filter.
if (!matches_view_filter(*_base, _view_info, base_key, existing, now)) {
@@ -988,6 +1037,7 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
auto view_rows = get_view_rows(base_key, update, std::nullopt);
auto update_marker = compute_row_marker(update);
const auto kind = update.column_kind();
for (const auto& [r, action] : view_rows) {
if (auto rm = std::get_if<row_marker>(&action)) {
r->apply(*rm);
@@ -996,16 +1046,16 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
}
r->apply(update.tomb());
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
add_cells_to_view(*_base, *_view, std::move(diff), r->cells());
auto diff = update.cells().difference(*_base, kind, existing.cells());
add_cells_to_view(*_base, *_view, kind, std::move(diff), r->cells());
}
_op_count += view_rows.size();
}
void view_updates::update_entry_for_computed_column(
const partition_key& base_key,
const clustering_row& update,
const std::optional<clustering_row>& existing,
const clustering_or_static_row& update,
const std::optional<clustering_or_static_row>& existing,
gc_clock::time_point now) {
auto view_rows = get_view_rows(base_key, update, existing);
for (const auto& [r, action] : view_rows) {
@@ -1026,9 +1076,10 @@ void view_updates::update_entry_for_computed_column(
void view_updates::generate_update(
const partition_key& base_key,
const clustering_row& update,
const std::optional<clustering_row>& existing,
const clustering_or_static_row& update,
const std::optional<clustering_or_static_row>& existing,
gc_clock::time_point now) {
// Note that the base PK columns in update and existing are the same, since we're intrinsically dealing
// with the same base row. So we have to check 3 things:
// 1) that the clustering key doesn't have a null, which can happen for compact tables. If that's the case,
@@ -1036,15 +1087,20 @@ void view_updates::generate_update(
// 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update.
// 3) whether the update actually matches the view SELECT filter
if (!update.key().is_full(*_base)) {
return;
if (update.is_clustering_row()) {
if (!update.key()->is_full(*_base)) {
return;
}
}
const auto& col_ids = _base_info->base_non_pk_columns_in_view_pk();
if (_view_info.has_computed_column_depending_on_base_non_primary_key()) {
return update_entry_for_computed_column(base_key, update, existing, now);
}
if (col_ids.empty()) {
if (!_base_info->has_base_non_pk_columns_in_view_pk) {
if (update.is_static_row()) {
// TODO: support static rows in views with pk only including columns from base pk
return;
}
// The view key is necessarily the same pre and post update.
if (existing && existing->is_live(*_base)) {
if (update.is_live(*_base)) {
@@ -1058,6 +1114,20 @@ void view_updates::generate_update(
return;
}
const auto& col_ids = update.is_clustering_row()
? _base_info->base_regular_columns_in_view_pk()
: _base_info->base_static_columns_in_view_pk();
// The view has a non-primary-key column from the base table as its primary key.
// That means it's either a regular or static column. If we are currently
// processing an update which does not correspond to the column's kind,
// just stop here.
if (col_ids.empty()) {
return;
}
const auto kind = update.column_kind();
// If one of the key columns is missing, set has_new_row = false
// meaning that after the update there will be no view row.
// If one of the key columns is missing in the existing value,
@@ -1068,7 +1138,7 @@ void view_updates::generate_update(
bool same_row = true;
for (auto col_id : col_ids) {
auto* after = update.cells().find_cell(col_id);
auto& cdef = _base->regular_column_at(col_id);
auto& cdef = _base->column_at(kind, col_id);
if (existing) {
auto* before = existing->cells().find_cell(col_id);
// Note that this cell is necessarily atomic, because col_ids are
@@ -1197,8 +1267,39 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
update.marker().compact_and_expire(update.tomb().tomb(), _now, always_gc, gc_before);
update.cells().compact_and_expire(*_schema, column_kind::regular_column, update.tomb(), _now, always_gc, gc_before, update.marker());
const auto update_row = clustering_or_static_row(std::move(update));
const auto existing_row = existing
? std::make_optional<clustering_or_static_row>(std::move(*existing))
: std::optional<clustering_or_static_row>();
for (auto&& v : _view_updates) {
v.generate_update(_key, update, existing, _now);
v.generate_update(_key, update_row, existing_row, _now);
}
}
void view_update_builder::generate_update(static_row&& update, const tombstone& update_tomb,
std::optional<static_row>&& existing, const tombstone& existing_tomb) {
if (!update_tomb && update.empty()) {
throw std::logic_error("A materialized view update cannot be empty");
}
auto dk = dht::decorate_key(*_schema, _key);
const auto& gc_state = _base.get_compaction_manager().get_tombstone_gc_state();
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
// We allow existing to be disengaged, which we treat the same as an empty row.
if (existing) {
existing->cells().compact_and_expire(*_schema, column_kind::static_column, row_tombstone(existing_tomb), _now, always_gc, gc_before);
update.apply(*_schema, static_row(*_schema, *existing));
}
update.cells().compact_and_expire(*_schema, column_kind::static_column, row_tombstone(update_tomb), _now, always_gc, gc_before);
const auto update_row = clustering_or_static_row(std::move(update));
const auto existing_row = existing
? std::make_optional<clustering_or_static_row>(std::move(*existing))
: std::optional<clustering_or_static_row>();
for (auto&& v : _view_updates) {
v.generate_update(_key, update_row, existing_row, _now);
}
}
@@ -1223,12 +1324,21 @@ future<stop_iteration> view_update_builder::on_results() {
? std::optional<clustering_row>(std::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row())
: std::nullopt;
generate_update(std::move(update), std::move(existing));
} else if (_update->is_static_row()) {
auto update = std::move(*_update).as_static_row();
auto tombstone = _existing_partition_tombstone;
auto existing = tombstone
? std::optional<static_row>(std::in_place)
: std::nullopt;
generate_update(std::move(update), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
}
return stop_updates ? stop() : advance_updates();
}
if (cmp > 0) {
// We have something existing but no update (which will happen either because it's a range tombstone marker in
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates)
// existing, or because we've fetched the existing row due to some partition/range deletion in the updates).
// Due to how the read command for existing rows is constructed, it is also possible that there is a static
// row is included, even though we didn't modify it.
if (_existing->is_range_tombstone_change()) {
_existing_current_tombstone = _existing->as_range_tombstone_change().tombstone();
} else if (_existing->is_clustering_row()) {
@@ -1242,6 +1352,21 @@ future<stop_iteration> view_update_builder::on_results() {
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
generate_update(std::move(update), { std::move(existing) });
}
} else if (_existing->is_static_row()) {
auto existing = std::move(*_existing).as_static_row();
auto tombstone = _update_partition_tombstone;
// The static row might be unintentionally included when fetching existing clustering rows,
// even if the static row was not updated. We can detect it. A static row can be affected either by:
//
// 1. A static row in the update mutation
// 2. A partition tombstone in the update mutation
//
// If neither of those is present, this means that the static row is included accidentally.
// If we are here, this means that (1) is not present. The `if` that follows checks for (2).
if (tombstone) {
auto update = static_row();
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
}
}
return stop_updates ? stop () : advance_existings();
}
@@ -1259,6 +1384,12 @@ future<stop_iteration> view_update_builder::on_results() {
cr.apply(std::max(_existing_partition_tombstone, _existing_current_tombstone));
});
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
} else if (_update->is_static_row()) {
if (!_existing->is_static_row()) {
on_internal_error(vlogger, format("Static row update mutation part {} shouldn't compare equal with an existing, non-static row mutation part {}",
mutation_fragment_v2::printer(*_schema, *_update), mutation_fragment_v2::printer(*_schema, *_existing)));
}
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, { std::move(*_existing).as_static_row() }, _existing_partition_tombstone);
}
return stop_updates ? stop() : advance_all();
}
@@ -1270,6 +1401,10 @@ future<stop_iteration> view_update_builder::on_results() {
auto existing = clustering_row(*_schema, _existing->as_clustering_row());
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
generate_update(std::move(update), { std::move(existing) });
} else if (_existing->is_static_row()) {
auto existing = static_row(*_schema, _existing->as_static_row());
auto update = static_row();
generate_update(std::move(update), _update_partition_tombstone, { std::move(existing) }, _existing_partition_tombstone);
}
return stop_updates ? stop() : advance_existings();
}
@@ -1285,6 +1420,12 @@ future<stop_iteration> view_update_builder::on_results() {
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())
: std::nullopt;
generate_update(std::move(*_update).as_clustering_row(), std::move(existing));
} else if (_update->is_static_row()) {
auto existing_tombstone = _existing_partition_tombstone;
auto existing = existing_tombstone
? std::optional<static_row>(std::in_place)
: std::nullopt;
generate_update(std::move(*_update).as_static_row(), _update_partition_tombstone, std::move(existing), _existing_partition_tombstone);
}
return stop_updates ? stop() : advance_updates();
}
@@ -1371,6 +1512,12 @@ future<query::clustering_row_ranges> calculate_affected_clustering_ranges(const
}
bool needs_static_row(const mutation_partition& mp, const std::vector<view_and_base>& views) {
// TODO: We could also check whether any of the views need static rows
// and return false if none of them do
return mp.partition_tombstone() || !mp.static_row().empty();
}
// Calculate the node ("natural endpoint") to which this node should send
// a view update.
//
@@ -2185,8 +2332,13 @@ public:
return stop_iteration::no;
}
stop_iteration consume(static_row&&, tombstone, bool) {
stop_iteration consume(static_row&& sr, tombstone, bool) {
inject_failure("view_builder_consume_static_row");
if (_views_to_build.empty() || _builder._as.abort_requested()) {
return stop_iteration::yes;
}
add_fragment(std::move(sr));
return stop_iteration::no;
}
@@ -2199,8 +2351,13 @@ public:
return stop_iteration::yes;
}
_fragments_memory_usage += cr.memory_usage(*_step.reader.schema());
_fragments.emplace_back(*_step.reader.schema(), _builder._permit, std::move(cr));
add_fragment(std::move(cr));
return stop_iteration::no;
}
void add_fragment(auto&& fragment) {
_fragments_memory_usage += fragment.memory_usage(*_step.reader.schema());
_fragments.emplace_back(*_step.reader.schema(), _builder._permit, std::move(fragment));
if (_fragments_memory_usage > batch_memory_max) {
// Although we have not yet completed the batch of base rows that
// compact_for_query<> planned for us (view_builder::batchsize),
@@ -2208,7 +2365,6 @@ public:
// so let's flush these rows now.
flush_fragments();
}
return stop_iteration::no;
}
stop_iteration consume(range_tombstone_change&&) {

View File

@@ -41,13 +41,15 @@ private:
schema_ptr _base_schema;
// Id of a regular base table column included in the view's PK, if any.
// Scylla views only allow one such column, alternator can have up to two.
std::vector<column_id> _base_non_pk_columns_in_view_pk;
std::vector<column_id> _base_regular_columns_in_view_pk;
std::vector<column_id> _base_static_columns_in_view_pk;
// For tracing purposes, if the view is out of sync with its base table
// and there exists a column which is not in base, its name is stored
// and added to debug messages.
std::optional<bytes> _column_missing_in_base = {};
public:
const std::vector<column_id>& base_non_pk_columns_in_view_pk() const;
const std::vector<column_id>& base_regular_columns_in_view_pk() const;
const std::vector<column_id>& base_static_columns_in_view_pk() const;
const schema_ptr& base_schema() const;
// Indicates if the view hase pk columns which are not part of the base
@@ -62,7 +64,9 @@ public:
const bool use_only_for_reads;
// A constructor for a base info that can facilitate reads and writes from the materialized view.
base_dependent_view_info(schema_ptr base_schema, std::vector<column_id>&& base_non_pk_columns_in_view_pk);
base_dependent_view_info(schema_ptr base_schema,
std::vector<column_id>&& base_regular_columns_in_view_pk,
std::vector<column_id>&& base_static_columns_in_view_pk);
// A constructor for a base info that can facilitate only reads from the materialized view.
base_dependent_view_info(bool has_base_non_pk_columns_in_view_pk, std::optional<bytes>&& column_missing_in_base);
};
@@ -76,6 +80,46 @@ struct view_and_base {
base_info_ptr base;
};
// An immutable representation of a clustering or static row of the base table.
struct clustering_or_static_row {
private:
std::optional<clustering_key_prefix> _key;
deletable_row _row;
public:
explicit clustering_or_static_row(clustering_row&& cr)
: _key(std::move(cr.key()))
, _row(std::move(cr).as_deletable_row())
{}
explicit clustering_or_static_row(static_row&& sr)
: _key()
, _row(row_tombstone(), row_marker(), std::move(sr.cells()))
{}
bool is_static_row() const { return !_key.has_value(); }
bool is_clustering_row() const { return _key.has_value(); }
const std::optional<clustering_key_prefix>& key() const { return _key; }
row_tombstone tomb() const { return _row.deleted_at(); }
const row_marker& marker() const { return _row.marker(); }
const row& cells() const { return _row.cells(); }
bool empty() const { return _row.empty(); }
bool is_live(const schema& s, tombstone base_tombstone = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const {
return _row.is_live(s, column_kind(), base_tombstone, now);
}
column_kind column_kind() const {
return _key.has_value()
? column_kind::regular_column : column_kind::static_column;
}
clustering_row as_clustering_row(const schema& s) const;
static_row as_static_row(const schema& s) const;
};
/**
* Whether the view filter considers the specified partition key.
*
@@ -119,7 +163,7 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de
* @param now the current time in seconds (to decide what is live and what isn't).
* @return whether the base row matches the view filter.
*/
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now);
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now);
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck);
@@ -181,24 +225,24 @@ public:
future<> move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations);
void generate_update(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
void generate_update(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
size_t op_count() const;
private:
mutation_partition& partition_for(partition_key&& key);
row_marker compute_row_marker(const clustering_row& base_row) const;
row_marker compute_row_marker(const clustering_or_static_row& base_row) const;
struct view_row_entry {
deletable_row* _row;
view_key_and_action::action _action;
};
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing);
bool can_skip_view_updates(const clustering_row& update, const clustering_row& existing) const;
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
void update_entry_for_computed_column(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing);
bool can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const;
void create_entry(const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now);
void delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
void do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
void update_entry(const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now);
void update_entry_for_computed_column(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
};
class view_update_builder {
@@ -237,6 +281,7 @@ public:
private:
void generate_update(clustering_row&& update, std::optional<clustering_row>&& existing);
void generate_update(static_row&& update, const tombstone& update_tomb, std::optional<static_row>&& existing, const tombstone& existing_tomb);
future<stop_iteration> on_results();
future<stop_iteration> advance_all();
@@ -260,6 +305,8 @@ future<query::clustering_row_ranges> calculate_affected_clustering_ranges(
const mutation_partition& mp,
const std::vector<view_and_base>& views);
bool needs_static_row(const mutation_partition& mp, const std::vector<view_and_base>& views);
struct wait_for_all_updates_tag {};
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
future<> mutate_MV(

View File

@@ -112,6 +112,7 @@ public:
gms::feature aggregate_storage_options { *this, "AGGREGATE_STORAGE_OPTIONS"sv };
gms::feature collection_indexing { *this, "COLLECTION_INDEXING"sv };
gms::feature large_collection_detection { *this, "LARGE_COLLECTION_DETECTION"sv };
gms::feature secondary_indexes_on_static_columns { *this, "SECONDARY_INDEXES_ON_STATIC_COLUMNS"sv };
public:

View File

@@ -249,11 +249,13 @@ view_ptr secondary_index_manager::create_view_for_index(const index_metadata& im
}
}
for (auto& col : schema->clustering_key_columns()) {
if (col == *index_target) {
continue;
if (!index_target->is_static()) {
for (auto& col : schema->clustering_key_columns()) {
if (col == *index_target) {
continue;
}
builder.with_column(col.name(), col.type, column_kind::clustering_key);
}
builder.with_column(col.name(), col.type, column_kind::clustering_key);
}
// This column needs to be after the base clustering key.

View File

@@ -63,7 +63,7 @@ public:
bool empty() const { return _row.empty(); }
bool is_live(const schema& s, tombstone base_tombstone = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const {
return _row.is_live(s, std::move(base_tombstone), std::move(now));
return _row.is_live(s, column_kind::regular_column, std::move(base_tombstone), std::move(now));
}
void apply(const schema& s, clustering_row&& cr) {

View File

@@ -1505,13 +1505,13 @@ bool mutation_partition::empty() const
}
bool
deletable_row::is_live(const schema& s, tombstone base_tombstone, gc_clock::time_point query_time) const {
deletable_row::is_live(const schema& s, column_kind kind, tombstone base_tombstone, gc_clock::time_point query_time) const {
// _created_at corresponds to the row marker cell, present for rows
// created with the 'insert' statement. If row marker is live, we know the
// row is live. Otherwise, a row is considered live if it has any cell
// which is live.
base_tombstone.apply(_deleted_at.tomb());
return _marker.is_live(base_tombstone, query_time) || _cells.is_live(s, column_kind::regular_column, base_tombstone, query_time);
return _marker.is_live(base_tombstone, query_time) || _cells.is_live(s, kind, base_tombstone, query_time);
}
bool
@@ -1532,7 +1532,7 @@ mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_t
for (const rows_entry& e : non_dummy_rows()) {
tombstone base_tombstone = range_tombstone_for_row(s, e.key());
if (e.row().is_live(s, base_tombstone, query_time)) {
if (e.row().is_live(s, column_kind::regular_column, base_tombstone, query_time)) {
++count;
}
}

View File

@@ -866,7 +866,7 @@ public:
const row& cells() const { return _cells; }
row& cells() { return _cells; }
bool equal(column_kind, const schema& s, const deletable_row& other, const schema& other_schema) const;
bool is_live(const schema& s, tombstone base_tombstone = tombstone(), gc_clock::time_point query_time = gc_clock::time_point::min()) const;
bool is_live(const schema& s, column_kind kind, tombstone base_tombstone = tombstone(), gc_clock::time_point query_time = gc_clock::time_point::min()) const;
bool empty() const { return !_deleted_at && _marker.is_missing() && !_cells.size(); }
deletable_row difference(const schema&, column_kind, const deletable_row& other) const;

View File

@@ -2425,26 +2425,36 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(schema_ptr s
co_return row_locker::lock_holder();
}
auto cr_ranges = co_await db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views);
if (cr_ranges.empty()) {
const bool need_regular = !cr_ranges.empty();
const bool need_static = db::view::needs_static_row(m.partition(), views);
if (!need_regular && !need_static) {
tracing::trace(tr_state, "View updates do not require read-before-write");
co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout), std::move(views), std::move(m), { }, std::move(tr_state), now);
// In this case we are not doing a read-before-write, just a
// write, so no lock is needed.
co_return row_locker::lock_holder();
}
// We read the whole set of regular columns in case the update now causes a base row to pass
// We read whole sets of regular and/or static columns in case the update now causes a base row to pass
// a view's filters, and a view happens to include columns that have no value in this update.
// Also, one of those columns can determine the lifetime of the base row, if it has a TTL.
auto columns = boost::copy_range<query::column_id_vector>(
base->regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
query::column_id_vector static_columns;
query::column_id_vector regular_columns;
if (need_regular) {
boost::copy(base->regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)), std::back_inserter(regular_columns));
}
if (need_static) {
boost::copy(base->static_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)), std::back_inserter(static_columns));
}
query::partition_slice::option_set opts;
opts.set(query::partition_slice::option::send_partition_key);
opts.set(query::partition_slice::option::send_clustering_key);
opts.set_if<query::partition_slice::option::send_clustering_key>(need_regular);
opts.set_if<query::partition_slice::option::distinct>(need_static && !need_regular);
opts.set_if<query::partition_slice::option::always_return_static_content>(need_static);
opts.set(query::partition_slice::option::send_timestamp);
opts.set(query::partition_slice::option::send_ttl);
opts.add(custom_opts);
auto slice = query::partition_slice(
std::move(cr_ranges), { }, std::move(columns), std::move(opts), { }, cql_serialization_format::internal(), query::max_rows);
std::move(cr_ranges), std::move(static_columns), std::move(regular_columns), std::move(opts), { }, cql_serialization_format::internal(), query::max_rows);
// Take the shard-local lock on the base-table row or partition as needed.
// We'll return this lock to the caller, which will release it after
// writing the base-table update.

View File

@@ -1770,7 +1770,7 @@ column_computation_ptr collection_column_computation::for_target_type(std::strin
void collection_column_computation::operate_on_collection_entries(
std::invocable<collection_kv*, collection_kv*, tombstone> auto&& old_and_new_row_func, const schema& schema,
const partition_key& key, const clustering_row& update, const std::optional<clustering_row>& existing) const {
const partition_key& key, const db::view::clustering_or_static_row& update, const std::optional<db::view::clustering_or_static_row>& existing) const {
const column_definition* cdef = schema.get_column_definition(_collection_name);
@@ -1840,7 +1840,8 @@ bytes collection_column_computation::compute_value(const schema&, const partitio
throw std::runtime_error(fmt::format("{}: not supported", __PRETTY_FUNCTION__));
}
std::vector<db::view::view_key_and_action> collection_column_computation::compute_values_with_action(const schema& schema, const partition_key& key, const clustering_row& update, const std::optional<clustering_row>& existing) const {
std::vector<db::view::view_key_and_action> collection_column_computation::compute_values_with_action(const schema& schema, const partition_key& key,
const db::view::clustering_or_static_row& update, const std::optional<db::view::clustering_or_static_row>& existing) const {
using collection_kv = std::pair<bytes_view, atomic_cell_view>;
auto serialize_cell = [_kind = _kind](const collection_kv& kv) -> bytes {
using kind = collection_column_computation::kind;

View File

@@ -1115,7 +1115,7 @@ static void validate_result(size_t i, const mutation& result_mut, const expected
const auto exp_dead_end = expected_part.dead_rows.cend();
for (; res_it != res_end && (exp_live_it != exp_live_end || exp_dead_it != exp_dead_end); ++res_it) {
const bool is_live = res_it->row().is_live(schema);
const bool is_live = res_it->row().is_live(schema, column_kind::regular_column);
// Check that we have remaining expected rows of the respective liveness.
if (is_live) {

View File

@@ -3180,7 +3180,7 @@ SEASTAR_TEST_CASE(compact_deleted_cell) {
auto rows = m->partition().clustered_rows();
BOOST_REQUIRE(rows.calculate_size() == 1);
auto& row = rows.begin()->row();
BOOST_REQUIRE(row.is_live(*s));
BOOST_REQUIRE(row.is_live(*s, column_kind::regular_column));
auto& cells = row.cells();
BOOST_REQUIRE(cells.size() == 1);
});

View File

@@ -8,7 +8,6 @@
from cassandra_tests.porting import *
@pytest.mark.xfail(reason="issues #2963")
def testSimpleStaticColumn(cql, test_keyspace):
with create_table(cql, test_keyspace, "(id int, name text, age int static, PRIMARY KEY (id, name))") as table:
cql.execute(f"CREATE INDEX static_age on {table}(age)")
@@ -39,7 +38,6 @@ def testSimpleStaticColumn(cql, test_keyspace):
assert_empty(execute(cql, table, "SELECT id, name, age FROM %s WHERE age=?", age2))
@pytest.mark.xfail(reason="issues #2963")
def testIndexOnCompoundRowKey(cql, test_keyspace):
with create_table(cql, test_keyspace, "(interval text, seq int, id int, severity int static, PRIMARY KEY ((interval, seq), id) ) WITH CLUSTERING ORDER BY (id DESC)") as table:
execute(cql, table, "CREATE INDEX ON %s (severity)")
@@ -56,7 +54,6 @@ def testIndexOnCompoundRowKey(cql, test_keyspace):
assert_rows(execute(cql, table, "select * from %s where severity = 10 and interval = 't' and seq = 1"),
["t", 1, 4, 10], ["t", 1, 3, 10])
@pytest.mark.xfail(reason="issues #2963")
def testIndexOnCollections(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int, v int, l list<int> static, s set<text> static, m map<text, int> static, PRIMARY KEY (k, v))") as table:
execute(cql, table, "CREATE INDEX ON %s (l)")
@@ -108,7 +105,6 @@ def testIndexOnCollections(cql, test_keyspace):
execute(cql, table, "DELETE m['a'] FROM %s WHERE k = 0")
assert_empty(execute(cql, table, "SELECT k, v FROM %s WHERE m CONTAINS KEY 'a'"))
@pytest.mark.xfail(reason="issues #2963")
def testIndexOnFrozenCollections(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int, v int, l frozen<list<int>> static, s frozen<set<text>> static, m frozen<map<text, int>> static, PRIMARY KEY (k, v))") as table:
execute(cql, table, "CREATE INDEX ON %s (FULL(l))")
@@ -154,7 +150,6 @@ def testIndexOnFrozenCollections(cql, test_keyspace):
assert_empty(execute(cql, table, "SELECT k, v FROM %s WHERE m = {'a': 1, 'b': 2}"))
assert_rows(execute(cql, table, "SELECT k, v FROM %s WHERE m = {'a': 2, 'b': 3}"), [0, 0], [0, 1])
@pytest.mark.xfail(reason="issues #2963")
def testStaticIndexAndNonStaticIndex(cql, test_keyspace):
with create_table(cql, test_keyspace, "(id int, company text, age int static, salary int, PRIMARY KEY(id, company))") as table:
execute(cql, table, "CREATE INDEX on %s(age)")
@@ -170,7 +165,6 @@ def testStaticIndexAndNonStaticIndex(cql, test_keyspace):
assert_rows(execute(cql, table, "SELECT id, company, age, salary FROM %s WHERE age = 20 AND salary = 2000 ALLOW FILTERING"),
[1, company2, 20, 2000])
@pytest.mark.xfail(reason="issues #2963")
def testIndexOnUDT(cql, test_keyspace):
with create_type(cql, test_keyspace, "(street text, city text)") as type_name:
with create_table(cql, test_keyspace, f"(id int, company text, home frozen<{type_name}> static, price int, PRIMARY KEY(id, company))") as table:

View File

@@ -856,7 +856,6 @@ def testAllowFilteringOnPartitionKeyWithIndexForContains(cql, test_keyspace):
[1, 1, {4, 5, 6}])
assert_empty(execute(cql, table, "SELECT * FROM %s WHERE k2 < ? AND v CONTAINS ? ALLOW FILTERING", 0, 7))
@pytest.mark.xfail(reason="issues #2963")
def testIndexOnStaticColumnWithPartitionWithoutRows(cql, test_keyspace):
with create_table(cql, test_keyspace, "(pk int, c int, s int static, v int, PRIMARY KEY (pk, c))") as table:
execute(cql, table, "CREATE INDEX ON %s (s)")

View File

@@ -214,6 +214,27 @@ def test_filter_with_unused_static_column(cql, test_keyspace, scylla_only):
expected = [(42,43,44)] if '4' in where else [(42,43,44),(1,2,3)]
assert list(cql.execute(f"SELECT * FROM {mv}")) == expected
# Ensure that we don't allow materialized views which contain static rows.
# Neither Cassandra nor Scylla support this at the moment.
def test_static_columns_are_disallowed(cql, test_keyspace):
schema = 'p int, c int, v int, s int static, primary key (p,c)'
with new_test_table(cql, test_keyspace, schema) as table:
# Case 1: 's' not in primary key
mv = unique_name()
try:
with pytest.raises(InvalidRequest, match="[Ss]tatic column"):
cql.execute(f"CREATE MATERIALIZED VIEW {test_keyspace}.{mv} AS SELECT p, s FROM {table} WHERE s IS NOT NULL PRIMARY KEY (p)")
finally:
cql.execute(f"DROP MATERIALIZED VIEW IF EXISTS {test_keyspace}.{mv}")
# Case 2: 's' in primary key
mv = unique_name()
try:
with pytest.raises(InvalidRequest, match="[Ss]tatic column"):
cql.execute(f"CREATE MATERIALIZED VIEW {test_keyspace}.{mv} AS SELECT p, s FROM {table} WHERE s IS NOT NULL PRIMARY KEY (s, p)")
finally:
cql.execute(f"DROP MATERIALIZED VIEW IF EXISTS {test_keyspace}.{mv}")
# IS_NOT operator can only be used in the context of materialized view creation and it must be of the form IS NOT NULL.
# Trying to do something like IS NOT 42 should fail.
# The error is a SyntaxException because Scylla and Cassandra check this during parsing.

View File

@@ -1232,6 +1232,32 @@ def test_index_paging_match_partition(cql, test_keyspace):
# Finally check that altogether, we read the right rows.
assert sorted(all_rows) == [(i,) for i in range(10)]
# Currently, paging of queries that uses secondary indexes on static columns
# is unable to page through partitions of the base table and must return them
# in whole. Related to #7432.
@pytest.mark.xfail(reason="issue #7432")
def test_index_paging_static_column(cql, test_keyspace):
schema = 'p int, c int, s int static, PRIMARY KEY(p, c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f'CREATE INDEX ON {table}(s)')
insert = cql.prepare(f"INSERT INTO {table}(p,c,s) VALUES (?,?,?)")
for p in range(5):
for c in range(5):
cql.execute(insert, [p, c, 42])
for page_size in [1, 2, 3, 4, 100]:
stmt = SimpleStatement(f"SELECT p, c FROM {table} WHERE s = 42", fetch_size=page_size)
all_rows = []
results = cql.execute(stmt)
while len(results.current_rows) == page_size:
all_rows.extend(results.current_rows)
results = cql.execute(stmt, paging_state=results.paging_state)
# After pages of page_size, the last page should be partial
assert len(results.current_rows) < page_size
all_rows.extend(results.current_rows)
# Finally check that altogether, we read the right rows.
assert sorted(all_rows) == [(p,c) for p in range(5) for c in range(5)]
# If, in contrast with test_index_paging_match_partition above which indexed
# a partition key column, we index a clustering key column, paging does work
# as expected and stops at the right page size. However, as was noted in
@@ -1269,3 +1295,210 @@ def test_index_paging_group_by(cql, test_keyspace, use_group_by):
all_rows.extend(results.current_rows)
# Finally check that altogether, we read the right rows.
assert sorted(all_rows) == [(i,) for i in range(10)]
# Tests basic operations on a static column index.
def test_static_column_index(cql, test_keyspace):
schema = 'pk int, c int, s int STATIC, v int, PRIMARY KEY(pk, c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f'CREATE INDEX ON {table}(s)')
# Insert
cql.execute(f'INSERT INTO {table} (pk, s) VALUES (0, 0)')
cql.execute(f'INSERT INTO {table} (pk, s) VALUES (1, 0)')
cql.execute(f'INSERT INTO {table} (pk, s) VALUES (2, 1)')
assert [(0,),(1,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 0'))
assert [(2,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 1'))
# Update
cql.execute(f'UPDATE {table} SET s = 1 WHERE pk = 1')
assert [(0,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 0'))
assert [(1,),(2,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 1'))
# Partition delete
cql.execute(f'DELETE FROM {table} WHERE pk = 2')
assert [(0,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 0'))
assert [(1,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 1'))
# Tests that building static indexes from a non-empty state works.
def test_static_column_index_build(cql, test_keyspace):
schema = 'pk int, c int, s int STATIC, v int, PRIMARY KEY(pk, c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f'INSERT INTO {table} (pk, s) VALUES (0, 0)')
cql.execute(f'INSERT INTO {table} (pk, s) VALUES (1, 0)')
cql.execute(f'INSERT INTO {table} (pk, s) VALUES (2, 0)')
cql.execute(f'CREATE INDEX ON {table}(s)')
# Indexes are created in the background, so we should wait here.
# I don't know how to get information about secondary index build
# status on C*, so we'll just wait until 30 seconds elapse or
# the index appears to be properly built.
start_time = time.time()
rows = None
while time.time() < start_time + 30:
rows = sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 0'))
if len(rows) == 3:
break
assert [(0,),(1,),(2,)] == rows
# Checks that clustering row deletions do not affect static columns.
def test_static_column_index_unaffected_by_clustering_row_ops(cql, test_keyspace):
schema = 'pk int, c int, s int STATIC, v int, PRIMARY KEY(pk, c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f'CREATE INDEX ON {table}(s)')
cql.execute(f'INSERT INTO {table} (pk, c, s, v) VALUES (0, 0, 42, 0)')
cql.execute(f'INSERT INTO {table} (pk, c, v) VALUES (0, 1, 10)')
cql.execute(f'INSERT INTO {table} (pk, c, v) VALUES (0, 2, 20)')
cql.execute(f'INSERT INTO {table} (pk, c, v) VALUES (0, 3, 30)')
cql.execute(f'INSERT INTO {table} (pk, c, v) VALUES (0, 4, 40)')
# We are not using SELECT DISTINCT because it is not implemented yet
# for queries that restrict a non-pk column. Therefore, `pk` appears
# multiple times in the result.
assert [(0,)]*5 == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 42'))
# Row delete
cql.execute(f'DELETE FROM {table} WHERE pk = 0 AND c = 4')
assert [(0,)]*4 == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 42'))
# Range delete
cql.execute(f'DELETE FROM {table} WHERE pk = 0 AND c >= 1 AND c < 3')
assert [(0,)]*2 == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 42'))
# Range delete, but this time get rid of all rows (static row should stay)
cql.execute(f'DELETE FROM {table} WHERE pk = 0 AND c >= 0 AND c <= 4')
assert [(0,)]*1 == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 42'))
# Finally, perform a partition delete and get rid of the row
cql.execute(f'DELETE FROM {table} WHERE pk = 0')
assert [] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 42'))
# Checks that changing a static column's value is correctly reflected by queries
# accelerated by a secondary index.
def test_static_column_index_all_clustering_rows_moved_by_static_column_update(cql, test_keyspace):
schema = 'pk int, c int, s int STATIC, v int, PRIMARY KEY(pk, c)'
with new_test_table(cql, test_keyspace, schema) as table:
rows_for_pk = [
[(0, 0, 0), (0, 1, 10), (0, 2, 20)],
[(1, 0, 0), (1, 1, 10), (1, 2, 20)],
]
cql.execute(f'CREATE INDEX ON {table}(s)')
for pk in range(2):
cql.execute(f'INSERT INTO {table} (pk, c, s, v) VALUES ({pk}, 0, 0, 0)')
cql.execute(f'INSERT INTO {table} (pk, c, v) VALUES ({pk}, 1, 10)')
cql.execute(f'INSERT INTO {table} (pk, c, v) VALUES ({pk}, 2, 20)')
assert rows_for_pk[0] + rows_for_pk[1] == sorted(cql.execute(f'SELECT pk, c, v FROM {table} WHERE s = 0'))
assert [] == sorted(cql.execute(f'SELECT pk, c, v FROM {table} WHERE s = 1'))
cql.execute(f"UPDATE {table} SET s = 1 WHERE pk = 1")
assert rows_for_pk[0] == sorted(cql.execute(f'SELECT pk, c, v FROM {table} WHERE s = 0'))
assert rows_for_pk[1] == sorted(cql.execute(f'SELECT pk, c, v FROM {table} WHERE s = 1'))
cql.execute(f"UPDATE {table} SET s = 1 WHERE pk = 0")
assert [] == sorted(cql.execute(f'SELECT pk, c, v FROM {table} WHERE s = 0'))
assert rows_for_pk[0] + rows_for_pk[1] == sorted(cql.execute(f'SELECT pk, c, v FROM {table} WHERE s = 1'))
# Tests operations on tables which have both static column and regular column indexes.
# Checks that one does not interfere with the other.
def test_static_and_regular_index_operations(cql, test_keyspace):
schema = 'pk int, c int, s int STATIC, v int, PRIMARY KEY(pk, c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f'CREATE INDEX ON {table}(s)')
cql.execute(f'CREATE INDEX ON {table}(v)')
cql.execute(f'INSERT INTO {table} (pk, s, c, v) VALUES (0, 0, 0, 0)')
cql.execute(f'INSERT INTO {table} (pk, s, c, v) VALUES (1, 0, 0, 1)')
cql.execute(f'INSERT INTO {table} (pk, s, c, v) VALUES (2, 1, 0, 0)')
cql.execute(f'INSERT INTO {table} (pk, s, c, v) VALUES (3, 1, 0, 1)')
assert [(0,),(1,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 0'))
assert [(2,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 1'))
assert [(0,),(2,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 0'))
assert [(1,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 1'))
cql.execute(f'UPDATE {table} SET s = 1 WHERE pk = 1')
assert [(0,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 0'))
assert [(1,),(2,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 1'))
assert [(0,),(2,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 0'))
assert [(1,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 1'))
cql.execute(f'UPDATE {table} SET v = 0 WHERE pk = 1 AND c = 0')
assert [(0,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 0'))
assert [(1,),(2,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 1'))
assert [(0,),(1,),(2,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 0'))
assert [(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 1'))
# There are separate codepaths for processing static column addition/removal
# in case the static row isn't the last element of the mutation.
# The operations below allow us to test those cases
cql.execute(f'INSERT INTO {table} (pk, c, v) VALUES (4, 0, 4)')
assert [] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 2'))
assert [(4,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 4'))
# Static column is set on a partition which didn't have a static row yet
cql.execute(f'BEGIN BATCH \
UPDATE {table} SET s = 2 WHERE pk = 4; \
UPDATE {table} SET v = 5 WHERE pk = 4 AND c = 0; \
APPLY BATCH')
assert [(4,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 2'))
assert [] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 4'))
assert [(4,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 5'))
# Static column is removed from a partition
# In order to construct the batch, we need the write timestamp
timestamp = list(cql.execute(f'SELECT writetime(v) FROM {table} WHERE pk = 4 AND c = 0'))[0][0]
cql.execute(f'BEGIN BATCH \
DELETE FROM {table} USING TIMESTAMP {timestamp} WHERE pk = 4; \
UPDATE {table} USING TIMESTAMP {timestamp+1} SET v = 6 WHERE pk = 4 AND c = 0; \
APPLY BATCH')
assert [] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s = 2'))
assert [] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 5'))
assert [(4,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE v = 6'))
# Make sure that, when there are multiple static column indexes and only one
# column is modified, only the index relevant to that column is modified.
def test_multiple_static_column_indexes(cql, test_keyspace):
schema = 'pk int, c int, s1 int STATIC, s2 int STATIC, PRIMARY KEY(pk, c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f'CREATE INDEX ON {table}(s1)')
cql.execute(f'CREATE INDEX ON {table}(s2)')
cql.execute(f'INSERT INTO {table} (pk, s1, s2) VALUES (0, 0, 0)')
cql.execute(f'INSERT INTO {table} (pk, s1, s2) VALUES (1, 0, 1)')
cql.execute(f'INSERT INTO {table} (pk, s1, s2) VALUES (2, 1, 0)')
cql.execute(f'INSERT INTO {table} (pk, s1, s2) VALUES (3, 1, 1)')
assert [(0,),(1,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s1 = 0'))
assert [(2,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s1 = 1'))
assert [(0,),(2,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s2 = 0'))
assert [(1,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s2 = 1'))
cql.execute(f'UPDATE {table} SET s1 = 1 WHERE pk = 1')
assert [(0,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s1 = 0'))
assert [(1,),(2,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s1 = 1'))
assert [(0,),(2,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s2 = 0'))
assert [(1,),(3,)] == sorted(cql.execute(f'SELECT pk FROM {table} WHERE s2 = 1'))
# Test that creating a local index on a static column is disallowed.
# Local static indexes are not useful because there is only one value
# of a static column allowed for a given partition.
def test_disallow_local_indexes_on_static_columns(scylla_only, cql, test_keyspace):
schema = 'pk int, c int, s int static, PRIMARY KEY(pk, c)'
with new_test_table(cql, test_keyspace, schema) as table:
with pytest.raises(InvalidRequest, match="Local indexes containing static columns are not supported"):
cql.execute(f'CREATE INDEX ON {table}((pk), s)')

View File

@@ -50,7 +50,7 @@ public:
cql3::statements::select_statement& select_statement() const;
const query::partition_slice& partition_slice() const;
const column_definition* view_column(const schema& base, column_id base_id) const;
const column_definition* view_column(const schema& base, column_kind kind, column_id base_id) const;
const column_definition* view_column(const column_definition& base_def) const;
bool has_base_non_pk_columns_in_view_pk() const;
bool has_computed_column_depending_on_base_non_primary_key() const {