materialized views: Replace db::view::view class
The write path uses a base schema at a particular version, and we want it to use the materialized views at the corresponding version. To achieve this, we need to map the state currently in db::view::view to a particular schema version, which this patch does by introducing the view_info class to hold the state previously in db::view::view, and by having a view schema directly point to it. The changes in the patch are thus: 1) Introduce view_info to hold the extra view state; 2) Point to the view_info from the schema; 3) Make the functions in the now stateless db::view::view non-member; 4) Remove the db::view::view class. All changes are structural and don't affect current behavior. Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
128
db/view/view.cc
128
db/view/view.cc
@@ -49,22 +49,25 @@
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "view_info.hh"
|
||||
|
||||
static logging::logger logger("view");
|
||||
|
||||
namespace db {
|
||||
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
|
||||
: _schema(schema)
|
||||
, _raw(raw_view_info)
|
||||
, _base_non_pk_column_in_view_pk(nullptr)
|
||||
{ }
|
||||
|
||||
namespace view {
|
||||
|
||||
cql3::statements::select_statement& view::select_statement() const {
|
||||
cql3::statements::select_statement& view_info::select_statement() const {
|
||||
if (!_select_statement) {
|
||||
std::vector<sstring_view> included;
|
||||
if (!_schema->view_info()->include_all_columns()) {
|
||||
included.reserve(_schema->all_columns_in_select_order().size());
|
||||
boost::transform(_schema->all_columns_in_select_order(), std::back_inserter(included), std::mem_fn(&column_definition::name_as_text));
|
||||
if (!include_all_columns()) {
|
||||
included.reserve(_schema.all_columns_in_select_order().size());
|
||||
boost::transform(_schema.all_columns_in_select_order(), std::back_inserter(included), std::mem_fn(&column_definition::name_as_text));
|
||||
}
|
||||
auto raw = cql3::util::build_select_statement(_schema->view_info()->base_name(), _schema->view_info()->where_clause(), std::move(included));
|
||||
raw->prepare_keyspace(_schema->ks_name());
|
||||
auto raw = cql3::util::build_select_statement(base_name(), where_clause(), std::move(included));
|
||||
raw->prepare_keyspace(_schema.ks_name());
|
||||
raw->set_bound_variables({});
|
||||
cql3::cql_stats ignored;
|
||||
auto prepared = raw->prepare(service::get_local_storage_proxy().get_db().local(), ignored, true);
|
||||
@@ -73,90 +76,105 @@ cql3::statements::select_statement& view::select_statement() const {
|
||||
return *_select_statement;
|
||||
}
|
||||
|
||||
const query::partition_slice& view::partition_slice() const {
|
||||
const query::partition_slice& view_info::partition_slice() const {
|
||||
if (!_partition_slice) {
|
||||
_partition_slice = select_statement().make_partition_slice(cql3::query_options({ }));
|
||||
}
|
||||
return *_partition_slice;
|
||||
}
|
||||
|
||||
const dht::partition_range_vector& view::partition_ranges() const {
|
||||
const dht::partition_range_vector& view_info::partition_ranges() const {
|
||||
if (!_partition_ranges) {
|
||||
_partition_ranges = select_statement().get_restrictions()->get_partition_key_ranges(cql3::query_options({ }));
|
||||
}
|
||||
return *_partition_ranges;
|
||||
}
|
||||
|
||||
bool view::partition_key_matches(const ::schema& base, const dht::decorated_key& key) const {
|
||||
const column_definition* view_info::view_column(const schema& base, column_id base_id) const {
|
||||
// FIXME: Map base column_ids to view_column_ids, which can be something like
|
||||
// a boost::small_vector where the position is the base column_id, and the
|
||||
// value is either empty or the view's column_id.
|
||||
return _schema.get_column_definition(base.regular_column_at(base_id).name());
|
||||
}
|
||||
|
||||
const column_definition* view_info::base_non_pk_column_in_view_pk(const schema& base) const {
|
||||
if (!_base_non_pk_column_in_view_pk) {
|
||||
for (auto&& base_col : base.regular_columns()) {
|
||||
auto* view_col = _schema.get_column_definition(base_col.name());
|
||||
if (view_col && view_col->is_primary_key()) {
|
||||
_base_non_pk_column_in_view_pk = view_col;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return _base_non_pk_column_in_view_pk;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
|
||||
namespace view {
|
||||
|
||||
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key) {
|
||||
dht::ring_position rp(key);
|
||||
auto& ranges = partition_ranges();
|
||||
auto& ranges = view.partition_ranges();
|
||||
return std::any_of(ranges.begin(), ranges.end(), [&] (auto&& range) {
|
||||
return range.contains(rp, dht::ring_position_comparator(base));
|
||||
});
|
||||
}
|
||||
|
||||
bool view::clustering_prefix_matches(const ::schema& base, const partition_key& key, const clustering_key_prefix& ck) const {
|
||||
bool clustering_prefix_matches(const schema& base, const view_info& view, const partition_key& key, const clustering_key_prefix& ck) {
|
||||
bound_view::compare less(base);
|
||||
auto& ranges = partition_slice().row_ranges(base, key);
|
||||
auto& ranges = view.partition_slice().row_ranges(base, key);
|
||||
return std::any_of(ranges.begin(), ranges.end(), [&] (auto&& range) {
|
||||
auto bounds = bound_view::from_range(range);
|
||||
return !less(ck, bounds.first) && !less(bounds.second, ck);
|
||||
});
|
||||
}
|
||||
|
||||
bool view::may_be_affected_by(const ::schema& base, const dht::decorated_key& key, const rows_entry& update) const {
|
||||
bool may_be_affected_by(const schema& base, const view_info view, const dht::decorated_key& key, const rows_entry& update) {
|
||||
// We can guarantee that the view won't be affected if:
|
||||
// - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns:
|
||||
// even if an update don't match a view condition on a regular column, that update can still invalidate a
|
||||
// pre-existing entry);
|
||||
// - the update doesn't modify any of the columns impacting the view (where "impacting" the view means that column
|
||||
// is neither included in the view, nor used by the view filter).
|
||||
if (!partition_key_matches(base, key) && !clustering_prefix_matches(base, key.key(), update.key())) {
|
||||
if (!partition_key_matches(base, view, key) && !clustering_prefix_matches(base, view, key.key(), update.key())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We want to check if the update modifies any of the columns that are part of the view (in which case the view is
|
||||
// affected). But iff the view includes all the base table columns, or the update has either a row deletion or a
|
||||
// row marker, we know the view is affected right away.
|
||||
if (_schema->view_info()->include_all_columns() || update.row().deleted_at() || update.row().marker().is_live()) {
|
||||
if (view.include_all_columns() || update.row().deleted_at() || update.row().marker().is_live()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool affected = false;
|
||||
update.row().cells().for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& cell) {
|
||||
affected = _schema->get_column_definition(base.column_at(column_kind::regular_column, id).name());
|
||||
affected = view.view_column(base, id);
|
||||
return stop_iteration(affected);
|
||||
});
|
||||
return affected;
|
||||
}
|
||||
|
||||
bool view::matches_view_filter(const ::schema& base, const partition_key& key, const clustering_row& update, gc_clock::time_point now) const {
|
||||
return clustering_prefix_matches(base, key, update.key()) &&
|
||||
boost::algorithm::all_of(
|
||||
select_statement().get_restrictions()->get_non_pk_restriction() | boost::adaptors::map_values,
|
||||
[&] (auto&& r) {
|
||||
return r->is_satisfied_by(base, key, update.key(), update.cells(), cql3::query_options({ }), now);
|
||||
});
|
||||
}
|
||||
|
||||
void view::set_base_non_pk_column_in_view_pk(const ::schema& base) {
|
||||
for (auto&& base_col : base.regular_columns()) {
|
||||
auto view_col = _schema->get_column_definition(base_col.name());
|
||||
if (view_col && view_col->is_primary_key()) {
|
||||
_base_non_pk_column_in_view_pk = view_col;
|
||||
return;
|
||||
}
|
||||
}
|
||||
_base_non_pk_column_in_view_pk = nullptr;
|
||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now) {
|
||||
return clustering_prefix_matches(base, view, key, update.key())
|
||||
&& boost::algorithm::all_of(
|
||||
view.select_statement().get_restrictions()->get_non_pk_restriction() | boost::adaptors::map_values,
|
||||
[&] (auto&& r) {
|
||||
return r->is_satisfied_by(base, key, update.key(), update.cells(), cql3::query_options({ }), now);
|
||||
});
|
||||
}
|
||||
|
||||
class view_updates final {
|
||||
lw_shared_ptr<const db::view::view> _view;
|
||||
view_ptr _view;
|
||||
const view_info& _view_info;
|
||||
schema_ptr _base;
|
||||
std::unordered_map<partition_key, mutation_partition, partition_key::hashing, partition_key::equality> _updates;
|
||||
public:
|
||||
explicit view_updates(lw_shared_ptr<const db::view::view> view, schema_ptr base)
|
||||
explicit view_updates(view_ptr view, schema_ptr base)
|
||||
: _view(std::move(view))
|
||||
, _view_info(*_view->view_info())
|
||||
, _base(std::move(base))
|
||||
, _updates(8, partition_key::hashing(*_base), partition_key::equality(*_base)) {
|
||||
}
|
||||
@@ -164,7 +182,7 @@ public:
|
||||
void move_to(std::vector<mutation>& mutations) && {
|
||||
auto& partitioner = dht::global_partitioner();
|
||||
std::transform(_updates.begin(), _updates.end(), std::back_inserter(mutations), [&, this] (auto&& m) {
|
||||
return mutation(_view->schema(), partitioner.decorate_key(*_base, std::move(m.first)), std::move(m.second));
|
||||
return mutation(_view, partitioner.decorate_key(*_base, std::move(m.first)), std::move(m.second));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -175,7 +193,7 @@ private:
|
||||
if (it != _updates.end()) {
|
||||
return it->second;
|
||||
}
|
||||
return _updates.emplace(std::move(key), mutation_partition(_view->schema())).first->second;
|
||||
return _updates.emplace(std::move(key), mutation_partition(_view)).first->second;
|
||||
}
|
||||
row_marker compute_row_marker(const clustering_row& base_row) const;
|
||||
deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update);
|
||||
@@ -216,7 +234,7 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
*/
|
||||
|
||||
auto marker = base_row.marker();
|
||||
auto* col = _view->base_non_pk_column_in_view_pk();
|
||||
auto* col = _view_info.base_non_pk_column_in_view_pk(*_base);
|
||||
if (col) {
|
||||
// Note: multi-cell columns can't be part of the primary key.
|
||||
auto cell = base_row.cells().cell_at(col->id).as_atomic_cell();
|
||||
@@ -268,10 +286,9 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
|
||||
return c.as_collection_mutation().data;
|
||||
}
|
||||
});
|
||||
auto& view_schema = *_view->schema();
|
||||
auto& partition = partition_for(partition_key::from_range(view_schema.partition_key_columns() | get_value));
|
||||
auto ckey = clustering_key::from_range(view_schema.clustering_key_columns() | get_value);
|
||||
return partition.clustered_row(view_schema, std::move(ckey));
|
||||
auto& partition = partition_for(partition_key::from_range(_view->partition_key_columns() | get_value));
|
||||
auto ckey = clustering_key::from_range(_view->clustering_key_columns() | get_value);
|
||||
return partition.clustered_row(*_view, std::move(ckey));
|
||||
}
|
||||
|
||||
static const column_definition* view_column(const schema& base, const schema& view, column_id base_id) {
|
||||
@@ -295,13 +312,13 @@ static void add_cells_to_view(const schema& base, const schema& view, const row&
|
||||
* This method checks that the base row does match the view filter before applying anything.
|
||||
*/
|
||||
void view_updates::create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now) {
|
||||
if (!_view->matches_view_filter(*_base, base_key, update, now)) {
|
||||
if (!matches_view_filter(*_base, _view_info, base_key, update, now)) {
|
||||
return;
|
||||
}
|
||||
deletable_row& r = get_view_row(base_key, update);
|
||||
r.apply(compute_row_marker(update));
|
||||
r.apply(update.tomb());
|
||||
add_cells_to_view(*_base, *_view->schema(), update.cells(), r.cells());
|
||||
add_cells_to_view(*_base, *_view, update.cells(), r.cells());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -311,7 +328,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
|
||||
void view_updates::delete_old_entry(const partition_key& base_key, const clustering_row& existing, gc_clock::time_point now) {
|
||||
// Before deleting an old entry, make sure it was matching the view filter
|
||||
// (otherwise there is nothing to delete)
|
||||
if (_view->matches_view_filter(*_base, base_key, existing, now)) {
|
||||
if (matches_view_filter(*_base, _view_info, base_key, existing, now)) {
|
||||
do_delete_old_entry(base_key, existing, now);
|
||||
}
|
||||
}
|
||||
@@ -323,13 +340,12 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
|
||||
// ensure that the timestamp for the entry then is bigger than the tombstone
|
||||
// we're just inserting, which is not currently guaranteed. See CASSANDRA-11500
|
||||
// for details.
|
||||
auto& view_schema = *_view->schema();
|
||||
auto ts = existing.marker().timestamp();
|
||||
auto set_max_ts = [&ts] (atomic_cell_view&& cell) {
|
||||
ts = std::max(ts, cell.timestamp());
|
||||
};
|
||||
existing.cells().for_each_cell([&, this] (column_id id, const atomic_cell_or_collection& cell) {
|
||||
auto* def = view_column(*_base, view_schema, id);
|
||||
auto* def = _view_info.view_column(*_base, id);
|
||||
if (!def) {
|
||||
return;
|
||||
}
|
||||
@@ -353,11 +369,11 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
|
||||
void view_updates::update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
|
||||
// While we know update and existing correspond to the same view entry,
|
||||
// they may not match the view filter.
|
||||
if (!_view->matches_view_filter(*_base, base_key, existing, now)) {
|
||||
if (!matches_view_filter(*_base, _view_info, base_key, existing, now)) {
|
||||
create_entry(base_key, update, now);
|
||||
return;
|
||||
}
|
||||
if (!_view->matches_view_filter(*_base, base_key, update, now)) {
|
||||
if (!matches_view_filter(*_base, _view_info, base_key, update, now)) {
|
||||
do_delete_old_entry(base_key, existing, now);
|
||||
return;
|
||||
}
|
||||
@@ -367,7 +383,7 @@ void view_updates::update_entry(const partition_key& base_key, const clustering_
|
||||
r.apply(update.tomb());
|
||||
|
||||
auto diff = update.cells().difference(*_base, column_kind::regular_column, existing.cells());
|
||||
add_cells_to_view(*_base, *_view->schema(), diff, r.cells());
|
||||
add_cells_to_view(*_base, *_view, diff, r.cells());
|
||||
}
|
||||
|
||||
void view_updates::generate_update(
|
||||
@@ -386,7 +402,7 @@ void view_updates::generate_update(
|
||||
return;
|
||||
}
|
||||
|
||||
auto* col = _view->base_non_pk_column_in_view_pk();
|
||||
auto* col = _view_info.base_non_pk_column_in_view_pk(*_base);
|
||||
if (!col) {
|
||||
// The view key is necessarily the same pre and post update.
|
||||
if (existing && !existing->empty()) {
|
||||
@@ -603,7 +619,7 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
|
||||
future<std::vector<mutation>> generate_view_updates(
|
||||
const schema_ptr& base,
|
||||
std::vector<lw_shared_ptr<view>>&& views_to_update,
|
||||
std::vector<view_ptr>&& views_to_update,
|
||||
streamed_mutation&& updates,
|
||||
streamed_mutation&& existings) {
|
||||
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (auto&& v) {
|
||||
|
||||
124
db/view/view.hh
124
db/view/view.hh
@@ -28,96 +28,60 @@
|
||||
#include "streamed_mutation.hh"
|
||||
#include "stdx.hh"
|
||||
|
||||
namespace cql3 {
|
||||
namespace statements {
|
||||
class select_statement;
|
||||
}
|
||||
}
|
||||
|
||||
namespace db {
|
||||
|
||||
namespace view {
|
||||
|
||||
class view final {
|
||||
view_ptr _schema;
|
||||
mutable shared_ptr<cql3::statements::select_statement> _select_statement;
|
||||
mutable stdx::optional<query::partition_slice> _partition_slice;
|
||||
mutable stdx::optional<dht::partition_range_vector> _partition_ranges;
|
||||
const column_definition* _base_non_pk_column_in_view_pk;
|
||||
public:
|
||||
explicit view(view_ptr schema, const ::schema& base)
|
||||
: _schema(std::move(schema)) {
|
||||
set_base_non_pk_column_in_view_pk(base);
|
||||
}
|
||||
/**
|
||||
* Whether the view filter considers the specified partition key.
|
||||
*
|
||||
* @param base the base table schema.
|
||||
* @param view_info the view info.
|
||||
* @param key the partition key that is updated.
|
||||
* @return false if we can guarantee that inserting an update for specified key
|
||||
* won't affect the view in any way, true otherwise.
|
||||
*/
|
||||
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key);
|
||||
|
||||
view_ptr schema() const {
|
||||
return _schema;
|
||||
}
|
||||
/**
|
||||
* Whether the view might be affected by the provided update.
|
||||
*
|
||||
* Note that having this method return true is not an absolute guarantee that the view will be
|
||||
* updated, just that it most likely will, but a false return guarantees it won't be affected.
|
||||
*
|
||||
* @param base the base table schema.
|
||||
* @param view_info the view info.
|
||||
* @param key the partition key that is updated.
|
||||
* @param update the base table update being applied.
|
||||
* @return false if we can guarantee that inserting update for key
|
||||
* won't affect the view in any way, true otherwise.
|
||||
*/
|
||||
bool may_be_affected_by(const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update);
|
||||
|
||||
void update(view_ptr new_schema, const ::schema& base) {
|
||||
_schema = new_schema;
|
||||
_select_statement = nullptr;
|
||||
_partition_slice = { };
|
||||
_partition_ranges = { };
|
||||
set_base_non_pk_column_in_view_pk(base);
|
||||
}
|
||||
/**
|
||||
* Whether a given base row matches the view filter (and thus if the view should have a corresponding entry).
|
||||
*
|
||||
* Note that this differs from may_be_affected_by in that the provide row must be the current
|
||||
* state of the base row, not just some updates to it. This function also has no false positive: a base
|
||||
* row either does or doesn't match the view filter.
|
||||
*
|
||||
* Also note that this function doesn't check the partition key, as it assumes the upper layers
|
||||
* have already filtered out the views that are not affected.
|
||||
*
|
||||
* @param base the base table schema.
|
||||
* @param view_info the view info.
|
||||
* @param key the partition key that is updated.
|
||||
* @param update the current state of a particular base row.
|
||||
* @param now the current time in seconds (to decide what is live and what isn't).
|
||||
* @return whether the base row matches the view filter.
|
||||
*/
|
||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now);
|
||||
|
||||
const column_definition* base_non_pk_column_in_view_pk() const {
|
||||
return _base_non_pk_column_in_view_pk;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the view filter considers the specified partition key.
|
||||
*
|
||||
* @param base the base table schema.
|
||||
* @param key the partition key that is updated.
|
||||
* @return false if we can guarantee that inserting an update for specified key
|
||||
* won't affect the view in any way, true otherwise.
|
||||
*/
|
||||
bool partition_key_matches(const ::schema& base, const dht::decorated_key& key) const;
|
||||
|
||||
/**
|
||||
* Whether the view might be affected by the provided update.
|
||||
*
|
||||
* Note that having this method return true is not an absolute guarantee that the view will be
|
||||
* updated, just that it most likely will, but a false return guarantees it won't be affected.
|
||||
*
|
||||
* @param base the base table schema.
|
||||
* @param key the partition key that is updated.
|
||||
* @param update the base table update being applied.
|
||||
* @return false if we can guarantee that inserting update for key
|
||||
* won't affect the view in any way, true otherwise.
|
||||
*/
|
||||
bool may_be_affected_by(const ::schema& base, const dht::decorated_key& key, const rows_entry& update) const;
|
||||
|
||||
/**
|
||||
* Whether a given base row matches the view filter (and thus if the view should have a corresponding entry).
|
||||
*
|
||||
* Note that this differs from may_be_affected_by in that the provide row must be the current
|
||||
* state of the base row, not just some updates to it. This function also has no false positive: a base
|
||||
* row either does or doesn't match the view filter.
|
||||
*
|
||||
* Also note that this function doesn't check the partition key, as it assumes the upper layers
|
||||
* have already filtered out the views that are not affected.
|
||||
*
|
||||
* @param base the base table schema.
|
||||
* @param key the partition key that is updated.
|
||||
* @param update the current state of a particular base row.
|
||||
* @param now the current time in seconds (to decide what is live and what isn't).
|
||||
* @return whether the base row matches the view filter.
|
||||
*/
|
||||
bool matches_view_filter(const ::schema& base, const partition_key& key, const clustering_row& update, gc_clock::time_point now) const;
|
||||
private:
|
||||
cql3::statements::select_statement& select_statement() const;
|
||||
const query::partition_slice& partition_slice() const;
|
||||
const dht::partition_range_vector& partition_ranges() const;
|
||||
bool clustering_prefix_matches(const ::schema& base, const partition_key& key, const clustering_key_prefix& ck) const;
|
||||
void set_base_non_pk_column_in_view_pk(const ::schema& base);
|
||||
};
|
||||
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck);
|
||||
|
||||
future<std::vector<mutation>> generate_view_updates(
|
||||
const schema_ptr& base,
|
||||
std::vector<lw_shared_ptr<view>>&& views_to_update,
|
||||
std::vector<view_ptr>&& views_to_update,
|
||||
streamed_mutation&& updates,
|
||||
streamed_mutation&& existings);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user