Merge 'Remove global proxy usage from view_info::select_statement()' from Pavel Emelyanov
The method needs proxy to get data_dictionary::database from to pass down to select_statement::prepare(). And a legacy bit that can come with data_dictionary::database as well. Fortunately, all the call traces that end up at select_statement() start inside table:: methods that have view_update_generator, or at view_builder::consumer that has reference to view_builder. Both services can share the database reference. However, the call traces in question pass through several code layers, so the PR adds data_dictionary::database to those layers one by one. Closes #13591 * github.com:scylladb/scylladb: view_info: Drop calls to get_local_storage_proxy() view_info: Add data_dictionary argument to select_statement() view_info: Add data_dictionary argument to partition_slice() method view_filter_checking_visitor: Construct with data_dictionary view: Carry data_dictionary arg through standalone helpers view_updates: Carry data_dictionary argument throug methods view_update_builder: Construct with data dictionary table: Push view_update_generator arg to affected_views() view: Add database getters to v._update_generator and v._builder
This commit is contained in:
@@ -79,13 +79,13 @@ view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
|
|||||||
, _has_computed_column_depending_on_base_non_primary_key(false)
|
, _has_computed_column_depending_on_base_non_primary_key(false)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
cql3::statements::select_statement& view_info::select_statement() const {
|
cql3::statements::select_statement& view_info::select_statement(data_dictionary::database db) const {
|
||||||
if (!_select_statement) {
|
if (!_select_statement) {
|
||||||
std::unique_ptr<cql3::statements::raw::select_statement> raw;
|
std::unique_ptr<cql3::statements::raw::select_statement> raw;
|
||||||
// FIXME(sarna): legacy code, should be removed after "computed_columns" feature is guaranteed
|
// FIXME(sarna): legacy code, should be removed after "computed_columns" feature is guaranteed
|
||||||
// to be available on every node. Then, we won't need to check if this view is backing a secondary index.
|
// to be available on every node. Then, we won't need to check if this view is backing a secondary index.
|
||||||
const column_definition* legacy_token_column = nullptr;
|
const column_definition* legacy_token_column = nullptr;
|
||||||
if (service::get_local_storage_proxy().local_db().find_column_family(base_id()).get_index_manager().is_global_index(_schema)) {
|
if (db.find_column_family(base_id()).get_index_manager().is_global_index(_schema)) {
|
||||||
if (!_schema.clustering_key_columns().empty()) {
|
if (!_schema.clustering_key_columns().empty()) {
|
||||||
legacy_token_column = &_schema.clustering_key_columns().front();
|
legacy_token_column = &_schema.clustering_key_columns().front();
|
||||||
}
|
}
|
||||||
@@ -103,15 +103,15 @@ cql3::statements::select_statement& view_info::select_statement() const {
|
|||||||
raw->prepare_keyspace(_schema.ks_name());
|
raw->prepare_keyspace(_schema.ks_name());
|
||||||
raw->set_bound_variables({});
|
raw->set_bound_variables({});
|
||||||
cql3::cql_stats ignored;
|
cql3::cql_stats ignored;
|
||||||
auto prepared = raw->prepare(service::get_local_storage_proxy().data_dictionary(), ignored, true);
|
auto prepared = raw->prepare(db, ignored, true);
|
||||||
_select_statement = static_pointer_cast<cql3::statements::select_statement>(prepared->statement);
|
_select_statement = static_pointer_cast<cql3::statements::select_statement>(prepared->statement);
|
||||||
}
|
}
|
||||||
return *_select_statement;
|
return *_select_statement;
|
||||||
}
|
}
|
||||||
|
|
||||||
const query::partition_slice& view_info::partition_slice() const {
|
const query::partition_slice& view_info::partition_slice(data_dictionary::database db) const {
|
||||||
if (!_partition_slice) {
|
if (!_partition_slice) {
|
||||||
_partition_slice = select_statement().make_partition_slice(cql3::query_options({ }));
|
_partition_slice = select_statement(db).make_partition_slice(cql3::query_options({ }));
|
||||||
}
|
}
|
||||||
return *_partition_slice;
|
return *_partition_slice;
|
||||||
}
|
}
|
||||||
@@ -273,8 +273,8 @@ void stats::register_stats() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key) {
|
bool partition_key_matches(data_dictionary::database db, const schema& base, const view_info& view, const dht::decorated_key& key) {
|
||||||
const cql3::expr::expression& pk_restrictions = view.select_statement().get_restrictions()->get_partition_key_restrictions();
|
const cql3::expr::expression& pk_restrictions = view.select_statement(db).get_restrictions()->get_partition_key_restrictions();
|
||||||
std::vector<bytes> exploded_pk = key.key().explode();
|
std::vector<bytes> exploded_pk = key.key().explode();
|
||||||
std::vector<bytes> exploded_ck;
|
std::vector<bytes> exploded_ck;
|
||||||
std::vector<const column_definition*> pk_columns;
|
std::vector<const column_definition*> pk_columns;
|
||||||
@@ -298,8 +298,8 @@ bool partition_key_matches(const schema& base, const view_info& view, const dht:
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bool clustering_prefix_matches(const schema& base, const view_info& view, const partition_key& key, const clustering_key_prefix& ck) {
|
bool clustering_prefix_matches(data_dictionary::database db, const schema& base, const view_info& view, const partition_key& key, const clustering_key_prefix& ck) {
|
||||||
const cql3::expr::expression& r = view.select_statement().get_restrictions()->get_clustering_columns_restrictions();
|
const cql3::expr::expression& r = view.select_statement(db).get_restrictions()->get_clustering_columns_restrictions();
|
||||||
std::vector<bytes> exploded_pk = key.explode();
|
std::vector<bytes> exploded_pk = key.explode();
|
||||||
std::vector<bytes> exploded_ck = ck.explode();
|
std::vector<bytes> exploded_ck = ck.explode();
|
||||||
std::vector<const column_definition*> ck_columns;
|
std::vector<const column_definition*> ck_columns;
|
||||||
@@ -322,21 +322,21 @@ bool clustering_prefix_matches(const schema& base, const view_info& view, const
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bool may_be_affected_by(const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update) {
|
bool may_be_affected_by(data_dictionary::database db, const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update) {
|
||||||
// We can guarantee that the view won't be affected if:
|
// We can guarantee that the view won't be affected if:
|
||||||
// - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns:
|
// - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns:
|
||||||
// even if an update don't match a view condition on a regular column, that update can still invalidate a
|
// even if an update don't match a view condition on a regular column, that update can still invalidate a
|
||||||
// pre-existing entry) - note that the upper layers should already have checked the partition key;
|
// pre-existing entry) - note that the upper layers should already have checked the partition key;
|
||||||
return clustering_prefix_matches(base, view, key.key(), update.key());
|
return clustering_prefix_matches(db, base, view, key.key(), update.key());
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool update_requires_read_before_write(const schema& base,
|
static bool update_requires_read_before_write(data_dictionary::database db, const schema& base,
|
||||||
const std::vector<view_and_base>& views,
|
const std::vector<view_and_base>& views,
|
||||||
const dht::decorated_key& key,
|
const dht::decorated_key& key,
|
||||||
const rows_entry& update) {
|
const rows_entry& update) {
|
||||||
for (auto&& v : views) {
|
for (auto&& v : views) {
|
||||||
view_info& vf = *v.view->view_info();
|
view_info& vf = *v.view->view_info();
|
||||||
if (may_be_affected_by(base, vf, key, update)) {
|
if (may_be_affected_by(db, base, vf, key, update)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -346,6 +346,7 @@ static bool update_requires_read_before_write(const schema& base,
|
|||||||
// Checks if the result matches the provided view filter.
|
// Checks if the result matches the provided view filter.
|
||||||
// It's currently assumed that the result consists of just a single row.
|
// It's currently assumed that the result consists of just a single row.
|
||||||
class view_filter_checking_visitor {
|
class view_filter_checking_visitor {
|
||||||
|
data_dictionary::database _db;
|
||||||
const schema& _base;
|
const schema& _base;
|
||||||
const view_info& _view;
|
const view_info& _view;
|
||||||
::shared_ptr<cql3::selection::selection> _selection;
|
::shared_ptr<cql3::selection::selection> _selection;
|
||||||
@@ -353,8 +354,9 @@ class view_filter_checking_visitor {
|
|||||||
|
|
||||||
bool _matches_view_filter = true;
|
bool _matches_view_filter = true;
|
||||||
public:
|
public:
|
||||||
view_filter_checking_visitor(const schema& base, const view_info& view)
|
view_filter_checking_visitor(data_dictionary::database db, const schema& base, const view_info& view)
|
||||||
: _base(base)
|
: _db(std::move(db))
|
||||||
|
, _base(base)
|
||||||
, _view(view)
|
, _view(view)
|
||||||
, _selection(cql3::selection::selection::for_columns(_base.shared_from_this(),
|
, _selection(cql3::selection::selection::for_columns(_base.shared_from_this(),
|
||||||
boost::copy_range<std::vector<const column_definition*>>(
|
boost::copy_range<std::vector<const column_definition*>>(
|
||||||
@@ -382,7 +384,7 @@ public:
|
|||||||
bool check_if_matches(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) const {
|
bool check_if_matches(const clustering_key& key, const query::result_row_view& static_row, const query::result_row_view& row) const {
|
||||||
std::vector<bytes> ck = key.explode();
|
std::vector<bytes> ck = key.explode();
|
||||||
return boost::algorithm::all_of(
|
return boost::algorithm::all_of(
|
||||||
_view.select_statement().get_restrictions()->get_non_pk_restriction() | boost::adaptors::map_values,
|
_view.select_statement(_db).get_restrictions()->get_non_pk_restriction() | boost::adaptors::map_values,
|
||||||
[&] (auto&& r) {
|
[&] (auto&& r) {
|
||||||
// FIXME: move outside all_of(). However, crashes.
|
// FIXME: move outside all_of(). However, crashes.
|
||||||
auto static_and_regular_columns = cql3::expr::get_non_pk_values(*_selection, static_row, &row);
|
auto static_and_regular_columns = cql3::expr::get_non_pk_values(*_selection, static_row, &row);
|
||||||
@@ -447,7 +449,7 @@ public:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now) {
|
bool matches_view_filter(data_dictionary::database db, const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||||
// TODO: Filtering is only supported in materialized views which don't support
|
// TODO: Filtering is only supported in materialized views which don't support
|
||||||
// static rows yet. Skip the whole function if it is a static row update.
|
// static rows yet. Skip the whole function if it is a static row update.
|
||||||
if (update.is_static_row()) {
|
if (update.is_static_row()) {
|
||||||
@@ -461,10 +463,10 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit
|
|||||||
builder.consume(clustering_row(base, update.as_clustering_row(base)), row_tombstone{}, update.is_live(base, tombstone(), now));
|
builder.consume(clustering_row(base, update.as_clustering_row(base)), row_tombstone{}, update.is_live(base, tombstone(), now));
|
||||||
builder.consume_end_of_partition();
|
builder.consume_end_of_partition();
|
||||||
auto result = builder.consume_end_of_stream();
|
auto result = builder.consume_end_of_stream();
|
||||||
view_filter_checking_visitor visitor(base, view);
|
view_filter_checking_visitor visitor(db, base, view);
|
||||||
query::result_view::consume(result, slice, visitor);
|
query::result_view::consume(result, slice, visitor);
|
||||||
|
|
||||||
return clustering_prefix_matches(base, view, key, *update.key())
|
return clustering_prefix_matches(db, base, view, key, *update.key())
|
||||||
&& visitor.matches_view_filter();
|
&& visitor.matches_view_filter();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -876,8 +878,8 @@ static void add_cells_to_view(const schema& base, const schema& view, column_kin
|
|||||||
* Creates a view entry corresponding to the provided base row.
|
* Creates a view entry corresponding to the provided base row.
|
||||||
* This method checks that the base row does match the view filter before applying anything.
|
* This method checks that the base row does match the view filter before applying anything.
|
||||||
*/
|
*/
|
||||||
void view_updates::create_entry(const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now) {
|
void view_updates::create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||||
if (!matches_view_filter(*_base, _view_info, base_key, update, now)) {
|
if (!matches_view_filter(db, *_base, _view_info, base_key, update, now)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -900,10 +902,10 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
|
|||||||
* Deletes the view entry corresponding to the provided base row.
|
* Deletes the view entry corresponding to the provided base row.
|
||||||
* This method checks that the base row does match the view filter before bothering.
|
* This method checks that the base row does match the view filter before bothering.
|
||||||
*/
|
*/
|
||||||
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
|
void view_updates::delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now) {
|
||||||
// Before deleting an old entry, make sure it was matching the view filter
|
// Before deleting an old entry, make sure it was matching the view filter
|
||||||
// (otherwise there is nothing to delete)
|
// (otherwise there is nothing to delete)
|
||||||
if (matches_view_filter(*_base, _view_info, base_key, existing, now)) {
|
if (matches_view_filter(db, *_base, _view_info, base_key, existing, now)) {
|
||||||
do_delete_old_entry(base_key, existing, update, now);
|
do_delete_old_entry(base_key, existing, update, now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1020,14 +1022,14 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update,
|
|||||||
* This method checks that the base row (before and after) matches the view filter before
|
* This method checks that the base row (before and after) matches the view filter before
|
||||||
* applying anything.
|
* applying anything.
|
||||||
*/
|
*/
|
||||||
void view_updates::update_entry(const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now) {
|
void view_updates::update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now) {
|
||||||
// While we know update and existing correspond to the same view entry,
|
// While we know update and existing correspond to the same view entry,
|
||||||
// they may not match the view filter.
|
// they may not match the view filter.
|
||||||
if (!matches_view_filter(*_base, _view_info, base_key, existing, now)) {
|
if (!matches_view_filter(db, *_base, _view_info, base_key, existing, now)) {
|
||||||
create_entry(base_key, update, now);
|
create_entry(db, base_key, update, now);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!matches_view_filter(*_base, _view_info, base_key, update, now)) {
|
if (!matches_view_filter(db, *_base, _view_info, base_key, update, now)) {
|
||||||
do_delete_old_entry(base_key, existing, update, now);
|
do_delete_old_entry(base_key, existing, update, now);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -1076,6 +1078,7 @@ void view_updates::update_entry_for_computed_column(
|
|||||||
}
|
}
|
||||||
|
|
||||||
void view_updates::generate_update(
|
void view_updates::generate_update(
|
||||||
|
data_dictionary::database db,
|
||||||
const partition_key& base_key,
|
const partition_key& base_key,
|
||||||
const clustering_or_static_row& update,
|
const clustering_or_static_row& update,
|
||||||
const std::optional<clustering_or_static_row>& existing,
|
const std::optional<clustering_or_static_row>& existing,
|
||||||
@@ -1105,12 +1108,12 @@ void view_updates::generate_update(
|
|||||||
// The view key is necessarily the same pre and post update.
|
// The view key is necessarily the same pre and post update.
|
||||||
if (existing && existing->is_live(*_base)) {
|
if (existing && existing->is_live(*_base)) {
|
||||||
if (update.is_live(*_base)) {
|
if (update.is_live(*_base)) {
|
||||||
update_entry(base_key, update, *existing, now);
|
update_entry(db, base_key, update, *existing, now);
|
||||||
} else {
|
} else {
|
||||||
delete_old_entry(base_key, *existing, update, now);
|
delete_old_entry(db, base_key, *existing, update, now);
|
||||||
}
|
}
|
||||||
} else if (update.is_live(*_base)) {
|
} else if (update.is_live(*_base)) {
|
||||||
create_entry(base_key, update, now);
|
create_entry(db, base_key, update, now);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -1168,21 +1171,21 @@ void view_updates::generate_update(
|
|||||||
if (has_old_row) {
|
if (has_old_row) {
|
||||||
if (has_new_row) {
|
if (has_new_row) {
|
||||||
if (same_row) {
|
if (same_row) {
|
||||||
update_entry(base_key, update, *existing, now);
|
update_entry(db, base_key, update, *existing, now);
|
||||||
} else {
|
} else {
|
||||||
// This code doesn't work if the old and new view row have the
|
// This code doesn't work if the old and new view row have the
|
||||||
// same key, because if they do we get both data and tombstone
|
// same key, because if they do we get both data and tombstone
|
||||||
// for the same timestamp (now) and the tombstone wins. This
|
// for the same timestamp (now) and the tombstone wins. This
|
||||||
// is why we need the "same_row" case above - it's not just a
|
// is why we need the "same_row" case above - it's not just a
|
||||||
// performance optimization.
|
// performance optimization.
|
||||||
delete_old_entry(base_key, *existing, update, now);
|
delete_old_entry(db, base_key, *existing, update, now);
|
||||||
create_entry(base_key, update, now);
|
create_entry(db, base_key, update, now);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
delete_old_entry(base_key, *existing, update, now);
|
delete_old_entry(db, base_key, *existing, update, now);
|
||||||
}
|
}
|
||||||
} else if (has_new_row) {
|
} else if (has_new_row) {
|
||||||
create_entry(base_key, update, now);
|
create_entry(db, base_key, update, now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1277,7 +1280,7 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
|
|||||||
? std::make_optional<clustering_or_static_row>(std::move(*existing))
|
? std::make_optional<clustering_or_static_row>(std::move(*existing))
|
||||||
: std::optional<clustering_or_static_row>();
|
: std::optional<clustering_or_static_row>();
|
||||||
for (auto&& v : _view_updates) {
|
for (auto&& v : _view_updates) {
|
||||||
v.generate_update(_key, update_row, existing_row, _now);
|
v.generate_update(_db, _key, update_row, existing_row, _now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1304,7 +1307,7 @@ void view_update_builder::generate_update(static_row&& update, const tombstone&
|
|||||||
? std::make_optional<clustering_or_static_row>(std::move(*existing))
|
? std::make_optional<clustering_or_static_row>(std::move(*existing))
|
||||||
: std::optional<clustering_or_static_row>();
|
: std::optional<clustering_or_static_row>();
|
||||||
for (auto&& v : _view_updates) {
|
for (auto&& v : _view_updates) {
|
||||||
v.generate_update(_key, update_row, existing_row, _now);
|
v.generate_update(_db, _key, update_row, existing_row, _now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1439,6 +1442,7 @@ future<stop_iteration> view_update_builder::on_results() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
view_update_builder make_view_update_builder(
|
view_update_builder make_view_update_builder(
|
||||||
|
data_dictionary::database db,
|
||||||
const replica::table& base_table,
|
const replica::table& base_table,
|
||||||
const schema_ptr& base,
|
const schema_ptr& base,
|
||||||
std::vector<view_and_base>&& views_to_update,
|
std::vector<view_and_base>&& views_to_update,
|
||||||
@@ -1454,10 +1458,11 @@ view_update_builder make_view_update_builder(
|
|||||||
bool is_index = base_table.get_index_manager().is_index(v.view);
|
bool is_index = base_table.get_index_manager().is_index(v.view);
|
||||||
return view_updates(std::move(v), is_index);
|
return view_updates(std::move(v), is_index);
|
||||||
}));
|
}));
|
||||||
return view_update_builder(base_table, base, std::move(vs), std::move(updates), std::move(existings), now);
|
return view_update_builder(std::move(db), base_table, base, std::move(vs), std::move(updates), std::move(existings), now);
|
||||||
}
|
}
|
||||||
|
|
||||||
future<query::clustering_row_ranges> calculate_affected_clustering_ranges(const schema& base,
|
future<query::clustering_row_ranges> calculate_affected_clustering_ranges(data_dictionary::database db,
|
||||||
|
const schema& base,
|
||||||
const dht::decorated_key& key,
|
const dht::decorated_key& key,
|
||||||
const mutation_partition& mp,
|
const mutation_partition& mp,
|
||||||
const std::vector<view_and_base>& views) {
|
const std::vector<view_and_base>& views) {
|
||||||
@@ -1467,11 +1472,11 @@ future<query::clustering_row_ranges> calculate_affected_clustering_ranges(const
|
|||||||
if (mp.partition_tombstone() || !mp.row_tombstones().empty()) {
|
if (mp.partition_tombstone() || !mp.row_tombstones().empty()) {
|
||||||
for (auto&& v : views) {
|
for (auto&& v : views) {
|
||||||
// FIXME: #2371
|
// FIXME: #2371
|
||||||
if (v.view->view_info()->select_statement().get_restrictions()->has_unrestricted_clustering_columns()) {
|
if (v.view->view_info()->select_statement(db).get_restrictions()->has_unrestricted_clustering_columns()) {
|
||||||
view_row_ranges.push_back(nonwrapping_range<clustering_key_prefix_view>::make_open_ended_both_sides());
|
view_row_ranges.push_back(nonwrapping_range<clustering_key_prefix_view>::make_open_ended_both_sides());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
for (auto&& r : v.view->view_info()->partition_slice().default_row_ranges()) {
|
for (auto&& r : v.view->view_info()->partition_slice(db).default_row_ranges()) {
|
||||||
view_row_ranges.push_back(r.transform(std::mem_fn(&clustering_key_prefix::view)));
|
view_row_ranges.push_back(r.transform(std::mem_fn(&clustering_key_prefix::view)));
|
||||||
co_await coroutine::maybe_yield();
|
co_await coroutine::maybe_yield();
|
||||||
}
|
}
|
||||||
@@ -1496,7 +1501,7 @@ future<query::clustering_row_ranges> calculate_affected_clustering_ranges(const
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (auto&& row : mp.clustered_rows()) {
|
for (auto&& row : mp.clustered_rows()) {
|
||||||
if (update_requires_read_before_write(base, views, key, row)) {
|
if (update_requires_read_before_write(db, base, views, key, row)) {
|
||||||
row_ranges.emplace_back(row.key());
|
row_ranges.emplace_back(row.key());
|
||||||
}
|
}
|
||||||
co_await coroutine::maybe_yield();
|
co_await coroutine::maybe_yield();
|
||||||
@@ -2304,7 +2309,7 @@ public:
|
|||||||
inject_failure("view_builder_load_views");
|
inject_failure("view_builder_load_views");
|
||||||
for (auto&& vs : _step.build_status) {
|
for (auto&& vs : _step.build_status) {
|
||||||
if (_step.current_token() >= vs.next_token) {
|
if (_step.current_token() >= vs.next_token) {
|
||||||
if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key)) {
|
if (partition_key_matches(_builder.get_db().as_data_dictionary(), *_step.reader.schema(), *vs.view->view_info(), _step.current_key)) {
|
||||||
_views_to_build.push_back(vs.view);
|
_views_to_build.push_back(vs.view);
|
||||||
}
|
}
|
||||||
if (vs.next_token || _step.current_token() != vs.first_token) {
|
if (vs.next_token || _step.current_token() != vs.first_token) {
|
||||||
|
|||||||
@@ -14,6 +14,7 @@
|
|||||||
#include "schema/schema_fwd.hh"
|
#include "schema/schema_fwd.hh"
|
||||||
#include "readers/flat_mutation_reader_v2.hh"
|
#include "readers/flat_mutation_reader_v2.hh"
|
||||||
#include "mutation/frozen_mutation.hh"
|
#include "mutation/frozen_mutation.hh"
|
||||||
|
#include "data_dictionary/data_dictionary.hh"
|
||||||
|
|
||||||
class frozen_mutation_and_schema;
|
class frozen_mutation_and_schema;
|
||||||
|
|
||||||
@@ -124,7 +125,7 @@ public:
|
|||||||
* @return false if we can guarantee that inserting an update for specified key
|
* @return false if we can guarantee that inserting an update for specified key
|
||||||
* won't affect the view in any way, true otherwise.
|
* won't affect the view in any way, true otherwise.
|
||||||
*/
|
*/
|
||||||
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key);
|
bool partition_key_matches(data_dictionary::database, const schema& base, const view_info& view, const dht::decorated_key& key);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether the view might be affected by the provided update.
|
* Whether the view might be affected by the provided update.
|
||||||
@@ -139,7 +140,7 @@ bool partition_key_matches(const schema& base, const view_info& view, const dht:
|
|||||||
* @return false if we can guarantee that inserting update for key
|
* @return false if we can guarantee that inserting update for key
|
||||||
* won't affect the view in any way, true otherwise.
|
* won't affect the view in any way, true otherwise.
|
||||||
*/
|
*/
|
||||||
bool may_be_affected_by(const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update);
|
bool may_be_affected_by(data_dictionary::database, const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether a given base row matches the view filter (and thus if the view should have a corresponding entry).
|
* Whether a given base row matches the view filter (and thus if the view should have a corresponding entry).
|
||||||
@@ -158,9 +159,9 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de
|
|||||||
* @param now the current time in seconds (to decide what is live and what isn't).
|
* @param now the current time in seconds (to decide what is live and what isn't).
|
||||||
* @return whether the base row matches the view filter.
|
* @return whether the base row matches the view filter.
|
||||||
*/
|
*/
|
||||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now);
|
bool matches_view_filter(data_dictionary::database, const schema& base, const view_info& view, const partition_key& key, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||||
|
|
||||||
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck);
|
bool clustering_prefix_matches(data_dictionary::database, const schema& base, const partition_key& key, const clustering_key_prefix& ck);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* When a base-table update modifies a value in a materialized view's key
|
* When a base-table update modifies a value in a materialized view's key
|
||||||
@@ -223,7 +224,7 @@ public:
|
|||||||
|
|
||||||
future<> move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations);
|
future<> move_to(utils::chunked_vector<frozen_mutation_and_schema>& mutations);
|
||||||
|
|
||||||
void generate_update(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
|
void generate_update(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
|
||||||
|
|
||||||
size_t op_count() const;
|
size_t op_count() const;
|
||||||
|
|
||||||
@@ -236,14 +237,15 @@ private:
|
|||||||
};
|
};
|
||||||
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing);
|
std::vector<view_row_entry> get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing);
|
||||||
bool can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const;
|
bool can_skip_view_updates(const clustering_or_static_row& update, const clustering_or_static_row& existing) const;
|
||||||
void create_entry(const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now);
|
void create_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||||
void delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
void delete_old_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||||
void do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
void do_delete_old_entry(const partition_key& base_key, const clustering_or_static_row& existing, const clustering_or_static_row& update, gc_clock::time_point now);
|
||||||
void update_entry(const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now);
|
void update_entry(data_dictionary::database db, const partition_key& base_key, const clustering_or_static_row& update, const clustering_or_static_row& existing, gc_clock::time_point now);
|
||||||
void update_entry_for_computed_column(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
|
void update_entry_for_computed_column(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, gc_clock::time_point now);
|
||||||
};
|
};
|
||||||
|
|
||||||
class view_update_builder {
|
class view_update_builder {
|
||||||
|
data_dictionary::database _db;
|
||||||
const replica::table& _base;
|
const replica::table& _base;
|
||||||
schema_ptr _schema; // The base schema
|
schema_ptr _schema; // The base schema
|
||||||
std::vector<view_updates> _view_updates;
|
std::vector<view_updates> _view_updates;
|
||||||
@@ -259,12 +261,13 @@ class view_update_builder {
|
|||||||
partition_key _key = partition_key::make_empty();
|
partition_key _key = partition_key::make_empty();
|
||||||
public:
|
public:
|
||||||
|
|
||||||
view_update_builder(const replica::table& base, schema_ptr s,
|
view_update_builder(data_dictionary::database db, const replica::table& base, schema_ptr s,
|
||||||
std::vector<view_updates>&& views_to_update,
|
std::vector<view_updates>&& views_to_update,
|
||||||
flat_mutation_reader_v2&& updates,
|
flat_mutation_reader_v2&& updates,
|
||||||
flat_mutation_reader_v2_opt&& existings,
|
flat_mutation_reader_v2_opt&& existings,
|
||||||
gc_clock::time_point now)
|
gc_clock::time_point now)
|
||||||
: _base(base)
|
: _db(std::move(db))
|
||||||
|
, _base(base)
|
||||||
, _schema(std::move(s))
|
, _schema(std::move(s))
|
||||||
, _view_updates(std::move(views_to_update))
|
, _view_updates(std::move(views_to_update))
|
||||||
, _updates(std::move(updates))
|
, _updates(std::move(updates))
|
||||||
@@ -298,6 +301,7 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
view_update_builder make_view_update_builder(
|
view_update_builder make_view_update_builder(
|
||||||
|
data_dictionary::database db,
|
||||||
const replica::table& base_table,
|
const replica::table& base_table,
|
||||||
const schema_ptr& base_schema,
|
const schema_ptr& base_schema,
|
||||||
std::vector<view_and_base>&& views_to_update,
|
std::vector<view_and_base>&& views_to_update,
|
||||||
@@ -306,6 +310,7 @@ view_update_builder make_view_update_builder(
|
|||||||
gc_clock::time_point now);
|
gc_clock::time_point now);
|
||||||
|
|
||||||
future<query::clustering_row_ranges> calculate_affected_clustering_ranges(
|
future<query::clustering_row_ranges> calculate_affected_clustering_ranges(
|
||||||
|
data_dictionary::database db,
|
||||||
const schema& base,
|
const schema& base,
|
||||||
const dht::decorated_key& key,
|
const dht::decorated_key& key,
|
||||||
const mutation_partition& mp,
|
const mutation_partition& mp,
|
||||||
|
|||||||
@@ -189,6 +189,8 @@ public:
|
|||||||
static constexpr size_t batch_size = 128;
|
static constexpr size_t batch_size = 128;
|
||||||
static constexpr size_t batch_memory_max = 1024*1024;
|
static constexpr size_t batch_memory_max = 1024*1024;
|
||||||
|
|
||||||
|
replica::database& get_db() noexcept { return _db; }
|
||||||
|
|
||||||
public:
|
public:
|
||||||
view_builder(replica::database&, db::system_keyspace&, db::system_distributed_keyspace&, service::migration_notifier&, view_update_generator& vug);
|
view_builder(replica::database&, db::system_keyspace&, db::system_distributed_keyspace&, service::migration_notifier&, view_update_generator& vug);
|
||||||
view_builder(view_builder&&) = delete;
|
view_builder(view_builder&&) = delete;
|
||||||
|
|||||||
@@ -72,6 +72,8 @@ public:
|
|||||||
future<> stop();
|
future<> stop();
|
||||||
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
|
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<replica::table> table);
|
||||||
|
|
||||||
|
replica::database& get_db() noexcept { return _db; }
|
||||||
|
|
||||||
future<> mutate_MV(
|
future<> mutate_MV(
|
||||||
dht::token base_token,
|
dht::token base_token,
|
||||||
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
|
utils::chunked_vector<frozen_mutation_and_schema> view_updates,
|
||||||
|
|||||||
@@ -1064,7 +1064,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
future<row_locker::lock_holder> do_push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
|
future<row_locker::lock_holder> do_push_view_replica_updates(shared_ptr<db::view::view_update_generator> gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source,
|
||||||
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
|
tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
|
||||||
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
|
std::vector<view_ptr> affected_views(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base, const mutation& update) const;
|
||||||
future<> generate_and_propagate_view_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base,
|
future<> generate_and_propagate_view_updates(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base,
|
||||||
reader_permit permit,
|
reader_permit permit,
|
||||||
std::vector<db::view::view_and_base>&& views,
|
std::vector<db::view::view_and_base>&& views,
|
||||||
|
|||||||
@@ -1976,10 +1976,10 @@ const std::vector<view_ptr>& table::views() const {
|
|||||||
return _views;
|
return _views;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<view_ptr> table::affected_views(const schema_ptr& base, const mutation& update) const {
|
std::vector<view_ptr> table::affected_views(shared_ptr<db::view::view_update_generator> gen, const schema_ptr& base, const mutation& update) const {
|
||||||
//FIXME: Avoid allocating a vector here; consider returning the boost iterator.
|
//FIXME: Avoid allocating a vector here; consider returning the boost iterator.
|
||||||
return boost::copy_range<std::vector<view_ptr>>(_views | boost::adaptors::filtered([&] (auto&& view) {
|
return boost::copy_range<std::vector<view_ptr>>(_views | boost::adaptors::filtered([&] (auto&& view) {
|
||||||
return db::view::partition_key_matches(*base, *view->view_info(), update.decorated_key());
|
return db::view::partition_key_matches(gen->get_db().as_data_dictionary(), *base, *view->view_info(), update.decorated_key());
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2014,6 +2014,7 @@ future<> table::generate_and_propagate_view_updates(shared_ptr<db::view::view_up
|
|||||||
auto base_token = m.token();
|
auto base_token = m.token();
|
||||||
auto m_schema = m.schema();
|
auto m_schema = m.schema();
|
||||||
db::view::view_update_builder builder = db::view::make_view_update_builder(
|
db::view::view_update_builder builder = db::view::make_view_update_builder(
|
||||||
|
gen->get_db().as_data_dictionary(),
|
||||||
*this,
|
*this,
|
||||||
base,
|
base,
|
||||||
std::move(views),
|
std::move(views),
|
||||||
@@ -2151,6 +2152,7 @@ future<> table::populate_views(
|
|||||||
gc_clock::time_point now) {
|
gc_clock::time_point now) {
|
||||||
auto schema = reader.schema();
|
auto schema = reader.schema();
|
||||||
db::view::view_update_builder builder = db::view::make_view_update_builder(
|
db::view::view_update_builder builder = db::view::make_view_update_builder(
|
||||||
|
gen->get_db().as_data_dictionary(),
|
||||||
*this,
|
*this,
|
||||||
schema,
|
schema,
|
||||||
std::move(views),
|
std::move(views),
|
||||||
@@ -2634,11 +2636,11 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(shared_ptr<d
|
|||||||
utils::get_local_injector().inject("table_push_view_replica_updates_stale_time_point", [&now] {
|
utils::get_local_injector().inject("table_push_view_replica_updates_stale_time_point", [&now] {
|
||||||
now -= 10s;
|
now -= 10s;
|
||||||
});
|
});
|
||||||
auto views = db::view::with_base_info_snapshot(affected_views(base, m));
|
auto views = db::view::with_base_info_snapshot(affected_views(gen, base, m));
|
||||||
if (views.empty()) {
|
if (views.empty()) {
|
||||||
co_return row_locker::lock_holder();
|
co_return row_locker::lock_holder();
|
||||||
}
|
}
|
||||||
auto cr_ranges = co_await db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views);
|
auto cr_ranges = co_await db::view::calculate_affected_clustering_ranges(gen->get_db().as_data_dictionary(), *base, m.decorated_key(), m.partition(), views);
|
||||||
const bool need_regular = !cr_ranges.empty();
|
const bool need_regular = !cr_ranges.empty();
|
||||||
const bool need_static = db::view::needs_static_row(m.partition(), views);
|
const bool need_static = db::view::needs_static_row(m.partition(), views);
|
||||||
if (!need_regular && !need_static) {
|
if (!need_regular && !need_static) {
|
||||||
|
|||||||
@@ -48,8 +48,8 @@ public:
|
|||||||
return _raw.where_clause();
|
return _raw.where_clause();
|
||||||
}
|
}
|
||||||
|
|
||||||
cql3::statements::select_statement& select_statement() const;
|
cql3::statements::select_statement& select_statement(data_dictionary::database) const;
|
||||||
const query::partition_slice& partition_slice() const;
|
const query::partition_slice& partition_slice(data_dictionary::database) const;
|
||||||
const column_definition* view_column(const schema& base, column_kind kind, column_id base_id) const;
|
const column_definition* view_column(const schema& base, column_kind kind, column_id base_id) const;
|
||||||
const column_definition* view_column(const column_definition& base_def) const;
|
const column_definition* view_column(const column_definition& base_def) const;
|
||||||
bool has_base_non_pk_columns_in_view_pk() const;
|
bool has_base_non_pk_columns_in_view_pk() const;
|
||||||
|
|||||||
Reference in New Issue
Block a user