gossiper: Store subscribers in an atomic_vector

The new guarantees are a bit better IMHO:

Once a subscriber is removed, it is never notified. This was not true
in the old code since it would iterate over a copy that would still
have that subscriber.

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
This commit is contained in:
Rafael Ávila de Espíndola
2020-01-17 10:18:05 -08:00
parent c62a33965d
commit 845116dfaf
5 changed files with 23 additions and 48 deletions

View File

@@ -882,11 +882,11 @@ bool gossiper::is_seed(const gms::inet_address& endpoint) const {
}
void gossiper::register_(shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
_subscribers.push_back(subscriber);
_subscribers.add(subscriber);
}
void gossiper::unregister_(shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
_subscribers.remove(subscriber);
future<> gossiper::unregister_(shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return _subscribers.remove(subscriber);
}
std::set<inet_address> gossiper::get_live_members() {

View File

@@ -42,6 +42,7 @@
#include <seastar/core/distributed.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/print.hh>
#include "utils/atomic_vector.hh"
#include "utils/UUID.hh"
#include "utils/fb_utilities.hh"
#include "gms/i_failure_detection_event_listener.hh"
@@ -187,38 +188,8 @@ private:
/**
* subscribers for interest in EndpointState change
*
* @class subscribers_list - allows modifications of the list at the same
* time as it's being iterated using for_each() method.
*/
class subscribers_list {
std::list<shared_ptr<i_endpoint_state_change_subscriber>> _l;
public:
auto push_back(shared_ptr<i_endpoint_state_change_subscriber> s) {
return _l.push_back(s);
}
/**
* Remove the element pointing to the same object as the given one.
* @param s shared_ptr pointing to the same object as one of the elements
* in the list.
*/
void remove(shared_ptr<i_endpoint_state_change_subscriber> s) {
_l.remove(s);
}
/**
* Make a copy of the current list and iterate over a copy.
*
* @param Func - function to apply on each list element
*/
template <typename Func>
void for_each(Func&& f) {
auto list_copy(_l);
std::for_each(list_copy.begin(), list_copy.end(), std::forward<Func>(f));
}
} _subscribers;
atomic_vector<shared_ptr<i_endpoint_state_change_subscriber>> _subscribers;
/* live member set */
utils::chunked_vector<inet_address> _live_endpoints;
@@ -278,7 +249,7 @@ public:
*
* @param subscriber module which implements the IEndpointStateChangeSubscriber
*/
void unregister_(shared_ptr<i_endpoint_state_change_subscriber> subscriber);
future<> unregister_(shared_ptr<i_endpoint_state_change_subscriber> subscriber);
std::set<inet_address> get_live_members();

View File

@@ -323,17 +323,19 @@ future<> gossiping_property_file_snitch::reload_gossiper_state() {
return make_ready_future<>();
}
future<> ret = make_ready_future<>();
if (_reconnectable_helper) {
gms::get_local_gossiper().unregister_(_reconnectable_helper);
ret = gms::get_local_gossiper().unregister_(_reconnectable_helper);
}
if (!_prefer_local) {
return make_ready_future<>();
return ret;
}
_reconnectable_helper = make_shared<reconnectable_snitch_helper>(_my_dc);
gms::get_local_gossiper().register_(_reconnectable_helper);
return make_ready_future<>();
return ret.then([this] {
_reconnectable_helper = make_shared<reconnectable_snitch_helper>(_my_dc);
gms::get_local_gossiper().register_(_reconnectable_helper);
});
}
using registry_2_params = class_registrator<i_endpoint_snitch,

View File

@@ -2579,9 +2579,9 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper, size_t max_
}
future<> repair_service::stop() {
_gossiper.local().unregister_(_gossip_helper);
_stopped = true;
return make_ready_future<>();
return _gossiper.local().unregister_(_gossip_helper).then([this] {
_stopped = true;
});
}
repair_service::~repair_service() {

View File

@@ -130,8 +130,9 @@ void load_broadcaster::start_broadcasting() {
future<> load_broadcaster::stop_broadcasting() {
_timer.cancel();
_gossiper.unregister_(shared_from_this());
return std::move(_done).then([this] {
return _gossiper.unregister_(shared_from_this()).then([this] {
return std::move(_done);
}).then([this] {
_stopped = true;
});
}
@@ -263,9 +264,10 @@ future<> view_update_backlog_broker::start() {
}
future<> view_update_backlog_broker::stop() {
_gossiper.unregister_(shared_from_this());
_as.request_abort();
return std::move(_started);
return _gossiper.unregister_(shared_from_this()).then([this] {
_as.request_abort();
return std::move(_started);
});
}
void view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {