From c7ef9a12ee4fed9ac3cd7573d5dc3ba60a244b38 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 26 May 2023 13:12:34 +0200 Subject: [PATCH] service: storage_proxy: make it possible to cancel all write handler types The `view_update_write_response_handler` class, which is a subclass of `abstract_write_response_handler`, was created for a single purpose: to make it possible to cancel a handler for a view update write, which means we stop waiting for a response to the write, timing out the handler immediately. This was done to solve issue with node shutdown hanging because it was waiting for a view update to finish; view updates were configured with 5 minute timeout. See #3966, #4028. Now we're having a similar problem with hint updates causing shutdown to hang in tests (#8079). `view_update_write_response_handler` implements cancelling by adding itself to an intrusive list which we then iterate over to timeout each handler when we shutdown or when gossiper notifies `storage_proxy` that a node is down. To make it possible to reuse this algorithm for other handlers, move the functionality into `abstract_write_response_handler`. We inherit from `bi::list_base_hook` so it introduces small memory overhead to each write handler (2 pointers) which was only present for view update handlers before. But those handlers are already quite large, the overhead is small compared to their size. Not all handlers are added to the cancelling list, this is controlled by the `cancellable` parameter passed to the constructor. For now we're only cancelling view handlers as before. In following commits we'll also cancel hint handlers. --- service/storage_proxy.cc | 59 +++++++++++++++++++--------------------- service/storage_proxy.hh | 4 --- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 117afdbb13..d8a8ff47d6 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1165,7 +1165,9 @@ public: }; }; -class abstract_write_response_handler : public seastar::enable_shared_from_this { +using is_cancellable = bool_class; + +class abstract_write_response_handler : public seastar::enable_shared_from_this, public bi::list_base_hook> { protected: using error = storage_proxy::error; storage_proxy::response_id_type _id; @@ -1211,7 +1213,7 @@ public: db::consistency_level cl, db::write_type type, std::unique_ptr mh, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, size_t pending_endpoints = 0, - inet_address_vector_topology_change dead_endpoints = {}) + inet_address_vector_topology_change dead_endpoints = {}, is_cancellable cancellable = is_cancellable::no) : _id(p->get_next_response_id()), _proxy(std::move(p)) , _effective_replication_map_ptr(std::move(erm)) , _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), @@ -1222,6 +1224,10 @@ public: // or we may fail the consistency level guarantees (see #833, #8058) _total_block_for = db::block_for(*_effective_replication_map_ptr, _cl) + pending_endpoints; ++_stats.writes; + + if (cancellable) { + register_cancellable(); + } } virtual ~abstract_write_response_handler() { --_stats.writes; @@ -1249,6 +1255,8 @@ public: _cdc_operation_result_tracker->on_mutation_failed(); } } + + update_cancellable_live_iterators(); } bool is_counter() const { return _type == db::write_type::COUNTER; @@ -1456,6 +1464,11 @@ public: return _stats; } friend storage_proxy; + +private: + void register_cancellable(); + // Called on destruction + void update_cancellable_live_iterators(); }; class datacenter_write_response_handler : public abstract_write_response_handler { @@ -1488,33 +1501,16 @@ public: db::consistency_level cl, db::write_type type, std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, - storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) : + storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable cancellable) : abstract_write_response_handler(std::move(p), std::move(ermp), cl, type, std::move(mh), - std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info, pending_endpoints.size(), std::move(dead_endpoints)) { + std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info, pending_endpoints.size(), std::move(dead_endpoints), cancellable) { _total_endpoints = _targets.size(); } }; -class view_update_write_response_handler : public write_response_handler, public bi::list_base_hook> { -public: - view_update_write_response_handler(shared_ptr p, - locator::effective_replication_map_ptr ermp, - db::consistency_level cl, - std::unique_ptr mh, inet_address_vector_replica_set targets, - const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, - storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info): - write_response_handler(p, std::move(ermp), cl, db::write_type::VIEW, std::move(mh), - std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info) { - register_in_intrusive_list(*p); - } - ~view_update_write_response_handler(); -private: - void register_in_intrusive_list(storage_proxy& p); -}; - -class storage_proxy::view_update_handlers_list : public bi::list, bi::constant_time_size> { +class storage_proxy::view_update_handlers_list : public bi::list, bi::constant_time_size> { // _live_iterators holds all iterators that point into the bi:list in the base class of this object. - // If we remove a view_update_write_response_handler from the list, and an iterator happens to point + // If we remove a abstract_write_response_handler from the list, and an iterator happens to point // into it, we advance the iterator so it doesn't point at a removed object. See #4912. std::vector _live_iterators; public: @@ -1527,7 +1523,7 @@ public: void unregister_live_iterator(iterator* itp) { _live_iterators.erase(boost::remove(_live_iterators, itp), _live_iterators.end()); } - void update_live_iterators(view_update_write_response_handler* vuwrh) { + void update_live_iterators(abstract_write_response_handler* vuwrh) { // vuwrh is being removed from the b::list, so if any live iterator points at it, // move it to the next object (this requires that the list is traversed in the forward // direction). @@ -1550,13 +1546,15 @@ public: }; }; -void view_update_write_response_handler::register_in_intrusive_list(storage_proxy& p) { - p.get_view_update_handlers_list().push_back(*this); +void abstract_write_response_handler::register_cancellable() { + _proxy->_view_update_handlers_list->push_back(*this); } -view_update_write_response_handler::~view_update_write_response_handler() { - _proxy->_view_update_handlers_list->update_live_iterators(this); +void abstract_write_response_handler::update_cancellable_live_iterators() { + if (is_linked()) { + _proxy->_view_update_handlers_list->update_live_iterators(this); + } } class datacenter_sync_write_response_handler : public abstract_write_response_handler { @@ -2313,10 +2311,9 @@ result storage_proxy::create_write_response_han h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); - } else if (type == db::write_type::VIEW) { - h = ::make_shared(shared_from_this(), std::move(ermp), cl, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else { - h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); + is_cancellable cancellable{type == db::write_type::VIEW}; + h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info, cancellable); } return bo::success(register_response_handler(std::move(h))); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 9867b1751b..6befd5aede 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -470,10 +470,6 @@ public: return _cdc; } - view_update_handlers_list& get_view_update_handlers_list() { - return *_view_update_handlers_list; - } - response_id_type get_next_response_id() { auto next = _next_response_id++; if (next == 0) { // 0 is reserved for unique_response_handler