From d5557ef0e27aac925ed2846bb1163e0ae1965811 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 28 Mar 2023 11:13:52 +0300 Subject: [PATCH] view: Plug view update generator to database The database is low-level service and currently view update generator implicitly depend on it via storage proxy. However, database does need to push view updates with the help of mutate_MV helper, thus adding the dependency loop. This patch exploits the fact that view updates start being pushed late enough, by that time all other service, including proxy and view update generator, seem to be up and running. This allows a "weak dependency" from database to view update generator, like there's one from database to system keyspace already. So in this patch the v.u.g. puts the shared-from-this pointer onto the database at the time it starts. On stop it removes this pointer after database is drained and (hopefully) all view updates are pushed. Signed-off-by: Pavel Emelyanov --- db/view/view_update_generator.cc | 2 ++ db/view/view_update_generator.hh | 2 +- replica/database.cc | 9 +++++++++ replica/database.hh | 8 ++++++++ 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 766aaff082..5629954277 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -97,6 +97,7 @@ view_update_generator::view_update_generator(replica::database& db, sharded view_update_generator::start() { } future<> view_update_generator::stop() { + _db.unplug_view_update_generator(); _as.request_abort(); _pending_sstables.signal(); return std::move(_started).then([this] { diff --git a/db/view/view_update_generator.hh b/db/view/view_update_generator.hh index 00a49276f4..2f84af9a68 100644 --- a/db/view/view_update_generator.hh +++ b/db/view/view_update_generator.hh @@ -28,7 +28,7 @@ class storage_proxy; namespace db::view { -class view_update_generator { +class view_update_generator : public async_sharded_service { public: static constexpr size_t registration_queue_size = 5; diff --git a/replica/database.cc b/replica/database.cc index 91561c8cfa..a47666e2a5 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -47,6 +47,7 @@ #include "timeout_config.hh" #include "service/storage_proxy.hh" #include "db/operation_type.hh" +#include "db/view/view_update_generator.hh" #include "utils/human_readable.hh" #include "utils/fb_utilities.hh" @@ -2719,6 +2720,14 @@ void database::unplug_system_keyspace() noexcept { _large_data_handler->unplug_system_keyspace(); } +void database::plug_view_update_generator(db::view::view_update_generator& generator) noexcept { + _view_update_generator = generator.shared_from_this(); +} + +void database::unplug_view_update_generator() noexcept { + _view_update_generator = nullptr; +} + } // namespace replica template diff --git a/replica/database.hh b/replica/database.hh index a9ea993e8a..ade747578e 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -123,6 +123,10 @@ class table_selector; future<> system_keyspace_make(db::system_keyspace& sys_ks, distributed& db, distributed& ss, sharded& g, sharded& raft_gr, db::config& cfg, system_table_load_phase phase); +namespace view { +class view_update_generator; +} + } class mutation_reordered_with_truncate_exception : public std::exception {}; @@ -1323,6 +1327,7 @@ private: db::timeout_semaphore _view_update_concurrency_sem{max_memory_pending_view_updates()}; cache_tracker _row_cache_tracker; + seastar::shared_ptr _view_update_generator; inheriting_concrete_execution_stage< future<>, @@ -1400,6 +1405,9 @@ public: void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept; void unplug_system_keyspace() noexcept; + void plug_view_update_generator(db::view::view_update_generator& generator) noexcept; + void unplug_view_update_generator() noexcept; + private: future<> flush_non_system_column_families(); future<> flush_system_column_families();