From 983af595e9b2fab9a72ed8451109a07c1f14ba8a Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Mon, 6 Mar 2017 20:02:52 +0100 Subject: [PATCH] database: Read existing base mutations When generating updates for a materialized view we need to read the existing base row, to be able to determine the primary key of the view row the new base update will supplant, in case the view includes a base non-primary key column in its own primary key. That old view row will be tombstoned or updated, if it exists, depending on the difference between the new base row and the existing one, if any. Signed-off-by: Duarte Nunes --- database.cc | 63 ++++++++++++++++++++++++++++++++++--------------- database.hh | 6 ++--- db/view/view.cc | 18 +++++++++----- db/view/view.hh | 2 +- 4 files changed, 60 insertions(+), 29 deletions(-) diff --git a/database.cc b/database.cc index fd42533a3f..ecc6787fc4 100644 --- a/database.cc +++ b/database.cc @@ -3939,7 +3939,7 @@ std::vector column_family::affected_views(const schema_ptr& base, cons /** * Given some updates on the base table and the existing values for the rows affected by that update, generates the - * mutations to be applied to the base table's views. + * mutations to be applied to the base table's views, and sends them to the paired view replicas. * * @param base the base schema at a particular version. * @param views the affected views which need to be updated. @@ -3950,15 +3950,17 @@ std::vector column_family::affected_views(const schema_ptr& base, cons * but has simply some updated values. * @return a future resolving to the mutations to apply to the views, which can be empty. */ -future> column_family::generate_view_updates(const schema_ptr& base, +future<> column_family::generate_and_propagate_view_updates(const schema_ptr& base, std::vector&& views, - streamed_mutation updates, - streamed_mutation existings) const { - // FIXME: Use the view_ptr which corresponds to the version of base. The current code - // just uses the most recent view_ptr, which is not correct. There should be a mapping between a base schema - // version and the corresponding view schema version. Note that the it might not be here that this FIXME - // is resolved. - return db::view::generate_view_updates(base, std::move(views), std::move(updates), std::move(existings)); + mutation&& m, + streamed_mutation_opt existings) const { + auto base_token = m.token(); + return db::view::generate_view_updates(base, + std::move(views), + streamed_mutation_from_mutation(std::move(m)), + std::move(existings)).then([base_token = std::move(base_token)] (auto&& updates) { + db::view::mutate_MV(std::move(base_token), std::move(updates)); + }); } /** @@ -3968,18 +3970,41 @@ future> column_family::generate_view_updates(const schema_ future<> column_family::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm) const { //FIXME: Avoid unfreezing here. auto m = fm.unfreeze(s); - m.upgrade(schema()); - auto views = affected_views(schema(), m); + auto& base = schema(); + m.upgrade(base); + auto views = affected_views(base, m); if (views.empty()) { return make_ready_future<>(); } - //FIXME: Read existing mutations - auto existing = streamed_mutation_from_mutation(mutation(m.decorated_key(), schema())); - auto base_token = m.token(); - return generate_view_updates(schema(), - std::move(views), - streamed_mutation_from_mutation(std::move(m)), - std::move(existing)).then([base_token = std::move(base_token)] (auto&& updates) { - db::view::mutate_MV(std::move(base_token), std::move(updates)); + auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views); + if (cr_ranges.empty()) { + return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }); + } + // We read the whole set of regular columns in case the update now causes a base row to pass + // a view's filters, and a view happens to include columns that have no value in this update. + // Also, one of those columns can determine the lifetime of the base row, if it has a TTL. + auto columns = boost::copy_range>( + base->regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id))); + query::partition_slice::option_set opts; + opts.set(query::partition_slice::option::send_partition_key); + opts.set(query::partition_slice::option::send_clustering_key); + opts.set(query::partition_slice::option::send_timestamp); + opts.set(query::partition_slice::option::send_ttl); + auto slice = query::partition_slice( + std::move(cr_ranges), { }, std::move(columns), std::move(opts), { }, cql_serialization_format::internal(), query::max_rows); + return do_with( + dht::partition_range::make_singular(m.decorated_key()), + std::move(slice), + std::move(m), + [base, views = std::move(views), this] (auto& pk, auto& slice, auto& m) mutable { + auto reader = this->as_mutation_source()( + base, + pk, + slice, + service::get_local_sstable_query_read_priority()); + auto f = reader(); + return f.then([&m, reader = std::move(reader), base = std::move(base), views = std::move(views), this] (auto&& existing) mutable { + return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(existing)); + }); }); } diff --git a/database.hh b/database.hh index a1bdaa2199..e9ac960afe 100644 --- a/database.hh +++ b/database.hh @@ -842,10 +842,10 @@ public: future<> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm) const; private: std::vector affected_views(const schema_ptr& base, const mutation& update) const; - future> generate_view_updates(const schema_ptr& base, + future<> generate_and_propagate_view_updates(const schema_ptr& base, std::vector&& views, - streamed_mutation updates, - streamed_mutation existings) const; + mutation&& m, + streamed_mutation_opt existings) const; // One does not need to wait on this future if all we are interested in, is // initiating the write. The writes initiated here will eventually diff --git a/db/view/view.cc b/db/view/view.cc index 209cf7e69a..7b3e74031f 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -471,7 +471,7 @@ class view_update_builder { schema_ptr _schema; // The base schema std::vector _view_updates; streamed_mutation _updates; - streamed_mutation _existings; + streamed_mutation_opt _existings; range_tombstone_accumulator _update_tombstone_tracker; range_tombstone_accumulator _existing_tombstone_tracker; mutation_fragment_opt _update; @@ -482,7 +482,7 @@ public: view_update_builder(schema_ptr s, std::vector&& views_to_update, streamed_mutation&& updates, - streamed_mutation&& existings) + streamed_mutation_opt&& existings) : _schema(std::move(s)) , _view_updates(std::move(views_to_update)) , _updates(std::move(updates)) @@ -491,7 +491,9 @@ public: , _existing_tombstone_tracker(*_schema, false) , _now(gc_clock::now()) { _update_tombstone_tracker.set_partition_tombstone(_updates.partition_tombstone()); - _existing_tombstone_tracker.set_partition_tombstone(_existings.partition_tombstone()); + if (_existings) { + _existing_tombstone_tracker.set_partition_tombstone(_existings->partition_tombstone()); + } } future> build(); @@ -501,7 +503,8 @@ private: future on_results(); future advance_all() { - return when_all(_updates(), _existings()).then([this] (auto&& fragments) mutable { + auto existings_f = _existings ? (*_existings)() : make_ready_future>(); + return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable { _update = std::move(std::get(std::get<0>(fragments).get())); _existing = std::move(std::get(std::get<1>(fragments).get())); return stop_iteration::no; @@ -516,7 +519,10 @@ private: } future advance_existings() { - return _existings().then([this] (auto&& existing) mutable { + if (!_existings) { + return make_ready_future(stop_iteration::no); + } + return (*_existings)().then([this] (auto&& existing) mutable { _existing = std::move(existing); return stop_iteration::no; }); @@ -645,7 +651,7 @@ future> generate_view_updates( const schema_ptr& base, std::vector&& views_to_update, streamed_mutation&& updates, - streamed_mutation&& existings) { + streamed_mutation_opt&& existings) { auto vs = boost::copy_range>(views_to_update | boost::adaptors::transformed([&] (auto&& v) { return view_updates(std::move(v), base); })); diff --git a/db/view/view.hh b/db/view/view.hh index a4ea7649f1..f9b2f629f4 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -83,7 +83,7 @@ future> generate_view_updates( const schema_ptr& base, std::vector&& views_to_update, streamed_mutation&& updates, - streamed_mutation&& existings); + streamed_mutation_opt&& existings); query::clustering_row_ranges calculate_affected_clustering_ranges( const schema& base,