gossiper: populate gossip_address_map
Add a non expiring entry into the address map for each host in the gossiper state and change one to expiring when the state is deleted.
This commit is contained in:
@@ -43,6 +43,7 @@
|
||||
#include <utility>
|
||||
#include "gms/generation-number.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "seastar/core/shard_id.hh"
|
||||
#include "seastar/rpc/rpc_types.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
@@ -125,6 +126,10 @@ gossiper::gossiper(abort_source& as, const locator::shared_token_metadata& stm,
|
||||
return _unreachable_endpoints.size();
|
||||
}, sm::description("How many unreachable nodes the current node sees")),
|
||||
});
|
||||
|
||||
// Add myself to the map on start
|
||||
_address_map.add_or_update_entry(_gcfg.host_id, get_broadcast_address());
|
||||
_address_map.set_nonexpiring(_gcfg.host_id);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1265,7 +1270,16 @@ future<> gossiper::evict_from_membership(inet_address endpoint, permit_id pid) {
|
||||
data.unreachable.erase(endpoint);
|
||||
data.live.erase(endpoint);
|
||||
});
|
||||
|
||||
co_await container().invoke_on_all([endpoint] (auto& g) {
|
||||
if (this_shard_id() == 0) {
|
||||
auto hid = g.get_endpoint_state_ptr(endpoint)->get_host_id();
|
||||
if (g._address_map.find(hid) == endpoint) {
|
||||
// During IP address change we may have a situation where we remove old gossiper state
|
||||
// but there is a new address for the same host id, so no need to make it expiring
|
||||
g._address_map.set_expiring(g.get_endpoint_state_ptr(endpoint)->get_host_id());
|
||||
}
|
||||
}
|
||||
g._endpoint_state_map.erase(endpoint);
|
||||
});
|
||||
_expire_time_endpoint_map.erase(endpoint);
|
||||
@@ -1307,6 +1321,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
|
||||
|
||||
future<> gossiper::replicate(inet_address ep, endpoint_state es, permit_id pid) {
|
||||
verify_permit(ep, pid);
|
||||
|
||||
// First pass: replicate the new endpoint_state on all shards.
|
||||
// Use foreign_ptr<std::unique_ptr> to ensure destroy on remote shards on exception
|
||||
std::vector<foreign_ptr<endpoint_state_ptr>> ep_states;
|
||||
@@ -1329,6 +1344,11 @@ future<> gossiper::replicate(inet_address ep, endpoint_state es, permit_id pid)
|
||||
try {
|
||||
co_return co_await container().invoke_on_all([&] (gossiper& g) {
|
||||
auto eps = ep_states[this_shard_id()].release();
|
||||
if (this_shard_id() == 0) {
|
||||
auto hid = eps->get_host_id();
|
||||
g._address_map.add_or_update_entry(hid, ep, eps->get_heart_beat_state().get_generation());
|
||||
g._address_map.set_nonexpiring(hid);
|
||||
}
|
||||
g._endpoint_state_map[ep] = std::move(eps);
|
||||
});
|
||||
} catch (...) {
|
||||
@@ -1505,6 +1525,11 @@ future<> gossiper::reset_endpoint_state_map() {
|
||||
auto lock = co_await lock_endpoint_update_semaphore();
|
||||
auto version = _live_endpoints_version + 1;
|
||||
co_await container().invoke_on_all([version] (gossiper& g) {
|
||||
if (this_shard_id() == 0) {
|
||||
for (auto&& [_, es_ptr] : g._endpoint_state_map) {
|
||||
g._address_map.set_expiring(es_ptr->get_host_id());
|
||||
}
|
||||
}
|
||||
g._unreachable_endpoints.clear();
|
||||
g._live_endpoints.clear();
|
||||
g._live_endpoints_version = version;
|
||||
|
||||
@@ -60,6 +60,7 @@ struct ack_msg_pending {
|
||||
struct gossip_config {
|
||||
seastar::scheduling_group gossip_scheduling_group = seastar::scheduling_group();
|
||||
sstring cluster_name;
|
||||
locator::host_id host_id;
|
||||
utils::UUID group0_id;
|
||||
std::set<inet_address> seeds;
|
||||
sstring partitioner;
|
||||
|
||||
1
main.cc
1
main.cc
@@ -1530,6 +1530,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
gcfg.shutdown_announce_ms = cfg->shutdown_announce_in_ms();
|
||||
gcfg.skip_wait_for_gossip_to_settle = cfg->skip_wait_for_gossip_to_settle();
|
||||
gcfg.group0_id = group0_id;
|
||||
gcfg.host_id = host_id;
|
||||
gcfg.failure_detector_timeout_ms = cfg->failure_detector_timeout_in_ms;
|
||||
gcfg.force_gossip_generation = cfg->force_gossip_generation;
|
||||
return gcfg;
|
||||
|
||||
Reference in New Issue
Block a user