Merge 'More gossiper cleanups' from Gleb Natapov
The PR contains more code cleanups, mostly in gossiper. Dropping more gossiper state leaving only NORMAL and SHUTDOWN. All other states are checked against topology state. Those two are left because SHUTDOWN state is propagated through gossiper only and when the node is not in SHUTDOWN it should be in some other state. No need to backport. Cleanups. Closes scylladb/scylladb#29129 * https://github.com/scylladb/scylladb: storage_service: cleanup unused code storage_service: simplify get_peer_info_for_update gossiper: send shutdown notifications in parallel gms: remove unused code virtual_tables: no need to call gossiper if we already know that the node is in shutdown gossiper: print node state from raft topology in the logs gossiper: use is_shutdown instead of code it manually gossiper: mark endpoint_state(inet_address ip) constructor as explicit gossiper: remove unused code gossiper: drop last use of LEFT state and drop the state gossiper: drop unused STATUS_BOOTSTRAPPING state gossiper: rename is_dead_state to is_left since this is all that the function checks now. gossiper: use raft topology state instead of gossiper one when checking node's state storage_service: drop check_for_endpoint_collision function storage_service: drop is_first_node function gossiper: remove unused REMOVED_TOKEN state gossiper: remove unused advertise_token_removed function
This commit is contained in:
@@ -99,7 +99,7 @@ public:
|
||||
|
||||
set_cell(cr, "up", gossiper.is_alive(hostid));
|
||||
if (gossiper.is_shutdown(endpoint)) {
|
||||
set_cell(cr, "status", gossiper.get_gossip_status(endpoint));
|
||||
set_cell(cr, "status", "shutdown");
|
||||
} else {
|
||||
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ public:
|
||||
_ip == other._ip;
|
||||
}
|
||||
|
||||
endpoint_state(inet_address ip) noexcept
|
||||
explicit endpoint_state(inet_address ip) noexcept
|
||||
: _heart_beat_state()
|
||||
, _update_timestamp(clk::now())
|
||||
, _ip(ip)
|
||||
|
||||
197
gms/gossiper.cc
197
gms/gossiper.cc
@@ -59,7 +59,6 @@ using clk = gossiper::clk;
|
||||
static logging::logger logger("gossip");
|
||||
|
||||
constexpr std::chrono::milliseconds gossiper::INTERVAL;
|
||||
constexpr std::chrono::hours gossiper::A_VERY_LONG_TIME;
|
||||
constexpr generation_type::value_type gossiper::MAX_GENERATION_DIFFERENCE;
|
||||
|
||||
const sstring& gossiper::get_cluster_name() const noexcept {
|
||||
@@ -648,7 +647,7 @@ future<> gossiper::do_apply_state_locally(locator::host_id node, endpoint_state
|
||||
}
|
||||
// Re-rake after apply_new_states
|
||||
es = get_endpoint_state_ptr(node);
|
||||
if (!is_alive(es->get_host_id()) && !is_dead_state(*es) && !shadow_round) { // unless of course, it was dead
|
||||
if (!is_alive(es->get_host_id()) && !is_left(*es) && !shadow_round) { // unless of course, it was dead
|
||||
mark_alive(es);
|
||||
}
|
||||
} else {
|
||||
@@ -767,7 +766,7 @@ future<> gossiper::remove_endpoint(locator::host_id endpoint, permit_id pid) {
|
||||
|
||||
if (was_alive) {
|
||||
try {
|
||||
logger.info("InetAddress {}/{} is now DOWN, status = {}", state->get_host_id(), ip, get_gossip_status(*state));
|
||||
logger.info("InetAddress {}/{} is now DOWN, status = {}", host_id, ip, get_node_status(host_id));
|
||||
co_await do_on_dead_notifications(ip, std::move(state), pid);
|
||||
} catch (...) {
|
||||
logger.warn("Fail to call on_dead callback: {}", std::current_exception());
|
||||
@@ -1174,10 +1173,10 @@ future<> gossiper::unregister_(shared_ptr<i_endpoint_state_change_subscriber> su
|
||||
|
||||
std::set<locator::host_id> gossiper::get_live_members() const {
|
||||
std::set<locator::host_id> live_members(_live_endpoints.begin(), _live_endpoints.end());
|
||||
auto myip = get_broadcast_address();
|
||||
auto myid = my_host_id();
|
||||
logger.debug("live_members before={}", live_members);
|
||||
if (!is_shutdown(myip)) {
|
||||
live_members.insert(my_host_id());
|
||||
if (!is_shutdown(myid)) {
|
||||
live_members.insert(myid);
|
||||
}
|
||||
logger.debug("live_members after={}", live_members);
|
||||
return live_members;
|
||||
@@ -1248,7 +1247,6 @@ future<> gossiper::evict_from_membership(locator::host_id hid, permit_id pid) {
|
||||
}
|
||||
g._endpoint_state_map.erase(hid);
|
||||
});
|
||||
_expire_time_endpoint_map.erase(hid);
|
||||
logger.debug("evicting {} from gossip", hid);
|
||||
}
|
||||
|
||||
@@ -1321,21 +1319,6 @@ future<> gossiper::replicate(endpoint_state es, permit_id pid) {
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::advertise_token_removed(locator::host_id host_id, permit_id pid) {
|
||||
auto permit = co_await lock_endpoint(host_id, pid);
|
||||
pid = permit.id();
|
||||
auto eps = get_endpoint_state(host_id);
|
||||
eps.update_timestamp(); // make sure we don't evict it too soon
|
||||
eps.get_heart_beat_state().force_newer_generation_unsafe();
|
||||
auto expire_time = compute_expire_time();
|
||||
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
|
||||
logger.info("Completing removal of {}", host_id);
|
||||
add_expire_time_for_endpoint(host_id, expire_time);
|
||||
co_await replicate(std::move(eps), pid);
|
||||
// ensure at least one gossip round occurs before returning
|
||||
co_await sleep_abortable(INTERVAL * 2, _abort_source);
|
||||
}
|
||||
|
||||
future<> gossiper::assassinate_endpoint(sstring address) {
|
||||
throw std::runtime_error("Assassinating endpoint is not supported in topology over raft mode");
|
||||
}
|
||||
@@ -1368,13 +1351,10 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
|
||||
std::uniform_real_distribution<double> dist(0, 1);
|
||||
double rand_dbl = dist(_random_engine);
|
||||
if (rand_dbl < prob) {
|
||||
std::set<locator::host_id> addrs;
|
||||
for (auto&& x : _unreachable_endpoints) {
|
||||
// Ignore the node which is decommissioned
|
||||
if (get_gossip_status(_address_map.get(x.first)) != sstring(versioned_value::STATUS_LEFT)) {
|
||||
addrs.insert(x.first);
|
||||
}
|
||||
}
|
||||
auto addrs = _unreachable_endpoints | std::ranges::views::keys | std::views::filter([this] (auto ep) {
|
||||
// Ignore the node which is no longer part of the cluster
|
||||
return !_topo_sm._topology.left_nodes.contains(raft::server_id(ep.uuid()));
|
||||
}) | std::ranges::to<std::set>();
|
||||
logger.trace("do_gossip_to_unreachable_member: live_endpoint nr={} unreachable_endpoints nr={}",
|
||||
live_endpoint_count, unreachable_endpoint_count);
|
||||
return send_gossip(message, addrs);
|
||||
@@ -1383,17 +1363,6 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
clk::time_point gossiper::get_expire_time_for_endpoint(locator::host_id id) const noexcept {
|
||||
/* default expire_time is A_VERY_LONG_TIME */
|
||||
auto it = _expire_time_endpoint_map.find(id);
|
||||
if (it == _expire_time_endpoint_map.end()) {
|
||||
return compute_expire_time();
|
||||
} else {
|
||||
auto stored_time = it->second;
|
||||
return stored_time;
|
||||
}
|
||||
}
|
||||
|
||||
endpoint_state_ptr gossiper::get_endpoint_state_ptr(locator::host_id ep) const noexcept {
|
||||
auto it = _endpoint_state_map.find(ep);
|
||||
if (it == _endpoint_state_map.end()) {
|
||||
@@ -1420,7 +1389,7 @@ endpoint_state& gossiper::my_endpoint_state() {
|
||||
auto ep = get_broadcast_address();
|
||||
auto it = _endpoint_state_map.find(id);
|
||||
if (it == _endpoint_state_map.end()) {
|
||||
it = _endpoint_state_map.emplace(id, make_endpoint_state_ptr({ep})).first;
|
||||
it = _endpoint_state_map.emplace(id, make_endpoint_state_ptr(endpoint_state{ep})).first;
|
||||
}
|
||||
return const_cast<endpoint_state&>(*it->second);
|
||||
}
|
||||
@@ -1634,9 +1603,8 @@ future<> gossiper::real_mark_alive(locator::host_id host_id) {
|
||||
}
|
||||
|
||||
// Do not mark a node with status shutdown as UP.
|
||||
auto status = sstring(get_gossip_status(*es));
|
||||
if (status == sstring(versioned_value::SHUTDOWN)) {
|
||||
logger.warn("Skip marking node {} with status = {} as UP", host_id, status);
|
||||
if (is_shutdown(*es)) {
|
||||
logger.warn("Skip marking node {} with status = shutdown as UP", host_id);
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -1649,7 +1617,6 @@ future<> gossiper::real_mark_alive(locator::host_id host_id) {
|
||||
auto [it_, inserted] = data.live.insert(addr);
|
||||
was_live = !inserted;
|
||||
});
|
||||
_expire_time_endpoint_map.erase(host_id);
|
||||
if (was_live) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1662,7 +1629,7 @@ future<> gossiper::real_mark_alive(locator::host_id host_id) {
|
||||
|
||||
auto addr = es->get_ip();
|
||||
|
||||
logger.info("InetAddress {}/{} is now UP, status = {}", host_id, addr, status);
|
||||
logger.info("InetAddress {}/{} is now UP, status = {}", host_id, addr, get_node_status(host_id));
|
||||
|
||||
co_await _subscribers.for_each([addr, host_id, es, pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
|
||||
co_await subscriber->on_alive(addr, host_id, es, pid);
|
||||
@@ -1678,7 +1645,7 @@ future<> gossiper::mark_dead(locator::host_id addr, endpoint_state_ptr state, pe
|
||||
data.live.erase(addr);
|
||||
data.unreachable[addr] = now();
|
||||
});
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(*state));
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_node_status(addr));
|
||||
co_await do_on_dead_notifications(state->get_ip(), std::move(state), pid);
|
||||
}
|
||||
|
||||
@@ -1688,14 +1655,14 @@ future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid,
|
||||
|
||||
endpoint_state_ptr eps_old = get_endpoint_state_ptr(ep);
|
||||
|
||||
if (!is_dead_state(eps) && !shadow_round) {
|
||||
if (!is_left(eps) && !shadow_round) {
|
||||
if (_endpoint_state_map.contains(ep)) {
|
||||
logger.info("Node {} has restarted, now UP, status = {}", ep, get_gossip_status(eps));
|
||||
logger.info("Node {} has restarted, now UP, status = {}", ep, get_node_status(ep));
|
||||
} else {
|
||||
logger.debug("Node {} is now part of the cluster, status = {}", ep, get_gossip_status(eps));
|
||||
logger.debug("Node {} is now part of the cluster, status = {}", ep, get_node_status(ep));
|
||||
}
|
||||
}
|
||||
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
|
||||
logger.trace("Adding endpoint state for {}, status = {}", ep, get_node_status(ep));
|
||||
co_await replicate(eps, pid);
|
||||
|
||||
if (shadow_round) {
|
||||
@@ -1713,10 +1680,10 @@ future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid,
|
||||
if (!ep_state) {
|
||||
throw std::out_of_range(format("ep={}", ep));
|
||||
}
|
||||
if (!is_dead_state(*ep_state)) {
|
||||
if (!is_left(*ep_state)) {
|
||||
mark_alive(ep_state);
|
||||
} else {
|
||||
logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps));
|
||||
logger.debug("Not marking {} alive due to dead state {}", ep, get_node_status(ep));
|
||||
co_await mark_dead(ep, ep_state, pid);
|
||||
}
|
||||
|
||||
@@ -1730,8 +1697,8 @@ future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid,
|
||||
}
|
||||
}
|
||||
|
||||
bool gossiper::is_dead_state(const endpoint_state& eps) const {
|
||||
return std::ranges::any_of(DEAD_STATES, [state = get_gossip_status(eps)](const auto& deadstate) { return state == deadstate; });
|
||||
bool gossiper::is_left(const endpoint_state& eps) const {
|
||||
return _topo_sm._topology.left_nodes.contains(raft::server_id(eps.get_host_id().uuid()));
|
||||
}
|
||||
|
||||
bool gossiper::is_shutdown(const locator::host_id& endpoint) const {
|
||||
@@ -1746,10 +1713,6 @@ bool gossiper::is_normal(const locator::host_id& endpoint) const {
|
||||
return get_gossip_status(endpoint) == versioned_value::STATUS_NORMAL;
|
||||
}
|
||||
|
||||
bool gossiper::is_silent_shutdown_state(const endpoint_state& ep_state) const{
|
||||
return std::ranges::any_of(SILENT_SHUTDOWN_STATES, [state = get_gossip_status(ep_state)](const auto& deadstate) { return state == deadstate; });
|
||||
}
|
||||
|
||||
future<> gossiper::apply_new_states(endpoint_state local_state, const endpoint_state& remote_state, permit_id pid, bool shadow_round) {
|
||||
// don't SCYLLA_ASSERT here, since if the node restarts the version will go back to zero
|
||||
//int oldVersion = local_state.get_heart_beat_state().get_heart_beat_version();
|
||||
@@ -2173,16 +2136,14 @@ future<> gossiper::do_stop_gossiping() {
|
||||
logger.info("gossip is already stopped");
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto my_ep_state = get_this_endpoint_state_ptr();
|
||||
if (my_ep_state) {
|
||||
logger.info("My status = {}", get_gossip_status(*my_ep_state));
|
||||
}
|
||||
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
|
||||
if (my_ep_state && _topo_sm._topology.normal_nodes.contains(raft::server_id(my_host_id().uuid()))) {
|
||||
auto local_generation = my_ep_state->get_heart_beat_state().get_generation();
|
||||
logger.info("Announcing shutdown");
|
||||
co_await add_local_application_state(application_state::STATUS, versioned_value::shutdown(true));
|
||||
auto live_endpoints = _live_endpoints;
|
||||
for (locator::host_id id : live_endpoints) {
|
||||
co_await coroutine::parallel_for_each(live_endpoints, [this, &local_generation] (locator::host_id id) -> future<> {
|
||||
logger.info("Sending a GossipShutdown to {} with generation {}", id, local_generation);
|
||||
try {
|
||||
co_await ser::gossip_rpc_verbs::send_gossip_shutdown(&_messaging, id, get_broadcast_address(), local_generation.value());
|
||||
@@ -2190,7 +2151,7 @@ future<> gossiper::do_stop_gossiping() {
|
||||
} catch (...) {
|
||||
logger.warn("Fail to send GossipShutdown to {}: {}", id, std::current_exception());
|
||||
}
|
||||
}
|
||||
});
|
||||
co_await sleep(std::chrono::milliseconds(_gcfg.shutdown_announce_ms));
|
||||
} else {
|
||||
logger.warn("No local state or state is in silent shutdown, not announcing shutdown");
|
||||
@@ -2241,19 +2202,6 @@ bool gossiper::is_enabled() const {
|
||||
return _enabled && !_abort_source.abort_requested();
|
||||
}
|
||||
|
||||
void gossiper::add_expire_time_for_endpoint(locator::host_id endpoint, clk::time_point expire_time) {
|
||||
auto now_ = now();
|
||||
auto diff = std::chrono::duration_cast<std::chrono::seconds>(expire_time - now_).count();
|
||||
logger.info("Node {} will be removed from gossip at [{:%Y-%m-%d %T %z}]: (expire = {}, now = {}, diff = {} seconds)",
|
||||
endpoint, fmt::gmtime(clk::to_time_t(expire_time)), expire_time.time_since_epoch().count(),
|
||||
now_.time_since_epoch().count(), diff);
|
||||
_expire_time_endpoint_map[endpoint] = expire_time;
|
||||
}
|
||||
|
||||
clk::time_point gossiper::compute_expire_time() {
|
||||
return now() + A_VERY_LONG_TIME;
|
||||
}
|
||||
|
||||
bool gossiper::is_alive(locator::host_id id) const {
|
||||
if (id == my_host_id()) {
|
||||
return true;
|
||||
@@ -2373,91 +2321,22 @@ std::string_view gossiper::get_gossip_status(const locator::host_id& endpoint) c
|
||||
return do_get_gossip_status(get_application_state_ptr(endpoint, application_state::STATUS));
|
||||
}
|
||||
|
||||
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const {
|
||||
// We allow to bootstrap a new node in only two cases:
|
||||
// 1) The node is a completely new node and no state in gossip at all
|
||||
// 2) The node has state in gossip and it is already removed from the
|
||||
// cluster either by nodetool decommission or nodetool removenode
|
||||
bool allowed = true;
|
||||
auto host_id = try_get_host_id(endpoint);
|
||||
if (!host_id) {
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
|
||||
return allowed;
|
||||
std::string gossiper::get_node_status(const locator::host_id& endpoint) const noexcept {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(logger, "get_node_status should only be called on shard 0");
|
||||
}
|
||||
auto eps = get_endpoint_state_ptr(*host_id);
|
||||
if (!eps) {
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
|
||||
return allowed;
|
||||
if (is_shutdown(endpoint)) {
|
||||
return "shutdown";
|
||||
}
|
||||
auto status = get_gossip_status(*eps);
|
||||
std::unordered_set<std::string_view> allowed_statuses{
|
||||
versioned_value::STATUS_LEFT,
|
||||
versioned_value::REMOVED_TOKEN,
|
||||
};
|
||||
allowed = allowed_statuses.contains(status);
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status={}, allowed_to_bootstrap={}", endpoint, status, allowed);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
std::set<sstring> gossiper::get_supported_features(locator::host_id endpoint) const {
|
||||
auto app_state = get_application_state_ptr(endpoint, application_state::SUPPORTED_FEATURES);
|
||||
if (!app_state) {
|
||||
return {};
|
||||
}
|
||||
return feature_service::to_feature_set(app_state->value());
|
||||
}
|
||||
|
||||
std::set<sstring> gossiper::get_supported_features(const std::unordered_map<locator::host_id, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const {
|
||||
std::unordered_map<locator::host_id, std::set<sstring>> features_map;
|
||||
std::set<sstring> common_features;
|
||||
|
||||
for (auto& x : loaded_peer_features) {
|
||||
auto features = feature_service::to_feature_set(x.second);
|
||||
if (features.empty()) {
|
||||
logger.warn("Loaded empty features for peer node {}", x.first);
|
||||
} else {
|
||||
features_map.emplace(x.first, std::move(features));
|
||||
auto n = _topo_sm._topology.find(raft::server_id{endpoint.uuid()});
|
||||
if (!n) {
|
||||
if (_topo_sm._topology.left_nodes.contains(raft::server_id{endpoint.uuid()})) {
|
||||
return "left";
|
||||
}
|
||||
return "unknown";
|
||||
} else {
|
||||
return fmt::format("{}", n->second.state);
|
||||
}
|
||||
|
||||
for (auto& x : _endpoint_state_map) {
|
||||
auto host_id = x.second->get_host_id();
|
||||
auto features = get_supported_features(host_id);
|
||||
if (ignore_local_node && host_id == my_host_id()) {
|
||||
logger.debug("Ignore SUPPORTED_FEATURES of local node: features={}", features);
|
||||
continue;
|
||||
}
|
||||
if (features.empty()) {
|
||||
auto it = loaded_peer_features.find(host_id);
|
||||
if (it != loaded_peer_features.end()) {
|
||||
logger.info("Node {} does not contain SUPPORTED_FEATURES in gossip, using features saved in system table, features={}", host_id, feature_service::to_feature_set(it->second));
|
||||
} else {
|
||||
logger.warn("Node {} does not contain SUPPORTED_FEATURES in gossip or system table", host_id);
|
||||
}
|
||||
} else {
|
||||
// Replace the features with live info
|
||||
features_map[host_id] = std::move(features);
|
||||
}
|
||||
}
|
||||
|
||||
if (ignore_local_node) {
|
||||
features_map.erase(my_host_id());
|
||||
}
|
||||
|
||||
if (!features_map.empty()) {
|
||||
common_features = features_map.begin()->second;
|
||||
}
|
||||
|
||||
for (auto& x : features_map) {
|
||||
auto& features = x.second;
|
||||
std::set<sstring> result;
|
||||
std::set_intersection(features.begin(), features.end(),
|
||||
common_features.begin(), common_features.end(),
|
||||
std::inserter(result, result.end()));
|
||||
common_features = std::move(result);
|
||||
}
|
||||
common_features.erase("");
|
||||
return common_features;
|
||||
}
|
||||
|
||||
void gossiper::check_snitch_name_matches(sstring local_snitch_name) const {
|
||||
|
||||
@@ -91,7 +91,6 @@ struct loaded_endpoint_state {
|
||||
class gossiper : public seastar::async_sharded_service<gossiper>, public seastar::peering_sharded_service<gossiper> {
|
||||
public:
|
||||
using clk = seastar::lowres_system_clock;
|
||||
using ignore_features_of_local_node = bool_class<class ignore_features_of_local_node_tag>;
|
||||
using generation_for_nodes = std::unordered_map<locator::host_id, generation_type>;
|
||||
private:
|
||||
using messaging_verb = netw::messaging_verb;
|
||||
@@ -198,18 +197,7 @@ private:
|
||||
endpoint_locks_map _endpoint_locks;
|
||||
|
||||
public:
|
||||
static constexpr std::array DEAD_STATES{
|
||||
versioned_value::REMOVED_TOKEN,
|
||||
versioned_value::STATUS_LEFT,
|
||||
};
|
||||
static constexpr std::array SILENT_SHUTDOWN_STATES{
|
||||
versioned_value::REMOVED_TOKEN,
|
||||
versioned_value::STATUS_LEFT,
|
||||
versioned_value::STATUS_BOOTSTRAPPING,
|
||||
versioned_value::STATUS_UNKNOWN,
|
||||
};
|
||||
static constexpr std::chrono::milliseconds INTERVAL{1000};
|
||||
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
|
||||
|
||||
// Maximum difference between remote generation value and generation
|
||||
// value this node would get if this node were restarted that we are
|
||||
@@ -241,7 +229,6 @@ private:
|
||||
/* initial seeds for joining the cluster */
|
||||
std::set<inet_address> _seeds;
|
||||
|
||||
std::map<locator::host_id, clk::time_point> _expire_time_endpoint_map;
|
||||
|
||||
bool _in_shadow_round = false;
|
||||
|
||||
@@ -341,13 +328,6 @@ private:
|
||||
utils::chunked_vector<gossip_digest> make_random_gossip_digest() const;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
|
||||
*
|
||||
* @param endpoint
|
||||
* @param host_id
|
||||
*/
|
||||
future<> advertise_token_removed(locator::host_id host_id, permit_id);
|
||||
|
||||
/**
|
||||
* Do not call this method unless you know what you are doing.
|
||||
@@ -363,7 +343,6 @@ public:
|
||||
future<generation_type> get_current_generation_number(locator::host_id endpoint) const;
|
||||
future<version_type> get_current_heart_beat_version(locator::host_id endpoint) const;
|
||||
|
||||
bool is_safe_for_bootstrap(inet_address endpoint) const;
|
||||
private:
|
||||
/**
|
||||
* Returns true if the chosen target was also a seed. False otherwise
|
||||
@@ -383,7 +362,6 @@ private:
|
||||
future<> do_gossip_to_unreachable_member(gossip_digest_syn message);
|
||||
|
||||
public:
|
||||
clk::time_point get_expire_time_for_endpoint(locator::host_id endpoint) const noexcept;
|
||||
|
||||
// Gets a shared pointer to the endpoint_state, if exists.
|
||||
// Otherwise, returns a null ptr.
|
||||
@@ -467,7 +445,7 @@ private:
|
||||
public:
|
||||
bool is_alive(locator::host_id id) const;
|
||||
|
||||
bool is_dead_state(const endpoint_state& eps) const;
|
||||
bool is_left(const endpoint_state& eps) const;
|
||||
// Wait for nodes to be alive on all shards
|
||||
future<> wait_alive(std::vector<gms::inet_address> nodes, std::chrono::milliseconds timeout);
|
||||
future<> wait_alive(std::vector<locator::host_id> nodes, std::chrono::milliseconds timeout);
|
||||
@@ -588,17 +566,12 @@ public:
|
||||
public:
|
||||
bool is_enabled() const;
|
||||
|
||||
public:
|
||||
void add_expire_time_for_endpoint(locator::host_id endpoint, clk::time_point expire_time);
|
||||
|
||||
static clk::time_point compute_expire_time();
|
||||
public:
|
||||
bool is_seed(const inet_address& endpoint) const;
|
||||
bool is_shutdown(const locator::host_id& endpoint) const;
|
||||
bool is_shutdown(const endpoint_state& eps) const;
|
||||
bool is_normal(const locator::host_id& endpoint) const;
|
||||
bool is_cql_ready(const locator::host_id& endpoint) const;
|
||||
bool is_silent_shutdown_state(const endpoint_state& ep_state) const;
|
||||
void force_newer_generation();
|
||||
public:
|
||||
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;
|
||||
@@ -615,12 +588,8 @@ private:
|
||||
gossip_address_map& _address_map;
|
||||
gossip_config _gcfg;
|
||||
condition_variable _failure_detector_loop_cv;
|
||||
// Get features supported by a particular node
|
||||
std::set<sstring> get_supported_features(locator::host_id endpoint) const;
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
|
||||
public:
|
||||
// Get features supported by all the nodes this node knows about
|
||||
std::set<sstring> get_supported_features(const std::unordered_map<locator::host_id, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const;
|
||||
std::string get_node_status(const locator::host_id& endpoint) const noexcept;
|
||||
private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
public:
|
||||
|
||||
@@ -10,10 +10,6 @@
|
||||
#include "gms/versioned_value.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <charconv>
|
||||
|
||||
namespace gms {
|
||||
|
||||
static_assert(std::is_nothrow_default_constructible_v<versioned_value>);
|
||||
@@ -23,11 +19,6 @@ versioned_value versioned_value::network_version() {
|
||||
return versioned_value(format("{}", netw::messaging_service::current_version));
|
||||
}
|
||||
|
||||
sstring versioned_value::make_full_token_string(const std::unordered_set<dht::token>& tokens) {
|
||||
return fmt::to_string(fmt::join(tokens | std::views::transform([] (const dht::token& t) {
|
||||
return t.to_sstring(); }), ";"));
|
||||
}
|
||||
|
||||
sstring versioned_value::make_token_string(const std::unordered_set<dht::token>& tokens) {
|
||||
if (tokens.empty()) {
|
||||
return "";
|
||||
@@ -35,16 +26,4 @@ sstring versioned_value::make_token_string(const std::unordered_set<dht::token>&
|
||||
return tokens.begin()->to_sstring();
|
||||
}
|
||||
|
||||
std::unordered_set<dht::token> versioned_value::tokens_from_string(const sstring& s) {
|
||||
if (s.size() == 0) {
|
||||
return {}; // boost::split produces one element for empty string
|
||||
}
|
||||
std::vector<sstring> tokens;
|
||||
boost::split(tokens, s, boost::is_any_of(";"));
|
||||
std::unordered_set<dht::token> ret;
|
||||
for (auto str : tokens) {
|
||||
ret.emplace(dht::token::from_sstring(str));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
} // namespace gms
|
||||
@@ -18,7 +18,6 @@
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "service/state_id.hh"
|
||||
#include "version.hh"
|
||||
#include "cdc/generation_id.hh"
|
||||
#include <set>
|
||||
#include <unordered_set>
|
||||
|
||||
@@ -45,11 +44,7 @@ public:
|
||||
|
||||
// values for ApplicationState.STATUS
|
||||
static constexpr std::string_view STATUS_UNKNOWN{"UNKNOWN"};
|
||||
static constexpr std::string_view STATUS_BOOTSTRAPPING{"BOOT"};
|
||||
static constexpr std::string_view STATUS_NORMAL{"NORMAL"};
|
||||
static constexpr std::string_view STATUS_LEFT{"LEFT"};
|
||||
|
||||
static constexpr std::string_view REMOVED_TOKEN{"removed"};
|
||||
|
||||
static constexpr std::string_view SHUTDOWN{"shutdown"};
|
||||
|
||||
@@ -80,26 +75,18 @@ public:
|
||||
: _version(-1) {
|
||||
}
|
||||
|
||||
static sstring version_string(const std::initializer_list<sstring>& args) {
|
||||
return fmt::to_string(fmt::join(args, versioned_value::DELIMITER));
|
||||
}
|
||||
|
||||
static sstring make_full_token_string(const std::unordered_set<dht::token>& tokens);
|
||||
static sstring make_token_string(const std::unordered_set<dht::token>& tokens);
|
||||
static sstring make_cdc_generation_id_string(std::optional<cdc::generation_id>);
|
||||
|
||||
// Reverse of `make_full_token_string`.
|
||||
static std::unordered_set<dht::token> tokens_from_string(const sstring&);
|
||||
|
||||
static versioned_value clone_with_higher_version(const versioned_value& value) noexcept {
|
||||
return versioned_value(value.value());
|
||||
}
|
||||
|
||||
static versioned_value bootstrapping(const std::unordered_set<dht::token>& tokens) {
|
||||
return versioned_value(version_string({sstring(versioned_value::STATUS_BOOTSTRAPPING),
|
||||
make_token_string(tokens)}));
|
||||
private:
|
||||
static sstring version_string(const std::initializer_list<sstring>& args) {
|
||||
return fmt::to_string(fmt::join(args, versioned_value::DELIMITER));
|
||||
}
|
||||
|
||||
static sstring make_token_string(const std::unordered_set<dht::token>& tokens);
|
||||
|
||||
public:
|
||||
static versioned_value normal(const std::unordered_set<dht::token>& tokens) {
|
||||
return versioned_value(version_string({sstring(versioned_value::STATUS_NORMAL),
|
||||
make_token_string(tokens)}));
|
||||
@@ -113,24 +100,10 @@ public:
|
||||
return versioned_value(new_version.to_sstring());
|
||||
}
|
||||
|
||||
static versioned_value left(const std::unordered_set<dht::token>& tokens, int64_t expire_time) {
|
||||
return versioned_value(version_string({sstring(versioned_value::STATUS_LEFT),
|
||||
make_token_string(tokens),
|
||||
std::to_string(expire_time)}));
|
||||
}
|
||||
|
||||
static versioned_value host_id(const locator::host_id& host_id) {
|
||||
return versioned_value(host_id.to_sstring());
|
||||
}
|
||||
|
||||
static versioned_value tokens(const std::unordered_set<dht::token>& tokens) {
|
||||
return versioned_value(make_full_token_string(tokens));
|
||||
}
|
||||
|
||||
static versioned_value removed_nonlocal(const locator::host_id& host_id, int64_t expire_time) {
|
||||
return versioned_value(sstring(REMOVED_TOKEN) + sstring(DELIMITER) + host_id.to_sstring() + sstring(DELIMITER) + to_sstring(expire_time));
|
||||
}
|
||||
|
||||
static versioned_value shutdown(bool value) {
|
||||
return versioned_value(sstring(SHUTDOWN) + sstring(DELIMITER) + (value ? "true" : "false"));
|
||||
}
|
||||
@@ -169,10 +142,6 @@ public:
|
||||
return versioned_value(private_ip);
|
||||
}
|
||||
|
||||
static versioned_value severity(double value) {
|
||||
return versioned_value(to_sstring(value));
|
||||
}
|
||||
|
||||
static versioned_value supported_features(const std::set<std::string_view>& features) {
|
||||
return versioned_value(fmt::to_string(fmt::join(features, ",")));
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
#include "raft/raft.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include <ranges>
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
@@ -119,11 +118,9 @@
|
||||
#include "service/task_manager_module.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include <csignal>
|
||||
#include "utils/labels.hh"
|
||||
#include "view_info.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "debug.hh"
|
||||
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
@@ -352,30 +349,6 @@ bool storage_service::is_replacing() {
|
||||
return !cfg.replace_address().empty();
|
||||
}
|
||||
|
||||
bool storage_service::is_first_node() {
|
||||
if (is_replacing()) {
|
||||
return false;
|
||||
}
|
||||
auto seeds = _gossiper.get_seeds();
|
||||
if (seeds.empty()) {
|
||||
return false;
|
||||
}
|
||||
// Node with the smallest IP address is chosen as the very first node
|
||||
// in the cluster. The first node is the only node that does not
|
||||
// bootstrap in the cluster. All other nodes will bootstrap.
|
||||
std::vector<gms::inet_address> sorted_seeds(seeds.begin(), seeds.end());
|
||||
std::sort(sorted_seeds.begin(), sorted_seeds.end());
|
||||
if (sorted_seeds.front() == get_broadcast_address()) {
|
||||
slogger.info("I am the first node in the cluster. Skip bootstrap. Node={}", get_broadcast_address());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool storage_service::should_bootstrap() {
|
||||
return !_sys_ks.local().bootstrap_complete() && !is_first_node();
|
||||
}
|
||||
|
||||
/* Broadcasts the chosen tokens through gossip,
|
||||
* together with a CDC generation timestamp and STATUS=NORMAL.
|
||||
*
|
||||
@@ -1575,9 +1548,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
raft_replace_info = raft_group0::replace_info {
|
||||
.raft_id = raft::server_id{ri->host_id.uuid()},
|
||||
};
|
||||
} else if (should_bootstrap()) {
|
||||
co_await check_for_endpoint_collision(initial_contact_nodes);
|
||||
} else {
|
||||
} else if (_sys_ks.local().bootstrap_complete()) {
|
||||
slogger.info("Performing gossip shadow round, initial_contact_nodes={}", initial_contact_nodes);
|
||||
co_await _gossiper.do_shadow_round(initial_contact_nodes, gms::gossiper::mandatory::no);
|
||||
_gossiper.check_snitch_name_matches(_snitch.local()->get_name());
|
||||
@@ -1833,7 +1804,7 @@ future<> storage_service::on_change(gms::inet_address endpoint, locator::host_id
|
||||
slogger.debug("endpoint={} on_change: states={}, permit_id={}", endpoint, states, pid);
|
||||
|
||||
auto ep_state = _gossiper.get_endpoint_state_ptr(host_id);
|
||||
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
|
||||
if (!ep_state || _gossiper.is_left(*ep_state)) {
|
||||
slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
|
||||
co_return;
|
||||
}
|
||||
@@ -1918,12 +1889,8 @@ std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for
|
||||
|
||||
auto set_field = [&]<typename T> (std::optional<T>& field,
|
||||
const gms::versioned_value& value,
|
||||
std::string_view name,
|
||||
bool managed_by_raft)
|
||||
std::string_view name)
|
||||
{
|
||||
if (managed_by_raft) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
field = T(value.value());
|
||||
} catch (...) {
|
||||
@@ -1932,31 +1899,17 @@ std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for
|
||||
}
|
||||
};
|
||||
|
||||
// Fields managed by raft are skipped here
|
||||
for (const auto& [state, value] : app_state_map) {
|
||||
switch (state) {
|
||||
case application_state::DC:
|
||||
set_field(get_peer_info().data_center, value, "data_center", true);
|
||||
break;
|
||||
case application_state::INTERNAL_IP:
|
||||
set_field(get_peer_info().preferred_ip, value, "preferred_ip", false);
|
||||
break;
|
||||
case application_state::RACK:
|
||||
set_field(get_peer_info().rack, value, "rack", true);
|
||||
break;
|
||||
case application_state::RELEASE_VERSION:
|
||||
set_field(get_peer_info().release_version, value, "release_version", true);
|
||||
set_field(get_peer_info().preferred_ip, value, "preferred_ip");
|
||||
break;
|
||||
case application_state::RPC_ADDRESS:
|
||||
set_field(get_peer_info().rpc_address, value, "rpc_address", false);
|
||||
set_field(get_peer_info().rpc_address, value, "rpc_address");
|
||||
break;
|
||||
case application_state::SCHEMA:
|
||||
set_field(get_peer_info().schema_version, value, "schema_version", false);
|
||||
break;
|
||||
case application_state::TOKENS:
|
||||
// tokens are updated separately
|
||||
break;
|
||||
case application_state::SUPPORTED_FEATURES:
|
||||
set_field(get_peer_info().supported_features, value, "supported_features", true);
|
||||
set_field(get_peer_info().schema_version, value, "schema_version");
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@@ -2425,27 +2378,6 @@ future<> storage_service::wait_for_group0_stop() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes) {
|
||||
slogger.debug("Starting shadow gossip round to check for endpoint collision");
|
||||
|
||||
return seastar::async([this, initial_contact_nodes] {
|
||||
bool found_bootstrapping_node = false;
|
||||
auto local_features = _feature_service.supported_feature_set();
|
||||
do {
|
||||
slogger.info("Performing gossip shadow round");
|
||||
_gossiper.do_shadow_round(initial_contact_nodes, gms::gossiper::mandatory::yes).get();
|
||||
_gossiper.check_snitch_name_matches(_snitch.local()->get_name());
|
||||
auto addr = get_broadcast_address();
|
||||
if (!_gossiper.is_safe_for_bootstrap(addr)) {
|
||||
throw std::runtime_error(::format("A node with address {} already exists, cancelling join. "
|
||||
"Use replace_address if you want to replace this node.", addr));
|
||||
}
|
||||
} while (found_bootstrapping_node);
|
||||
slogger.info("Checking bootstrapping/leaving/moving nodes: ok (check_for_endpoint_collision)");
|
||||
_gossiper.reset_endpoint_state_map().get();
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::remove_endpoint(inet_address endpoint, gms::permit_id pid) {
|
||||
auto host_id_opt = _gossiper.try_get_host_id(endpoint);
|
||||
if (host_id_opt) {
|
||||
@@ -2498,17 +2430,6 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
|
||||
replace_host_id = _gossiper.get_host_id(replace_address);
|
||||
}
|
||||
|
||||
auto state = _gossiper.get_endpoint_state_ptr(replace_host_id);
|
||||
if (!state) {
|
||||
throw std::runtime_error(::format("Cannot replace_address {} because it doesn't exist in gossip", replace_address));
|
||||
}
|
||||
|
||||
// Reject to replace a node that has left the ring
|
||||
auto status = _gossiper.get_gossip_status(replace_host_id);
|
||||
if (status == gms::versioned_value::STATUS_LEFT || status == gms::versioned_value::REMOVED_TOKEN) {
|
||||
throw std::runtime_error(::format("Cannot replace_address {} because it has left the ring, status={}", replace_address, status));
|
||||
}
|
||||
|
||||
auto dc_rack = get_dc_rack_for(replace_host_id).value_or(locator::endpoint_dc_rack::default_location);
|
||||
|
||||
auto ri = replacement_info {
|
||||
@@ -2802,12 +2723,7 @@ future<> storage_service::raft_decommission() {
|
||||
rtlogger.info("decommission: waiting for completion (request ID: {})", request_id);
|
||||
auto error = co_await wait_for_topology_request_completion(request_id);
|
||||
|
||||
if (error.empty()) {
|
||||
// Need to set it otherwise gossiper will try to send shutdown on exit
|
||||
rtlogger.info("decommission: successfully removed from topology (request ID: {}), updating gossip status", request_id);
|
||||
co_await _gossiper.add_local_application_state(std::pair(gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count())));
|
||||
rtlogger.info("Decommission succeeded. Request ID: {}", request_id);
|
||||
} else {
|
||||
if (!error.empty()) {
|
||||
auto err = fmt::format("Decommission failed. See earlier errors ({}). Request ID: {}", error, request_id);
|
||||
rtlogger.error("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
|
||||
@@ -35,7 +35,6 @@
|
||||
#include <seastar/core/gate.hh>
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "streaming/stream_reason.hh"
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "service/migration_listener.hh"
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
@@ -56,7 +55,6 @@
|
||||
class node_ops_cmd_request;
|
||||
class node_ops_cmd_response;
|
||||
class node_ops_info;
|
||||
enum class node_ops_cmd : uint32_t;
|
||||
class repair_service;
|
||||
class protocol_server;
|
||||
|
||||
@@ -177,7 +175,6 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
|
||||
private:
|
||||
using token = dht::token;
|
||||
using token_range_endpoints = dht::token_range_endpoints;
|
||||
using endpoint_details = dht::endpoint_details;
|
||||
using boot_strapper = dht::boot_strapper;
|
||||
using token_metadata = locator::token_metadata;
|
||||
using shared_token_metadata = locator::shared_token_metadata;
|
||||
@@ -214,7 +211,6 @@ private:
|
||||
|
||||
sstring _operation_in_progress;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
using client_shutdown_hook = noncopyable_function<void()>;
|
||||
std::vector<protocol_server*> _protocol_servers;
|
||||
std::vector<std::any> _listeners;
|
||||
named_gate _async_gate;
|
||||
@@ -474,8 +470,6 @@ private:
|
||||
|
||||
public:
|
||||
|
||||
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes);
|
||||
|
||||
future<> join_cluster(sharded<service::storage_proxy>& proxy,
|
||||
start_hint_manager start_hm, gms::generation_type new_generation);
|
||||
|
||||
@@ -492,9 +486,7 @@ public:
|
||||
future<> wait_for_group0_stop();
|
||||
|
||||
private:
|
||||
bool should_bootstrap();
|
||||
bool is_replacing();
|
||||
bool is_first_node();
|
||||
future<> start_sys_dist_ks() const;
|
||||
future<> join_topology(sharded<service::storage_proxy>& proxy,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
@@ -735,9 +727,6 @@ public:
|
||||
future<> removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes);
|
||||
future<> mark_excluded(const std::vector<locator::host_id>&);
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req);
|
||||
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
|
||||
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
||||
void on_node_ops_registered(node_ops_id);
|
||||
|
||||
future<mode> get_operation_mode();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user