diff --git a/inet_address_vectors.hh b/inet_address_vectors.hh index 6eefe99461..bae10f6a1a 100644 --- a/inet_address_vectors.hh +++ b/inet_address_vectors.hh @@ -19,3 +19,8 @@ using inet_address_vector_topology_change = utils::small_vector; using host_id_vector_topology_change = utils::small_vector; + +template +concept HostIdVector = + std::same_as, host_id_vector_replica_set> || + std::same_as, host_id_vector_topology_change>; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index af5af4b76f..8de265d42a 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -132,6 +132,14 @@ void validate_read_replicas(const locator::effective_replication_map& erm, const } } +void remove_non_local_host_ids(HostIdVector auto& host_ids, const locator::effective_replication_map& erm) { + const auto my_id = erm.get_topology().my_host_id(); + auto it = std::ranges::remove_if(host_ids, [&my_id](locator::host_id& id) { + return id != my_id; + }).begin(); + host_ids.erase(it, host_ids.end()); +} + } // namespace namespace storage_proxy_stats { @@ -3386,7 +3394,7 @@ storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, froze result storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, - db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable, coordinator_mutate_options) { + db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable, coordinator_mutate_options options) { replica::table& table = _db.local().find_column_family(s->id()); auto erm = table.get_effective_replication_map(); host_id_vector_replica_set natural_endpoints = erm->get_natural_replicas(token); @@ -3413,6 +3421,11 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok }).begin(); pending_endpoints.erase(itend, pending_endpoints.end()); + if (options.node_local_only) { + remove_non_local_host_ids(natural_endpoints, *erm); + remove_non_local_host_ids(pending_endpoints, *erm); + } + auto all = natural_endpoints; std::ranges::copy(pending_endpoints, std::back_inserter(all)); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 8cd759b64e..38cd517afb 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -157,6 +157,10 @@ struct storage_proxy_coordinator_query_result { } }; +struct storage_proxy_coordinator_mutate_options { + node_local_only node_local_only = node_local_only::no; +}; + class cas_request; class storage_proxy : public seastar::async_sharded_service, public peering_sharded_service, public service::endpoint_lifecycle_subscriber { @@ -211,9 +215,7 @@ public: using coordinator_query_options = storage_proxy_coordinator_query_options; using coordinator_query_result = storage_proxy_coordinator_query_result; - - struct coordinator_mutate_options { - }; + using coordinator_mutate_options = storage_proxy_coordinator_mutate_options; // Holds a list of endpoints participating in CAS request, for a given // consistency level, token, and state of joining/leaving nodes.