diff --git a/database.cc b/database.cc index fd42533a3f..ecc6787fc4 100644 --- a/database.cc +++ b/database.cc @@ -3939,7 +3939,7 @@ std::vector column_family::affected_views(const schema_ptr& base, cons /** * Given some updates on the base table and the existing values for the rows affected by that update, generates the - * mutations to be applied to the base table's views. + * mutations to be applied to the base table's views, and sends them to the paired view replicas. * * @param base the base schema at a particular version. * @param views the affected views which need to be updated. @@ -3950,15 +3950,17 @@ std::vector column_family::affected_views(const schema_ptr& base, cons * but has simply some updated values. * @return a future resolving to the mutations to apply to the views, which can be empty. */ -future> column_family::generate_view_updates(const schema_ptr& base, +future<> column_family::generate_and_propagate_view_updates(const schema_ptr& base, std::vector&& views, - streamed_mutation updates, - streamed_mutation existings) const { - // FIXME: Use the view_ptr which corresponds to the version of base. The current code - // just uses the most recent view_ptr, which is not correct. There should be a mapping between a base schema - // version and the corresponding view schema version. Note that the it might not be here that this FIXME - // is resolved. - return db::view::generate_view_updates(base, std::move(views), std::move(updates), std::move(existings)); + mutation&& m, + streamed_mutation_opt existings) const { + auto base_token = m.token(); + return db::view::generate_view_updates(base, + std::move(views), + streamed_mutation_from_mutation(std::move(m)), + std::move(existings)).then([base_token = std::move(base_token)] (auto&& updates) { + db::view::mutate_MV(std::move(base_token), std::move(updates)); + }); } /** @@ -3968,18 +3970,41 @@ future> column_family::generate_view_updates(const schema_ future<> column_family::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm) const { //FIXME: Avoid unfreezing here. auto m = fm.unfreeze(s); - m.upgrade(schema()); - auto views = affected_views(schema(), m); + auto& base = schema(); + m.upgrade(base); + auto views = affected_views(base, m); if (views.empty()) { return make_ready_future<>(); } - //FIXME: Read existing mutations - auto existing = streamed_mutation_from_mutation(mutation(m.decorated_key(), schema())); - auto base_token = m.token(); - return generate_view_updates(schema(), - std::move(views), - streamed_mutation_from_mutation(std::move(m)), - std::move(existing)).then([base_token = std::move(base_token)] (auto&& updates) { - db::view::mutate_MV(std::move(base_token), std::move(updates)); + auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views); + if (cr_ranges.empty()) { + return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }); + } + // We read the whole set of regular columns in case the update now causes a base row to pass + // a view's filters, and a view happens to include columns that have no value in this update. + // Also, one of those columns can determine the lifetime of the base row, if it has a TTL. + auto columns = boost::copy_range>( + base->regular_columns() | boost::adaptors::transformed(std::mem_fn(&column_definition::id))); + query::partition_slice::option_set opts; + opts.set(query::partition_slice::option::send_partition_key); + opts.set(query::partition_slice::option::send_clustering_key); + opts.set(query::partition_slice::option::send_timestamp); + opts.set(query::partition_slice::option::send_ttl); + auto slice = query::partition_slice( + std::move(cr_ranges), { }, std::move(columns), std::move(opts), { }, cql_serialization_format::internal(), query::max_rows); + return do_with( + dht::partition_range::make_singular(m.decorated_key()), + std::move(slice), + std::move(m), + [base, views = std::move(views), this] (auto& pk, auto& slice, auto& m) mutable { + auto reader = this->as_mutation_source()( + base, + pk, + slice, + service::get_local_sstable_query_read_priority()); + auto f = reader(); + return f.then([&m, reader = std::move(reader), base = std::move(base), views = std::move(views), this] (auto&& existing) mutable { + return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(existing)); + }); }); } diff --git a/database.hh b/database.hh index a1bdaa2199..e9ac960afe 100644 --- a/database.hh +++ b/database.hh @@ -842,10 +842,10 @@ public: future<> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm) const; private: std::vector affected_views(const schema_ptr& base, const mutation& update) const; - future> generate_view_updates(const schema_ptr& base, + future<> generate_and_propagate_view_updates(const schema_ptr& base, std::vector&& views, - streamed_mutation updates, - streamed_mutation existings) const; + mutation&& m, + streamed_mutation_opt existings) const; // One does not need to wait on this future if all we are interested in, is // initiating the write. The writes initiated here will eventually diff --git a/db/view/view.cc b/db/view/view.cc index 209cf7e69a..7b3e74031f 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -471,7 +471,7 @@ class view_update_builder { schema_ptr _schema; // The base schema std::vector _view_updates; streamed_mutation _updates; - streamed_mutation _existings; + streamed_mutation_opt _existings; range_tombstone_accumulator _update_tombstone_tracker; range_tombstone_accumulator _existing_tombstone_tracker; mutation_fragment_opt _update; @@ -482,7 +482,7 @@ public: view_update_builder(schema_ptr s, std::vector&& views_to_update, streamed_mutation&& updates, - streamed_mutation&& existings) + streamed_mutation_opt&& existings) : _schema(std::move(s)) , _view_updates(std::move(views_to_update)) , _updates(std::move(updates)) @@ -491,7 +491,9 @@ public: , _existing_tombstone_tracker(*_schema, false) , _now(gc_clock::now()) { _update_tombstone_tracker.set_partition_tombstone(_updates.partition_tombstone()); - _existing_tombstone_tracker.set_partition_tombstone(_existings.partition_tombstone()); + if (_existings) { + _existing_tombstone_tracker.set_partition_tombstone(_existings->partition_tombstone()); + } } future> build(); @@ -501,7 +503,8 @@ private: future on_results(); future advance_all() { - return when_all(_updates(), _existings()).then([this] (auto&& fragments) mutable { + auto existings_f = _existings ? (*_existings)() : make_ready_future>(); + return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable { _update = std::move(std::get(std::get<0>(fragments).get())); _existing = std::move(std::get(std::get<1>(fragments).get())); return stop_iteration::no; @@ -516,7 +519,10 @@ private: } future advance_existings() { - return _existings().then([this] (auto&& existing) mutable { + if (!_existings) { + return make_ready_future(stop_iteration::no); + } + return (*_existings)().then([this] (auto&& existing) mutable { _existing = std::move(existing); return stop_iteration::no; }); @@ -645,7 +651,7 @@ future> generate_view_updates( const schema_ptr& base, std::vector&& views_to_update, streamed_mutation&& updates, - streamed_mutation&& existings) { + streamed_mutation_opt&& existings) { auto vs = boost::copy_range>(views_to_update | boost::adaptors::transformed([&] (auto&& v) { return view_updates(std::move(v), base); })); diff --git a/db/view/view.hh b/db/view/view.hh index a4ea7649f1..f9b2f629f4 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -83,7 +83,7 @@ future> generate_view_updates( const schema_ptr& base, std::vector&& views_to_update, streamed_mutation&& updates, - streamed_mutation&& existings); + streamed_mutation_opt&& existings); query::clustering_row_ranges calculate_affected_clustering_ranges( const schema& base,