From cc262d814b073a3cb819a6575a7fe5877b46349b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 28 Mar 2023 12:16:34 +0300 Subject: [PATCH] view: Drop global storage_proxy usage from mutate_MV() Now the mutate_MV is the method of v.u.generator which has reference to the sharded. Few helper static wrappers are patched to get the needed proxy or database reference from the mutate_MV call. Signed-off-by: Pavel Emelyanov --- db/view/view.cc | 17 ++++++++--------- db/view/view_update_generator.cc | 1 - 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index df90d29e4c..63d7c70c8b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1543,9 +1543,8 @@ bool needs_static_row(const mutation_partition& mp, const std::vector -get_view_natural_endpoint(const sstring& keyspace_name, +get_view_natural_endpoint(replica::database& db, const sstring& keyspace_name, const dht::token& base_token, const dht::token& view_token) { - auto &db = service::get_local_storage_proxy().local_db(); auto& ks = db.find_keyspace(keyspace_name); auto erm = ks.get_effective_replication_map(); auto& topology = erm->get_token_metadata_ptr()->get_topology(); @@ -1586,13 +1585,13 @@ get_view_natural_endpoint(const sstring& keyspace_name, return view_endpoints[base_it - base_endpoints.begin()]; } -static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints, +static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints, frozen_mutation_and_schema&& mut, const dht::token& base_token, const dht::token& view_token, service::allow_hints allow_hints, tracing::trace_state_ptr tr_state) { tracing::trace(tr_state, "Sending view update for {}.{} to {}, with pending endpoints = {}; base token = {}; view token = {}", mut.s->ks_name(), mut.s->cf_name(), target, pending_endpoints, base_token, view_token); - return service::get_local_storage_proxy().send_to_endpoint( + return proxy.send_to_endpoint( std::move(mut), target, std::move(pending_endpoints), @@ -1626,11 +1625,11 @@ future<> view_update_generator::mutate_MV( { static constexpr size_t max_concurrent_updates = 128; co_await max_concurrent_for_each(view_updates, max_concurrent_updates, - [base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> { + [this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> { auto view_token = dht::get_token(*mut.s, mut.fm.key()); auto& keyspace_name = mut.s->ks_name(); - auto target_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token); - auto remote_endpoints = service::get_local_storage_proxy().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name); + auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token); + auto remote_endpoints = _proxy.local().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name); auto sem_units = pending_view_updates.split(mut.fm.representation().size()); const bool update_synchronously = should_update_synchronously(*mut.s); @@ -1677,7 +1676,7 @@ future<> view_update_generator::mutate_MV( auto mut_ptr = remote_endpoints.empty() ? std::make_unique(std::move(mut.fm)) : std::make_unique(mut.fm); tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}", mut.s->ks_name(), mut.s->cf_name(), base_token, view_token); - local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped( + local_view_update = _proxy.local().mutate_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped( [s = mut.s, &stats, &cf_stats, tr_state, base_token, view_token, my_address, mut_ptr = std::move(mut_ptr), units = sem_units.split(sem_units.count())] (future<>&& f) { --stats.writes; @@ -1714,7 +1713,7 @@ future<> view_update_generator::mutate_MV( stats.view_updates_pushed_remote += updates_pushed_remote; cf_stats.total_view_updates_pushed_remote += updates_pushed_remote; schema_ptr s = mut.s; - future<> view_update = apply_to_remote_endpoints(*target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( + future<> view_update = apply_to_remote_endpoints(_proxy.local(), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( [s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote, units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable { if (f.failed()) { diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index c773ba13a8..75db180d1a 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -96,7 +96,6 @@ view_update_generator::view_update_generator(replica::database& db, sharded()) { setup_metrics(); discover_staging_sstables(); - (void)_proxy; _db.plug_view_update_generator(*this); }