service: storage_proxy: rename view_update_handlers_list
The list will be used for non-view-update write handlers as well, so generalize the name. Also generalize some variable names used in the implementation. This commit only renames things + some comments were added, there are no logical changes.
This commit is contained in:
@@ -1508,13 +1508,18 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class storage_proxy::view_update_handlers_list : public bi::list<abstract_write_response_handler, bi::base_hook<abstract_write_response_handler>, bi::constant_time_size<false>> {
|
||||
// This list contains `abstract_write_response_handler`s which were constructed as `cancellable`.
|
||||
// When a `cancellable` handler is constructed, it adds itself to the list (see `register_cancellable`).
|
||||
// We use the list to cancel handlers - as if the write timed out - on certain events, such as when
|
||||
// we shutdown a node so that shutdown is not blocked.
|
||||
// We don't add normal data path writes to the list, only background work such as hints and view updates.
|
||||
class storage_proxy::cancellable_write_handlers_list : public bi::list<abstract_write_response_handler, bi::base_hook<abstract_write_response_handler>, bi::constant_time_size<false>> {
|
||||
// _live_iterators holds all iterators that point into the bi:list in the base class of this object.
|
||||
// If we remove a abstract_write_response_handler from the list, and an iterator happens to point
|
||||
// into it, we advance the iterator so it doesn't point at a removed object. See #4912.
|
||||
std::vector<iterator*> _live_iterators;
|
||||
public:
|
||||
view_update_handlers_list() {
|
||||
cancellable_write_handlers_list() {
|
||||
_live_iterators.reserve(10); // We only expect 1.
|
||||
}
|
||||
void register_live_iterator(iterator* itp) noexcept { // We don't tolerate failure, so abort instead
|
||||
@@ -1523,37 +1528,37 @@ public:
|
||||
void unregister_live_iterator(iterator* itp) {
|
||||
_live_iterators.erase(boost::remove(_live_iterators, itp), _live_iterators.end());
|
||||
}
|
||||
void update_live_iterators(abstract_write_response_handler* vuwrh) {
|
||||
// vuwrh is being removed from the b::list, so if any live iterator points at it,
|
||||
void update_live_iterators(abstract_write_response_handler* handler) {
|
||||
// handler is being removed from the b::list, so if any live iterator points at it,
|
||||
// move it to the next object (this requires that the list is traversed in the forward
|
||||
// direction).
|
||||
for (auto& itp : _live_iterators) {
|
||||
if (&**itp == vuwrh) {
|
||||
if (&**itp == handler) {
|
||||
++*itp;
|
||||
}
|
||||
}
|
||||
}
|
||||
class iterator_guard {
|
||||
view_update_handlers_list& _vuhl;
|
||||
cancellable_write_handlers_list& _handlers;
|
||||
iterator* _itp;
|
||||
public:
|
||||
iterator_guard(view_update_handlers_list& vuhl, iterator& it) : _vuhl(vuhl), _itp(&it) {
|
||||
_vuhl.register_live_iterator(_itp);
|
||||
iterator_guard(cancellable_write_handlers_list& handlers, iterator& it) : _handlers(handlers), _itp(&it) {
|
||||
_handlers.register_live_iterator(_itp);
|
||||
}
|
||||
~iterator_guard() {
|
||||
_vuhl.unregister_live_iterator(_itp);
|
||||
_handlers.unregister_live_iterator(_itp);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
void abstract_write_response_handler::register_cancellable() {
|
||||
_proxy->_view_update_handlers_list->push_back(*this);
|
||||
_proxy->_cancellable_write_handlers_list->push_back(*this);
|
||||
}
|
||||
|
||||
|
||||
void abstract_write_response_handler::update_cancellable_live_iterators() {
|
||||
if (is_linked()) {
|
||||
_proxy->_view_update_handlers_list->update_live_iterators(this);
|
||||
_proxy->_cancellable_write_handlers_list->update_live_iterators(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2721,7 +2726,7 @@ storage_proxy::storage_proxy(distributed<replica::database>& db, gms::gossiper&
|
||||
, _background_write_throttle_threahsold(cfg.available_memory / 10)
|
||||
, _mutate_stage{"storage_proxy_mutate", &storage_proxy::do_mutate}
|
||||
, _max_view_update_backlog(max_view_update_backlog)
|
||||
, _view_update_handlers_list(std::make_unique<view_update_handlers_list>()) {
|
||||
, _cancellable_write_handlers_list(std::make_unique<cancellable_write_handlers_list>()) {
|
||||
namespace sm = seastar::metrics;
|
||||
_metrics.add_group(storage_proxy_stats::COORDINATOR_STATS_CATEGORY, {
|
||||
sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); },
|
||||
@@ -6236,24 +6241,24 @@ void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint) {
|
||||
|
||||
void storage_proxy::on_up(const gms::inet_address& endpoint) {};
|
||||
|
||||
void storage_proxy::retire_view_response_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun) {
|
||||
void storage_proxy::cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun) {
|
||||
assert(thread::running_in_thread());
|
||||
auto it = _view_update_handlers_list->begin();
|
||||
while (it != _view_update_handlers_list->end()) {
|
||||
auto it = _cancellable_write_handlers_list->begin();
|
||||
while (it != _cancellable_write_handlers_list->end()) {
|
||||
auto guard = it->shared_from_this();
|
||||
if (filter_fun(*it) && _response_handlers.contains(it->id())) {
|
||||
it->timeout_cb();
|
||||
}
|
||||
++it;
|
||||
if (need_preempt()) {
|
||||
view_update_handlers_list::iterator_guard ig{*_view_update_handlers_list, it};
|
||||
cancellable_write_handlers_list::iterator_guard ig{*_cancellable_write_handlers_list, it};
|
||||
seastar::thread::yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void storage_proxy::on_down(const gms::inet_address& endpoint) {
|
||||
return retire_view_response_handlers([endpoint] (const abstract_write_response_handler& handler) {
|
||||
return cancel_write_handlers([endpoint] (const abstract_write_response_handler& handler) {
|
||||
const auto& targets = handler.get_targets();
|
||||
return boost::find(targets, endpoint) != targets.end();
|
||||
});
|
||||
@@ -6263,7 +6268,7 @@ future<> storage_proxy::drain_on_shutdown() {
|
||||
//NOTE: the thread is spawned here because there are delicate lifetime issues to consider
|
||||
// and writing them down with plain futures is error-prone.
|
||||
return async([this] {
|
||||
retire_view_response_handlers([] (const abstract_write_response_handler&) { return true; });
|
||||
cancel_write_handlers([] (const abstract_write_response_handler&) { return true; });
|
||||
_hints_resource_manager.stop().get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -273,8 +273,8 @@ private:
|
||||
std::unordered_map<gms::inet_address, view_update_backlog_timestamped> _view_update_backlogs;
|
||||
|
||||
//NOTICE(sarna): This opaque pointer is here just to avoid moving write handler class definitions from .cc to .hh. It's slow path.
|
||||
class view_update_handlers_list;
|
||||
std::unique_ptr<view_update_handlers_list> _view_update_handlers_list;
|
||||
class cancellable_write_handlers_list;
|
||||
std::unique_ptr<cancellable_write_handlers_list> _cancellable_write_handlers_list;
|
||||
|
||||
/* This is a pointer to the shard-local part of the sharded cdc_service:
|
||||
* storage_proxy needs access to cdc_service to augument mutations.
|
||||
@@ -427,7 +427,8 @@ private:
|
||||
template<typename Range>
|
||||
future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, clock_type::time_point timeout);
|
||||
|
||||
void retire_view_response_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun);
|
||||
// Retires (times out) write response handlers which were constructed as `cancellable` and pass the given filter.
|
||||
void cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun);
|
||||
|
||||
/**
|
||||
* Returns whether for a range query doing a query against merged is likely
|
||||
|
||||
Reference in New Issue
Block a user