view_updates: Create view entry

This patch introduces the view_updates::create_entry function, which
creates a view row mutation given a new base table row.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2017-01-15 19:45:53 +01:00
parent b8b8a8099c
commit e0f642180f

View File

@@ -172,6 +172,8 @@ private:
return _updates.emplace(std::move(key), mutation_partition(_view->schema())).first->second;
}
row_marker compute_row_marker(const clustering_row& base_row) const;
deletable_row& get_view_row(const partition_key& base_key, const clustering_row& update);
void create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now);
};
row_marker view_updates::compute_row_marker(const clustering_row& base_row) const {
@@ -236,6 +238,59 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
return row_marker(marker.timestamp(), ttl, expiry);
}
deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) {
auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) {
auto* base_col = _base->get_column_definition(cdef.name());
assert(base_col);
switch (base_col->kind) {
case column_kind::partition_key:
return base_key.get_component(*_base, base_col->position());
case column_kind::clustering_key:
return update.key().get_component(*_base, base_col->position());
default:
auto& c = update.cells().cell_at(base_col->id);
if (base_col->is_atomic()) {
return c.as_atomic_cell().value();
}
return c.as_collection_mutation().data;
}
});
auto& view_schema = *_view->schema();
auto& partition = partition_for(partition_key::from_range(view_schema.partition_key_columns() | get_value));
auto ckey = clustering_key::from_range(view_schema.clustering_key_columns() | get_value);
return partition.clustered_row(view_schema, std::move(ckey));
}
static const column_definition* view_column(const schema& base, const schema& view, column_id base_id) {
// FIXME: Map base column_ids to view_column_ids, which can be something like
// a boost::small_vector where the position is the base column_id, and the
// value is either empty or the view's column_id.
return view.get_column_definition(base.regular_column_at(base_id).name());
}
static void add_cells_to_view(const schema& base, const schema& view, const row& base_cells, row& view_cells) {
base_cells.for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
auto* view_col = view_column(base, view, id);
if (view_col && !view_col->is_primary_key()) {
view_cells.append_cell(view_col->id, c);
}
});
}
/**
* Creates a view entry corresponding to the provided base row.
* This method checks that the base row does match the view filter before applying anything.
*/
void view_updates::create_entry(const partition_key& base_key, const clustering_row& update, gc_clock::time_point now) {
if (!_view->matches_view_filter(*_base, base_key, update, now)) {
return;
}
deletable_row& r = get_view_row(base_key, update);
r.apply(compute_row_marker(update));
r.apply(update.tomb());
add_cells_to_view(*_base, *_view->schema(), update.cells(), r.cells());
}
} // namespace view
} // namespace db