From 7cabdc54a6e73144450829aa51de5f35c0a17e86 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 28 Mar 2023 12:12:56 +0300 Subject: [PATCH] view: Make mutate_MV() method of view_update_generator Nowadays its a static helper, but internally it depends on storage proxy, so it grabs its global instance. Making it a method of view update generator makes it possible to use the proxy dependency from the generator. Signed-off-by: Pavel Emelyanov --- db/view/view.cc | 2 +- db/view/view.hh | 17 ----------------- db/view/view_update_generator.hh | 30 ++++++++++++++++++++++++++++++ replica/table.cc | 6 +++--- 4 files changed, 34 insertions(+), 21 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index a0ac8255ca..df90d29e4c 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1614,7 +1614,7 @@ static bool should_update_synchronously(const schema& s) { // to a modification of a single base partition, and apply them to the // appropriate paired replicas. This is done asynchronously - we do not wait // for the writes to complete. -future<> mutate_MV( +future<> view_update_generator::mutate_MV( dht::token base_token, utils::chunked_vector view_updates, db::view::stats& stats, diff --git a/db/view/view.hh b/db/view/view.hh index a657109015..70ccd28bf1 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -21,11 +21,6 @@ namespace replica { struct cf_stats; } -namespace service { -struct allow_hints_tag; -using allow_hints = bool_class; -} - namespace db { namespace view { @@ -315,18 +310,6 @@ future calculate_affected_clustering_ranges( bool needs_static_row(const mutation_partition& mp, const std::vector& views); -struct wait_for_all_updates_tag {}; -using wait_for_all_updates = bool_class; -future<> mutate_MV( - dht::token base_token, - utils::chunked_vector view_updates, - db::view::stats& stats, - replica::cf_stats& cf_stats, - tracing::trace_state_ptr tr_state, - db::timeout_semaphore_units pending_view_updates, - service::allow_hints allow_hints, - wait_for_all_updates wait_for_all); - /** * create_virtual_column() adds a "virtual column" to a schema builder. * The definition of a "virtual column" is based on the given definition diff --git a/db/view/view_update_generator.hh b/db/view/view_update_generator.hh index 2f84af9a68..6b9c0cf5e7 100644 --- a/db/view/view_update_generator.hh +++ b/db/view/view_update_generator.hh @@ -9,7 +9,10 @@ #pragma once #include "sstables/shared_sstable.hh" +#include "db/timeout_clock.hh" +#include "utils/chunked_vector.hh" +#include #include #include #include @@ -17,17 +20,34 @@ using namespace seastar; +struct frozen_mutation_and_schema; + +namespace dht { +class token; +} + +namespace tracing { +class trace_state_ptr; +} + namespace replica { class database; class table; +struct cf_stats; } namespace service { class storage_proxy; +struct allow_hints_tag; +using allow_hints = bool_class; } namespace db::view { +class stats; +struct wait_for_all_updates_tag {}; +using wait_for_all_updates = bool_class; + class view_update_generator : public async_sharded_service { public: static constexpr size_t registration_queue_size = 5; @@ -52,6 +72,16 @@ public: future<> stop(); future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr table); + future<> mutate_MV( + dht::token base_token, + utils::chunked_vector view_updates, + db::view::stats& stats, + replica::cf_stats& cf_stats, + tracing::trace_state_ptr tr_state, + db::timeout_semaphore_units pending_view_updates, + service::allow_hints allow_hints, + wait_for_all_updates wait_for_all); + ssize_t available_register_units() const { return _registration_sem.available_units(); } private: bool should_throttle() const; diff --git a/replica/table.cc b/replica/table.cc index 67498f2dfc..1568de6b60 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -34,7 +34,7 @@ #include "db/system_keyspace.hh" #include "db/query_context.hh" #include "query-result-writer.hh" -#include "db/view/view.hh" +#include "db/view/view_update_generator.hh" #include #include #include "utils/error_injection.hh" @@ -1999,7 +1999,7 @@ future<> table::generate_and_propagate_view_updates(shared_ptrsize()); auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(*updates)); try { - co_await db::view::mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state, + co_await gen->mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state, std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no); } catch (...) { // Ignore exceptions: any individual failure to propagate a view update will be reported @@ -2132,7 +2132,7 @@ future<> table::populate_views( size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size); auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for); units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, update_size - units_to_wait_for)); - co_await db::view::mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, + co_await gen->mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes); } catch (...) { if (!err) {