Merge 'mv: delete a partition in a single operation when applicable' from Michael Litvak
Currently when a partition is deleted from the base table, we generate a row tombstone update for each one of the view rows in the partition. When the partition key in the view is the same as the base, maybe in a different order, this can be done more efficiently - The whole corresponding view partition can be deleted with one partition tombstone update. With this commit, when generating view updates, if the update mutation has a partition tombstone then for the views which have the same partition key we will generate a partition tombstone update, and skip the individual row tombstone updates. Fixes scylladb/scylladb#8199 Closes scylladb/scylladb#19338 * github.com:scylladb/scylladb: mv: skip reading rows when generating partition tombstone update mv: delete a partition in a single operation when applicable cql-pytest: move ScyllaMetrics to util file to allow reuse
This commit is contained in:
101
db/view/view.cc
101
db/view/view.cc
@@ -188,6 +188,13 @@ db::view::base_info_ptr view_info::make_base_dependent_view_info(const schema& b
|
||||
std::vector<column_id> base_regular_columns_in_view_pk;
|
||||
std::vector<column_id> base_static_columns_in_view_pk;
|
||||
|
||||
_is_partition_key_permutation_of_base_partition_key =
|
||||
boost::algorithm::all_of(_schema.partition_key_columns(), [&base] (const column_definition& view_col) {
|
||||
const column_definition* base_col = base.get_column_definition(view_col.name());
|
||||
return base_col && base_col->is_partition_key();
|
||||
})
|
||||
&& _schema.partition_key_size() == base.partition_key_size();
|
||||
|
||||
for (auto&& view_col : boost::range::join(_schema.partition_key_columns(), _schema.clustering_key_columns())) {
|
||||
if (view_col.is_computed()) {
|
||||
// we are not going to find it in the base table...
|
||||
@@ -678,7 +685,7 @@ private:
|
||||
|
||||
|
||||
std::vector<view_updates::view_row_entry>
|
||||
view_updates::get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing) {
|
||||
view_updates::get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, row_tombstone row_delete_tomb) {
|
||||
value_getter getter(*_base, base_key, update, existing);
|
||||
auto get_value = boost::adaptors::transformed(std::ref(getter));
|
||||
|
||||
@@ -701,6 +708,14 @@ view_updates::get_view_rows(const partition_key& base_key, const clustering_or_s
|
||||
clustering_key ckey = clustering_key::from_range(boost::adaptors::transform(ck, view_managed_key_view_and_action::get_key_view));
|
||||
auto action = (action_column < pk.size() ? pk[action_column] : ck[action_column - pk.size()])._action;
|
||||
mutation_partition& partition = partition_for(std::move(pkey));
|
||||
|
||||
// Skip adding the row if we already wrote a partition tombstone for this partition, and the update
|
||||
// is deleting the row with an equal row tombstone. This means the entire partition is deleted
|
||||
// so we don't need to generate updates for individual rows.
|
||||
if (partition.partition_tombstone() && partition.partition_tombstone() == row_delete_tomb.tomb()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ret.push_back({&partition.clustered_row(*_view, std::move(ckey)), action});
|
||||
};
|
||||
|
||||
@@ -912,7 +927,7 @@ void view_updates::create_entry(data_dictionary::database db, const partition_ke
|
||||
return;
|
||||
}
|
||||
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt);
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt, {});
|
||||
auto update_marker = compute_row_marker(update);
|
||||
const auto kind = update.column_kind();
|
||||
for (const auto& [r, action]: view_rows) {
|
||||
@@ -940,7 +955,7 @@ void view_updates::delete_old_entry(data_dictionary::database db, const partitio
|
||||
}
|
||||
|
||||
void view_updates::do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||
auto view_rows = get_view_rows(base_key, existing, std::nullopt);
|
||||
auto view_rows = get_view_rows(base_key, existing, std::nullopt, update.tomb());
|
||||
const auto kind = existing.column_kind();
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
const auto& col_ids = existing.is_clustering_row()
|
||||
@@ -1081,7 +1096,7 @@ void view_updates::update_entry(data_dictionary::database db, const partition_ke
|
||||
return;
|
||||
}
|
||||
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt);
|
||||
auto view_rows = get_view_rows(base_key, update, std::nullopt, {});
|
||||
auto update_marker = compute_row_marker(update);
|
||||
const auto kind = update.column_kind();
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
@@ -1103,7 +1118,7 @@ void view_updates::update_entry_for_computed_column(
|
||||
const clustering_or_static_row& update,
|
||||
const std::optional<clustering_or_static_row>& existing,
|
||||
gc_clock::time_point now) {
|
||||
auto view_rows = get_view_rows(base_key, update, existing);
|
||||
auto view_rows = get_view_rows(base_key, update, existing, {});
|
||||
for (const auto& [r, action] : view_rows) {
|
||||
struct visitor {
|
||||
deletable_row* row;
|
||||
@@ -1232,6 +1247,63 @@ void view_updates::generate_update(
|
||||
}
|
||||
}
|
||||
|
||||
bool view_updates::is_partition_key_permutation_of_base_partition_key() const {
|
||||
return _view_info.is_partition_key_permutation_of_base_partition_key();
|
||||
}
|
||||
|
||||
std::optional<partition_key> view_updates::construct_view_partition_key_from_base(const partition_key& base_pk)
|
||||
{
|
||||
// We check that the view partition key is a permutation of the
|
||||
// base partition key. If so, we can construct the corresponding
|
||||
// view partition key from the base key and apply an optimized
|
||||
// partition level update. Otherwise, we return std::nullopt.
|
||||
|
||||
if (!is_partition_key_permutation_of_base_partition_key()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto base_exploded_pk = base_pk.explode();
|
||||
std::vector<bytes> view_exploded_pk(_view->partition_key_size());
|
||||
|
||||
// Construct the view partition key by finding each component
|
||||
// in the base partition key.
|
||||
for (const column_definition& view_cdef : _view->partition_key_columns()) {
|
||||
const column_definition* base_cdef = _base->get_column_definition(view_cdef.name());
|
||||
if (base_cdef && base_cdef->is_partition_key()) {
|
||||
view_exploded_pk[view_cdef.id] = base_exploded_pk[base_cdef->id];
|
||||
} else {
|
||||
// This shouldn't happen because we already checked that all
|
||||
// the view partition key columns appear in the base partition key.
|
||||
on_internal_error(vlogger, format("Unexpected failure to construct view partition update for view {}.{} of {}.{}, ",
|
||||
_view->ks_name(), _view->cf_name(), _base->ks_name(), _base->cf_name()));
|
||||
}
|
||||
}
|
||||
|
||||
partition_key view_pk = partition_key::from_exploded(view_exploded_pk);
|
||||
return view_pk;
|
||||
}
|
||||
|
||||
bool view_updates::generate_partition_tombstone_update(
|
||||
data_dictionary::database db,
|
||||
const partition_key& base_key,
|
||||
tombstone partition_tomb) {
|
||||
|
||||
// Try to construct the view partition key from the base partition key.
|
||||
// This will succeed if the view partition key columns are a permutation
|
||||
// of the base partition key columns. If it fails, we skip the optimization.
|
||||
auto view_key_opt = construct_view_partition_key_from_base(base_key);
|
||||
if (!view_key_opt) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Apply the partition tombstone on the view partition
|
||||
mutation_partition& mp = partition_for(std::move(*view_key_opt));
|
||||
mp.apply(partition_tomb);
|
||||
|
||||
_op_count++;
|
||||
return true;
|
||||
}
|
||||
|
||||
future<> view_update_builder::close() noexcept {
|
||||
return when_all_succeed(_updates.close(), _existings->close()).discard_result();
|
||||
}
|
||||
@@ -1274,10 +1346,21 @@ future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_up
|
||||
}
|
||||
bool do_advance_updates = false;
|
||||
bool do_advance_existings = false;
|
||||
bool is_partition_tombstone_applied_on_all_views = false;
|
||||
if (_update && _update->is_partition_start()) {
|
||||
_key = std::move(std::move(_update)->as_partition_start().key().key());
|
||||
_update_partition_tombstone = _update->as_partition_start().partition_tombstone();
|
||||
do_advance_updates = true;
|
||||
|
||||
if (_update_partition_tombstone) {
|
||||
// For views that have the same partition key as base, generate an update of partition tombstone to delete
|
||||
// the entire partition in one operation, instead of generating an update for each row.
|
||||
is_partition_tombstone_applied_on_all_views = true;
|
||||
for (auto&& v : _view_updates) {
|
||||
bool is_applied = v.is_partition_key_permutation_of_base_partition_key() && v.generate_partition_tombstone_update(_db, _key, _update_partition_tombstone);
|
||||
is_partition_tombstone_applied_on_all_views &= is_applied;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (_existing && _existing->is_partition_start()) {
|
||||
_existing_partition_tombstone = _existing->as_partition_start().partition_tombstone();
|
||||
@@ -1289,7 +1372,13 @@ future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_up
|
||||
co_await advance_existings();
|
||||
}
|
||||
|
||||
while (co_await on_results() == stop_iteration::no) {};
|
||||
// If the partition tombstone update is applied to all the views and there are no other updates, we can skip going over
|
||||
// all the rows trying to generate row updates, because the partition tombstones already cover everything.
|
||||
if (is_partition_tombstone_applied_on_all_views && _update->is_end_of_partition()) {
|
||||
_skip_row_updates = true;
|
||||
}
|
||||
|
||||
while (!_skip_row_updates && co_await on_results() == stop_iteration::no) {};
|
||||
|
||||
utils::chunked_vector<frozen_mutation_and_schema> mutations;
|
||||
for (auto& update : _view_updates) {
|
||||
|
||||
@@ -223,9 +223,14 @@ public:
|
||||
future<> move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations);
|
||||
|
||||
void generate_update(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
|
||||
bool generate_partition_tombstone_update(data_dictionary::database db, const partition_key& base_key, tombstone partition_tomb);
|
||||
|
||||
size_t op_count() const;
|
||||
|
||||
bool is_partition_key_permutation_of_base_partition_key() const;
|
||||
|
||||
std::optional<partition_key> construct_view_partition_key_from_base(const partition_key& base_pk);
|
||||
|
||||
private:
|
||||
mutation_partition& partition_for(partition_key&& key);
|
||||
row_marker compute_row_marker(const clustering_or_static_row& base_row) const;
|
||||
@@ -233,7 +238,7 @@ private:
|
||||
deletable_row* _row;
|
||||
view_key_and_action::action _action;
|
||||
};
|
||||
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing);
|
||||
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, row_tombstone update_tomb);
|
||||
bool can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const;
|
||||
void create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
void delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||
@@ -257,6 +262,7 @@ class view_update_builder {
|
||||
mutation_fragment_v2_opt _existing;
|
||||
gc_clock::time_point _now;
|
||||
partition_key _key = partition_key::make_empty();
|
||||
bool _skip_row_updates = false;
|
||||
public:
|
||||
|
||||
view_update_builder(data_dictionary::database db, const replica::table& base, schema_ptr s,
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
import time
|
||||
import pytest
|
||||
|
||||
from util import new_test_table, unique_name, new_materialized_view
|
||||
from util import new_test_table, unique_name, new_materialized_view, ScyllaMetrics
|
||||
from cassandra.protocol import InvalidRequest, SyntaxException
|
||||
|
||||
import nodetool
|
||||
@@ -992,3 +992,200 @@ def test_base_clustering_prefix_deletion(cql, test_keyspace):
|
||||
# After the deletion, all data should be gone from both base and view
|
||||
assert [] == list(cql.execute(f"SELECT c2 FROM {table}"))
|
||||
assert [] == list(cql.execute(f"SELECT c2 FROM {mv}"))
|
||||
|
||||
# Test deleting an entire base partition, where there is a view with the same or
|
||||
# different partition key. When the view has the same partition key as base,
|
||||
# the partition deletion can be done on the base in one update of partition
|
||||
# tombstone. In other cases, a tombstone is generated for each row that
|
||||
# corresponds to the deleted base partition.
|
||||
def test_base_partition_deletion_with_same_view_partition_key(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, 'p int, c int, v int, primary key (p,c)') as table:
|
||||
# Insert into two base partitions. We will delete one of them
|
||||
v = 42
|
||||
insert = cql.prepare(f'INSERT INTO {table} (p,c,v) VALUES (?,?,{v})')
|
||||
# Create multiple views with different primary keys.
|
||||
# The first view has the same partition key as the base, so the entire partition will be deleted on the view as well.
|
||||
# The other views have different partition key than the base, so they will have individual rows removed.
|
||||
with new_materialized_view(cql, table, '*', 'p,c', 'p is not null and c is not null') as mv1, \
|
||||
new_materialized_view(cql, table, '*', 'c,p', 'p is not null and c is not null') as mv2, \
|
||||
new_materialized_view(cql, table, '*', '(p,c),v', 'p is not null and c is not null and v is not null') as mv3:
|
||||
N = 10
|
||||
for i in range(N):
|
||||
cql.execute(insert, [1, i])
|
||||
cql.execute(insert, [2, i])
|
||||
|
||||
# Before the deletion, all N rows should exist in the base and the view
|
||||
allN = list(range(N))
|
||||
assert allN == [x.c for x in cql.execute(f"SELECT c FROM {table} WHERE p=1")]
|
||||
assert allN == sorted([x.c for x in cql.execute(f"SELECT c FROM {mv1} WHERE p=1")])
|
||||
assert allN == [x.c for x in cql.execute(f"SELECT c FROM {table} WHERE p=2")]
|
||||
assert allN == sorted([x.c for x in cql.execute(f"SELECT c FROM {mv1} WHERE p=2")])
|
||||
for i in range(N):
|
||||
assert [1,2] == sorted([x.p for x in cql.execute(f"SELECT p FROM {mv2} WHERE c={i}")])
|
||||
|
||||
cql.execute(f"DELETE FROM {table} WHERE p=1")
|
||||
|
||||
# After the deletion, all data should be gone from both base and view
|
||||
assert [] == list(cql.execute(f"SELECT c FROM {table} WHERE p=1"))
|
||||
assert [] == list(cql.execute(f"SELECT c FROM {mv1} WHERE p=1"))
|
||||
assert [] == list(cql.execute(f"SELECT c FROM {mv2} WHERE p=1 ALLOW FILTERING"))
|
||||
assert [] == list(cql.execute(f"SELECT c FROM {mv3} WHERE p=1 ALLOW FILTERING"))
|
||||
|
||||
assert allN == [x.c for x in cql.execute(f"SELECT c FROM {table} WHERE p=2")]
|
||||
assert allN == sorted([x.c for x in cql.execute(f"SELECT c FROM {mv1} WHERE p=2")])
|
||||
|
||||
for i in range(N):
|
||||
assert [2] == sorted([x.p for x in cql.execute(f"SELECT p FROM {mv2} WHERE c={i}")])
|
||||
|
||||
for i in range(N):
|
||||
assert [v] == sorted([x.v for x in cql.execute(f"SELECT v FROM {mv3} WHERE p=2 AND c={i}")])
|
||||
|
||||
# The partition key of the view is strictly contained in the base partition key,
|
||||
# so multiple base partitions are combined into one view partition.
|
||||
# Test deleting a base partition and verify it is deleted correctly in the view.
|
||||
def test_base_partition_deletion_with_smaller_view_partition_key(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, 'p1 int, p2 int, c int, primary key ((p1,p2),c)') as table:
|
||||
insert = cql.prepare(f'INSERT INTO {table} (p1,p2,c) VALUES (?,?,?)')
|
||||
with new_materialized_view(cql, table, '*', 'p1,p2,c', 'p1 is not null and p2 is not null and c is not null') as mv:
|
||||
# Insert into two separate base partitions.
|
||||
# In the view, the rows have the same partition key.
|
||||
cql.execute(insert, [0, 0, 10])
|
||||
cql.execute(insert, [0, 1, 20])
|
||||
|
||||
# Delete one of the partitions
|
||||
cql.execute(f"DELETE FROM {table} WHERE p1=0 AND p2=0")
|
||||
|
||||
assert [] == list(cql.execute(f"SELECT c FROM {table} WHERE p1=0 AND p2=0"))
|
||||
assert [20] == [x.c for x in cql.execute(f"SELECT c FROM {table} WHERE p1=0 AND p2=1")]
|
||||
|
||||
assert [(1,20)] == [(x.p2, x.c) for x in cql.execute(f"SELECT p2, c FROM {mv} WHERE p1=0")]
|
||||
|
||||
# Test deleting a large partition when there is a view with the same partition
|
||||
# key, and verify that view updates metrics is increased by exactly 1. Deleting
|
||||
# a partition in this case is expected to generate one view update for deleting
|
||||
# the corresponding view partition by a partition tombstone.
|
||||
# Reproduces #8199
|
||||
@pytest.mark.parametrize("permuted", [False, True])
|
||||
def test_base_partition_deletion_with_metrics(cql, test_keyspace, scylla_only, permuted):
|
||||
with new_test_table(cql, test_keyspace, 'p1 int, p2 int, c int, primary key ((p1,p2),c)') as table:
|
||||
# Insert into one base partition. We will delete the entire partition
|
||||
insert = cql.prepare(f'INSERT INTO {table} (p1,p2,c) VALUES (?,?,?)')
|
||||
# The view partition key is a permutation of the base partition key.
|
||||
with new_materialized_view(cql, table, '*', '(p2,p1),c' if permuted else '(p1,p2),c', 'p1 is not null and p2 is not null and c is not null') as mv:
|
||||
# the metric total_view_updates_pushed_local is incremented by 1 for each 100 row view
|
||||
# updates, because it is collected in batches according to max_rows_for_view_updates.
|
||||
# To verify the behavior, we want the metric to increase by at least 2 without the optimization,
|
||||
# so 101 is the minimum value that works. With the optimization, we expect to have exactly 1 update
|
||||
# for any N.
|
||||
N = 101
|
||||
|
||||
# all operations are on this single partition
|
||||
p1, p2 = 1, 10
|
||||
where_clause_table = f"WHERE p1={p1} AND p2={p2}"
|
||||
where_clause_mv = f"WHERE p2={p2} AND p1={p1}" if permuted else where_clause_table
|
||||
|
||||
for i in range(N):
|
||||
cql.execute(insert, [p1, p2, i])
|
||||
|
||||
# Before the deletion, all N rows should exist in the base and the view
|
||||
allN = list(range(N))
|
||||
assert allN == [x.c for x in cql.execute(f"SELECT c FROM {table} {where_clause_table}")]
|
||||
assert allN == sorted([x.c for x in cql.execute(f"SELECT c FROM {mv} {where_clause_mv}")])
|
||||
|
||||
metrics_before = ScyllaMetrics.query(cql)
|
||||
updates_before = metrics_before.get('scylla_database_total_view_updates_pushed_local')
|
||||
|
||||
cql.execute(f"DELETE FROM {table} {where_clause_table}")
|
||||
|
||||
# After the deletion, all data should be gone from both base and view
|
||||
assert [] == list(cql.execute(f"SELECT c FROM {table} {where_clause_table}"))
|
||||
assert [] == list(cql.execute(f"SELECT c FROM {mv} {where_clause_mv}"))
|
||||
|
||||
metrics_after = ScyllaMetrics.query(cql)
|
||||
updates_after = metrics_after.get('scylla_database_total_view_updates_pushed_local')
|
||||
|
||||
print(f"scylla_database_total_view_updates_pushed_local: {updates_before} -> {updates_after}")
|
||||
assert updates_after == updates_before + 1
|
||||
|
||||
# Perform a batch operation, deleting a partition and also inserting a row
|
||||
# to that partition with a newer timestamp, and verify that the insertion
|
||||
# is not lost in the MV update.
|
||||
def test_base_partition_deletion_in_batch_with_insert(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, 'p int, c int, primary key (p,c)') as table:
|
||||
insert = cql.prepare(f'INSERT INTO {table} (p,c) VALUES (?,?) USING TIMESTAMP 99')
|
||||
with new_materialized_view(cql, table, '*', 'p,c', 'p is not null and c is not null') as mv:
|
||||
cql.execute(insert, [0, 1])
|
||||
cql.execute(insert, [0, 2])
|
||||
cql.execute(insert, [0, 3])
|
||||
|
||||
# This should delete all the existing partition rows, and the new
|
||||
# row insertion survives and remains the only row after the operation
|
||||
# since it has the most recent timestamp.
|
||||
cmd = 'BEGIN UNLOGGED BATCH '
|
||||
cmd += f'INSERT INTO {table} (p,c) VALUES (0,4) USING TIMESTAMP 98; '
|
||||
cmd += f'DELETE FROM {table} USING TIMESTAMP 100 WHERE p=0; '
|
||||
cmd += f'INSERT INTO {table} (p,c) VALUES (0,5) USING TIMESTAMP 101; '
|
||||
cmd += 'APPLY BATCH;'
|
||||
cql.execute(cmd)
|
||||
|
||||
# Verify it is correct both in the table and the view
|
||||
assert [5] == [x.c for x in cql.execute(f"SELECT c FROM {table} WHERE p=0")]
|
||||
assert [5] == [x.c for x in cql.execute(f"SELECT c FROM {mv} WHERE p=0")]
|
||||
|
||||
# Similar to the test above, perform a deletion of a base partition in a batch with
|
||||
# deletion of individual rows. Verify the partition is deleted correctly and that
|
||||
# a single update is generated for the view for deleting the whole partition, and no
|
||||
# view updates for each row.
|
||||
def test_base_partition_deletion_in_batch_with_delete_row_with_metrics(cql, test_keyspace, scylla_only):
|
||||
with new_test_table(cql, test_keyspace, 'p int, c int, v int, primary key ((p,c),v)') as table:
|
||||
insert = cql.prepare(f'INSERT INTO {table} (p,c,v) VALUES (?,?,?)')
|
||||
# The view partition key is the same as the base partition key.
|
||||
with new_materialized_view(cql, table, '*', '(p,c),v', 'p is not null and c is not null and v is not null') as mv:
|
||||
N = 101 # See comment above
|
||||
for i in range(N):
|
||||
cql.execute(insert, [1, 10, i])
|
||||
|
||||
# Before the deletion, all N rows should exist in the base and the view
|
||||
allN = list(range(N))
|
||||
assert allN == [x.v for x in cql.execute(f"SELECT v FROM {table} WHERE p=1 AND c=10")]
|
||||
assert allN == sorted([x.v for x in cql.execute(f"SELECT v FROM {mv} WHERE p=1 AND c=10")])
|
||||
|
||||
metrics_before = ScyllaMetrics.query(cql)
|
||||
updates_before = metrics_before.get('scylla_database_total_view_updates_pushed_local')
|
||||
|
||||
# The batch deletes the entire partition and also, redundantly, deleting individual rows in the partition.
|
||||
# We expect the view update to contain only a single update for deleting the partition.
|
||||
cmd = 'BEGIN UNLOGGED BATCH '
|
||||
for i in range(100,500):
|
||||
cmd += f'DELETE FROM {table} WHERE p=1 AND c=10 AND v={i}; '
|
||||
cmd += f'DELETE FROM {table} WHERE p=1 AND c=10; '
|
||||
cmd += 'APPLY BATCH;'
|
||||
cql.execute(cmd)
|
||||
|
||||
# Verify the partition is deleted
|
||||
assert [] == list(cql.execute(f"SELECT v FROM {table} WHERE p=1 AND c=10"))
|
||||
assert [] == list(cql.execute(f"SELECT v FROM {mv} WHERE p=1 AND c=10"))
|
||||
|
||||
# Verify there is a single view update
|
||||
metrics_after = ScyllaMetrics.query(cql)
|
||||
updates_after = metrics_after.get('scylla_database_total_view_updates_pushed_local')
|
||||
|
||||
print(f"scylla_database_total_view_updates_pushed_local: {updates_before} -> {updates_after}")
|
||||
assert updates_after == updates_before + 1
|
||||
|
||||
# Delete a base partition using a timestamp lower than some of the rows
|
||||
# in the partition. Verify it doesn't result new delete in the base
|
||||
# and the view partition.
|
||||
def test_base_partition_deletion_with_low_timestamp(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, 'p int, c int, primary key (p,c)') as table:
|
||||
with new_materialized_view(cql, table, '*', 'p,c', 'p is not null and c is not null') as mv:
|
||||
cql.execute(f'INSERT INTO {table} (p,c) VALUES (0,1) USING TIMESTAMP 99')
|
||||
cql.execute(f'INSERT INTO {table} (p,c) VALUES (0,2) USING TIMESTAMP 101')
|
||||
|
||||
# Delete the partition with a timestamp which is older than some of
|
||||
# the rows and newer than other rows.
|
||||
cql.execute(f'DELETE FROM {table} USING TIMESTAMP 100 WHERE p=0')
|
||||
|
||||
# Verify we get only the row with the newer timestamp
|
||||
assert [2] == [x.c for x in cql.execute(f"SELECT c FROM {table} WHERE p=0")]
|
||||
assert [2] == [x.c for x in cql.execute(f"SELECT c FROM {mv} WHERE p=0")]
|
||||
|
||||
@@ -9,8 +9,7 @@
|
||||
import pytest
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from cassandra.protocol import InvalidRequest
|
||||
from util import unique_name, new_cql
|
||||
import requests
|
||||
from util import unique_name, new_cql, ScyllaMetrics
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
@@ -34,40 +33,6 @@ def disable_compression():
|
||||
finally:
|
||||
cassandra.connection.locally_supported_compressions = saved
|
||||
|
||||
|
||||
class ScyllaMetrics:
|
||||
def __init__(self, lines):
|
||||
self._lines = lines
|
||||
@staticmethod
|
||||
def query(cql):
|
||||
url = f'http://{cql.cluster.contact_points[0]}:9180/metrics'
|
||||
return ScyllaMetrics(requests.get(url).text.split('\n'))
|
||||
def get(self, name, labels = None, shard='total'):
|
||||
result = None
|
||||
for l in self._lines:
|
||||
if not l.startswith(name):
|
||||
continue
|
||||
labels_start = l.find('{')
|
||||
labels_finish = l.find('}')
|
||||
if labels_start == -1 or labels_finish == -1:
|
||||
raise ValueError(f'invalid metric format [{l}]')
|
||||
def match_kv(kv):
|
||||
key, val = kv.split('=')
|
||||
val = val.strip('"')
|
||||
return shard == 'total' or val == shard if key == 'shard' \
|
||||
else labels is None or labels.get(key, None) == val
|
||||
match = all(match_kv(kv) for kv in l[labels_start + 1:labels_finish].split(','))
|
||||
if match:
|
||||
value = float(l[labels_finish + 2:])
|
||||
if result is None:
|
||||
result = value
|
||||
else:
|
||||
result += value
|
||||
if shard != 'total':
|
||||
break
|
||||
return result
|
||||
|
||||
|
||||
# When a too large request comes, it should be rejected in full.
|
||||
# That means that first of all a client receives an error after sending
|
||||
# such a request, but also that following correct requests can be successfully
|
||||
|
||||
@@ -11,6 +11,7 @@ import random
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import requests
|
||||
import collections
|
||||
import ssl
|
||||
from contextlib import contextmanager
|
||||
@@ -336,3 +337,35 @@ class config_value_context:
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
self._cql.execute(self._update, (self._original_value, self._key))
|
||||
|
||||
class ScyllaMetrics:
|
||||
def __init__(self, lines):
|
||||
self._lines = lines
|
||||
@staticmethod
|
||||
def query(cql):
|
||||
url = f'http://{cql.cluster.contact_points[0]}:9180/metrics'
|
||||
return ScyllaMetrics(requests.get(url).text.split('\n'))
|
||||
def get(self, name, labels = None, shard='total'):
|
||||
result = None
|
||||
for l in self._lines:
|
||||
if not l.startswith(name):
|
||||
continue
|
||||
labels_start = l.find('{')
|
||||
labels_finish = l.find('}')
|
||||
if labels_start == -1 or labels_finish == -1:
|
||||
raise ValueError(f'invalid metric format [{l}]')
|
||||
def match_kv(kv):
|
||||
key, val = kv.split('=')
|
||||
val = val.strip('"')
|
||||
return shard == 'total' or val == shard if key == 'shard' \
|
||||
else labels is None or labels.get(key, None) == val
|
||||
match = all(match_kv(kv) for kv in l[labels_start + 1:labels_finish].split(','))
|
||||
if match:
|
||||
value = float(l[labels_finish + 2:])
|
||||
if result is None:
|
||||
result = value
|
||||
else:
|
||||
result += value
|
||||
if shard != 'total':
|
||||
break
|
||||
return result
|
||||
|
||||
@@ -24,6 +24,10 @@ class view_info final {
|
||||
mutable std::optional<query::partition_slice> _partition_slice;
|
||||
db::view::base_info_ptr _base_info;
|
||||
mutable bool _has_computed_column_depending_on_base_non_primary_key;
|
||||
|
||||
// True if the partition key columns of the view are the same as the
|
||||
// partition key columns of the base, maybe in a different order.
|
||||
mutable bool _is_partition_key_permutation_of_base_partition_key;
|
||||
public:
|
||||
view_info(const schema& schema, const raw_view_info& raw_view_info);
|
||||
|
||||
@@ -56,6 +60,10 @@ public:
|
||||
return _has_computed_column_depending_on_base_non_primary_key;
|
||||
}
|
||||
|
||||
bool is_partition_key_permutation_of_base_partition_key() const {
|
||||
return _is_partition_key_permutation_of_base_partition_key;
|
||||
}
|
||||
|
||||
/// Returns a pointer to the base_dependent_view_info which matches the current
|
||||
/// schema of the base table.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user