storage_proxy: handle node_local_only in mutate

We add the remove_non_local_host_ids() helper, which
will be used in the next commit to support the read
path. HostIdVector concept is introduced to be able
to handle both host_id_vector_replica_set and
host_id_vector_topology_change uniformly.

The storage_proxy_coordinator_mutate_options class
is declared outside of storage_proxy to avoid C++
compiler complaints about default field initializers.
In particular, some storage_proxy methods use this
class for optional parameters with default values,
which is not allowed when the class is defined inside
storage_proxy.
This commit is contained in:
Petr Gusev
2025-07-09 13:53:14 +02:00
parent 7eb198f2cc
commit 2d747d97b8
3 changed files with 24 additions and 4 deletions

View File

@@ -19,3 +19,8 @@ using inet_address_vector_topology_change = utils::small_vector<gms::inet_addres
using host_id_vector_replica_set = utils::small_vector<locator::host_id, 3>;
using host_id_vector_topology_change = utils::small_vector<locator::host_id, 1>;
template <typename T>
concept HostIdVector =
std::same_as<std::remove_cvref_t<T>, host_id_vector_replica_set> ||
std::same_as<std::remove_cvref_t<T>, host_id_vector_topology_change>;

View File

@@ -132,6 +132,14 @@ void validate_read_replicas(const locator::effective_replication_map& erm, const
}
}
void remove_non_local_host_ids(HostIdVector auto& host_ids, const locator::effective_replication_map& erm) {
const auto my_id = erm.get_topology().my_host_id();
auto it = std::ranges::remove_if(host_ids, [&my_id](locator::host_id& id) {
return id != my_id;
}).begin();
host_ids.erase(it, host_ids.end());
}
} // namespace
namespace storage_proxy_stats {
@@ -3386,7 +3394,7 @@ storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, froze
result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr<mutation_holder> mh,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable, coordinator_mutate_options) {
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable, coordinator_mutate_options options) {
replica::table& table = _db.local().find_column_family(s->id());
auto erm = table.get_effective_replication_map();
host_id_vector_replica_set natural_endpoints = erm->get_natural_replicas(token);
@@ -3413,6 +3421,11 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
}).begin();
pending_endpoints.erase(itend, pending_endpoints.end());
if (options.node_local_only) {
remove_non_local_host_ids(natural_endpoints, *erm);
remove_non_local_host_ids(pending_endpoints, *erm);
}
auto all = natural_endpoints;
std::ranges::copy(pending_endpoints, std::back_inserter(all));

View File

@@ -157,6 +157,10 @@ struct storage_proxy_coordinator_query_result {
}
};
struct storage_proxy_coordinator_mutate_options {
node_local_only node_local_only = node_local_only::no;
};
class cas_request;
class storage_proxy : public seastar::async_sharded_service<storage_proxy>, public peering_sharded_service<storage_proxy>, public service::endpoint_lifecycle_subscriber {
@@ -211,9 +215,7 @@ public:
using coordinator_query_options = storage_proxy_coordinator_query_options;
using coordinator_query_result = storage_proxy_coordinator_query_result;
struct coordinator_mutate_options {
};
using coordinator_mutate_options = storage_proxy_coordinator_mutate_options;
// Holds a list of endpoints participating in CAS request, for a given
// consistency level, token, and state of joining/leaving nodes.