diff --git a/db/view/view.cc b/db/view/view.cc index f4245e3786..38ee54d4eb 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1689,7 +1689,7 @@ future<> view_update_generator::mutate_MV( auto mut_ptr = remote_endpoints.empty() ? std::make_unique(std::move(mut.fm)) : std::make_unique(mut.fm); tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}", mut.s->ks_name(), mut.s->cf_name(), base_token, view_token); - local_view_update = _proxy.local().mutate_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped( + local_view_update = _proxy.local().mutate_mv_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped( [s = mut.s, &stats, &cf_stats, tr_state, base_token, view_token, my_address, mut_ptr = std::move(mut_ptr), units = sem_units.split(sem_units.count())] (future<>&& f) { --stats.writes; diff --git a/main.cc b/main.cc index 093115ef68..3bdbad9a87 100644 --- a/main.cc +++ b/main.cc @@ -1049,6 +1049,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl storage_proxy_smp_service_group_config.max_nonlocal_requests = 5000; spcfg.read_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); spcfg.write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); + spcfg.write_mv_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); spcfg.hints_write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); spcfg.write_ack_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); static db::view::node_update_backlog node_backlog(smp::count, 10ms); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index eea837f2a9..c3da446095 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2825,6 +2825,7 @@ storage_proxy::storage_proxy(distributed& db, storage_proxy:: , _erm_factory(erm_factory) , _read_smp_service_group(cfg.read_smp_service_group) , _write_smp_service_group(cfg.write_smp_service_group) + , _write_mv_smp_service_group(cfg.write_mv_smp_service_group) , _hints_write_smp_service_group(cfg.hints_write_smp_service_group) , _write_ack_smp_service_group(cfg.write_ack_smp_service_group) , _next_response_id(std::chrono::system_clock::now().time_since_epoch()/1ms) diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 6efdcc2faa..e631c81abb 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -165,6 +165,7 @@ public: size_t available_memory; smp_service_group read_smp_service_group = default_smp_service_group(); smp_service_group write_smp_service_group = default_smp_service_group(); + smp_service_group write_mv_smp_service_group = default_smp_service_group(); smp_service_group hints_write_smp_service_group = default_smp_service_group(); // Write acknowledgments might not be received on the correct shard, and // they need a separate smp_service_group to prevent an ABBA deadlock @@ -233,6 +234,7 @@ private: locator::effective_replication_map_factory& _erm_factory; smp_service_group _read_smp_service_group; smp_service_group _write_smp_service_group; + smp_service_group _write_mv_smp_service_group; smp_service_group _hints_write_smp_service_group; smp_service_group _write_ack_smp_service_group; response_id_type _next_response_id; @@ -529,6 +531,11 @@ public: future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) { return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info); } + // Applies materialized view mutation on this node. + // Resolves with timed_out_error when timeout is reached. + future<> mutate_mv_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) { + return mutate_locally(s, m, tr_state, sync, timeout, _write_mv_smp_service_group, rate_limit_info); + } // Applies mutations on this node. // Resolves with timed_out_error when timeout is reached. future<> mutate_locally(std::vector mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate());