storage_proxy: add fencing to Paxos verbs

This commit adds fencing support to all Paxos verbs:
* Pass an optional (for backward compatibility) fencing_token as a
parameter to the prepare, accept, learn, and prune verbs.
* Call apply_fence twice — before and after accessing local data. This
ensures that if the coordinator is fenced out mid-request, the replica
does not return success, which would otherwise incorrectly contribute
to achieving the target CL. Without this, a user might observe
successful writes that become unreadable after the topology
operation completes.
* For prune, call apply_fence only once because it does not return a
response to the LWT coordinator.

Fixes scylladb/scylladb#22332
This commit is contained in:
Petr Gusev
2025-08-01 15:51:40 +02:00
parent ab750af711
commit 6d7af84fed
2 changed files with 58 additions and 32 deletions

View File

@@ -35,7 +35,7 @@ verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command
verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]], std::optional<full_position> [[version 5.2.0]];
verb [[with_timeout]] truncate (sstring, sstring);
verb [[]] truncate_with_tablets (sstring ks_name, sstring cf_name, service::frozen_topology_guard frozen_guard);
verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info [[ref]]) -> service::paxos::prepare_response [[unique_ptr]];
verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional<tracing::trace_info> trace_info [[ref]]) -> bool;
verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]]);
verb [[with_client_info, with_timeout, one_way]] paxos_prune (table_schema_version schema_id, partition_key key [[ref]], utils::UUID ballot, std::optional<tracing::trace_info> trace_info [[ref]]);
verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> service::paxos::prepare_response [[unique_ptr]];
verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> bool;
verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]], service::fencing_token fence [[version 2025.4]]);
verb [[with_client_info, with_timeout, one_way]] paxos_prune (table_schema_version schema_id, partition_key key [[ref]], utils::UUID ballot, std::optional<tracing::trace_info> trace_info [[ref]], service::fencing_token fence [[version 2025.4]]);

View File

@@ -438,31 +438,31 @@ public:
future<service::paxos::prepare_response> send_paxos_prepare(
locator::host_id addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state,
const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da) {
const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, fencing_token fence) {
tracing::trace(tr_state, "prepare_ballot: sending prepare {} to {}", ballot, addr);
return ser::storage_proxy_rpc_verbs::send_paxos_prepare(
&_ms, addr, timeout, cmd, key, ballot, only_digest, da, tracing::make_trace_info(tr_state));
&_ms, addr, timeout, cmd, key, ballot, only_digest, da, tracing::make_trace_info(tr_state), fence);
}
future<bool> send_paxos_accept(
locator::host_id addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state,
const service::paxos::proposal& proposal) {
const service::paxos::proposal& proposal, fencing_token fence) {
tracing::trace(tr_state, "accept_proposal: send accept {} to {}", proposal, addr);
return ser::storage_proxy_rpc_verbs::send_paxos_accept(&_ms, std::move(addr), timeout, proposal, tracing::make_trace_info(tr_state));
return ser::storage_proxy_rpc_verbs::send_paxos_accept(&_ms, std::move(addr), timeout, proposal, tracing::make_trace_info(tr_state), fence);
}
future<> send_paxos_learn(
locator::host_id addr, storage_proxy::clock_type::time_point timeout, const std::optional<tracing::trace_info>& trace_info,
const service::paxos::proposal& decision, const host_id_vector_replica_set& forward,
gms::inet_address reply_to_ip, locator::host_id reply_to, unsigned shard, uint64_t response_id) {
gms::inet_address reply_to_ip, locator::host_id reply_to, unsigned shard, uint64_t response_id, fencing_token fence) {
return ser::storage_proxy_rpc_verbs::send_paxos_learn(
&_ms, addr, timeout, decision, {}, reply_to_ip, shard, response_id, trace_info, forward, reply_to);
&_ms, addr, timeout, decision, {}, reply_to_ip, shard, response_id, trace_info, forward, reply_to, fence);
}
future<> send_paxos_prune(
locator::host_id addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state,
table_schema_version schema_id, const partition_key& key, utils::UUID ballot) {
return ser::storage_proxy_rpc_verbs::send_paxos_prune(&_ms, addr, timeout, schema_id, key, ballot, tracing::make_trace_info(tr_state));
table_schema_version schema_id, const partition_key& key, utils::UUID ballot, fencing_token fence) {
return ser::storage_proxy_rpc_verbs::send_paxos_prune(&_ms, addr, timeout, schema_id, key, ballot, tracing::make_trace_info(tr_state), fence);
}
future<> truncate_with_tablets(sstring ks_name, sstring cf_name, std::chrono::milliseconds timeout_in_ms) {
@@ -726,22 +726,23 @@ private:
const rpc::client_info& cinfo, rpc::opt_time_point t,
paxos::proposal decision, inet_address_vector_replica_set forward, gms::inet_address reply_to, unsigned shard,
storage_proxy::response_id_type response_id, std::optional<tracing::trace_info> trace_info,
rpc::optional<host_id_vector_replica_set> forward_id, rpc::optional<locator::host_id> reply_to_id) {
rpc::optional<host_id_vector_replica_set> forward_id, rpc::optional<locator::host_id> reply_to_id,
rpc::optional<fencing_token> fence) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto schema_version = decision.update.schema_version();
return handle_write(src_addr, t, schema_version, std::move(decision), std::move(forward), reply_to, std::move(forward_id),reply_to_id, shard,
response_id, trace_info,
fencing_token{},
/* apply_fn */ [this] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s,
const paxos::proposal& decision, clock_type::time_point timeout, fencing_token) {
return paxos::paxos_state::learn(*p, paxos_store(), std::move(s), decision, timeout, tr_state);
fence.value_or(fencing_token{}),
/* apply_fn */ [this, src_addr] (shared_ptr<storage_proxy>& p, tracing::trace_state_ptr tr_state, schema_ptr s,
const paxos::proposal& decision, clock_type::time_point timeout, fencing_token fence) {
return p->apply_fence_on_ready(paxos::paxos_state::learn(*p, paxos_store(), std::move(s), decision, timeout, tr_state), fence, src_addr);
},
/* forward_fn */ [this] (shared_ptr<storage_proxy>&, locator::host_id addr, clock_type::time_point timeout, const paxos::proposal& m,
gms::inet_address ip, locator::host_id reply_to, unsigned shard, response_id_type response_id,
const std::optional<tracing::trace_info>& trace_info, fencing_token) {
return send_paxos_learn(addr, timeout, trace_info, m, {}, ip, reply_to, shard, response_id);
const std::optional<tracing::trace_info>& trace_info, fencing_token fence) {
return send_paxos_learn(addr, timeout, trace_info, m, {}, ip, reply_to, shard, response_id, fence);
});
}
@@ -968,7 +969,8 @@ private:
handle_paxos_prepare(
const rpc::client_info& cinfo, rpc::opt_time_point timeout,
query::read_command cmd, partition_key key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info) {
bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info,
rpc::optional<fencing_token> fence_opt) {
auto src_addr = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto src_shard = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
@@ -978,6 +980,9 @@ private:
tracing::begin(tr_state);
tracing::trace(tr_state, "paxos_prepare: message received from /{} ballot {}", src_addr, ballot);
}
co_await _sp.apply_fence(fence_opt, src_addr);
if (!cmd.max_result_size) {
cmd.max_result_size.emplace(cinfo.retrieve_auxiliary<uint64_t>("max_result_size"));
}
@@ -987,7 +992,7 @@ private:
unsigned shard = schema->table().shard_for_reads(token);
bool local = shard == this_shard_id();
_sp.get_stats().replica_cross_shard_ops += !local;
co_return co_await _sp.container().invoke_on(shard, _sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
auto result = co_await _sp.container().invoke_on(shard, _sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
cmd = make_lw_shared<query::read_command>(std::move(cmd)), key = std::move(key),
ballot, only_digest, da, timeout, src_addr, &paxos_store = _paxos_store] (storage_proxy& sp) -> future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>> {
tracing::trace_state_ptr tr_state = gt;
@@ -995,11 +1000,16 @@ private:
tracing::trace(tr_state, "paxos_prepare: handling is done, sending a response to /{}", src_addr);
co_return make_foreign(std::make_unique<paxos::prepare_response>(std::move(r)));
});
co_await _sp.apply_fence(fence_opt, src_addr);
co_return std::move(result);
}
future<bool> handle_paxos_accept(
const rpc::client_info& cinfo, rpc::opt_time_point timeout,
paxos::proposal proposal, std::optional<tracing::trace_info> trace_info) {
paxos::proposal proposal, std::optional<tracing::trace_info> trace_info,
rpc::optional<fencing_token> fence_opt) {
auto src_addr = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto src_shard = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
@@ -1009,6 +1019,9 @@ private:
tracing::begin(tr_state);
tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_addr, proposal);
}
co_await _sp.apply_fence(fence_opt, src_addr);
auto handling_done = defer([tr_state, src_addr] {
if (tr_state) {
tracing::trace(tr_state, "paxos_accept: handling is done, sending a response to /{}", src_addr);
@@ -1019,15 +1032,20 @@ private:
unsigned shard = schema->table().shard_for_reads(token);
bool local = shard == this_shard_id();
_sp.get_stats().replica_cross_shard_ops += !local;
co_return co_await _sp.container().invoke_on(shard, _sp._write_smp_service_group, coroutine::lambda([gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(tr_state),
auto result = co_await _sp.container().invoke_on(shard, _sp._write_smp_service_group, coroutine::lambda([gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(tr_state),
proposal = std::move(proposal), timeout, token, this] (storage_proxy& sp) {
return paxos::paxos_state::accept(sp, paxos_store(), gt, gs, token, proposal, *timeout);
}));
co_await _sp.apply_fence(fence_opt, src_addr);
co_return std::move(result);
}
future<rpc::no_wait_type> handle_paxos_prune(
const rpc::client_info& cinfo, rpc::opt_time_point timeout,
table_schema_version schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
table_schema_version schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info,
rpc::optional<fencing_token> fence_opt) {
static thread_local uint16_t pruning = 0;
static constexpr uint16_t pruning_limit = 1000; // since PRUNE verb is one way replica side has its own queue limit
auto src_addr = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
@@ -1040,6 +1058,8 @@ private:
tracing::trace(tr_state, "paxos_prune: message received from /{} ballot {}", src_addr, ballot);
}
co_await _sp.apply_fence(fence_opt, src_addr);
if (pruning >= pruning_limit) {
_sp.get_stats().cas_replica_dropped_prune++;
tracing::trace(tr_state, "paxos_prune: do not prune due to overload", src_addr);
@@ -1492,6 +1512,10 @@ public:
const locator::effective_replication_map_ptr& get_effective_replication_map() const noexcept {
return _token_guard.get_erm();
}
fencing_token get_fence() const {
return storage_proxy::get_fence(*get_effective_replication_map());
}
};
thread_local uint64_t paxos_response_handler::next_id = 0;
@@ -1511,18 +1535,18 @@ public:
}
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
const locator::effective_replication_map& erm, fencing_token) override {
const locator::effective_replication_map& erm, fencing_token fence) override {
tracing::trace(tr_state, "Executing a learn locally");
// TODO: Enforce per partition rate limiting in paxos
return paxos::paxos_state::learn(sp, sp.remote().paxos_store(), _schema, *_proposal, timeout, tr_state);
return sp.apply_fence_on_ready(paxos::paxos_state::learn(sp, sp.remote().paxos_store(), _schema, *_proposal, timeout, tr_state), fence, sp.my_host_id(erm));
}
virtual future<> apply_remotely(storage_proxy& sp, locator::host_id ep, const host_id_vector_replica_set& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token) override {
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token fence) override {
tracing::trace(tr_state, "Sending a learn to /{}", ep);
// TODO: Enforce per partition rate limiting in paxos
return sp.remote().send_paxos_learn(ep, timeout, tracing::make_trace_info(tr_state),
*_proposal, forward, sp.my_address(), sp.get_token_metadata_ptr()->get_my_id(), this_shard_id(), response_id);
*_proposal, forward, sp.my_address(), sp.get_token_metadata_ptr()->get_my_id(), this_shard_id(), response_id, fence);
}
virtual bool is_shared() override {
return true;
@@ -2219,8 +2243,9 @@ future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUI
if (topo.is_me(peer)) {
tracing::trace(tr_state, "prepare_ballot: prepare {} locally", ballot);
response = co_await paxos::paxos_state::prepare(*_proxy, _proxy->remote().paxos_store(), tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout);
co_await _proxy->apply_fence(get_fence(), peer);
} else {
response = co_await _proxy->remote().send_paxos_prepare(peer, _timeout, tr_state, *_cmd, _key.key(), ballot, only_digest, da);
response = co_await _proxy->remote().send_paxos_prepare(peer, _timeout, tr_state, *_cmd, _key.key(), ballot, only_digest, da, get_fence());
}
} catch (...) {
if (request_tracker.p) {
@@ -2383,8 +2408,9 @@ future<bool> paxos_response_handler::accept_proposal(lw_shared_ptr<paxos::propos
if (topo.is_me(peer)) {
tracing::trace(tr_state, "accept_proposal: accept {} locally", *proposal);
accepted = co_await paxos::paxos_state::accept(*_proxy, _proxy->remote().paxos_store(), tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout);
co_await _proxy->apply_fence(get_fence(), peer);
} else {
accepted = co_await _proxy->remote().send_paxos_accept(peer, _timeout, tr_state, *proposal);
accepted = co_await _proxy->remote().send_paxos_accept(peer, _timeout, tr_state, *proposal, get_fence());
}
} catch(...) {
if (request_tracker.p) {
@@ -2551,10 +2577,10 @@ void paxos_response_handler::prune(utils::UUID ballot) {
(void)parallel_for_each(_live_endpoints, [this, ballot, erm, my_address] (locator::host_id peer) mutable {
if (peer == my_address) {
tracing::trace(tr_state, "prune: prune {} locally", ballot);
return paxos::paxos_state::prune(_proxy->remote().paxos_store(), _schema, _key.key(), ballot, _timeout, tr_state);
return _proxy->apply_fence_on_ready(paxos::paxos_state::prune(_proxy->remote().paxos_store(), _schema, _key.key(), ballot, _timeout, tr_state), get_fence(), my_address);
} else {
tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer);
return _proxy->remote().send_paxos_prune(peer, _timeout, tr_state, _schema->version(), _key.key(), ballot);
return _proxy->remote().send_paxos_prune(peer, _timeout, tr_state, _schema->version(), _key.key(), ballot, get_fence());
}
}).then_wrapped([this, h = shared_from_this()] (future<> f) {
h->_proxy->get_stats().cas_now_pruning--;