view: Drop global storage_proxy usage from mutate_MV()

Now the mutate_MV is the method of v.u.generator which has reference to
the sharded<storage_proxy>. Few helper static wrappers are patched to
get the needed proxy or database reference from the mutate_MV call.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-03-28 12:16:34 +03:00
parent 7cabdc54a6
commit cc262d814b
2 changed files with 8 additions and 10 deletions

View File

@@ -1543,9 +1543,8 @@ bool needs_static_row(const mutation_partition& mp, const std::vector<view_and_b
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
static std::optional<gms::inet_address>
get_view_natural_endpoint(const sstring& keyspace_name,
get_view_natural_endpoint(replica::database& db, const sstring& keyspace_name,
const dht::token& base_token, const dht::token& view_token) {
auto &db = service::get_local_storage_proxy().local_db();
auto& ks = db.find_keyspace(keyspace_name);
auto erm = ks.get_effective_replication_map();
auto& topology = erm->get_token_metadata_ptr()->get_topology();
@@ -1586,13 +1585,13 @@ get_view_natural_endpoint(const sstring& keyspace_name,
return view_endpoints[base_it - base_endpoints.begin()];
}
static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints,
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints,
frozen_mutation_and_schema&& mut, const dht::token& base_token, const dht::token& view_token,
service::allow_hints allow_hints, tracing::trace_state_ptr tr_state) {
tracing::trace(tr_state, "Sending view update for {}.{} to {}, with pending endpoints = {}; base token = {}; view token = {}",
mut.s->ks_name(), mut.s->cf_name(), target, pending_endpoints, base_token, view_token);
return service::get_local_storage_proxy().send_to_endpoint(
return proxy.send_to_endpoint(
std::move(mut),
target,
std::move(pending_endpoints),
@@ -1626,11 +1625,11 @@ future<> view_update_generator::mutate_MV(
{
static constexpr size_t max_concurrent_updates = 128;
co_await max_concurrent_for_each(view_updates, max_concurrent_updates,
[base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> {
[this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all] (frozen_mutation_and_schema mut) mutable -> future<> {
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto& keyspace_name = mut.s->ks_name();
auto target_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token);
auto remote_endpoints = service::get_local_storage_proxy().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name);
auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token);
auto remote_endpoints = _proxy.local().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name);
auto sem_units = pending_view_updates.split(mut.fm.representation().size());
const bool update_synchronously = should_update_synchronously(*mut.s);
@@ -1677,7 +1676,7 @@ future<> view_update_generator::mutate_MV(
auto mut_ptr = remote_endpoints.empty() ? std::make_unique<frozen_mutation>(std::move(mut.fm)) : std::make_unique<frozen_mutation>(mut.fm);
tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}",
mut.s->ks_name(), mut.s->cf_name(), base_token, view_token);
local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped(
local_view_update = _proxy.local().mutate_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped(
[s = mut.s, &stats, &cf_stats, tr_state, base_token, view_token, my_address, mut_ptr = std::move(mut_ptr),
units = sem_units.split(sem_units.count())] (future<>&& f) {
--stats.writes;
@@ -1714,7 +1713,7 @@ future<> view_update_generator::mutate_MV(
stats.view_updates_pushed_remote += updates_pushed_remote;
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
schema_ptr s = mut.s;
future<> view_update = apply_to_remote_endpoints(*target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
future<> view_update = apply_to_remote_endpoints(_proxy.local(), *target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped(
[s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote,
units = sem_units.split(sem_units.count()), apply_update_synchronously] (future<>&& f) mutable {
if (f.failed()) {

View File

@@ -96,7 +96,6 @@ view_update_generator::view_update_generator(replica::database& db, sharded<serv
, _progress_tracker(std::make_unique<progress_tracker>()) {
setup_metrics();
discover_staging_sstables();
(void)_proxy;
_db.plug_view_update_generator(*this);
}