Merge 'Gossiper endpoint locking' from Benny Halevy
This series cleans up and hardens the endpoint locking design and implementation in the gossiper and endpoint-state subscribers. We make sure that all notifications (expect for `before_change`, that apparently can be dropped) are called under lock_endpoint, as well as all calls to gossiper::replicate, to serialize endpoint_state changes across all shards. An endpoint lock gets a unique permit_id that is passed to the notifications and passed back by them if the notification functions call the gossiper back for the same endpoint on paths that modify the endpoint_state and may acquire the same endpoint lock - to prevent a deadlock. Fixes scylladb/scylladb#14838 Refs scylladb/scylladb#14471 Closes #14845 * github.com:scylladb/scylladb: gossiper: replicate: ensure non-null permit gossiper: add_saved_endpoint: lock_endpoint gossiper: mark_as_shutdown: lock_endpoint gossiper: real_mark_alive: lock_endpoint gossiper: advertise_token_removed: lock_endpoint gossiper: do_status_check: lock_endpoint gossiper: remove_endpoint: lock_endpoint if needed gossiper: force_remove_endpoint: lock_endpoint if needed storage_service: lock_endpoint when removing node gossiper: use permit_id to serialize state changes while preventing deadlocks gossiper: lock_endpoint: add debug messages utils: UUID: make default tagged_uuid ctor constexpr gossiper: lock_endpoint must be called on shard 0 gossiper: replicate: simplify interface gossiper: mark_as_shutdown: make private gossiper: convict: make private gossiper: mark_as_shutdown: do not call convict
This commit is contained in:
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "gossiper.hh"
|
||||
#include "api/api-doc/gossiper.json.hh"
|
||||
#include "gms/endpoint_state.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
|
||||
namespace api {
|
||||
@@ -59,7 +60,7 @@ void set_gossiper(http_context& ctx, routes& r, gms::gossiper& g) {
|
||||
|
||||
httpd::gossiper_json::force_remove_endpoint.set(r, [&g](std::unique_ptr<http::request> req) {
|
||||
gms::inet_address ep(req->param["addr"]);
|
||||
return g.force_remove_endpoint(ep).then([] {
|
||||
return g.force_remove_endpoint(ep, gms::null_permit_id).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
});
|
||||
|
||||
@@ -794,7 +794,7 @@ future<> generation_service::after_join(std::optional<cdc::generation_id>&& star
|
||||
_cdc_streams_rewrite_complete = maybe_rewrite_streams_descriptions();
|
||||
}
|
||||
|
||||
future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state) {
|
||||
future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state, gms::permit_id pid) {
|
||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||
|
||||
auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID);
|
||||
@@ -802,10 +802,10 @@ future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state e
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
return on_change(ep, gms::application_state::CDC_GENERATION_ID, *val);
|
||||
return on_change(ep, gms::application_state::CDC_GENERATION_ID, *val, pid);
|
||||
}
|
||||
|
||||
future<> generation_service::on_change(gms::inet_address ep, gms::application_state app_state, const gms::versioned_value& v) {
|
||||
future<> generation_service::on_change(gms::inet_address ep, gms::application_state app_state, const gms::versioned_value& v, gms::permit_id) {
|
||||
assert_shard_zero(__PRETTY_FUNCTION__);
|
||||
|
||||
if (app_state != gms::application_state::CDC_GENERATION_ID) {
|
||||
|
||||
@@ -104,13 +104,13 @@ public:
|
||||
}
|
||||
|
||||
virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_remove(gms::inet_address) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_remove(gms::inet_address, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
|
||||
virtual future<> on_join(gms::inet_address, gms::endpoint_state) override;
|
||||
virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override;
|
||||
virtual future<> on_join(gms::inet_address, gms::endpoint_state, gms::permit_id) override;
|
||||
virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&, gms::permit_id) override;
|
||||
|
||||
future<> check_and_repair_cdc_streams();
|
||||
|
||||
|
||||
@@ -168,4 +168,15 @@ public:
|
||||
friend std::ostream& operator<<(std::ostream& os, const endpoint_state& x);
|
||||
};
|
||||
|
||||
// The endpoint state is protected with an endpoint lock
|
||||
// acquired in the gossiper using gossiper::lock_endpoint.
|
||||
//
|
||||
// permit_id identifies the held endpoint lock
|
||||
// and it is used by gossiper::lock_endpoint to prevent a deadlock
|
||||
// when a notification function is called under the endpoint lock
|
||||
// and calls back into the gossiper on a path that wants to acquire
|
||||
// the endpoint_lock for the same endpoint.
|
||||
using permit_id = utils::tagged_uuid<struct permit_id_tag>;
|
||||
constexpr permit_id null_permit_id = permit_id::create_null_id();
|
||||
|
||||
} // gms
|
||||
|
||||
@@ -224,20 +224,20 @@ public:
|
||||
, _sys_ks(s)
|
||||
{
|
||||
}
|
||||
future<> on_join(inet_address ep, endpoint_state state) override {
|
||||
future<> on_join(inet_address ep, endpoint_state state, gms::permit_id) override {
|
||||
return enable_features();
|
||||
}
|
||||
future<> on_change(inet_address ep, application_state state, const versioned_value&) override {
|
||||
future<> on_change(inet_address ep, application_state state, const versioned_value&, gms::permit_id) override {
|
||||
if (state == application_state::SUPPORTED_FEATURES) {
|
||||
return enable_features();
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
future<> before_change(inet_address, endpoint_state, application_state, const versioned_value&) override { return make_ready_future(); }
|
||||
future<> on_alive(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_dead(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_remove(inet_address) override { return make_ready_future(); }
|
||||
future<> on_restart(inet_address, endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_alive(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
future<> on_dead(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
future<> on_remove(inet_address, gms::permit_id) override { return make_ready_future(); }
|
||||
future<> on_restart(inet_address, endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
|
||||
future<> enable_features();
|
||||
};
|
||||
|
||||
250
gms/gossiper.cc
250
gms/gossiper.cc
@@ -38,6 +38,7 @@
|
||||
#include <boost/range/algorithm/partition.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <utility>
|
||||
#include "gms/generation-number.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
@@ -427,7 +428,7 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t>
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto permit = co_await this->lock_endpoint(from);
|
||||
auto permit = co_await this->lock_endpoint(from, null_permit_id);
|
||||
if (generation_number_opt) {
|
||||
debug_validate_gossip_generation(*generation_number_opt);
|
||||
auto es = this->get_endpoint_state_for_endpoint_ptr(from);
|
||||
@@ -446,7 +447,7 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t>
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
co_await this->mark_as_shutdown(from);
|
||||
co_await this->mark_as_shutdown(from, permit.id());
|
||||
}
|
||||
|
||||
future<gossip_get_endpoint_states_response>
|
||||
@@ -549,7 +550,7 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address>
|
||||
future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification) {
|
||||
// If state does not exist just add it. If it does then add it if the remote generation is greater.
|
||||
// If there is a generation tie, attempt to break it by heartbeat version.
|
||||
auto permit = co_await this->lock_endpoint(node);
|
||||
auto permit = co_await this->lock_endpoint(node, null_permit_id);
|
||||
auto es = this->get_endpoint_state_for_endpoint_ptr(node);
|
||||
if (es) {
|
||||
endpoint_state& local_state = *es;
|
||||
@@ -564,7 +565,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
|
||||
if (listener_notification) {
|
||||
logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node);
|
||||
// major state change will handle the update by inserting the remote state directly
|
||||
co_await this->handle_major_state_change(node, remote_state);
|
||||
co_await this->handle_major_state_change(node, remote_state, permit.id());
|
||||
} else {
|
||||
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
|
||||
_endpoint_state_map[node] = remote_state;
|
||||
@@ -576,7 +577,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
|
||||
auto remote_max_version = this->get_max_endpoint_state_version(remote_state);
|
||||
if (remote_max_version > local_max_version) {
|
||||
// apply states, but do not notify since there is no major change
|
||||
co_await this->apply_new_states(node, local_state, remote_state);
|
||||
co_await this->apply_new_states(node, local_state, remote_state, permit.id());
|
||||
} else {
|
||||
logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
|
||||
}
|
||||
@@ -602,7 +603,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
|
||||
}
|
||||
} else {
|
||||
if (listener_notification) {
|
||||
co_await this->handle_major_state_change(node, remote_state);
|
||||
co_await this->handle_major_state_change(node, remote_state, permit.id());
|
||||
} else {
|
||||
logger.debug("Applying remote_state for node {} (new node)", node);
|
||||
_endpoint_state_map[node] = remote_state;
|
||||
@@ -643,14 +644,16 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
|
||||
std::chrono::steady_clock::now() - start).count());
|
||||
}
|
||||
|
||||
future<> gossiper::force_remove_endpoint(inet_address endpoint) {
|
||||
future<> gossiper::force_remove_endpoint(inet_address endpoint, permit_id pid) {
|
||||
if (endpoint == get_broadcast_address()) {
|
||||
return make_exception_future<>(std::runtime_error(format("Can not force remove node {} itself", endpoint)));
|
||||
}
|
||||
return container().invoke_on(0, [endpoint] (auto& gossiper) mutable -> future<> {
|
||||
return container().invoke_on(0, [endpoint, pid] (auto& gossiper) mutable -> future<> {
|
||||
auto permit = co_await gossiper.lock_endpoint(endpoint, pid);
|
||||
pid = permit.id();
|
||||
try {
|
||||
co_await gossiper.remove_endpoint(endpoint);
|
||||
co_await gossiper.evict_from_membership(endpoint);
|
||||
co_await gossiper.remove_endpoint(endpoint, pid);
|
||||
co_await gossiper.evict_from_membership(endpoint, pid);
|
||||
logger.info("Finished to force remove node {}", endpoint);
|
||||
} catch (...) {
|
||||
logger.warn("Failed to force remove node {}: {}", endpoint, std::current_exception());
|
||||
@@ -658,11 +661,14 @@ future<> gossiper::force_remove_endpoint(inet_address endpoint) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::remove_endpoint(inet_address endpoint) {
|
||||
future<> gossiper::remove_endpoint(inet_address endpoint, permit_id pid) {
|
||||
auto permit = co_await lock_endpoint(endpoint, pid);
|
||||
pid = permit.id();
|
||||
|
||||
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
|
||||
try {
|
||||
co_await _subscribers.for_each([endpoint] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_remove(endpoint);
|
||||
co_await _subscribers.for_each([endpoint, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_remove(endpoint, pid);
|
||||
});
|
||||
} catch (...) {
|
||||
logger.warn("Fail to call on_remove callback: {}", std::current_exception());
|
||||
@@ -692,6 +698,10 @@ future<> gossiper::do_status_check() {
|
||||
if (endpoint == get_broadcast_address()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto permit = co_await lock_endpoint(endpoint, null_permit_id);
|
||||
const auto& pid = permit.id();
|
||||
|
||||
auto eps = get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
if (!eps) {
|
||||
continue;
|
||||
@@ -699,7 +709,6 @@ future<> gossiper::do_status_check() {
|
||||
auto& ep_state = *eps;
|
||||
bool is_alive = ep_state.is_alive();
|
||||
auto update_timestamp = ep_state.get_update_timestamp();
|
||||
// ep_state cannot be used after yielding
|
||||
|
||||
// check if this is a fat client. fat clients are removed automatically from
|
||||
// gossip after FatClientTimeout. Do not remove dead states here.
|
||||
@@ -707,8 +716,8 @@ future<> gossiper::do_status_check() {
|
||||
&& !_just_removed_endpoints.contains(endpoint)
|
||||
&& ((now - update_timestamp) > fat_client_timeout)) {
|
||||
logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fat_client_timeout.count());
|
||||
co_await remove_endpoint(endpoint); // will put it in _just_removed_endpoints to respect quarantine delay
|
||||
co_await evict_from_membership(endpoint); // can get rid of the state immediately
|
||||
co_await remove_endpoint(endpoint, pid); // will put it in _just_removed_endpoints to respect quarantine delay
|
||||
co_await evict_from_membership(endpoint, pid); // can get rid of the state immediately
|
||||
}
|
||||
|
||||
// check for dead state removal
|
||||
@@ -716,7 +725,7 @@ future<> gossiper::do_status_check() {
|
||||
if (!is_alive && (now > expire_time)
|
||||
&& (!get_token_metadata_ptr()->is_normal_token_owner(endpoint))) {
|
||||
logger.debug("time is expiring for endpoint : {} ({})", endpoint, expire_time.time_since_epoch().count());
|
||||
co_await evict_from_membership(endpoint);
|
||||
co_await evict_from_membership(endpoint, pid);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -731,12 +740,69 @@ future<> gossiper::do_status_check() {
|
||||
}
|
||||
}
|
||||
|
||||
future<gossiper::endpoint_permit> gossiper::lock_endpoint(inet_address ep) {
|
||||
return _endpoint_locks.get_or_load(ep, [] (const inet_address& ep) { return semaphore(1); }).then([] (auto eptr) {
|
||||
return get_units(*eptr, 1).then([eptr] (auto units) mutable {
|
||||
return endpoint_permit{std::move(eptr), std::move(units)};
|
||||
});
|
||||
});
|
||||
gossiper::endpoint_permit::endpoint_permit(endpoint_locks_map::entry_ptr&& ptr, inet_address addr, std::string caller) noexcept
|
||||
: _ptr(std::move(ptr))
|
||||
, _permit_id(_ptr->pid)
|
||||
, _addr(std::move(addr))
|
||||
, _caller(std::move(caller))
|
||||
{
|
||||
++_ptr->holders;
|
||||
logger.debug("{}: lock_endpoint {}: acquired: permit_id={} holders={}", _caller, _addr, _permit_id, _ptr->holders);
|
||||
}
|
||||
|
||||
gossiper::endpoint_permit::endpoint_permit(endpoint_permit&& o) noexcept
|
||||
: _ptr(std::exchange(o._ptr, nullptr))
|
||||
, _permit_id(std::exchange(o._permit_id, null_permit_id))
|
||||
, _addr(std::exchange(o._addr, inet_address{}))
|
||||
, _caller(std::move(o._caller))
|
||||
{}
|
||||
|
||||
gossiper::endpoint_permit::~endpoint_permit() {
|
||||
release();
|
||||
}
|
||||
|
||||
bool gossiper::endpoint_permit::release() noexcept {
|
||||
if (auto ptr = std::exchange(_ptr, nullptr)) {
|
||||
assert(ptr->pid == _permit_id);
|
||||
logger.debug("{}: lock_endpoint {}: released: permit_id={} holders={}", _caller, _addr, _permit_id, ptr->holders);
|
||||
if (!--ptr->holders) {
|
||||
logger.debug("{}: lock_endpoint {}: released: permit_id={}", _caller, _addr, _permit_id);
|
||||
ptr->units.return_all();
|
||||
ptr->pid = null_permit_id;
|
||||
_permit_id = null_permit_id;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
gossiper::endpoint_lock_entry::endpoint_lock_entry() noexcept
|
||||
: sem(1)
|
||||
, pid(permit_id::create_null_id())
|
||||
{}
|
||||
|
||||
future<gossiper::endpoint_permit> gossiper::lock_endpoint(inet_address ep, permit_id pid, seastar::compat::source_location l) {
|
||||
assert(this_shard_id() == 0);
|
||||
std::string caller = l.function_name();
|
||||
auto eptr = co_await _endpoint_locks.get_or_load(ep, [] (const inet_address& ep) { return endpoint_lock_entry(); });
|
||||
if (pid) {
|
||||
if (eptr->pid == pid) {
|
||||
// Already locked with the same permit
|
||||
co_return endpoint_permit(std::move(eptr), std::move(ep), std::move(caller));
|
||||
} else {
|
||||
// the endpoint lock must not have been released fir the permit_id
|
||||
// maybe we're passed a permit_id that was acquired for a different endpoint
|
||||
on_internal_error_noexcept(logger, fmt::format("{}: lock_endpoint {}: permit_id={}: endpoint_lock_entry has mismatching permit_id={}", caller, ep, pid, eptr->pid));
|
||||
}
|
||||
}
|
||||
pid = permit_id::create_random_id();
|
||||
logger.debug("{}: lock_endpoint {}: waiting: permit_id={}", caller, ep, pid);
|
||||
eptr->units = co_await get_units(eptr->sem, 1);
|
||||
eptr->pid = pid;
|
||||
if (eptr->holders) {
|
||||
on_internal_error_noexcept(logger, fmt::format("{}: lock_endpoint {}: newly held endpoint_lock_entry has {} holders", caller, ep, eptr->holders));
|
||||
}
|
||||
co_return endpoint_permit(std::move(eptr), std::move(ep), std::move(caller));
|
||||
}
|
||||
|
||||
future<> gossiper::update_live_endpoints_version() {
|
||||
@@ -1068,9 +1134,9 @@ future<> gossiper::convict(inet_address endpoint) {
|
||||
co_return;
|
||||
}
|
||||
if (is_shutdown(endpoint)) {
|
||||
co_await mark_as_shutdown(endpoint);
|
||||
co_await mark_as_shutdown(endpoint, null_permit_id);
|
||||
} else {
|
||||
co_await mark_dead(endpoint, *state);
|
||||
co_await mark_dead(endpoint, *state, null_permit_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1091,8 +1157,8 @@ version_type gossiper::get_max_endpoint_state_version(endpoint_state state) cons
|
||||
return max_version;
|
||||
}
|
||||
|
||||
future<> gossiper::evict_from_membership(inet_address endpoint) {
|
||||
auto permit = co_await lock_endpoint(endpoint);
|
||||
future<> gossiper::evict_from_membership(inet_address endpoint, permit_id pid) {
|
||||
auto permit = co_await lock_endpoint(endpoint, pid);
|
||||
_unreachable_endpoints.erase(endpoint);
|
||||
co_await container().invoke_on_all([endpoint] (auto& g) {
|
||||
g._endpoint_state_map.erase(endpoint);
|
||||
@@ -1142,7 +1208,10 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
|
||||
#endif
|
||||
}
|
||||
|
||||
future<> gossiper::replicate(inet_address ep, const endpoint_state& es) {
|
||||
future<> gossiper::replicate(inet_address ep, const endpoint_state& es, permit_id pid) {
|
||||
if (!pid) {
|
||||
on_internal_error_noexcept(logger, fmt::format("replicate {} called with null permit", ep));
|
||||
}
|
||||
return container().invoke_on_all([ep, es, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
|
||||
if (this_shard_id() != orig) {
|
||||
g._endpoint_state_map[ep].add_application_state(es);
|
||||
@@ -1150,25 +1219,9 @@ future<> gossiper::replicate(inet_address ep, const endpoint_state& es) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::replicate(inet_address ep, const std::map<application_state, versioned_value>& src, const utils::chunked_vector<application_state>& changed) {
|
||||
return container().invoke_on_all([ep, &src, &changed, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
|
||||
if (this_shard_id() != orig) {
|
||||
for (auto&& key : changed) {
|
||||
g._endpoint_state_map[ep].add_application_state(key, src.at(key));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::replicate(inet_address ep, application_state key, const versioned_value& value) {
|
||||
return container().invoke_on_all([ep, key, &value, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
|
||||
if (this_shard_id() != orig) {
|
||||
g._endpoint_state_map[ep].add_application_state(key, value);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id) {
|
||||
future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id, permit_id pid) {
|
||||
auto permit = co_await lock_endpoint(endpoint, pid);
|
||||
pid = permit.id();
|
||||
auto& eps = get_endpoint_state(endpoint);
|
||||
eps.update_timestamp(); // make sure we don't evict it too soon
|
||||
eps.get_heart_beat_state().force_newer_generation_unsafe();
|
||||
@@ -1177,7 +1230,7 @@ future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_
|
||||
logger.info("Completing removal of {}", endpoint);
|
||||
add_expire_time_for_endpoint(endpoint, expire_time);
|
||||
_endpoint_state_map[endpoint] = eps;
|
||||
co_await replicate(endpoint, eps);
|
||||
co_await replicate(endpoint, eps, pid);
|
||||
// ensure at least one gossip round occurs before returning
|
||||
co_await sleep_abortable(INTERVAL * 2, _abort_source);
|
||||
}
|
||||
@@ -1191,7 +1244,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
|
||||
return container().invoke_on(0, [address] (auto&& gossiper) {
|
||||
return seastar::async([&gossiper, g = gossiper.shared_from_this(), address] {
|
||||
inet_address endpoint(address);
|
||||
auto permit = gossiper.lock_endpoint(endpoint).get0();
|
||||
auto permit = gossiper.lock_endpoint(endpoint, null_permit_id).get0();
|
||||
auto es = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
auto now = gossiper.now();
|
||||
generation_type gen(std::chrono::duration_cast<std::chrono::seconds>((now + std::chrono::seconds(60)).time_since_epoch()).count());
|
||||
@@ -1232,7 +1285,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
|
||||
std::unordered_set<dht::token> tokens_set(tokens.begin(), tokens.end());
|
||||
auto expire_time = gossiper.compute_expire_time();
|
||||
ep_state.add_application_state(application_state::STATUS, versioned_value::left(tokens_set, expire_time.time_since_epoch().count()));
|
||||
gossiper.handle_major_state_change(endpoint, ep_state).get();
|
||||
gossiper.handle_major_state_change(endpoint, ep_state, permit.id()).get();
|
||||
sleep_abortable(INTERVAL * 4, gossiper._abort_source).get();
|
||||
logger.warn("Finished assassinating {}", endpoint);
|
||||
});
|
||||
@@ -1494,33 +1547,34 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation);
|
||||
(void)_messaging.send_gossip_echo(id, generation.value(), std::chrono::milliseconds(15000)).then([this, addr] {
|
||||
logger.trace("Got EchoMessage Reply");
|
||||
// After sending echo message, the Node might not be in the
|
||||
// _endpoint_state_map anymore, use the reference of local_state
|
||||
// might cause user-after-free
|
||||
auto es = get_endpoint_state_for_endpoint_ptr(addr);
|
||||
if (!es) {
|
||||
logger.info("Node {} is not in endpoint_state_map anymore", addr);
|
||||
} else {
|
||||
endpoint_state& state = *es;
|
||||
logger.debug("Mark Node {} alive after EchoMessage", addr);
|
||||
return real_mark_alive(addr, state);
|
||||
}
|
||||
return make_ready_future();
|
||||
return real_mark_alive(addr);
|
||||
}).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending)] (auto ep) {
|
||||
logger.warn("Fail to send EchoMessage to {}: {}", addr, ep);
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
logger.trace("marking as alive {}", addr);
|
||||
future<> gossiper::real_mark_alive(inet_address addr) {
|
||||
auto permit = co_await lock_endpoint(addr, null_permit_id);
|
||||
|
||||
// After sending echo message, the Node might not be in the
|
||||
// _endpoint_state_map anymore, use the reference of local_state
|
||||
// might cause user-after-free
|
||||
auto* es = get_endpoint_state_for_endpoint_ptr(addr);
|
||||
if (!es) {
|
||||
logger.info("Node {} is not in endpoint_state_map anymore", addr);
|
||||
co_return;
|
||||
}
|
||||
|
||||
// Do not mark a node with status shutdown as UP.
|
||||
auto status = sstring(get_gossip_status(local_state));
|
||||
auto status = sstring(get_gossip_status(*es));
|
||||
if (status == sstring(versioned_value::SHUTDOWN)) {
|
||||
logger.warn("Skip marking node {} with status = {} as UP", addr, status);
|
||||
co_return;
|
||||
}
|
||||
|
||||
logger.debug("Mark Node {} alive after EchoMessage", addr);
|
||||
|
||||
auto& local_state = *es;
|
||||
local_state.mark_alive();
|
||||
local_state.update_timestamp(); // prevents do_status_check from racing us and evicting if it was down > A_VERY_LONG_TIME
|
||||
|
||||
@@ -1548,13 +1602,13 @@ future<> gossiper::real_mark_alive(inet_address addr, endpoint_state& local_stat
|
||||
logger.info("InetAddress {} is now UP, status = {}", addr, status);
|
||||
}
|
||||
|
||||
co_await _subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
|
||||
co_await subscriber->on_alive(addr, state);
|
||||
co_await _subscribers.for_each([addr, state = std::move(state), pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
|
||||
co_await subscriber->on_alive(addr, state, pid);
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state, permit_id pid) {
|
||||
logger.trace("marking as down {}", addr);
|
||||
local_state.mark_dead();
|
||||
endpoint_state state = local_state;
|
||||
@@ -1562,13 +1616,13 @@ future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
co_await update_live_endpoints_version();
|
||||
_unreachable_endpoints[addr] = now();
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
|
||||
co_await _subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
|
||||
co_await subscriber->on_dead(addr, state);
|
||||
co_await _subscribers.for_each([addr, state, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
|
||||
co_await subscriber->on_dead(addr, state, pid);
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps) {
|
||||
future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps, permit_id pid) {
|
||||
std::optional<endpoint_state> eps_old;
|
||||
if (auto* p = get_endpoint_state_for_endpoint_ptr(ep); p) {
|
||||
eps_old = *p;
|
||||
@@ -1583,7 +1637,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
|
||||
}
|
||||
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
|
||||
_endpoint_state_map[ep] = eps;
|
||||
co_await replicate(ep, eps);
|
||||
co_await replicate(ep, eps, pid);
|
||||
|
||||
if (is_in_shadow_round()) {
|
||||
// In shadow round, we only interested in the peer's endpoint_state,
|
||||
@@ -1597,8 +1651,8 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
|
||||
|
||||
if (eps_old) {
|
||||
// the node restarted: it is up to the subscriber to take whatever action is necessary
|
||||
co_await _subscribers.for_each([ep, eps_old] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_restart(ep, *eps_old);
|
||||
co_await _subscribers.for_each([ep, eps_old, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_restart(ep, *eps_old, pid);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1607,18 +1661,18 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
|
||||
mark_alive(ep, ep_state);
|
||||
} else {
|
||||
logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps));
|
||||
co_await mark_dead(ep, ep_state);
|
||||
co_await mark_dead(ep, ep_state, pid);
|
||||
}
|
||||
|
||||
auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep);
|
||||
if (eps_new) {
|
||||
co_await _subscribers.for_each([ep, eps_new] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_join(ep, *eps_new);
|
||||
co_await _subscribers.for_each([ep, eps_new, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_join(ep, *eps_new, pid);
|
||||
});
|
||||
}
|
||||
// check this at the end so nodes will learn about the endpoint
|
||||
if (is_shutdown(ep)) {
|
||||
co_await mark_as_shutdown(ep);
|
||||
co_await mark_as_shutdown(ep, pid);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1660,7 +1714,7 @@ bool gossiper::is_silent_shutdown_state(const endpoint_state& ep_state) const{
|
||||
return false;
|
||||
}
|
||||
|
||||
future<> gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state) {
|
||||
future<> gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state, permit_id pid) {
|
||||
// don't assert here, since if the node restarts the version will go back to zero
|
||||
//int oldVersion = local_state.get_heart_beat_state().get_heart_beat_version();
|
||||
|
||||
@@ -1702,7 +1756,7 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state& local_sta
|
||||
// Exceptions during replication will cause abort because node's state
|
||||
// would be inconsistent across shards. Changes listeners depend on state
|
||||
// being replicated to all shards.
|
||||
co_await replicate(addr, remote_map, changed);
|
||||
co_await replicate(addr, local_state, pid);
|
||||
|
||||
// Exceptions thrown from listeners will result in abort because that could leave the node in a bad
|
||||
// state indefinitely. Unless the value changes again, we wouldn't retry notifications.
|
||||
@@ -1710,7 +1764,7 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state& local_sta
|
||||
// Listeners should decide which failures are non-fatal and swallow them.
|
||||
try {
|
||||
for (auto&& key: changed) {
|
||||
co_await do_on_change_notifications(addr, key, remote_map.at(key));
|
||||
co_await do_on_change_notifications(addr, key, remote_map.at(key), pid);
|
||||
}
|
||||
} catch (...) {
|
||||
on_fatal_internal_error(logger, format("Gossip change listener failed: {}", std::current_exception()));
|
||||
@@ -1725,9 +1779,9 @@ future<> gossiper::do_before_change_notifications(inet_address addr, const endpo
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) {
|
||||
co_await _subscribers.for_each([addr, state, value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_change(addr, state, value);
|
||||
future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id pid) {
|
||||
co_await _subscribers.for_each([addr, state, value, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
return subscriber->on_change(addr, state, value, pid);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1818,6 +1872,7 @@ void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_l
|
||||
}
|
||||
|
||||
future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map<application_state, versioned_value> preload_local_states, gms::advertise_myself advertise) {
|
||||
auto permit = co_await lock_endpoint(get_broadcast_address(), null_permit_id);
|
||||
co_await container().invoke_on_all([advertise] (gossiper& g) {
|
||||
if (!advertise) {
|
||||
g._advertise_myself = false;
|
||||
@@ -1838,7 +1893,7 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map
|
||||
|
||||
auto generation = local_state.get_heart_beat_state().get_generation();
|
||||
|
||||
co_await replicate(get_broadcast_address(), local_state);
|
||||
co_await replicate(get_broadcast_address(), local_state, permit.id());
|
||||
|
||||
logger.trace("gossip started with generation {}", generation);
|
||||
_enabled = true;
|
||||
@@ -1980,6 +2035,8 @@ future<> gossiper::add_saved_endpoint(inet_address ep) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto permit = co_await lock_endpoint(ep, null_permit_id);
|
||||
|
||||
//preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
|
||||
auto ep_state = endpoint_state();
|
||||
auto es = get_endpoint_state_for_endpoint_ptr(ep);
|
||||
@@ -2000,7 +2057,7 @@ future<> gossiper::add_saved_endpoint(inet_address ep) {
|
||||
}
|
||||
ep_state.mark_dead();
|
||||
_endpoint_state_map[ep] = ep_state;
|
||||
co_await replicate(ep, ep_state);
|
||||
co_await replicate(ep, ep_state, permit.id());
|
||||
_unreachable_endpoints[ep] = now();
|
||||
logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation());
|
||||
}
|
||||
@@ -2041,7 +2098,7 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
|
||||
auto& gossiper = *g;
|
||||
inet_address ep_addr = gossiper.get_broadcast_address();
|
||||
// for symmetry with other apply, use endpoint lock for our own address.
|
||||
auto permit = gossiper.lock_endpoint(ep_addr).get0();
|
||||
auto permit = gossiper.lock_endpoint(ep_addr, null_permit_id).get0();
|
||||
auto es = gossiper.get_endpoint_state_for_endpoint_ptr(ep_addr);
|
||||
if (!es) {
|
||||
auto err = format("endpoint_state_map does not contain endpoint = {}, application_states = {}",
|
||||
@@ -2074,6 +2131,13 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
|
||||
// Add to local application state
|
||||
es->add_application_state(state, value);
|
||||
}
|
||||
|
||||
// It is OK to replicate the new endpoint_state
|
||||
// after all application states were modified as a batch.
|
||||
// We guarantee that the on_change notifications
|
||||
// will be called in the order given by `states` anyhow.
|
||||
gossiper.replicate(ep_addr, *es, permit.id()).get();
|
||||
|
||||
for (auto& p : states) {
|
||||
auto& state = p.first;
|
||||
auto& value = p.second;
|
||||
@@ -2081,8 +2145,7 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
|
||||
// now we might defer again, so this could be reordered. But we've
|
||||
// ensured the whole set of values are monotonically versioned and
|
||||
// applied to endpoint state.
|
||||
gossiper.replicate(ep_addr, state, value).get();
|
||||
gossiper.do_on_change_notifications(ep_addr, state, value).get();
|
||||
gossiper.do_on_change_notifications(ep_addr, state, value, permit.id()).get();
|
||||
}
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to apply application_state: {}", ep);
|
||||
@@ -2281,15 +2344,16 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application
|
||||
* This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it
|
||||
* @param endpoint endpoint that has shut itself down
|
||||
*/
|
||||
future<> gossiper::mark_as_shutdown(const inet_address& endpoint) {
|
||||
future<> gossiper::mark_as_shutdown(const inet_address& endpoint, permit_id pid) {
|
||||
auto permit = co_await lock_endpoint(endpoint, pid);
|
||||
pid = permit.id();
|
||||
auto es = get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
if (es) {
|
||||
auto& ep_state = *es;
|
||||
ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true));
|
||||
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
|
||||
co_await replicate(endpoint, ep_state);
|
||||
co_await mark_dead(endpoint, ep_state);
|
||||
co_await convict(endpoint);
|
||||
co_await replicate(endpoint, ep_state, pid);
|
||||
co_await mark_dead(endpoint, ep_state, pid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/print.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include <seastar/util/source_location-compat.hh>
|
||||
#include "utils/atomic_vector.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
@@ -147,12 +148,29 @@ public:
|
||||
public:
|
||||
static clk::time_point inline now() noexcept { return clk::now(); }
|
||||
public:
|
||||
using endpoint_locks_map = utils::loading_shared_values<inet_address, semaphore>;
|
||||
struct endpoint_permit {
|
||||
endpoint_locks_map::entry_ptr _ptr;
|
||||
semaphore_units<> _units;
|
||||
struct endpoint_lock_entry {
|
||||
semaphore sem;
|
||||
permit_id pid;
|
||||
semaphore_units<> units;
|
||||
size_t holders = 0;
|
||||
|
||||
endpoint_lock_entry() noexcept;
|
||||
};
|
||||
future<endpoint_permit> lock_endpoint(inet_address);
|
||||
using endpoint_locks_map = utils::loading_shared_values<inet_address, endpoint_lock_entry>;
|
||||
class endpoint_permit {
|
||||
endpoint_locks_map::entry_ptr _ptr;
|
||||
permit_id _permit_id;
|
||||
inet_address _addr;
|
||||
std::string _caller;
|
||||
public:
|
||||
endpoint_permit(endpoint_locks_map::entry_ptr&& ptr, inet_address addr, std::string caller) noexcept;
|
||||
endpoint_permit(endpoint_permit&&) noexcept;
|
||||
~endpoint_permit();
|
||||
bool release() noexcept;
|
||||
const permit_id& id() const noexcept { return _permit_id; }
|
||||
};
|
||||
// Must be called on shard 0
|
||||
future<endpoint_permit> lock_endpoint(inet_address, permit_id pid, seastar::compat::source_location l = seastar::compat::source_location::current());
|
||||
|
||||
private:
|
||||
/* map where key is the endpoint and value is the state associated with the endpoint */
|
||||
@@ -229,13 +247,7 @@ private:
|
||||
void run();
|
||||
// Replicates given endpoint_state to all other shards.
|
||||
// The state state doesn't have to be kept alive around until completes.
|
||||
future<> replicate(inet_address, const endpoint_state&);
|
||||
// Replicates "states" from "src" to all other shards.
|
||||
// "src" and "states" must be kept alive until completes and must not change.
|
||||
future<> replicate(inet_address, const std::map<application_state, versioned_value>& src, const utils::chunked_vector<application_state>& states);
|
||||
// Replicates given value to all other shards.
|
||||
// The value must be kept alive until completes and not change.
|
||||
future<> replicate(inet_address, application_state key, const versioned_value& value);
|
||||
future<> replicate(inet_address, const endpoint_state&, permit_id);
|
||||
public:
|
||||
explicit gossiper(abort_source& as, const locator::shared_token_metadata& stm, netw::messaging_service& ms, const db::config& cfg, gossip_config gcfg);
|
||||
|
||||
@@ -269,11 +281,6 @@ public:
|
||||
|
||||
int64_t get_endpoint_downtime(inet_address ep) const noexcept;
|
||||
|
||||
/**
|
||||
* @param endpoint end point that is convicted.
|
||||
*/
|
||||
future<> convict(inet_address endpoint);
|
||||
|
||||
/**
|
||||
* Return either: the greatest heartbeat or application state
|
||||
*
|
||||
@@ -284,18 +291,23 @@ public:
|
||||
|
||||
|
||||
private:
|
||||
/**
|
||||
* @param endpoint end point that is convicted.
|
||||
*/
|
||||
future<> convict(inet_address endpoint);
|
||||
|
||||
/**
|
||||
* Removes the endpoint from gossip completely
|
||||
*
|
||||
* @param endpoint endpoint to be removed from the current membership.
|
||||
*/
|
||||
future<> evict_from_membership(inet_address endpoint);
|
||||
future<> evict_from_membership(inet_address endpoint, permit_id);
|
||||
public:
|
||||
/**
|
||||
* Removes the endpoint from Gossip but retains endpoint state
|
||||
*/
|
||||
future<> remove_endpoint(inet_address endpoint);
|
||||
future<> force_remove_endpoint(inet_address endpoint);
|
||||
future<> remove_endpoint(inet_address endpoint, permit_id);
|
||||
future<> force_remove_endpoint(inet_address endpoint, permit_id);
|
||||
private:
|
||||
/**
|
||||
* Quarantines the endpoint for QUARANTINE_DELAY
|
||||
@@ -329,7 +341,7 @@ public:
|
||||
* @param endpoint
|
||||
* @param host_id
|
||||
*/
|
||||
future<> advertise_token_removed(inet_address endpoint, locator::host_id host_id);
|
||||
future<> advertise_token_removed(inet_address endpoint, locator::host_id host_id, permit_id);
|
||||
|
||||
future<> unsafe_assassinate_endpoint(sstring address);
|
||||
|
||||
@@ -410,9 +422,11 @@ private:
|
||||
|
||||
void mark_alive(inet_address addr, endpoint_state& local_state);
|
||||
|
||||
future<> real_mark_alive(inet_address addr, endpoint_state& local_state);
|
||||
future<> real_mark_alive(inet_address addr);
|
||||
|
||||
future<> mark_dead(inet_address addr, endpoint_state& local_state);
|
||||
future<> mark_dead(inet_address addr, endpoint_state& local_state, permit_id);
|
||||
|
||||
future<> mark_as_shutdown(const inet_address& endpoint, permit_id);
|
||||
|
||||
/**
|
||||
* This method is called whenever there is a "big" change in ep state (a generation change for a known node).
|
||||
@@ -420,7 +434,7 @@ private:
|
||||
* @param ep endpoint
|
||||
* @param ep_state EndpointState for the endpoint
|
||||
*/
|
||||
future<> handle_major_state_change(inet_address ep, const endpoint_state& eps);
|
||||
future<> handle_major_state_change(inet_address ep, const endpoint_state& eps, permit_id);
|
||||
|
||||
public:
|
||||
bool is_alive(inet_address ep) const;
|
||||
@@ -440,13 +454,13 @@ private:
|
||||
future<> do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification);
|
||||
future<> apply_state_locally_without_listener_notification(std::unordered_map<inet_address, endpoint_state> map);
|
||||
|
||||
future<> apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state);
|
||||
future<> apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state, permit_id);
|
||||
|
||||
// notify that a local application state is going to change (doesn't get triggered for remote changes)
|
||||
future<> do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value);
|
||||
|
||||
// notify that an application state has changed
|
||||
future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value);
|
||||
future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value, permit_id);
|
||||
/* Request all the state for the endpoint in the g_digest */
|
||||
|
||||
void request_all(gossip_digest& g_digest, utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation);
|
||||
@@ -564,7 +578,6 @@ public:
|
||||
bool is_normal_ring_member(const inet_address& endpoint) const;
|
||||
bool is_cql_ready(const inet_address& endpoint) const;
|
||||
bool is_silent_shutdown_state(const endpoint_state& ep_state) const;
|
||||
future<> mark_as_shutdown(const inet_address& endpoint);
|
||||
void force_newer_generation();
|
||||
public:
|
||||
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;
|
||||
|
||||
@@ -18,12 +18,19 @@
|
||||
namespace gms {
|
||||
|
||||
/**
|
||||
* This is called by an instance of the IEndpointStateChangePublisher to notify
|
||||
* This is called by the gossiper to notify
|
||||
* interested parties about changes in the the state associated with any endpoint.
|
||||
* For instance if node A figures there is a changes in state for an endpoint B
|
||||
* it notifies all interested parties of this change. It is upto to the registered
|
||||
* instance to decide what he does with this change. Not all modules maybe interested
|
||||
* in all state changes.
|
||||
*
|
||||
* All notificaions that accept a permit_id are guaranteed to be called
|
||||
* under the respective endpoint lock. The permit_id must be provided
|
||||
* by the subscriber if it calls back gossiper functions that modify the same endpoint's
|
||||
* state, and therefore may acquire the same endpoint_lock - to prevent deadlock on the nested
|
||||
* call path. Calls from other code paths or for other endpoints should pass a
|
||||
* null_permit_id to indicate that no endpoint lock is held for them.
|
||||
*/
|
||||
class i_endpoint_state_change_subscriber {
|
||||
public:
|
||||
@@ -35,17 +42,17 @@ public:
|
||||
* @param endpoint endpoint for which the state change occurred.
|
||||
* @param epState state that actually changed for the above endpoint.
|
||||
*/
|
||||
virtual future<> on_join(inet_address endpoint, endpoint_state ep_state) = 0;
|
||||
virtual future<> on_join(inet_address endpoint, endpoint_state ep_state, permit_id) = 0;
|
||||
|
||||
virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_statekey, const versioned_value& newvalue) = 0;
|
||||
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) = 0;
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value, permit_id) = 0;
|
||||
|
||||
virtual future<> on_alive(inet_address endpoint, endpoint_state state) = 0;
|
||||
virtual future<> on_alive(inet_address endpoint, endpoint_state state, permit_id) = 0;
|
||||
|
||||
virtual future<> on_dead(inet_address endpoint, endpoint_state state) = 0;
|
||||
virtual future<> on_dead(inet_address endpoint, endpoint_state state, permit_id) = 0;
|
||||
|
||||
virtual future<> on_remove(inet_address endpoint) = 0;
|
||||
virtual future<> on_remove(inet_address endpoint, permit_id) = 0;
|
||||
|
||||
/**
|
||||
* Called whenever a node is restarted.
|
||||
@@ -53,7 +60,7 @@ public:
|
||||
* previously marked down. It will have only if {@code state.isAlive() == false}
|
||||
* as {@code state} is from before the restarted node is marked up.
|
||||
*/
|
||||
virtual future<> on_restart(inet_address endpoint, endpoint_state state) = 0;
|
||||
virtual future<> on_restart(inet_address endpoint, endpoint_state state, permit_id) = 0;
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
|
||||
@@ -2962,7 +2962,8 @@ public:
|
||||
}
|
||||
virtual future<> on_join(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state ep_state) override {
|
||||
gms::endpoint_state ep_state,
|
||||
gms::permit_id) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual future<> before_change(
|
||||
@@ -2975,26 +2976,31 @@ public:
|
||||
virtual future<> on_change(
|
||||
gms::inet_address endpoint,
|
||||
gms::application_state state,
|
||||
const gms::versioned_value& value) override {
|
||||
const gms::versioned_value& value,
|
||||
gms::permit_id) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual future<> on_alive(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state state) override {
|
||||
gms::endpoint_state state,
|
||||
gms::permit_id) override {
|
||||
return make_ready_future();
|
||||
}
|
||||
virtual future<> on_dead(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state state) override {
|
||||
gms::endpoint_state state,
|
||||
gms::permit_id) override {
|
||||
return remove_row_level_repair(endpoint);
|
||||
}
|
||||
virtual future<> on_remove(
|
||||
gms::inet_address endpoint) override {
|
||||
gms::inet_address endpoint,
|
||||
gms::permit_id) override {
|
||||
return remove_row_level_repair(endpoint);
|
||||
}
|
||||
virtual future<> on_restart(
|
||||
gms::inet_address endpoint,
|
||||
gms::endpoint_state ep_state) override {
|
||||
gms::endpoint_state ep_state,
|
||||
gms::permit_id) override {
|
||||
return remove_row_level_repair(endpoint);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -35,30 +35,30 @@ public:
|
||||
assert(_stopped);
|
||||
}
|
||||
|
||||
virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override {
|
||||
virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value, gms::permit_id) override {
|
||||
if (state == gms::application_state::LOAD) {
|
||||
_load_info[endpoint] = std::stod(value.value());
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override {
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id pid) override {
|
||||
auto* local_value = ep_state.get_application_state_ptr(gms::application_state::LOAD);
|
||||
if (local_value) {
|
||||
return on_change(endpoint, gms::application_state::LOAD, *local_value);
|
||||
return on_change(endpoint, gms::application_state::LOAD, *local_value, pid);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& newValue) override { return make_ready_future(); }
|
||||
|
||||
future<> on_alive(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_alive(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
|
||||
future<> on_dead(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_dead(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
|
||||
future<> on_restart(gms::inet_address endpoint, gms::endpoint_state) override { return make_ready_future(); }
|
||||
future<> on_restart(gms::inet_address endpoint, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
|
||||
virtual future<> on_remove(gms::inet_address endpoint) override {
|
||||
virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override {
|
||||
_load_info.erase(endpoint);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -1199,12 +1199,12 @@ future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id
|
||||
return db::schema_tables::get_column_mapping(sys_ks, table_id, v);
|
||||
}
|
||||
|
||||
future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) {
|
||||
schedule_schema_pull(endpoint, ep_state);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> migration_manager::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {
|
||||
future<> migration_manager::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value, gms::permit_id) {
|
||||
if (state == gms::application_state::SCHEMA) {
|
||||
auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
|
||||
@@ -1218,7 +1218,7 @@ future<> migration_manager::on_change(gms::inet_address endpoint, gms::applicati
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) {
|
||||
schedule_schema_pull(endpoint, state);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -191,12 +191,12 @@ public:
|
||||
future<schema_ptr> get_schema_for_write(table_schema_version, netw::msg_addr from, netw::messaging_service& ms, abort_source* as = nullptr);
|
||||
|
||||
private:
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) override;
|
||||
virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state) override { return make_ready_future(); }
|
||||
virtual future<> on_remove(gms::inet_address endpoint) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state) override { return make_ready_future(); }
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override;
|
||||
virtual future<> on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value, gms::permit_id) override;
|
||||
virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override;
|
||||
virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_statekey, const gms::versioned_value& newvalue) override { return make_ready_future(); }
|
||||
|
||||
public:
|
||||
|
||||
@@ -266,7 +266,7 @@ future<> view_update_backlog_broker::stop() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value) {
|
||||
future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value, gms::permit_id) {
|
||||
if (state == gms::application_state::VIEW_BACKLOG) {
|
||||
size_t current;
|
||||
size_t max;
|
||||
@@ -296,7 +296,7 @@ future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, gms::
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> view_update_backlog_broker::on_remove(gms::inet_address endpoint) {
|
||||
future<> view_update_backlog_broker::on_remove(gms::inet_address endpoint, gms::permit_id) {
|
||||
_sp.local()._view_update_backlogs.erase(endpoint);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ public:
|
||||
{}
|
||||
|
||||
virtual future<>
|
||||
on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override {
|
||||
on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override {
|
||||
return on_endpoint_change(endpoint, ep_state);
|
||||
}
|
||||
|
||||
@@ -111,13 +111,13 @@ public:
|
||||
|
||||
virtual future<>
|
||||
on_change(gms::inet_address endpoint,
|
||||
gms::application_state state, const gms::versioned_value& value) override {
|
||||
gms::application_state state, const gms::versioned_value& value, gms::permit_id) override {
|
||||
// Raft server ID never changes - do nothing
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<>
|
||||
on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state) override {
|
||||
on_alive(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override {
|
||||
co_await utils::get_local_injector().inject_with_handler("raft_group_registry::on_alive", [endpoint, &ep_state] (auto& handler) -> future<> {
|
||||
auto app_state_ptr = ep_state.get_application_state_ptr(gms::application_state::HOST_ID);
|
||||
if (!app_state_ptr) {
|
||||
@@ -139,12 +139,12 @@ public:
|
||||
}
|
||||
|
||||
virtual future<>
|
||||
on_dead(gms::inet_address endpoint, gms::endpoint_state state) override {
|
||||
on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<>
|
||||
on_remove(gms::inet_address endpoint) override {
|
||||
on_remove(gms::inet_address endpoint, gms::permit_id) override {
|
||||
// The mapping is removed when the server is removed from
|
||||
// Raft configuration, not when it's dead or alive, or
|
||||
// removed
|
||||
@@ -152,7 +152,7 @@ public:
|
||||
}
|
||||
|
||||
virtual future<>
|
||||
on_restart(gms::inet_address endpoint, gms::endpoint_state ep_state) override {
|
||||
on_restart(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override {
|
||||
return on_endpoint_change(endpoint, ep_state);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include "gms/endpoint_state.hh"
|
||||
#include "locator/snitch_base.hh"
|
||||
#include "locator/production_snitch_base.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
@@ -25,6 +26,7 @@
|
||||
#include <seastar/core/smp.hh>
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "seastar/core/on_internal_error.hh"
|
||||
#include "seastar/core/scollectd.hh"
|
||||
#include "service/raft/group0_state_machine.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/UUID.hh"
|
||||
@@ -338,7 +340,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
for (const auto& id: _topology_state_machine._topology.left_nodes) {
|
||||
auto ip = co_await id2ip(id);
|
||||
if (_gossiper.get_live_members().contains(ip) || _gossiper.get_unreachable_members().contains(ip)) {
|
||||
co_await remove_endpoint(ip);
|
||||
co_await remove_endpoint(ip, gms::null_permit_id);
|
||||
}
|
||||
|
||||
// FIXME: when removing a node from the cluster through `removenode`, we should ban it early,
|
||||
@@ -2805,7 +2807,7 @@ storage_service::get_range_to_address_map(locator::vnode_effective_replication_m
|
||||
co_return co_await construct_range_to_endpoint_map(erm, co_await get_all_ranges(sorted_tokens));
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node) {
|
||||
future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node, gms::permit_id) {
|
||||
try {
|
||||
slogger.info("handle_state_replacing: Waiting for replacing node {} to be alive on all shards", replacing_node);
|
||||
co_await _gossiper.wait_alive({replacing_node}, std::chrono::milliseconds(5 * 1000));
|
||||
@@ -2818,8 +2820,8 @@ future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_t
|
||||
co_await update_topology_change_info(tmptr, ::format("handle_state_replacing {}", replacing_node));
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
|
||||
slogger.debug("endpoint={} handle_state_bootstrap", endpoint);
|
||||
future<> storage_service::handle_state_bootstrap(inet_address endpoint, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} handle_state_bootstrap: permit_id={}", endpoint, pid);
|
||||
// explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
|
||||
auto tokens = get_tokens_for(endpoint);
|
||||
|
||||
@@ -2851,8 +2853,8 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_normal(inet_address endpoint) {
|
||||
slogger.debug("endpoint={} handle_state_normal", endpoint);
|
||||
future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} handle_state_normal: permit_id={}", endpoint, pid);
|
||||
|
||||
if (_raft_topology_change_enabled) {
|
||||
slogger.debug("ignore handle_state_normal since topology change are using raft");
|
||||
@@ -2994,7 +2996,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
|
||||
tmlock.reset();
|
||||
|
||||
for (auto ep : endpoints_to_remove) {
|
||||
co_await remove_endpoint(ep);
|
||||
co_await remove_endpoint(ep, ep == endpoint ? pid : gms::null_permit_id);
|
||||
}
|
||||
slogger.debug("handle_state_normal: endpoint={} is_normal_token_owner={} endpoint_to_remove={} owned_tokens={}", endpoint, is_normal_token_owner, endpoints_to_remove.contains(endpoint), owned_tokens);
|
||||
if (!owned_tokens.empty() && !endpoints_to_remove.count(endpoint)) {
|
||||
@@ -3021,8 +3023,8 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
|
||||
_normal_state_handled_on_boot.insert(endpoint);
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_leaving(inet_address endpoint) {
|
||||
slogger.debug("endpoint={} handle_state_leaving", endpoint);
|
||||
future<> storage_service::handle_state_leaving(inet_address endpoint, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} handle_state_leaving: permit_id={}", endpoint, pid);
|
||||
|
||||
auto tokens = get_tokens_for(endpoint);
|
||||
|
||||
@@ -3058,7 +3060,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) {
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_left(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
future<> storage_service::handle_state_left(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id pid) {
|
||||
|
||||
if (_raft_topology_change_enabled) {
|
||||
slogger.debug("ignore handle_state_left since topology change are using raft");
|
||||
@@ -3083,15 +3085,15 @@ future<> storage_service::handle_state_left(inet_address endpoint, std::vector<s
|
||||
slogger.warn("handle_state_left: Get tokens from token_metadata, node={}, tokens={}", endpoint, tokens_from_tm);
|
||||
tokens = std::unordered_set<dht::token>(tokens_from_tm.begin(), tokens_from_tm.end());
|
||||
}
|
||||
co_await excise(tokens, endpoint, extract_expire_time(pieces));
|
||||
co_await excise(tokens, endpoint, extract_expire_time(pieces), pid);
|
||||
}
|
||||
|
||||
void storage_service::handle_state_moving(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
void storage_service::handle_state_moving(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id) {
|
||||
throw std::runtime_error(::format("Move operation is not supported anymore, endpoint={}", endpoint));
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_removed(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
slogger.debug("endpoint={} handle_state_removed", endpoint);
|
||||
future<> storage_service::handle_state_removed(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} handle_state_removed: permit_id={}", endpoint, pid);
|
||||
|
||||
if (endpoint == get_broadcast_address()) {
|
||||
slogger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?");
|
||||
@@ -3107,22 +3109,22 @@ future<> storage_service::handle_state_removed(inet_address endpoint, std::vecto
|
||||
auto state = pieces[0];
|
||||
auto remove_tokens = get_token_metadata().get_tokens(endpoint);
|
||||
std::unordered_set<token> tmp(remove_tokens.begin(), remove_tokens.end());
|
||||
co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces));
|
||||
co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces), pid);
|
||||
} else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it
|
||||
add_expire_time_if_found(endpoint, extract_expire_time(pieces));
|
||||
co_await remove_endpoint(endpoint);
|
||||
co_await remove_endpoint(endpoint, pid);
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
|
||||
slogger.debug("endpoint={} on_join", endpoint);
|
||||
future<> storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} on_join: permit_id={}", endpoint, pid);
|
||||
for (const auto& e : ep_state.get_application_state_map()) {
|
||||
co_await on_change(endpoint, e.first, e.second);
|
||||
co_await on_change(endpoint, e.first, e.second, pid);
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
slogger.debug("endpoint={} on_alive", endpoint);
|
||||
future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} on_alive: permit_id={}", endpoint, pid);
|
||||
bool is_normal_token_owner = get_token_metadata().is_normal_token_owner(endpoint);
|
||||
if (is_normal_token_owner) {
|
||||
co_await notify_up(endpoint);
|
||||
@@ -3139,8 +3141,8 @@ future<> storage_service::before_change(gms::inet_address endpoint, gms::endpoin
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> storage_service::on_change(inet_address endpoint, application_state state, const versioned_value& value) {
|
||||
slogger.debug("endpoint={} on_change: app_state={}, versioned_value={}", endpoint, state, value);
|
||||
future<> storage_service::on_change(inet_address endpoint, application_state state, const versioned_value& value, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} on_change: app_state={}, versioned_value={}, permit_id={}", endpoint, state, value, pid);
|
||||
if (state == application_state::STATUS) {
|
||||
std::vector<sstring> pieces;
|
||||
boost::split(pieces, value.value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
|
||||
@@ -3150,18 +3152,18 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
|
||||
}
|
||||
const sstring& move_name = pieces[0];
|
||||
if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) {
|
||||
co_await handle_state_bootstrap(endpoint);
|
||||
co_await handle_state_bootstrap(endpoint, pid);
|
||||
} else if (move_name == sstring(versioned_value::STATUS_NORMAL) ||
|
||||
move_name == sstring(versioned_value::SHUTDOWN)) {
|
||||
co_await handle_state_normal(endpoint);
|
||||
co_await handle_state_normal(endpoint, pid);
|
||||
} else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) {
|
||||
co_await handle_state_removed(endpoint, std::move(pieces));
|
||||
co_await handle_state_removed(endpoint, std::move(pieces), pid);
|
||||
} else if (move_name == sstring(versioned_value::STATUS_LEAVING)) {
|
||||
co_await handle_state_leaving(endpoint);
|
||||
co_await handle_state_leaving(endpoint, pid);
|
||||
} else if (move_name == sstring(versioned_value::STATUS_LEFT)) {
|
||||
co_await handle_state_left(endpoint, std::move(pieces));
|
||||
co_await handle_state_left(endpoint, std::move(pieces), pid);
|
||||
} else if (move_name == sstring(versioned_value::STATUS_MOVING)) {
|
||||
handle_state_moving(endpoint, std::move(pieces));
|
||||
handle_state_moving(endpoint, std::move(pieces), pid);
|
||||
} else if (move_name == sstring(versioned_value::HIBERNATE)) {
|
||||
slogger.warn("endpoint={} went into HIBERNATE state, this is no longer supported. Use a new version to perform the replace operation.", endpoint);
|
||||
} else {
|
||||
@@ -3201,8 +3203,8 @@ future<> storage_service::maybe_reconnect_to_preferred_ip(inet_address ep, inet_
|
||||
}
|
||||
|
||||
|
||||
future<> storage_service::on_remove(gms::inet_address endpoint) {
|
||||
slogger.debug("endpoint={} on_remove", endpoint);
|
||||
future<> storage_service::on_remove(gms::inet_address endpoint, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} on_remove: permit_id={}", endpoint, pid);
|
||||
auto tmlock = co_await get_token_metadata_lock();
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
tmptr->remove_endpoint(endpoint);
|
||||
@@ -3210,16 +3212,16 @@ future<> storage_service::on_remove(gms::inet_address endpoint) {
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
future<> storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
slogger.debug("endpoint={} on_dead", endpoint);
|
||||
future<> storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} on_dead: permit_id={}", endpoint, pid);
|
||||
return notify_down(endpoint);
|
||||
}
|
||||
|
||||
future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) {
|
||||
slogger.debug("endpoint={} on_restart", endpoint);
|
||||
future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id pid) {
|
||||
slogger.debug("endpoint={} on_restart: permit_id={}", endpoint, pid);
|
||||
// If we have restarted before the node was even marked down, we need to reset the connection pool
|
||||
if (state.is_alive()) {
|
||||
return on_dead(endpoint, state);
|
||||
return on_dead(endpoint, state, pid);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
@@ -3605,8 +3607,8 @@ future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::i
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::remove_endpoint(inet_address endpoint) {
|
||||
co_await _gossiper.remove_endpoint(endpoint);
|
||||
future<> storage_service::remove_endpoint(inet_address endpoint, gms::permit_id pid) {
|
||||
co_await _gossiper.remove_endpoint(endpoint, pid);
|
||||
try {
|
||||
co_await _sys_ks.local().remove_endpoint(endpoint);
|
||||
} catch (...) {
|
||||
@@ -4578,9 +4580,11 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
|
||||
|
||||
// Step 7: Announce the node has left
|
||||
slogger.info("removenode[{}]: Advertising that the node left the ring", uuid);
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id).get();
|
||||
auto permit = ss._gossiper.lock_endpoint(endpoint, gms::null_permit_id).get();
|
||||
const auto& pid = permit.id();
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id, pid).get();
|
||||
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
|
||||
ss.excise(std::move(tmp), endpoint).get();
|
||||
ss.excise(std::move(tmp), endpoint, pid).get();
|
||||
removed_from_token_ring = true;
|
||||
slogger.info("removenode[{}]: Finished removing the node from the ring", uuid);
|
||||
} catch (...) {
|
||||
@@ -5255,10 +5259,10 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint) {
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, gms::permit_id pid) {
|
||||
slogger.info("Removing tokens {} for {}", tokens, endpoint);
|
||||
// FIXME: HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
|
||||
co_await remove_endpoint(endpoint);
|
||||
co_await remove_endpoint(endpoint, pid);
|
||||
auto tmlock = std::make_optional(co_await get_token_metadata_lock());
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
tmptr->remove_endpoint(endpoint);
|
||||
@@ -5271,9 +5275,9 @@ future<> storage_service::excise(std::unordered_set<token> tokens, inet_address
|
||||
co_await notify_left(endpoint);
|
||||
}
|
||||
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, int64_t expire_time) {
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, int64_t expire_time, gms::permit_id pid) {
|
||||
add_expire_time_if_found(endpoint, expire_time);
|
||||
return excise(tokens, endpoint);
|
||||
return excise(tokens, endpoint, pid);
|
||||
}
|
||||
|
||||
future<> storage_service::leave_ring() {
|
||||
@@ -5977,9 +5981,11 @@ future<> storage_service::force_remove_completion() {
|
||||
slogger.warn("No host_id is found for endpoint {}", endpoint);
|
||||
continue;
|
||||
}
|
||||
co_await ss._gossiper.advertise_token_removed(endpoint, host_id);
|
||||
auto permit = co_await ss._gossiper.lock_endpoint(endpoint, gms::null_permit_id);
|
||||
const auto& pid = permit.id();
|
||||
co_await ss._gossiper.advertise_token_removed(endpoint, host_id, pid);
|
||||
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
|
||||
co_await ss.excise(tokens_set, endpoint);
|
||||
co_await ss.excise(tokens_set, endpoint, pid);
|
||||
|
||||
slogger.info("force_remove_completion: removing endpoint {} from group 0", endpoint);
|
||||
assert(ss._group0);
|
||||
|
||||
@@ -409,7 +409,7 @@ public:
|
||||
locator::vnode_effective_replication_map_ptr erm,
|
||||
const dht::token_range_vector& ranges) const;
|
||||
public:
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
|
||||
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state, gms::permit_id) override;
|
||||
virtual future<> before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override;
|
||||
/*
|
||||
* Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
|
||||
@@ -443,11 +443,11 @@ public:
|
||||
* Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
|
||||
* you should never bootstrap a new node during a removenode, decommission or move.
|
||||
*/
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) override;
|
||||
virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual future<> on_remove(gms::inet_address endpoint) override;
|
||||
virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value, gms::permit_id) override;
|
||||
virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override;
|
||||
virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override;
|
||||
virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override;
|
||||
virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state state, gms::permit_id) override;
|
||||
|
||||
public:
|
||||
// For migration_listener
|
||||
@@ -491,7 +491,7 @@ private:
|
||||
*
|
||||
* @param endpoint bootstrapping node
|
||||
*/
|
||||
future<> handle_state_bootstrap(inet_address endpoint);
|
||||
future<> handle_state_bootstrap(inet_address endpoint, gms::permit_id);
|
||||
|
||||
/**
|
||||
* Handle node move to normal state. That is, node is entering token ring and participating
|
||||
@@ -499,14 +499,14 @@ private:
|
||||
*
|
||||
* @param endpoint node
|
||||
*/
|
||||
future<> handle_state_normal(inet_address endpoint);
|
||||
future<> handle_state_normal(inet_address endpoint, gms::permit_id);
|
||||
|
||||
/**
|
||||
* Handle node preparing to leave the ring
|
||||
*
|
||||
* @param endpoint node
|
||||
*/
|
||||
future<> handle_state_leaving(inet_address endpoint);
|
||||
future<> handle_state_leaving(inet_address endpoint, gms::permit_id);
|
||||
|
||||
/**
|
||||
* Handle node leaving the ring. This will happen when a node is decommissioned
|
||||
@@ -514,7 +514,7 @@ private:
|
||||
* @param endpoint If reason for leaving is decommission, endpoint is the leaving node.
|
||||
* @param pieces STATE_LEFT,token
|
||||
*/
|
||||
future<> handle_state_left(inet_address endpoint, std::vector<sstring> pieces);
|
||||
future<> handle_state_left(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id);
|
||||
|
||||
/**
|
||||
* Handle node moving inside the ring.
|
||||
@@ -522,7 +522,7 @@ private:
|
||||
* @param endpoint moving endpoint address
|
||||
* @param pieces STATE_MOVING, token
|
||||
*/
|
||||
void handle_state_moving(inet_address endpoint, std::vector<sstring> pieces);
|
||||
void handle_state_moving(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id);
|
||||
|
||||
/**
|
||||
* Handle notification that a node being actively removed from the ring via 'removenode'
|
||||
@@ -530,17 +530,17 @@ private:
|
||||
* @param endpoint node
|
||||
* @param pieces is REMOVED_TOKEN (node is gone)
|
||||
*/
|
||||
future<> handle_state_removed(inet_address endpoint, std::vector<sstring> pieces);
|
||||
future<> handle_state_removed(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id);
|
||||
|
||||
future<>
|
||||
handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node);
|
||||
handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node, gms::permit_id);
|
||||
|
||||
private:
|
||||
future<> excise(std::unordered_set<token> tokens, inet_address endpoint);
|
||||
future<> excise(std::unordered_set<token> tokens, inet_address endpoint, long expire_time);
|
||||
future<> excise(std::unordered_set<token> tokens, inet_address endpoint, gms::permit_id);
|
||||
future<> excise(std::unordered_set<token> tokens, inet_address endpoint, long expire_time, gms::permit_id);
|
||||
|
||||
/** unlike excise we just need this endpoint gone without going through any notifications **/
|
||||
future<> remove_endpoint(inet_address endpoint);
|
||||
future<> remove_endpoint(inet_address endpoint, gms::permit_id pid);
|
||||
|
||||
void add_expire_time_if_found(inet_address endpoint, int64_t expire_time);
|
||||
|
||||
|
||||
@@ -39,15 +39,15 @@ public:
|
||||
|
||||
seastar::future<> stop();
|
||||
|
||||
virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&) override;
|
||||
virtual future<> on_change(gms::inet_address, gms::application_state, const gms::versioned_value&, gms::permit_id) override;
|
||||
|
||||
virtual future<> on_remove(gms::inet_address) override;
|
||||
virtual future<> on_remove(gms::inet_address, gms::permit_id) override;
|
||||
|
||||
virtual future<> on_join(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_join(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> before_change(gms::inet_address, gms::endpoint_state, gms::application_state, const gms::versioned_value&) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address, gms::endpoint_state) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_restart(gms::inet_address, gms::endpoint_state, gms::permit_id) override { return make_ready_future(); }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -328,7 +328,7 @@ void stream_manager::fail_all_sessions() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> stream_manager::on_remove(inet_address endpoint) {
|
||||
future<> stream_manager::on_remove(inet_address endpoint, gms::permit_id) {
|
||||
if (has_peer(endpoint)) {
|
||||
sslog.info("stream_manager: Close all stream_session with peer = {} in on_remove", endpoint);
|
||||
//FIXME: discarded future.
|
||||
@@ -341,7 +341,7 @@ future<> stream_manager::on_remove(inet_address endpoint) {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state) {
|
||||
future<> stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state, gms::permit_id) {
|
||||
if (has_peer(endpoint)) {
|
||||
sslog.info("stream_manager: Close all stream_session with peer = {} in on_restart", endpoint);
|
||||
//FIXME: discarded future.
|
||||
@@ -354,7 +354,7 @@ future<> stream_manager::on_restart(inet_address endpoint, endpoint_state ep_sta
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) {
|
||||
future<> stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state, gms::permit_id) {
|
||||
if (has_peer(endpoint)) {
|
||||
sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint);
|
||||
//FIXME: discarded future.
|
||||
|
||||
@@ -165,13 +165,13 @@ public:
|
||||
shared_ptr<stream_session> get_session(streaming::plan_id plan_id, gms::inet_address from, const char* verb, std::optional<table_id> cf_id = {});
|
||||
|
||||
public:
|
||||
virtual future<> on_join(inet_address endpoint, endpoint_state ep_state) override { return make_ready_future(); }
|
||||
virtual future<> on_join(inet_address endpoint, endpoint_state ep_state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> before_change(inet_address endpoint, endpoint_state current_state, application_state new_state_key, const versioned_value& new_value) override { return make_ready_future(); }
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(inet_address endpoint, endpoint_state state) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(inet_address endpoint, endpoint_state state) override;
|
||||
virtual future<> on_remove(inet_address endpoint) override;
|
||||
virtual future<> on_restart(inet_address endpoint, endpoint_state ep_state) override;
|
||||
virtual future<> on_change(inet_address endpoint, application_state state, const versioned_value& value, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_alive(inet_address endpoint, endpoint_state state, gms::permit_id) override { return make_ready_future(); }
|
||||
virtual future<> on_dead(inet_address endpoint, endpoint_state state, gms::permit_id) override;
|
||||
virtual future<> on_remove(inet_address endpoint, gms::permit_id) override;
|
||||
virtual future<> on_restart(inet_address endpoint, endpoint_state ep_state, gms::permit_id) override;
|
||||
|
||||
private:
|
||||
void fail_all_sessions();
|
||||
|
||||
@@ -117,7 +117,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline UUID null_uuid() noexcept {
|
||||
inline constexpr UUID null_uuid() noexcept {
|
||||
return UUID();
|
||||
}
|
||||
|
||||
@@ -210,9 +210,9 @@ struct tagged_uuid {
|
||||
return bool(id);
|
||||
}
|
||||
static tagged_uuid create_random_id() noexcept { return tagged_uuid{utils::make_random_uuid()}; }
|
||||
static tagged_uuid create_null_id() noexcept { return tagged_uuid{utils::null_uuid()}; }
|
||||
static constexpr tagged_uuid create_null_id() noexcept { return tagged_uuid{}; }
|
||||
explicit tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {}
|
||||
tagged_uuid() = default;
|
||||
constexpr tagged_uuid() = default;
|
||||
|
||||
const utils::UUID& uuid() const noexcept {
|
||||
return id;
|
||||
|
||||
Reference in New Issue
Block a user