From eddb7406b4c9148773d1e97021edd09115929a33 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 26 May 2023 13:17:56 +0200 Subject: [PATCH] service: storage_proxy: rename `view_update_handlers_list` The list will be used for non-view-update write handlers as well, so generalize the name. Also generalize some variable names used in the implementation. This commit only renames things + some comments were added, there are no logical changes. --- service/storage_proxy.cc | 41 ++++++++++++++++++++++------------------ service/storage_proxy.hh | 7 ++++--- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index d8a8ff47d6..c50280b291 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1508,13 +1508,18 @@ public: } }; -class storage_proxy::view_update_handlers_list : public bi::list, bi::constant_time_size> { +// This list contains `abstract_write_response_handler`s which were constructed as `cancellable`. +// When a `cancellable` handler is constructed, it adds itself to the list (see `register_cancellable`). +// We use the list to cancel handlers - as if the write timed out - on certain events, such as when +// we shutdown a node so that shutdown is not blocked. +// We don't add normal data path writes to the list, only background work such as hints and view updates. +class storage_proxy::cancellable_write_handlers_list : public bi::list, bi::constant_time_size> { // _live_iterators holds all iterators that point into the bi:list in the base class of this object. // If we remove a abstract_write_response_handler from the list, and an iterator happens to point // into it, we advance the iterator so it doesn't point at a removed object. See #4912. std::vector _live_iterators; public: - view_update_handlers_list() { + cancellable_write_handlers_list() { _live_iterators.reserve(10); // We only expect 1. } void register_live_iterator(iterator* itp) noexcept { // We don't tolerate failure, so abort instead @@ -1523,37 +1528,37 @@ public: void unregister_live_iterator(iterator* itp) { _live_iterators.erase(boost::remove(_live_iterators, itp), _live_iterators.end()); } - void update_live_iterators(abstract_write_response_handler* vuwrh) { - // vuwrh is being removed from the b::list, so if any live iterator points at it, + void update_live_iterators(abstract_write_response_handler* handler) { + // handler is being removed from the b::list, so if any live iterator points at it, // move it to the next object (this requires that the list is traversed in the forward // direction). for (auto& itp : _live_iterators) { - if (&**itp == vuwrh) { + if (&**itp == handler) { ++*itp; } } } class iterator_guard { - view_update_handlers_list& _vuhl; + cancellable_write_handlers_list& _handlers; iterator* _itp; public: - iterator_guard(view_update_handlers_list& vuhl, iterator& it) : _vuhl(vuhl), _itp(&it) { - _vuhl.register_live_iterator(_itp); + iterator_guard(cancellable_write_handlers_list& handlers, iterator& it) : _handlers(handlers), _itp(&it) { + _handlers.register_live_iterator(_itp); } ~iterator_guard() { - _vuhl.unregister_live_iterator(_itp); + _handlers.unregister_live_iterator(_itp); } }; }; void abstract_write_response_handler::register_cancellable() { - _proxy->_view_update_handlers_list->push_back(*this); + _proxy->_cancellable_write_handlers_list->push_back(*this); } void abstract_write_response_handler::update_cancellable_live_iterators() { if (is_linked()) { - _proxy->_view_update_handlers_list->update_live_iterators(this); + _proxy->_cancellable_write_handlers_list->update_live_iterators(this); } } @@ -2721,7 +2726,7 @@ storage_proxy::storage_proxy(distributed& db, gms::gossiper& , _background_write_throttle_threahsold(cfg.available_memory / 10) , _mutate_stage{"storage_proxy_mutate", &storage_proxy::do_mutate} , _max_view_update_backlog(max_view_update_backlog) - , _view_update_handlers_list(std::make_unique()) { + , _cancellable_write_handlers_list(std::make_unique()) { namespace sm = seastar::metrics; _metrics.add_group(storage_proxy_stats::COORDINATOR_STATS_CATEGORY, { sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); }, @@ -6236,24 +6241,24 @@ void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint) { void storage_proxy::on_up(const gms::inet_address& endpoint) {}; -void storage_proxy::retire_view_response_handlers(noncopyable_function filter_fun) { +void storage_proxy::cancel_write_handlers(noncopyable_function filter_fun) { assert(thread::running_in_thread()); - auto it = _view_update_handlers_list->begin(); - while (it != _view_update_handlers_list->end()) { + auto it = _cancellable_write_handlers_list->begin(); + while (it != _cancellable_write_handlers_list->end()) { auto guard = it->shared_from_this(); if (filter_fun(*it) && _response_handlers.contains(it->id())) { it->timeout_cb(); } ++it; if (need_preempt()) { - view_update_handlers_list::iterator_guard ig{*_view_update_handlers_list, it}; + cancellable_write_handlers_list::iterator_guard ig{*_cancellable_write_handlers_list, it}; seastar::thread::yield(); } } } void storage_proxy::on_down(const gms::inet_address& endpoint) { - return retire_view_response_handlers([endpoint] (const abstract_write_response_handler& handler) { + return cancel_write_handlers([endpoint] (const abstract_write_response_handler& handler) { const auto& targets = handler.get_targets(); return boost::find(targets, endpoint) != targets.end(); }); @@ -6263,7 +6268,7 @@ future<> storage_proxy::drain_on_shutdown() { //NOTE: the thread is spawned here because there are delicate lifetime issues to consider // and writing them down with plain futures is error-prone. return async([this] { - retire_view_response_handlers([] (const abstract_write_response_handler&) { return true; }); + cancel_write_handlers([] (const abstract_write_response_handler&) { return true; }); _hints_resource_manager.stop().get(); }); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 6befd5aede..0c14834661 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -273,8 +273,8 @@ private: std::unordered_map _view_update_backlogs; //NOTICE(sarna): This opaque pointer is here just to avoid moving write handler class definitions from .cc to .hh. It's slow path. - class view_update_handlers_list; - std::unique_ptr _view_update_handlers_list; + class cancellable_write_handlers_list; + std::unique_ptr _cancellable_write_handlers_list; /* This is a pointer to the shard-local part of the sharded cdc_service: * storage_proxy needs access to cdc_service to augument mutations. @@ -427,7 +427,8 @@ private: template future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, clock_type::time_point timeout); - void retire_view_response_handlers(noncopyable_function filter_fun); + // Retires (times out) write response handlers which were constructed as `cancellable` and pass the given filter. + void cancel_write_handlers(noncopyable_function filter_fun); /** * Returns whether for a range query doing a query against merged is likely