diff --git a/idl/storage_proxy.idl.hh b/idl/storage_proxy.idl.hh index 7559bfc22a..2e32c5bcc1 100644 --- a/idl/storage_proxy.idl.hh +++ b/idl/storage_proxy.idl.hh @@ -35,7 +35,7 @@ verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]], std::optional [[version 5.2.0]]; verb [[with_timeout]] truncate (sstring, sstring); verb [[]] truncate_with_tablets (sstring ks_name, sstring cf_name, service::frozen_topology_guard frozen_guard); -verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional trace_info [[ref]]) -> service::paxos::prepare_response [[unique_ptr]]; -verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional trace_info [[ref]]) -> bool; -verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]]); -verb [[with_client_info, with_timeout, one_way]] paxos_prune (table_schema_version schema_id, partition_key key [[ref]], utils::UUID ballot, std::optional trace_info [[ref]]); +verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd [[ref]], partition_key key [[ref]], utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> service::paxos::prepare_response [[unique_ptr]]; +verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional trace_info [[ref]], service::fencing_token fence [[version 2025.4]]) -> bool; +verb [[with_client_info, with_timeout, one_way]] paxos_learn (service::paxos::proposal decision [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional trace_info [[ref]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]], service::fencing_token fence [[version 2025.4]]); +verb [[with_client_info, with_timeout, one_way]] paxos_prune (table_schema_version schema_id, partition_key key [[ref]], utils::UUID ballot, std::optional trace_info [[ref]], service::fencing_token fence [[version 2025.4]]); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index a0f8c2a282..61c96b1186 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -438,31 +438,31 @@ public: future send_paxos_prepare( locator::host_id addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, - const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da) { + const query::read_command& cmd, const partition_key& key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, fencing_token fence) { tracing::trace(tr_state, "prepare_ballot: sending prepare {} to {}", ballot, addr); return ser::storage_proxy_rpc_verbs::send_paxos_prepare( - &_ms, addr, timeout, cmd, key, ballot, only_digest, da, tracing::make_trace_info(tr_state)); + &_ms, addr, timeout, cmd, key, ballot, only_digest, da, tracing::make_trace_info(tr_state), fence); } future send_paxos_accept( locator::host_id addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, - const service::paxos::proposal& proposal) { + const service::paxos::proposal& proposal, fencing_token fence) { tracing::trace(tr_state, "accept_proposal: send accept {} to {}", proposal, addr); - return ser::storage_proxy_rpc_verbs::send_paxos_accept(&_ms, std::move(addr), timeout, proposal, tracing::make_trace_info(tr_state)); + return ser::storage_proxy_rpc_verbs::send_paxos_accept(&_ms, std::move(addr), timeout, proposal, tracing::make_trace_info(tr_state), fence); } future<> send_paxos_learn( locator::host_id addr, storage_proxy::clock_type::time_point timeout, const std::optional& trace_info, const service::paxos::proposal& decision, const host_id_vector_replica_set& forward, - gms::inet_address reply_to_ip, locator::host_id reply_to, unsigned shard, uint64_t response_id) { + gms::inet_address reply_to_ip, locator::host_id reply_to, unsigned shard, uint64_t response_id, fencing_token fence) { return ser::storage_proxy_rpc_verbs::send_paxos_learn( - &_ms, addr, timeout, decision, {}, reply_to_ip, shard, response_id, trace_info, forward, reply_to); + &_ms, addr, timeout, decision, {}, reply_to_ip, shard, response_id, trace_info, forward, reply_to, fence); } future<> send_paxos_prune( locator::host_id addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, - table_schema_version schema_id, const partition_key& key, utils::UUID ballot) { - return ser::storage_proxy_rpc_verbs::send_paxos_prune(&_ms, addr, timeout, schema_id, key, ballot, tracing::make_trace_info(tr_state)); + table_schema_version schema_id, const partition_key& key, utils::UUID ballot, fencing_token fence) { + return ser::storage_proxy_rpc_verbs::send_paxos_prune(&_ms, addr, timeout, schema_id, key, ballot, tracing::make_trace_info(tr_state), fence); } future<> truncate_with_tablets(sstring ks_name, sstring cf_name, std::chrono::milliseconds timeout_in_ms) { @@ -726,22 +726,23 @@ private: const rpc::client_info& cinfo, rpc::opt_time_point t, paxos::proposal decision, inet_address_vector_replica_set forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, std::optional trace_info, - rpc::optional forward_id, rpc::optional reply_to_id) { + rpc::optional forward_id, rpc::optional reply_to_id, + rpc::optional fence) { tracing::trace_state_ptr trace_state_ptr; auto src_addr = cinfo.retrieve_auxiliary("host_id"); auto schema_version = decision.update.schema_version(); return handle_write(src_addr, t, schema_version, std::move(decision), std::move(forward), reply_to, std::move(forward_id),reply_to_id, shard, response_id, trace_info, - fencing_token{}, - /* apply_fn */ [this] (shared_ptr& p, tracing::trace_state_ptr tr_state, schema_ptr s, - const paxos::proposal& decision, clock_type::time_point timeout, fencing_token) { - return paxos::paxos_state::learn(*p, paxos_store(), std::move(s), decision, timeout, tr_state); + fence.value_or(fencing_token{}), + /* apply_fn */ [this, src_addr] (shared_ptr& p, tracing::trace_state_ptr tr_state, schema_ptr s, + const paxos::proposal& decision, clock_type::time_point timeout, fencing_token fence) { + return p->apply_fence_on_ready(paxos::paxos_state::learn(*p, paxos_store(), std::move(s), decision, timeout, tr_state), fence, src_addr); }, /* forward_fn */ [this] (shared_ptr&, locator::host_id addr, clock_type::time_point timeout, const paxos::proposal& m, gms::inet_address ip, locator::host_id reply_to, unsigned shard, response_id_type response_id, - const std::optional& trace_info, fencing_token) { - return send_paxos_learn(addr, timeout, trace_info, m, {}, ip, reply_to, shard, response_id); + const std::optional& trace_info, fencing_token fence) { + return send_paxos_learn(addr, timeout, trace_info, m, {}, ip, reply_to, shard, response_id, fence); }); } @@ -968,7 +969,8 @@ private: handle_paxos_prepare( const rpc::client_info& cinfo, rpc::opt_time_point timeout, query::read_command cmd, partition_key key, utils::UUID ballot, - bool only_digest, query::digest_algorithm da, std::optional trace_info) { + bool only_digest, query::digest_algorithm da, std::optional trace_info, + rpc::optional fence_opt) { auto src_addr = cinfo.retrieve_auxiliary("host_id"); auto src_shard = cinfo.retrieve_auxiliary("src_cpu_id"); @@ -978,6 +980,9 @@ private: tracing::begin(tr_state); tracing::trace(tr_state, "paxos_prepare: message received from /{} ballot {}", src_addr, ballot); } + + co_await _sp.apply_fence(fence_opt, src_addr); + if (!cmd.max_result_size) { cmd.max_result_size.emplace(cinfo.retrieve_auxiliary("max_result_size")); } @@ -987,7 +992,7 @@ private: unsigned shard = schema->table().shard_for_reads(token); bool local = shard == this_shard_id(); _sp.get_stats().replica_cross_shard_ops += !local; - co_return co_await _sp.container().invoke_on(shard, _sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)), + auto result = co_await _sp.container().invoke_on(shard, _sp._write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)), cmd = make_lw_shared(std::move(cmd)), key = std::move(key), ballot, only_digest, da, timeout, src_addr, &paxos_store = _paxos_store] (storage_proxy& sp) -> future>> { tracing::trace_state_ptr tr_state = gt; @@ -995,11 +1000,16 @@ private: tracing::trace(tr_state, "paxos_prepare: handling is done, sending a response to /{}", src_addr); co_return make_foreign(std::make_unique(std::move(r))); }); + + co_await _sp.apply_fence(fence_opt, src_addr); + + co_return std::move(result); } future handle_paxos_accept( const rpc::client_info& cinfo, rpc::opt_time_point timeout, - paxos::proposal proposal, std::optional trace_info) { + paxos::proposal proposal, std::optional trace_info, + rpc::optional fence_opt) { auto src_addr = cinfo.retrieve_auxiliary("host_id"); auto src_shard = cinfo.retrieve_auxiliary("src_cpu_id"); @@ -1009,6 +1019,9 @@ private: tracing::begin(tr_state); tracing::trace(tr_state, "paxos_accept: message received from /{} ballot {}", src_addr, proposal); } + + co_await _sp.apply_fence(fence_opt, src_addr); + auto handling_done = defer([tr_state, src_addr] { if (tr_state) { tracing::trace(tr_state, "paxos_accept: handling is done, sending a response to /{}", src_addr); @@ -1019,15 +1032,20 @@ private: unsigned shard = schema->table().shard_for_reads(token); bool local = shard == this_shard_id(); _sp.get_stats().replica_cross_shard_ops += !local; - co_return co_await _sp.container().invoke_on(shard, _sp._write_smp_service_group, coroutine::lambda([gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(tr_state), + auto result = co_await _sp.container().invoke_on(shard, _sp._write_smp_service_group, coroutine::lambda([gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(tr_state), proposal = std::move(proposal), timeout, token, this] (storage_proxy& sp) { return paxos::paxos_state::accept(sp, paxos_store(), gt, gs, token, proposal, *timeout); })); + + co_await _sp.apply_fence(fence_opt, src_addr); + + co_return std::move(result); } future handle_paxos_prune( const rpc::client_info& cinfo, rpc::opt_time_point timeout, - table_schema_version schema_id, partition_key key, utils::UUID ballot, std::optional trace_info) { + table_schema_version schema_id, partition_key key, utils::UUID ballot, std::optional trace_info, + rpc::optional fence_opt) { static thread_local uint16_t pruning = 0; static constexpr uint16_t pruning_limit = 1000; // since PRUNE verb is one way replica side has its own queue limit auto src_addr = cinfo.retrieve_auxiliary("host_id"); @@ -1040,6 +1058,8 @@ private: tracing::trace(tr_state, "paxos_prune: message received from /{} ballot {}", src_addr, ballot); } + co_await _sp.apply_fence(fence_opt, src_addr); + if (pruning >= pruning_limit) { _sp.get_stats().cas_replica_dropped_prune++; tracing::trace(tr_state, "paxos_prune: do not prune due to overload", src_addr); @@ -1492,6 +1512,10 @@ public: const locator::effective_replication_map_ptr& get_effective_replication_map() const noexcept { return _token_guard.get_erm(); } + + fencing_token get_fence() const { + return storage_proxy::get_fence(*get_effective_replication_map()); + } }; thread_local uint64_t paxos_response_handler::next_id = 0; @@ -1511,18 +1535,18 @@ public: } virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, - const locator::effective_replication_map& erm, fencing_token) override { + const locator::effective_replication_map& erm, fencing_token fence) override { tracing::trace(tr_state, "Executing a learn locally"); // TODO: Enforce per partition rate limiting in paxos - return paxos::paxos_state::learn(sp, sp.remote().paxos_store(), _schema, *_proposal, timeout, tr_state); + return sp.apply_fence_on_ready(paxos::paxos_state::learn(sp, sp.remote().paxos_store(), _schema, *_proposal, timeout, tr_state), fence, sp.my_host_id(erm)); } virtual future<> apply_remotely(storage_proxy& sp, locator::host_id ep, const host_id_vector_replica_set& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, - tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token) override { + tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token fence) override { tracing::trace(tr_state, "Sending a learn to /{}", ep); // TODO: Enforce per partition rate limiting in paxos return sp.remote().send_paxos_learn(ep, timeout, tracing::make_trace_info(tr_state), - *_proposal, forward, sp.my_address(), sp.get_token_metadata_ptr()->get_my_id(), this_shard_id(), response_id); + *_proposal, forward, sp.my_address(), sp.get_token_metadata_ptr()->get_my_id(), this_shard_id(), response_id, fence); } virtual bool is_shared() override { return true; @@ -2219,8 +2243,9 @@ future paxos_response_handler::prepare_ballot(utils::UUI if (topo.is_me(peer)) { tracing::trace(tr_state, "prepare_ballot: prepare {} locally", ballot); response = co_await paxos::paxos_state::prepare(*_proxy, _proxy->remote().paxos_store(), tr_state, _schema, *_cmd, _key.key(), ballot, only_digest, da, _timeout); + co_await _proxy->apply_fence(get_fence(), peer); } else { - response = co_await _proxy->remote().send_paxos_prepare(peer, _timeout, tr_state, *_cmd, _key.key(), ballot, only_digest, da); + response = co_await _proxy->remote().send_paxos_prepare(peer, _timeout, tr_state, *_cmd, _key.key(), ballot, only_digest, da, get_fence()); } } catch (...) { if (request_tracker.p) { @@ -2383,8 +2408,9 @@ future paxos_response_handler::accept_proposal(lw_shared_ptrremote().paxos_store(), tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout); + co_await _proxy->apply_fence(get_fence(), peer); } else { - accepted = co_await _proxy->remote().send_paxos_accept(peer, _timeout, tr_state, *proposal); + accepted = co_await _proxy->remote().send_paxos_accept(peer, _timeout, tr_state, *proposal, get_fence()); } } catch(...) { if (request_tracker.p) { @@ -2551,10 +2577,10 @@ void paxos_response_handler::prune(utils::UUID ballot) { (void)parallel_for_each(_live_endpoints, [this, ballot, erm, my_address] (locator::host_id peer) mutable { if (peer == my_address) { tracing::trace(tr_state, "prune: prune {} locally", ballot); - return paxos::paxos_state::prune(_proxy->remote().paxos_store(), _schema, _key.key(), ballot, _timeout, tr_state); + return _proxy->apply_fence_on_ready(paxos::paxos_state::prune(_proxy->remote().paxos_store(), _schema, _key.key(), ballot, _timeout, tr_state), get_fence(), my_address); } else { tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer); - return _proxy->remote().send_paxos_prune(peer, _timeout, tr_state, _schema->version(), _key.key(), ballot); + return _proxy->remote().send_paxos_prune(peer, _timeout, tr_state, _schema->version(), _key.key(), ballot, get_fence()); } }).then_wrapped([this, h = shared_from_this()] (future<> f) { h->_proxy->get_stats().cas_now_pruning--;