From 2652dffd89031605561990c29ae8719eaa395aca Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 28 Mar 2023 11:21:13 +0300 Subject: [PATCH] view: Capture v.u.generator on view_updating_consumer lambda The consumer is in fact pushing the updates and _that_'s the component that would really need the view_update_generator at hand. The consumer is created from the generator itself so no troubles getting the pointer. Signed-off-by: Pavel Emelyanov --- db/view/view.cc | 5 +++-- db/view/view_update_generator.cc | 2 +- db/view/view_updating_consumer.hh | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 01515248e0..4cfbf43571 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -35,6 +35,7 @@ #include "db/view/view.hh" #include "db/view/view_builder.hh" #include "db/view/view_updating_consumer.hh" +#include "db/view/view_update_generator.hh" #include "db/system_keyspace_view_types.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" @@ -2597,10 +2598,10 @@ void view_updating_consumer::maybe_flush_buffer_mid_partition() { } } -view_updating_consumer::view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector excluded_sstables, const seastar::abort_source& as, +view_updating_consumer::view_updating_consumer(view_update_generator& gen, schema_ptr schema, reader_permit permit, replica::table& table, std::vector excluded_sstables, const seastar::abort_source& as, evictable_reader_handle_v2& staging_reader_handle) : view_updating_consumer(std::move(schema), std::move(permit), as, staging_reader_handle, - [table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable { + [table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables), gen = gen.shared_from_this()] (mutation m) mutable { auto s = m.schema(); return table->stream_view_replica_updates(std::move(s), std::move(m), db::no_timeout, excluded_sstables); }) diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 5629954277..c773ba13a8 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -165,7 +165,7 @@ future<> view_update_generator::start() { ::mutation_reader::forwarding::no); inject_failure("view_update_generator_consume_staging_sstable"); - auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle), + auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(*this, s, std::move(permit), *t, sstables, _as, staging_sstable_reader_handle), dht::incremental_owned_ranges_checker::make_partition_filter(_db.get_keyspace_local_ranges(s->ks_name()))); staging_sstable_reader.close().get(); if (result == stop_iteration::yes) { diff --git a/db/view/view_updating_consumer.hh b/db/view/view_updating_consumer.hh index 886c2a52b5..3c53cf5959 100644 --- a/db/view/view_updating_consumer.hh +++ b/db/view/view_updating_consumer.hh @@ -24,6 +24,8 @@ class evictable_reader_handle_v2; namespace db::view { +class view_update_generator; + /* * A consumer that pushes materialized view updates for each consumed mutation. * It is expected to be run in seastar::async threaded context through consume_in_thread() @@ -62,7 +64,7 @@ public: , _view_update_pusher(std::move(view_update_pusher)) { } - view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector excluded_sstables, const seastar::abort_source& as, + view_updating_consumer(view_update_generator& gen, schema_ptr schema, reader_permit permit, replica::table& table, std::vector excluded_sstables, const seastar::abort_source& as, evictable_reader_handle_v2& staging_reader_handle); view_updating_consumer(view_updating_consumer&&) = default;