From c7ef9a12ee4fed9ac3cd7573d5dc3ba60a244b38 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 26 May 2023 13:12:34 +0200 Subject: [PATCH 1/6] service: storage_proxy: make it possible to cancel all write handler types The `view_update_write_response_handler` class, which is a subclass of `abstract_write_response_handler`, was created for a single purpose: to make it possible to cancel a handler for a view update write, which means we stop waiting for a response to the write, timing out the handler immediately. This was done to solve issue with node shutdown hanging because it was waiting for a view update to finish; view updates were configured with 5 minute timeout. See #3966, #4028. Now we're having a similar problem with hint updates causing shutdown to hang in tests (#8079). `view_update_write_response_handler` implements cancelling by adding itself to an intrusive list which we then iterate over to timeout each handler when we shutdown or when gossiper notifies `storage_proxy` that a node is down. To make it possible to reuse this algorithm for other handlers, move the functionality into `abstract_write_response_handler`. We inherit from `bi::list_base_hook` so it introduces small memory overhead to each write handler (2 pointers) which was only present for view update handlers before. But those handlers are already quite large, the overhead is small compared to their size. Not all handlers are added to the cancelling list, this is controlled by the `cancellable` parameter passed to the constructor. For now we're only cancelling view handlers as before. In following commits we'll also cancel hint handlers. --- service/storage_proxy.cc | 59 +++++++++++++++++++--------------------- service/storage_proxy.hh | 4 --- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 117afdbb13..d8a8ff47d6 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1165,7 +1165,9 @@ public: }; }; -class abstract_write_response_handler : public seastar::enable_shared_from_this { +using is_cancellable = bool_class; + +class abstract_write_response_handler : public seastar::enable_shared_from_this, public bi::list_base_hook> { protected: using error = storage_proxy::error; storage_proxy::response_id_type _id; @@ -1211,7 +1213,7 @@ public: db::consistency_level cl, db::write_type type, std::unique_ptr mh, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, size_t pending_endpoints = 0, - inet_address_vector_topology_change dead_endpoints = {}) + inet_address_vector_topology_change dead_endpoints = {}, is_cancellable cancellable = is_cancellable::no) : _id(p->get_next_response_id()), _proxy(std::move(p)) , _effective_replication_map_ptr(std::move(erm)) , _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), @@ -1222,6 +1224,10 @@ public: // or we may fail the consistency level guarantees (see #833, #8058) _total_block_for = db::block_for(*_effective_replication_map_ptr, _cl) + pending_endpoints; ++_stats.writes; + + if (cancellable) { + register_cancellable(); + } } virtual ~abstract_write_response_handler() { --_stats.writes; @@ -1249,6 +1255,8 @@ public: _cdc_operation_result_tracker->on_mutation_failed(); } } + + update_cancellable_live_iterators(); } bool is_counter() const { return _type == db::write_type::COUNTER; @@ -1456,6 +1464,11 @@ public: return _stats; } friend storage_proxy; + +private: + void register_cancellable(); + // Called on destruction + void update_cancellable_live_iterators(); }; class datacenter_write_response_handler : public abstract_write_response_handler { @@ -1488,33 +1501,16 @@ public: db::consistency_level cl, db::write_type type, std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, - storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) : + storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable cancellable) : abstract_write_response_handler(std::move(p), std::move(ermp), cl, type, std::move(mh), - std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info, pending_endpoints.size(), std::move(dead_endpoints)) { + std::move(targets), std::move(tr_state), stats, std::move(permit), rate_limit_info, pending_endpoints.size(), std::move(dead_endpoints), cancellable) { _total_endpoints = _targets.size(); } }; -class view_update_write_response_handler : public write_response_handler, public bi::list_base_hook> { -public: - view_update_write_response_handler(shared_ptr p, - locator::effective_replication_map_ptr ermp, - db::consistency_level cl, - std::unique_ptr mh, inet_address_vector_replica_set targets, - const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, - storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info): - write_response_handler(p, std::move(ermp), cl, db::write_type::VIEW, std::move(mh), - std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info) { - register_in_intrusive_list(*p); - } - ~view_update_write_response_handler(); -private: - void register_in_intrusive_list(storage_proxy& p); -}; - -class storage_proxy::view_update_handlers_list : public bi::list, bi::constant_time_size> { +class storage_proxy::view_update_handlers_list : public bi::list, bi::constant_time_size> { // _live_iterators holds all iterators that point into the bi:list in the base class of this object. - // If we remove a view_update_write_response_handler from the list, and an iterator happens to point + // If we remove a abstract_write_response_handler from the list, and an iterator happens to point // into it, we advance the iterator so it doesn't point at a removed object. See #4912. std::vector _live_iterators; public: @@ -1527,7 +1523,7 @@ public: void unregister_live_iterator(iterator* itp) { _live_iterators.erase(boost::remove(_live_iterators, itp), _live_iterators.end()); } - void update_live_iterators(view_update_write_response_handler* vuwrh) { + void update_live_iterators(abstract_write_response_handler* vuwrh) { // vuwrh is being removed from the b::list, so if any live iterator points at it, // move it to the next object (this requires that the list is traversed in the forward // direction). @@ -1550,13 +1546,15 @@ public: }; }; -void view_update_write_response_handler::register_in_intrusive_list(storage_proxy& p) { - p.get_view_update_handlers_list().push_back(*this); +void abstract_write_response_handler::register_cancellable() { + _proxy->_view_update_handlers_list->push_back(*this); } -view_update_write_response_handler::~view_update_write_response_handler() { - _proxy->_view_update_handlers_list->update_live_iterators(this); +void abstract_write_response_handler::update_cancellable_live_iterators() { + if (is_linked()) { + _proxy->_view_update_handlers_list->update_live_iterators(this); + } } class datacenter_sync_write_response_handler : public abstract_write_response_handler { @@ -2313,10 +2311,9 @@ result storage_proxy::create_write_response_han h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); - } else if (type == db::write_type::VIEW) { - h = ::make_shared(shared_from_this(), std::move(ermp), cl, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else { - h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); + is_cancellable cancellable{type == db::write_type::VIEW}; + h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info, cancellable); } return bo::success(register_response_handler(std::move(h))); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 9867b1751b..6befd5aede 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -470,10 +470,6 @@ public: return _cdc; } - view_update_handlers_list& get_view_update_handlers_list() { - return *_view_update_handlers_list; - } - response_id_type get_next_response_id() { auto next = _next_response_id++; if (next == 0) { // 0 is reserved for unique_response_handler From eddb7406b4c9148773d1e97021edd09115929a33 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 26 May 2023 13:17:56 +0200 Subject: [PATCH 2/6] service: storage_proxy: rename `view_update_handlers_list` The list will be used for non-view-update write handlers as well, so generalize the name. Also generalize some variable names used in the implementation. This commit only renames things + some comments were added, there are no logical changes. --- service/storage_proxy.cc | 41 ++++++++++++++++++++++------------------ service/storage_proxy.hh | 7 ++++--- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index d8a8ff47d6..c50280b291 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1508,13 +1508,18 @@ public: } }; -class storage_proxy::view_update_handlers_list : public bi::list, bi::constant_time_size> { +// This list contains `abstract_write_response_handler`s which were constructed as `cancellable`. +// When a `cancellable` handler is constructed, it adds itself to the list (see `register_cancellable`). +// We use the list to cancel handlers - as if the write timed out - on certain events, such as when +// we shutdown a node so that shutdown is not blocked. +// We don't add normal data path writes to the list, only background work such as hints and view updates. +class storage_proxy::cancellable_write_handlers_list : public bi::list, bi::constant_time_size> { // _live_iterators holds all iterators that point into the bi:list in the base class of this object. // If we remove a abstract_write_response_handler from the list, and an iterator happens to point // into it, we advance the iterator so it doesn't point at a removed object. See #4912. std::vector _live_iterators; public: - view_update_handlers_list() { + cancellable_write_handlers_list() { _live_iterators.reserve(10); // We only expect 1. } void register_live_iterator(iterator* itp) noexcept { // We don't tolerate failure, so abort instead @@ -1523,37 +1528,37 @@ public: void unregister_live_iterator(iterator* itp) { _live_iterators.erase(boost::remove(_live_iterators, itp), _live_iterators.end()); } - void update_live_iterators(abstract_write_response_handler* vuwrh) { - // vuwrh is being removed from the b::list, so if any live iterator points at it, + void update_live_iterators(abstract_write_response_handler* handler) { + // handler is being removed from the b::list, so if any live iterator points at it, // move it to the next object (this requires that the list is traversed in the forward // direction). for (auto& itp : _live_iterators) { - if (&**itp == vuwrh) { + if (&**itp == handler) { ++*itp; } } } class iterator_guard { - view_update_handlers_list& _vuhl; + cancellable_write_handlers_list& _handlers; iterator* _itp; public: - iterator_guard(view_update_handlers_list& vuhl, iterator& it) : _vuhl(vuhl), _itp(&it) { - _vuhl.register_live_iterator(_itp); + iterator_guard(cancellable_write_handlers_list& handlers, iterator& it) : _handlers(handlers), _itp(&it) { + _handlers.register_live_iterator(_itp); } ~iterator_guard() { - _vuhl.unregister_live_iterator(_itp); + _handlers.unregister_live_iterator(_itp); } }; }; void abstract_write_response_handler::register_cancellable() { - _proxy->_view_update_handlers_list->push_back(*this); + _proxy->_cancellable_write_handlers_list->push_back(*this); } void abstract_write_response_handler::update_cancellable_live_iterators() { if (is_linked()) { - _proxy->_view_update_handlers_list->update_live_iterators(this); + _proxy->_cancellable_write_handlers_list->update_live_iterators(this); } } @@ -2721,7 +2726,7 @@ storage_proxy::storage_proxy(distributed& db, gms::gossiper& , _background_write_throttle_threahsold(cfg.available_memory / 10) , _mutate_stage{"storage_proxy_mutate", &storage_proxy::do_mutate} , _max_view_update_backlog(max_view_update_backlog) - , _view_update_handlers_list(std::make_unique()) { + , _cancellable_write_handlers_list(std::make_unique()) { namespace sm = seastar::metrics; _metrics.add_group(storage_proxy_stats::COORDINATOR_STATS_CATEGORY, { sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); }, @@ -6236,24 +6241,24 @@ void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint) { void storage_proxy::on_up(const gms::inet_address& endpoint) {}; -void storage_proxy::retire_view_response_handlers(noncopyable_function filter_fun) { +void storage_proxy::cancel_write_handlers(noncopyable_function filter_fun) { assert(thread::running_in_thread()); - auto it = _view_update_handlers_list->begin(); - while (it != _view_update_handlers_list->end()) { + auto it = _cancellable_write_handlers_list->begin(); + while (it != _cancellable_write_handlers_list->end()) { auto guard = it->shared_from_this(); if (filter_fun(*it) && _response_handlers.contains(it->id())) { it->timeout_cb(); } ++it; if (need_preempt()) { - view_update_handlers_list::iterator_guard ig{*_view_update_handlers_list, it}; + cancellable_write_handlers_list::iterator_guard ig{*_cancellable_write_handlers_list, it}; seastar::thread::yield(); } } } void storage_proxy::on_down(const gms::inet_address& endpoint) { - return retire_view_response_handlers([endpoint] (const abstract_write_response_handler& handler) { + return cancel_write_handlers([endpoint] (const abstract_write_response_handler& handler) { const auto& targets = handler.get_targets(); return boost::find(targets, endpoint) != targets.end(); }); @@ -6263,7 +6268,7 @@ future<> storage_proxy::drain_on_shutdown() { //NOTE: the thread is spawned here because there are delicate lifetime issues to consider // and writing them down with plain futures is error-prone. return async([this] { - retire_view_response_handlers([] (const abstract_write_response_handler&) { return true; }); + cancel_write_handlers([] (const abstract_write_response_handler&) { return true; }); _hints_resource_manager.stop().get(); }); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 6befd5aede..0c14834661 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -273,8 +273,8 @@ private: std::unordered_map _view_update_backlogs; //NOTICE(sarna): This opaque pointer is here just to avoid moving write handler class definitions from .cc to .hh. It's slow path. - class view_update_handlers_list; - std::unique_ptr _view_update_handlers_list; + class cancellable_write_handlers_list; + std::unique_ptr _cancellable_write_handlers_list; /* This is a pointer to the shard-local part of the sharded cdc_service: * storage_proxy needs access to cdc_service to augument mutations. @@ -427,7 +427,8 @@ private: template future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, clock_type::time_point timeout); - void retire_view_response_handlers(noncopyable_function filter_fun); + // Retires (times out) write response handlers which were constructed as `cancellable` and pass the given filter. + void cancel_write_handlers(noncopyable_function filter_fun); /** * Returns whether for a range query doing a query against merged is likely From 0ef35ceed440932cb3e17f87dce856b910a96944 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 26 May 2023 14:41:04 +0200 Subject: [PATCH 3/6] service: storage_proxy: make hint write handlers cancellable Whether a write handler should be cancellable is now controlled by a parameter passed to `create_write_response_handler`. We plumb it down from `send_to_endpoint` which is called by hints manager. This will cause hint write handlers to immediately timeout when we shutdown or when a destination node is marked as dead. Fixes #8079 --- db/view/view.cc | 3 ++- service/storage_proxy.cc | 47 ++++++++++++++++++++++------------------ service/storage_proxy.hh | 13 ++++++----- 3 files changed, 36 insertions(+), 27 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 672447e60d..c8e259e76b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1605,7 +1605,8 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, gms::in std::move(pending_endpoints), db::write_type::VIEW, std::move(tr_state), - allow_hints); + allow_hints, + service::is_cancellable::yes); } static bool should_update_synchronously(const schema& s) { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index c50280b291..dee6ee65cc 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1165,8 +1165,6 @@ public: }; }; -using is_cancellable = bool_class; - class abstract_write_response_handler : public seastar::enable_shared_from_this, public bi::list_base_hook> { protected: using error = storage_proxy::error; @@ -2307,7 +2305,7 @@ future> storage_proxy::response_wait(storage_proxy::response_id_type id result storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, - storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info) + storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable cancellable) { shared_ptr h; auto& rs = ermp->get_replication_strategy(); @@ -2317,7 +2315,6 @@ result storage_proxy::create_write_response_han } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info); } else { - is_cancellable cancellable{type == db::write_type::VIEW}; h = ::make_shared(shared_from_this(), std::move(ermp), cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit), rate_limit_info, cancellable); } return bo::success(register_response_handler(std::move(h))); @@ -2843,7 +2840,7 @@ storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, froze result storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, - db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { + db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable) { replica::table& table = _db.local().find_column_family(s->id()); auto erm = table.get_effective_replication_map(); inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token); @@ -2907,7 +2904,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints); return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints, - std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info); + std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info, cancellable); } /** @@ -2920,13 +2917,13 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok result storage_proxy::create_write_response_handler(const mutation& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { return create_write_response_handler_helper(m.schema(), m.token(), std::make_unique(m), cl, type, tr_state, - std::move(permit), allow_limit); + std::move(permit), allow_limit, is_cancellable::no); } result storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { return create_write_response_handler_helper(h.mut.schema(), h.mut.token(), std::make_unique(h.mut), cl, type, tr_state, - std::move(permit), allow_limit); + std::move(permit), allow_limit, is_cancellable::yes); } result @@ -2941,7 +2938,7 @@ storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db tracing::trace(tr_state, "Creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints); // No rate limiting for read repair - return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate()); + return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no); } result @@ -2950,7 +2947,7 @@ storage_proxy::create_write_response_handler(const std::tuple(std::move(commit), s, std::move(h)), cl, - db::write_type::CAS, tr_state, std::move(permit), allow_limit); + db::write_type::CAS, tr_state, std::move(permit), allow_limit, is_cancellable::no); } result @@ -2967,7 +2964,7 @@ storage_proxy::create_write_response_handler(const std::tuple(std::move(commit), s, nullptr), std::move(endpoints), - inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate()); + inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no); } void storage_proxy::register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr tracker) { @@ -3495,7 +3492,7 @@ storage_proxy::mutate_atomically_result(std::vector mutations, db::con return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) { auto& table = _p._db.local().find_column_family(m.schema()->id()); auto ermp = table.get_effective_replication_map(); - return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate()); + return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate(), is_cancellable::no); }).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) { _p.register_cdc_operation_result_tracker(ids, _cdc_tracker); return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout); @@ -3600,7 +3597,8 @@ future<> storage_proxy::send_to_endpoint( db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, - allow_hints allow_hints) { + allow_hints allow_hints, + is_cancellable cancellable) { utils::latency_counter lc; lc.start(); @@ -3612,7 +3610,7 @@ future<> storage_proxy::send_to_endpoint( timeout = clock_type::now() + 5min; } return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(), - [this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] ( + [this, tr_state, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats, cancellable] ( std::unique_ptr& m, db::consistency_level cl, db::write_type type, service_permit permit) mutable { @@ -3639,7 +3637,8 @@ future<> storage_proxy::send_to_endpoint( tr_state, stats, std::move(permit), - std::monostate()); // TODO: Pass the correct enforcement type + std::monostate(), // TODO: Pass the correct enforcement type + cancellable); }).then(utils::result_wrap([this, cl, tr_state = std::move(tr_state), timeout = std::move(timeout)] (unique_response_handler_vector ids) mutable { return mutate_begin(std::move(ids), cl, std::move(tr_state), std::move(timeout)); })).then_wrapped([p = shared_from_this(), lc, &stats] (future> f) { @@ -3653,7 +3652,8 @@ future<> storage_proxy::send_to_endpoint( inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, - allow_hints allow_hints) { + allow_hints allow_hints, + is_cancellable cancellable) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), std::move(target), @@ -3661,7 +3661,8 @@ future<> storage_proxy::send_to_endpoint( type, std::move(tr_state), get_stats(), - allow_hints); + allow_hints, + cancellable); } future<> storage_proxy::send_to_endpoint( @@ -3671,7 +3672,8 @@ future<> storage_proxy::send_to_endpoint( db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, - allow_hints allow_hints) { + allow_hints allow_hints, + is_cancellable cancellable) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), std::move(target), @@ -3679,7 +3681,8 @@ future<> storage_proxy::send_to_endpoint( type, std::move(tr_state), stats, - allow_hints); + allow_hints, + cancellable); } future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) { @@ -3691,7 +3694,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, db::write_type::SIMPLE, tracing::trace_state_ptr(), get_stats(), - allow_hints::no); + allow_hints::no, + is_cancellable::yes); } return send_to_endpoint( @@ -3701,7 +3705,8 @@ future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, db::write_type::SIMPLE, tracing::trace_state_ptr(), get_stats(), - allow_hints::no); + allow_hints::no, + is_cancellable::yes); } future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_a_s) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 0c14834661..448c2c30d7 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -99,6 +99,8 @@ struct view_update_backlog_timestamped { struct allow_hints_tag {}; using allow_hints = bool_class; +using is_cancellable = bool_class; + struct storage_proxy_coordinator_query_result { foreign_ptr> query_result; replicas_per_token_range last_replicas; @@ -307,9 +309,9 @@ private: ::shared_ptr& get_write_response_handler(storage_proxy::response_id_type id); result create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, - service_permit permit, db::allow_per_partition_rate_limit allow_limit); + service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable); result create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, - const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info); + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable); result create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); @@ -416,7 +418,8 @@ private: db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, - allow_hints allow_hints = allow_hints::yes); + allow_hints, + is_cancellable); db::view::update_backlog get_view_update_backlog() const; @@ -565,9 +568,9 @@ public: // hinted handoff support, and just one target. See also // send_to_live_endpoints() - another take on the same original function. future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, - tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes); + tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints, is_cancellable); future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, - tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes); + tracing::trace_state_ptr tr_state, allow_hints, is_cancellable); // Send a mutation to a specific remote target as a hint. // Unlike regular mutations during write operations, hints are sent on the streaming connection From ce13395ce49ed2c2b2c72d266e275de7c3b06bc9 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 24 May 2023 15:11:50 +0200 Subject: [PATCH 4/6] test: pylib: scylla_cluster: add explicit timeout for graceful server stop If server shutdown hangs, the `manager.server_stop_gracefully` call would eventually (after 5 minutes) timeout with a cryptic `TimeoutError`; it's a generic timeout for performing requests by the tests to `ScyllaClusterManager`. It was non-obvious how to find what actually caused the timeout - you'd have to browse multiple logs. Introduce an explicit timeout in `ScyllaServer.stop_gracefully`. Set it to 1 minute. Whether this is a good value may be arguable, but shutdown taking longer than that probably indicates problems. The important thing is that this timeout is shorter than the generic request timeout. If this times out we get a nice error in the test: ``` E test.pylib.rest_client.HTTPError: HTTP error 500, uri: http+unix://api/cluster/server/1/stop_gracefully, params: None, json: None, body: E Stopping server ScyllaServer(1, 127.162.40.1, 826d5884-4696-4a22-80a7-cc872aa43102) gracefully took longer than 60s ``` --- test/pylib/scylla_cluster.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index 251bc5c00e..86e7c53250 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -502,9 +502,15 @@ class ScyllaServer: except ProcessLookupError: pass else: - # FIXME: add timeout, fail the test and mark cluster as dirty - # if we timeout. - await self.cmd.wait() + STOP_TIMEOUT_SECONDS = 60 + wait_task = self.cmd.wait() + try: + await asyncio.wait_for(wait_task, timeout=STOP_TIMEOUT_SECONDS) + except asyncio.TimeoutError: + self.cmd.kill() + await self.cmd.wait() + raise RuntimeError( + f"Stopping server {self} gracefully took longer than {STOP_TIMEOUT_SECONDS}s") finally: if self.cmd: self.logger.info("gracefully stopped %s", self) From 7e56388721afa7b7416ce0c736df85867e6bbb5f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 11 May 2023 13:34:34 +0200 Subject: [PATCH 5/6] test: pylib: ScyllaCluster: generalize config type for `server_add` Generalize from `dict[str, str]` to `dict[str, Any]`. --- test/pylib/manager_client.py | 2 +- test/pylib/scylla_cluster.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 088d8045c3..a9ed711313 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -161,7 +161,7 @@ class ManagerClient(): await self.server_sees_others(server_id, wait_others, interval = wait_interval) self._driver_update() - async def server_add(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, str]] = None, start: bool = True) -> ServerInfo: + async def server_add(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, Any]] = None, start: bool = True) -> ServerInfo: """Add a new server""" try: data: dict[str, Any] = {'start': start} diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index 86e7c53250..91e8cd5185 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -18,7 +18,7 @@ import shutil import tempfile import time import traceback -from typing import Optional, Dict, List, Set, Tuple, Callable, AsyncIterator, NamedTuple, Union +from typing import Any, Optional, Dict, List, Set, Tuple, Callable, AsyncIterator, NamedTuple, Union import uuid from enum import Enum from io import BufferedWriter @@ -203,7 +203,7 @@ class ScyllaServer: logger: Union[logging.Logger, logging.LoggerAdapter], cluster_name: str, ip_addr: str, seeds: List[str], cmdline_options: List[str], - config_options: Dict[str, str]) -> None: + config_options: Dict[str, Any]) -> None: # pylint: disable=too-many-arguments self.server_id = ServerNum(ScyllaServer.newid()) self.exe = pathlib.Path(exe).resolve() @@ -565,7 +565,7 @@ class ScyllaCluster: cluster_name: str ip_addr: IPAddress seeds: List[str] - config_from_test: dict[str, str] + config_from_test: dict[str, Any] cmdline_from_test: List[str] def __init__(self, logger: Union[logging.Logger, logging.LoggerAdapter], @@ -652,11 +652,11 @@ class ScyllaCluster: def _seeds(self) -> List[str]: return [server.ip_addr for server in self.running.values()] - async def add_server(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, str]] = None, start: bool = True) -> ServerInfo: + async def add_server(self, replace_cfg: Optional[ReplaceConfig] = None, cmdline: Optional[List[str]] = None, config: Optional[dict[str, Any]] = None, start: bool = True) -> ServerInfo: """Add a new server to the cluster""" self.is_dirty = True - extra_config: dict[str, str] = config.copy() if config else {} + extra_config: dict[str, Any] = config.copy() if config else {} if replace_cfg: replaced_id = replace_cfg.replaced_id assert replaced_id in self.servers, \ From beabb615668223120c1701179a52f6fc042a6592 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 24 May 2023 15:17:21 +0200 Subject: [PATCH 6/6] test: reproducer for hints manager shutdown hang --- db/hints/manager.cc | 8 ++- db/hints/manager.hh | 3 +- test/topology_custom/suite.yaml | 4 ++ test/topology_custom/test_shutdown_hang.py | 63 ++++++++++++++++++++++ 4 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 test/topology_custom/test_shutdown_hang.py diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 56504ba8da..003b048c3e 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -44,7 +44,7 @@ static logging::logger manager_logger("hints_manager"); const std::string manager::FILENAME_PREFIX("HintsLog" + commitlog::descriptor::SEPARATOR); const std::chrono::seconds manager::hint_file_write_timeout = std::chrono::seconds(2); -const std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10); +std::chrono::seconds manager::hints_flush_period = std::chrono::seconds(10); manager::manager(sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager& res_manager, distributed& db) : _hints_dir(fs::path(hints_directory) / format("{:d}", this_shard_id())) @@ -52,7 +52,11 @@ manager::manager(sstring hints_directory, host_filter filter, int64_t max_hint_w , _max_hint_window_us(max_hint_window_ms * 1000) , _local_db(db.local()) , _resource_manager(res_manager) -{} +{ + if (utils::get_local_injector().enter("decrease_hints_flush_period")) { + hints_flush_period = std::chrono::seconds{1}; + } +} manager::~manager() { assert(_ep_managers.empty()); diff --git a/db/hints/manager.hh b/db/hints/manager.hh index b7b4fc8e54..58c066b6d0 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -501,7 +501,8 @@ private: public: static const std::string FILENAME_PREFIX; - static const std::chrono::seconds hints_flush_period; + // Non-const - can be modified with an error injection. + static std::chrono::seconds hints_flush_period; static const std::chrono::seconds hint_file_write_timeout; private: diff --git a/test/topology_custom/suite.yaml b/test/topology_custom/suite.yaml index 85e1cd3e71..60fd05ad68 100644 --- a/test/topology_custom/suite.yaml +++ b/test/topology_custom/suite.yaml @@ -5,3 +5,7 @@ cluster: extra_scylla_config_options: authenticator: AllowAllAuthenticator authorizer: AllowAllAuthorizer +skip_in_release: + - test_shutdown_hang +skip_in_debug: + - test_shutdown_hang diff --git a/test/topology_custom/test_shutdown_hang.py b/test/topology_custom/test_shutdown_hang.py new file mode 100644 index 0000000000..ddb560c776 --- /dev/null +++ b/test/topology_custom/test_shutdown_hang.py @@ -0,0 +1,63 @@ +import asyncio +import logging +import time +import pytest + +from cassandra.query import SimpleStatement # type: ignore +from cassandra.cluster import ConsistencyLevel # type: ignore +from cassandra.protocol import WriteTimeout # type: ignore + +from test.pylib.manager_client import ManagerClient +from test.topology.util import wait_for_token_ring_and_group0_consistency + + +logger = logging.getLogger(__name__) + + +@pytest.mark.asyncio +async def test_hints_manager_shutdown_hang(manager: ManagerClient) -> None: + """Reproducer for #8079""" + s1 = await manager.server_add(config={ + 'error_injections_at_startup': ['decrease_hints_flush_period'] + }) + s2 = await manager.server_add() + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + + cql = manager.get_cql() + + logger.info("Create keyspace and table") + await cql.run_async("create keyspace ks with replication = {'class': 'SimpleStrategy', 'replication_factor': 2}") + await cql.run_async("create table ks.t (pk int primary key)") + + logger.info(f"Stop {s2}") + await manager.server_stop(s2.server_id) + + logger.info("Write data with small timeout") + # We're using a small timeout for the insert so it's not unexpected that it would fail on slow + # CI machines. To avoid flakiness we disable the test in debug mode (as well as release since + # it requires an error injection - so it will run only in dev mode) and we retry the write 10 times. + passed = False + for _ in range(10): + try: + await cql.run_async(SimpleStatement("insert into ks.t (pk) values (0) using timeout 500ms", + consistency_level=ConsistencyLevel.ONE)) + except WriteTimeout: + logger.info("write timeout, retrying") + else: + passed = True + break + + if not passed: + pytest.fail("Write timed out on each attempt") + + # The write succeeded but a background task was left to finish the write to the other node + # (which is dead but the first node didn't mark it as dead yet). + # The background task will timeout shortly because of 'using timeout' in the statement. + # This will cause a hint to get created. + # The hints manager starts sending the hint soon after (hint flushing happens every + # ~1 second with the error injection). + logger.info("Sleep") + await asyncio.sleep(2) + + logger.info(f"Stop {s1} gracefully") + await manager.server_stop_gracefully(s1.server_id)