diff --git a/db/view/view.cc b/db/view/view.cc index 65059421ec..035825c285 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2606,7 +2606,7 @@ view_updating_consumer::view_updating_consumer(view_update_generator& gen, schem : view_updating_consumer(std::move(schema), std::move(permit), as, staging_reader_handle, [table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables), gen = gen.shared_from_this()] (mutation m) mutable { auto s = m.schema(); - return table->stream_view_replica_updates(std::move(s), std::move(m), db::no_timeout, excluded_sstables); + return table->stream_view_replica_updates(gen, std::move(s), std::move(m), db::no_timeout, excluded_sstables); }) { } diff --git a/replica/database.cc b/replica/database.cc index a47666e2a5..b9633dd420 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1940,7 +1940,11 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra row_locker::lock_holder lock; if (!cf.views().empty()) { - auto lock_f = co_await coroutine::as_future(cf.push_view_replica_updates(s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore())); + if (!_view_update_generator) { + co_await coroutine::return_exception(std::runtime_error("view update generator not plugged to push updates")); + } + + auto lock_f = co_await coroutine::as_future(cf.push_view_replica_updates(_view_update_generator, s, m, timeout, std::move(tr_state), get_reader_concurrency_semaphore())); if (lock_f.failed()) { auto ex = lock_f.get_exception(); if (is_timeout_exception(ex)) { diff --git a/replica/database.hh b/replica/database.hh index ade747578e..a10b178924 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1026,12 +1026,12 @@ public: void remove_view(view_ptr v); void clear_views(); const std::vector& views() const; - future push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, + future push_view_replica_updates(shared_ptr gen, const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const; - future push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, + future push_view_replica_updates(shared_ptr gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const; future - stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, + stream_view_replica_updates(shared_ptr gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, std::vector& excluded_sstables) const; void add_coordinator_read_latency(utils::estimated_histogram::duration latency); @@ -1067,10 +1067,10 @@ public: size_t estimate_read_memory_cost() const; private: - future do_push_view_replica_updates(schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source, + future do_push_view_replica_updates(shared_ptr gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const; std::vector affected_views(const schema_ptr& base, const mutation& update) const; - future<> generate_and_propagate_view_updates(const schema_ptr& base, + future<> generate_and_propagate_view_updates(shared_ptr gen, const schema_ptr& base, reader_permit permit, std::vector&& views, mutation&& m, diff --git a/replica/table.cc b/replica/table.cc index 79e004c95c..b7cffd1268 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1967,7 +1967,7 @@ static size_t memory_usage_of(const utils::chunked_vector table::generate_and_propagate_view_updates(const schema_ptr& base, +future<> table::generate_and_propagate_view_updates(shared_ptr gen, const schema_ptr& base, reader_permit permit, std::vector&& views, mutation&& m, @@ -2572,14 +2572,14 @@ future<> table::move_sstables_from_staging(std::vector * Given an update for the base table, calculates the set of potentially affected views, * generates the relevant updates, and sends them to the paired view replicas. */ -future table::push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, +future table::push_view_replica_updates(shared_ptr gen, const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const { //FIXME: Avoid unfreezing here. auto m = fm.unfreeze(s); - return push_view_replica_updates(s, std::move(m), timeout, std::move(tr_state), sem); + return push_view_replica_updates(std::move(gen), s, std::move(m), timeout, std::move(tr_state), sem); } -future table::do_push_view_replica_updates(schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source, +future table::do_push_view_replica_updates(shared_ptr gen, schema_ptr s, mutation m, db::timeout_clock::time_point timeout, mutation_source source, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const { if (!_config.view_update_concurrency_semaphore->current()) { // We don't have resources to generate view updates for this write. If we reached this point, we failed to @@ -2605,7 +2605,7 @@ future table::do_push_view_replica_updates(schema_ptr s const bool need_static = db::view::needs_static_row(m.partition(), views); if (!need_regular && !need_static) { tracing::trace(tr_state, "View updates do not require read-before-write"); - co_await generate_and_propagate_view_updates(base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now); + co_await generate_and_propagate_view_updates(gen, base, sem.make_tracking_only_permit(s.get(), "push-view-updates-1", timeout, tr_state), std::move(views), std::move(m), { }, tr_state, now); // In this case we are not doing a read-before-write, just a // write, so no lock is needed. co_return row_locker::lock_holder(); @@ -2640,7 +2640,7 @@ future table::do_push_view_replica_updates(schema_ptr s auto pk = dht::partition_range::make_singular(m.decorated_key()); auto permit = sem.make_tracking_only_permit(base.get(), "push-view-updates-2", timeout, tr_state); auto reader = source.make_reader_v2(base, permit, pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); - co_await this->generate_and_propagate_view_updates(base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now); + co_await this->generate_and_propagate_view_updates(gen, base, std::move(permit), std::move(views), std::move(m), std::move(reader), tr_state, now); tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name()); // return the local partition/row lock we have taken so it // remains locked until the caller is done modifying this @@ -2649,16 +2649,17 @@ future table::do_push_view_replica_updates(schema_ptr s } -future table::push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, +future table::push_view_replica_updates(shared_ptr gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr tr_state, reader_concurrency_semaphore& sem) const { - return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source(), + return do_push_view_replica_updates(std::move(gen), s, std::move(m), timeout, as_mutation_source(), std::move(tr_state), sem, service::get_local_sstable_query_read_priority(), {}); } future -table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, +table::stream_view_replica_updates(shared_ptr gen, const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, std::vector& excluded_sstables) const { return do_push_view_replica_updates( + std::move(gen), s, std::move(m), timeout,