From 0ef35ceed440932cb3e17f87dce856b910a96944 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 26 May 2023 14:41:04 +0200 Subject: [PATCH] service: storage_proxy: make hint write handlers cancellable Whether a write handler should be cancellable is now controlled by a parameter passed to `create_write_response_handler`. We plumb it down from `send_to_endpoint` which is called by hints manager. This will cause hint write handlers to immediately timeout when we shutdown or when a destination node is marked as dead. Fixes #8079 --- db/view/view.cc | 3 ++- service/storage_proxy.cc | 47 ++++++++++++++++++++++------------------ service/storage_proxy.hh | 13 ++++++----- 3 files changed, 36 insertions(+), 27 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 672447e60d..c8e259e76b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1605,7 +1605,8 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::in std::move(pending_endpoints), db::write_type::VIEW, std::move(tr_state), - allow_hints); + allow_hints, + service::is_cancellable::yes); } static bool should_update_synchronously(const schema& s) { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index c50280b291..dee6ee65cc 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1165,8 +1165,6 @@ public: }; }; -using is_cancellable = bool_class; - class abstract_write_response_handler : public seastar::enable_shared_from_this, public bi::list_base_hook> { protected: using error = storage_proxy::error; @@ -2307,7 +2305,7 @@ future> storage_proxy::response_wait(storage_proxy::response_id_type id result storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, - storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) + storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable cancellable) { shared_ptr h; auto& rs = ermp->get_replication_strategy(); @@ -2317,7 +2315,6 @@ result storage_proxy::create_write_response_han } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else { - is_cancellable cancellable{type == db::write_type::VIEW}; h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info, cancellable); } return bo::success(register_response_handler(std::move(h))); @@ -2843,7 +2840,7 @@ storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, froze result storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, - db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { + db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable) { replica::table& table = _db.local().find_column_family(s->id()); auto erm = table.get_effective_replication_map(); inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token); @@ -2907,7 +2904,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints); return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints, - std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info); + std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info, cancellable); } /** @@ -2920,13 +2917,13 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok result storage_proxy::create_write_response_handler(const mutation& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { return create_write_response_handler_helper(m.schema(), m.token(), std::make_unique(m), cl, type, tr_state, - std::move(permit), allow_limit); + std::move(permit), allow_limit, is_cancellable::no); } result storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { return create_write_response_handler_helper(h.mut.schema(), h.mut.token(), std::make_unique(h.mut), cl, type, tr_state, - std::move(permit), allow_limit); + std::move(permit), allow_limit, is_cancellable::yes); } result @@ -2941,7 +2938,7 @@ storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db tracing::trace(tr_state, "Creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints); // No rate limiting for read repair - return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate()); + return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no); } result @@ -2950,7 +2947,7 @@ storage_proxy::create_write_response_handler(const std::tuple(std::move(commit), s, std::move(h)), cl, - db::write_type::CAS, tr_state, std::move(permit), allow_limit); + db::write_type::CAS, tr_state, std::move(permit), allow_limit, is_cancellable::no); } result @@ -2967,7 +2964,7 @@ storage_proxy::create_write_response_handler(const std::tuple(std::move(commit), s, nullptr), std::move(endpoints), - inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate()); + inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no); } void storage_proxy::register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr tracker) { @@ -3495,7 +3492,7 @@ storage_proxy::mutate_atomically_result(std::vector mutations, db::con return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) { auto& table = _p._db.local().find_column_family(m.schema()->id()); auto ermp = table.get_effective_replication_map(); - return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate()); + return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate(), is_cancellable::no); }).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) { _p.register_cdc_operation_result_tracker(ids, _cdc_tracker); return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout); @@ -3600,7 +3597,8 @@ future<> storage_proxy::send_to_endpoint( db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, - allow_hints allow_hints) { + allow_hints allow_hints, + is_cancellable cancellable) { utils::latency_counter lc; lc.start(); @@ -3612,7 +3610,7 @@ future<> storage_proxy::send_to_endpoint( timeout = clock_type::now() + 5min; } return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(), - [this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] ( + [this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats, cancellable] ( std::unique_ptr& m, db::consistency_level cl, db::write_type type, service_permit permit) mutable { @@ -3639,7 +3637,8 @@ future<> storage_proxy::send_to_endpoint( tr_state, stats, std::move(permit), - std::monostate()); // TODO: Pass the correct enforcement type + std::monostate(), // TODO: Pass the correct enforcement type + cancellable); }).then(utils::result_wrap([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (unique_response_handler_vector ids) mutable { return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout)); })).then_wrapped([p = shared_from_this(), lc, &stats] (future> f) { @@ -3653,7 +3652,8 @@ future<> storage_proxy::send_to_endpoint( inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, - allow_hints allow_hints) { + allow_hints allow_hints, + is_cancellable cancellable) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), std::move(target), @@ -3661,7 +3661,8 @@ future<> storage_proxy::send_to_endpoint( type, std::move(tr_state), get_stats(), - allow_hints); + allow_hints, + cancellable); } future<> storage_proxy::send_to_endpoint( @@ -3671,7 +3672,8 @@ future<> storage_proxy::send_to_endpoint( db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, - allow_hints allow_hints) { + allow_hints allow_hints, + is_cancellable cancellable) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), std::move(target), @@ -3679,7 +3681,8 @@ future<> storage_proxy::send_to_endpoint( type, std::move(tr_state), stats, - allow_hints); + allow_hints, + cancellable); } future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) { @@ -3691,7 +3694,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, db::write_type::SIMPLE, tracing::trace_state_ptr(), get_stats(), - allow_hints::no); + allow_hints::no, + is_cancellable::yes); } return send_to_endpoint( @@ -3701,7 +3705,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, db::write_type::SIMPLE, tracing::trace_state_ptr(), get_stats(), - allow_hints::no); + allow_hints::no, + is_cancellable::yes); } future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 0c14834661..448c2c30d7 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -99,6 +99,8 @@ struct view_update_backlog_timestamped { struct allow_hints_tag {}; using allow_hints = bool_class; +using is_cancellable = bool_class; + struct storage_proxy_coordinator_query_result { foreign_ptr> query_result; replicas_per_token_range last_replicas; @@ -307,9 +309,9 @@ private: ::shared_ptr& get_write_response_handler(storage_proxy::response_id_type id); result create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, - service_permit permit, db::allow_per_partition_rate_limit allow_limit); + service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable); result create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, - const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info); + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable); result create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); @@ -416,7 +418,8 @@ private: db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, - allow_hints allow_hints = allow_hints::yes); + allow_hints, + is_cancellable); db::view::update_backlog get_view_update_backlog() const; @@ -565,9 +568,9 @@ public: // hinted handoff support, and just one target. See also // send_to_live_endpoints() - another take on the same original function. future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, - tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes); + tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints, is_cancellable); future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, - tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes); + tracing::trace_state_ptr tr_state, allow_hints, is_cancellable); // Send a mutation to a specific remote target as a hint. // Unlike regular mutations during write operations, hints are sent on the streaming connection