database: Read existing base mutations

When generating updates for a materialized view we need to read the
existing base row, to be able to determine the primary key of the view
row the new base update will supplant, in case the view includes a
base non-primary key column in its own primary key. That old view row
will be tombstoned or updated, if it exists, depending on the difference
between the new base row and the existing one, if any.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2017-03-06 20:02:52 +01:00
parent 8a77bfe35b
commit 983af595e9
4 changed files with 60 additions and 29 deletions

View File

@@ -3939,7 +3939,7 @@ std::vector<view_ptr> column_family::affected_views(const schema_ptr& base, cons
/**
* Given some updates on the base table and the existing values for the rows affected by that update, generates the
* mutations to be applied to the base table's views.
* mutations to be applied to the base table's views, and sends them to the paired view replicas.
*
* @param base the base schema at a particular version.
* @param views the affected views which need to be updated.
@@ -3950,15 +3950,17 @@ std::vector<view_ptr> column_family::affected_views(const schema_ptr& base, cons
* but has simply some updated values.
* @return a future resolving to the mutations to apply to the views, which can be empty.
*/
future<std::vector<mutation>> column_family::generate_view_updates(const schema_ptr& base,
future<> column_family::generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,
streamed_mutation updates,
streamed_mutation existings) const {
// FIXME: Use the view_ptr which corresponds to the version of base. The current code
// just uses the most recent view_ptr, which is not correct. There should be a mapping between a base schema
// version and the corresponding view schema version. Note that the it might not be here that this FIXME
// is resolved.
return db::view::generate_view_updates(base, std::move(views), std::move(updates), std::move(existings));
mutation&& m,
streamed_mutation_opt existings) const {
auto base_token = m.token();
return db::view::generate_view_updates(base,
std::move(views),
streamed_mutation_from_mutation(std::move(m)),
std::move(existings)).then([base_token = std::move(base_token)] (auto&& updates) {
db::view::mutate_MV(std::move(base_token), std::move(updates));
});
}
/**
@@ -3968,18 +3970,41 @@ future<std::vector<mutation>> column_family::generate_view_updates(const schema_
future<> column_family::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm) const {
//FIXME: Avoid unfreezing here.
auto m = fm.unfreeze(s);
m.upgrade(schema());
auto views = affected_views(schema(), m);
auto& base = schema();
m.upgrade(base);
auto views = affected_views(base, m);
if (views.empty()) {
return make_ready_future<>();
}
//FIXME: Read existing mutations
auto existing = streamed_mutation_from_mutation(mutation(m.decorated_key(), schema()));
auto base_token = m.token();
return generate_view_updates(schema(),
std::move(views),
streamed_mutation_from_mutation(std::move(m)),
std::move(existing)).then([base_token = std::move(base_token)] (auto&& updates) {
db::view::mutate_MV(std::move(base_token), std::move(updates));
auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views);
if (cr_ranges.empty()) {
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { });
}
// We read the whole set of regular columns in case the update now causes a base row to pass
// a view's filters, and a view happens to include columns that have no value in this update.
// Also, one of those columns can determine the lifetime of the base row, if it has a TTL.
auto columns = boost::copy_range<std::vector<column_id>>(
base->regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id)));
query::partition_slice::option_set opts;
opts.set(query::partition_slice::option::send_partition_key);
opts.set(query::partition_slice::option::send_clustering_key);
opts.set(query::partition_slice::option::send_timestamp);
opts.set(query::partition_slice::option::send_ttl);
auto slice = query::partition_slice(
std::move(cr_ranges), { }, std::move(columns), std::move(opts), { }, cql_serialization_format::internal(), query::max_rows);
return do_with(
dht::partition_range::make_singular(m.decorated_key()),
std::move(slice),
std::move(m),
[base, views = std::move(views), this] (auto& pk, auto& slice, auto& m) mutable {
auto reader = this->as_mutation_source()(
base,
pk,
slice,
service::get_local_sstable_query_read_priority());
auto f = reader();
return f.then([&m, reader = std::move(reader), base = std::move(base), views = std::move(views), this] (auto&& existing) mutable {
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(existing));
});
});
}

View File

@@ -842,10 +842,10 @@ public:
future<> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm) const;
private:
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
future<std::vector<mutation>> generate_view_updates(const schema_ptr& base,
future<> generate_and_propagate_view_updates(const schema_ptr& base,
std::vector<view_ptr>&& views,
streamed_mutation updates,
streamed_mutation existings) const;
mutation&& m,
streamed_mutation_opt existings) const;
// One does not need to wait on this future if all we are interested in, is
// initiating the write. The writes initiated here will eventually

View File

@@ -471,7 +471,7 @@ class view_update_builder {
schema_ptr _schema; // The base schema
std::vector<view_updates> _view_updates;
streamed_mutation _updates;
streamed_mutation _existings;
streamed_mutation_opt _existings;
range_tombstone_accumulator _update_tombstone_tracker;
range_tombstone_accumulator _existing_tombstone_tracker;
mutation_fragment_opt _update;
@@ -482,7 +482,7 @@ public:
view_update_builder(schema_ptr s,
std::vector<view_updates>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation&& existings)
streamed_mutation_opt&& existings)
: _schema(std::move(s))
, _view_updates(std::move(views_to_update))
, _updates(std::move(updates))
@@ -491,7 +491,9 @@ public:
, _existing_tombstone_tracker(*_schema, false)
, _now(gc_clock::now()) {
_update_tombstone_tracker.set_partition_tombstone(_updates.partition_tombstone());
_existing_tombstone_tracker.set_partition_tombstone(_existings.partition_tombstone());
if (_existings) {
_existing_tombstone_tracker.set_partition_tombstone(_existings->partition_tombstone());
}
}
future<std::vector<mutation>> build();
@@ -501,7 +503,8 @@ private:
future<stop_iteration> on_results();
future<stop_iteration> advance_all() {
return when_all(_updates(), _existings()).then([this] (auto&& fragments) mutable {
auto existings_f = _existings ? (*_existings)() : make_ready_future<optimized_optional<mutation_fragment>>();
return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
_update = std::move(std::get<mutation_fragment_opt>(std::get<0>(fragments).get()));
_existing = std::move(std::get<mutation_fragment_opt>(std::get<1>(fragments).get()));
return stop_iteration::no;
@@ -516,7 +519,10 @@ private:
}
future<stop_iteration> advance_existings() {
return _existings().then([this] (auto&& existing) mutable {
if (!_existings) {
return make_ready_future<stop_iteration>(stop_iteration::no);
}
return (*_existings)().then([this] (auto&& existing) mutable {
_existing = std::move(existing);
return stop_iteration::no;
});
@@ -645,7 +651,7 @@ future<std::vector<mutation>> generate_view_updates(
const schema_ptr& base,
std::vector<view_ptr>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation&& existings) {
streamed_mutation_opt&& existings) {
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (auto&& v) {
return view_updates(std::move(v), base);
}));

View File

@@ -83,7 +83,7 @@ future<std::vector<mutation>> generate_view_updates(
const schema_ptr& base,
std::vector<view_ptr>&& views_to_update,
streamed_mutation&& updates,
streamed_mutation&& existings);
streamed_mutation_opt&& existings);
query::clustering_row_ranges calculate_affected_clustering_ranges(
const schema& base,