diff --git a/idl/storage_proxy.idl.hh b/idl/storage_proxy.idl.hh index 3bbd353cd7..7ac0f7bd6d 100644 --- a/idl/storage_proxy.idl.hh +++ b/idl/storage_proxy.idl.hh @@ -26,7 +26,7 @@ verb [[with_client_info, with_timeout, one_way]] mutation (frozen_mutation fm [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional trace_info [[ref]] [[version 1.3.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]); verb [[with_client_info, one_way]] mutation_done (unsigned shard, uint64_t response_id, db::view::update_backlog backlog [[version 3.1.0]]); verb [[with_client_info, one_way]] mutation_failed (unsigned shard, uint64_t response_id, size_t num_failed, db::view::update_backlog backlog [[version 3.1.0]], replica::exception_variant exception [[version 5.1.0]]); -verb [[with_client_info, with_timeout]] counter_mutation (std::vector fms, db::consistency_level cl, std::optional trace_info [[ref]]); +verb [[with_client_info, with_timeout]] counter_mutation (std::vector fms, db::consistency_level cl, std::optional trace_info [[ref]], service::fencing_token fence [[version 5.4.0]]) -> replica::exception_variant [[version 5.4.0]]; verb [[with_client_info, with_timeout, one_way]] hint_mutation (frozen_mutation fm [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional trace_info [[ref]] [[version 1.3.0]] /* this verb was mistakenly introduced with optional trace_info */, service::fencing_token fence [[version 5.4.0]]); verb [[with_client_info, with_timeout]] read_data (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]]) -> query::result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]]; verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command cmd [[ref]], ::compat::wrapping_partition_range pr, service::fencing_token fence [[version 5.4.0]]) -> reconcilable_result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]]; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index e675e8d81f..ba7eeb8e65 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -128,19 +128,23 @@ seastar::metrics::label_instance current_scheduling_group_label() { } -template -static future encode_replica_exception_for_rpc(gms::feature_service& features, std::exception_ptr eptr) { +template +static future encode_replica_exception_for_rpc(gms::feature_service& features, std::exception_ptr eptr) { if (features.typed_errors_in_read_rpc) { if (auto ex = replica::try_encode_replica_exception(eptr); ex) { - ResultTuple encoded_ex = utils::make_default_rpc_tuple(); - std::get(encoded_ex) = std::move(ex); - return make_ready_future(std::move(encoded_ex)); + if constexpr (std::is_same_v) { + return make_ready_future(std::move(ex)); + } else { + ResultType encoded_ex = utils::make_default_rpc_tuple(); + std::get(encoded_ex) = std::move(ex); + return make_ready_future(std::move(encoded_ex)); + } } } - return make_exception_future(std::move(eptr)); + return make_exception_future(std::move(eptr)); } -template +template static future add_replica_exception_to_query_result(gms::feature_service& features, future&& f) { if (!f.failed()) { return make_ready_future(utils::tuple_insert(f.get(), replica::exception_variant{})); @@ -247,11 +251,14 @@ public: future<> send_counter_mutation( netw::msg_addr addr, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, - std::vector fms, db::consistency_level cl) { + std::vector fms, db::consistency_level cl, fencing_token fence) { tracing::trace(tr_state, "Enqueuing counter update to {}", addr); - return ser::storage_proxy_rpc_verbs::send_counter_mutation( - &_ms, std::move(addr), timeout, - std::move(fms), cl, tracing::make_trace_info(tr_state)); + auto&& opt_exception = co_await ser::storage_proxy_rpc_verbs::send_counter_mutation( + &_ms, std::move(addr), timeout, + std::move(fms), cl, tracing::make_trace_info(tr_state), fence); + if (opt_exception.has_value() && *opt_exception) { + co_await coroutine::return_exception_ptr((*opt_exception).into_exception_ptr()); + } } future<> send_mutation_done( @@ -399,9 +406,10 @@ private: co_return co_await _mm.get_schema_for_write(std::move(v), std::move(from), _ms, &aoe.abort_source()); } - future<> handle_counter_mutation( + future handle_counter_mutation( const rpc::client_info& cinfo, rpc::opt_time_point t, - std::vector fms, db::consistency_level cl, std::optional trace_info) { + std::vector fms, db::consistency_level cl, std::optional trace_info, + rpc::optional fence_opt) { auto src_addr = netw::messaging_service::get_source(cinfo); tracing::trace_state_ptr trace_state_ptr; @@ -411,6 +419,12 @@ private: tracing::trace(trace_state_ptr, "Message received from /{}", src_addr.addr); } + const auto fence = fence_opt.value_or(fencing_token{}); + if (auto stale = _sp.apply_fence(fence, src_addr.addr)) { + co_return co_await encode_replica_exception_for_rpc(_sp.features(), + make_exception_ptr(std::move(*stale))); + } + std::vector mutations; auto timeout = *t; co_await coroutine::parallel_for_each(std::move(fms), [&] (frozen_mutation& fm) { @@ -423,6 +437,11 @@ private: }); auto& sp = _sp; co_await sp.mutate_counters_on_leader(std::move(mutations), cl, timeout, std::move(trace_state_ptr), /* FIXME: rpc should also pass a permit down to callbacks */ empty_service_permit()); + if (auto stale = _sp.apply_fence(fence, src_addr.addr)) { + co_return co_await encode_replica_exception_for_rpc(_sp.features(), + make_exception_ptr(std::move(*stale))); + } + co_return replica::exception_variant{}; } future handle_write( @@ -3287,6 +3306,7 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level // so we need a container for them. std::set<> will result in the fewest allocations if there is just one. std::set erms; + const auto fence = fencing_token{_shared_token_metadata.get()->get_version()}; for (auto& m : mutations) { auto& table = _db.local().find_column_family(m.schema()->id()); auto erm = table.get_effective_replication_map(); @@ -3298,14 +3318,14 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level // Forward mutations to the leaders chosen for them auto my_address = utils::fb_utilities::get_broadcast_address(); - co_await coroutine::parallel_for_each(leaders, [this, cl, timeout, tr_state = std::move(tr_state), permit = std::move(permit), my_address] (auto& endpoint_and_mutations) -> future<> { + co_await coroutine::parallel_for_each(leaders, [this, cl, timeout, tr_state = std::move(tr_state), permit = std::move(permit), my_address, fence] (auto& endpoint_and_mutations) -> future<> { auto first_schema = endpoint_and_mutations.second[0].s; try { auto endpoint = endpoint_and_mutations.first; if (endpoint == my_address) { - co_await this->mutate_counters_on_leader(std::move(endpoint_and_mutations.second), cl, timeout, tr_state, permit); + co_await apply_fence(this->mutate_counters_on_leader(std::move(endpoint_and_mutations.second), cl, timeout, tr_state, permit), fence, my_address); } else { auto& mutations = endpoint_and_mutations.second; auto fms = boost::copy_range>(mutations | boost::adaptors::transformed([] (auto& m) { @@ -3318,7 +3338,7 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level co_await remote().send_counter_mutation( netw::messaging_service::msg_addr{ endpoint_and_mutations.first, 0 }, timeout, tr_state, - std::move(fms), cl); + std::move(fms), cl, fence); } } catch (...) { // The leader receives a vector of mutations and processes them together, @@ -3343,6 +3363,8 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level throw mutation_write_timeout_exception(s->ks_name(), s->cf_name(), cl, 0, db::block_for(*erm, cl), db::write_type::COUNTER); } catch (rpc::closed_error&) { throw mutation_write_failure_exception(s->ks_name(), s->cf_name(), cl, 0, 1, db::block_for(*erm, cl), db::write_type::COUNTER); + } catch (replica::stale_topology_exception& e) { + throw mutation_write_failure_exception(e.what(), cl, 0, 1, db::block_for(*erm, cl), db::write_type::COUNTER); } } }