gossiper: send failure detection ping to a host id instead of ip
This way wrong host will not answer it.
This commit is contained in:
@@ -944,15 +944,17 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, genera
|
||||
auto diff = gossiper::clk::duration(0);
|
||||
auto echo_interval = std::chrono::seconds(2);
|
||||
auto max_duration = echo_interval + std::chrono::milliseconds(_gcfg.failure_detector_timeout_ms());
|
||||
auto host_id = get_host_id(node);
|
||||
|
||||
while (is_enabled()) {
|
||||
bool failed = false;
|
||||
try {
|
||||
logger.debug("failure_detector_loop: Send echo to node {}, status = started", node);
|
||||
co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(node), netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value(), false);
|
||||
logger.debug("failure_detector_loop: Send echo to node {}, status = ok", node);
|
||||
logger.debug("failure_detector_loop: Send echo to node {}/{}, status = started", host_id, node);
|
||||
co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value(), false);
|
||||
logger.debug("failure_detector_loop: Send echo to node {}/{}, status = ok", host_id, node);
|
||||
} catch (...) {
|
||||
failed = true;
|
||||
logger.warn("failure_detector_loop: Send echo to node {}, status = failed: {}", node, std::current_exception());
|
||||
logger.warn("failure_detector_loop: Send echo to node {}/{}, status = failed: {}", host_id, node, std::current_exception());
|
||||
}
|
||||
auto now = gossiper::clk::now();
|
||||
diff = now - last;
|
||||
@@ -960,7 +962,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, genera
|
||||
last = now;
|
||||
}
|
||||
if (diff > max_duration) {
|
||||
logger.info("failure_detector_loop: Mark node {} as DOWN", node);
|
||||
logger.info("failure_detector_loop: Mark node {}/{} as DOWN", host_id, node);
|
||||
co_await container().invoke_on(0, [node] (gms::gossiper& g) {
|
||||
return g.convict(node);
|
||||
});
|
||||
@@ -972,8 +974,8 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, genera
|
||||
// to different shards. We return from the per node loop here. The
|
||||
// failure_detector_loop main loop will restart the per node loop.
|
||||
if (_live_endpoints_version != live_endpoints_version) {
|
||||
logger.debug("failure_detector_loop: Finished loop for node {}, live_endpoints={}, current_live_endpoints_version={}, live_endpoints_version={}",
|
||||
node, _live_endpoints, _live_endpoints_version, live_endpoints_version);
|
||||
logger.debug("failure_detector_loop: Finished loop for node {}/{}, live_endpoints={}, current_live_endpoints_version={}, live_endpoints_version={}",
|
||||
host_id, node, _live_endpoints, _live_endpoints_version, live_endpoints_version);
|
||||
co_return;
|
||||
} else {
|
||||
co_await sleep_abortable(echo_interval, _abort_source);
|
||||
@@ -1705,16 +1707,16 @@ void gossiper::mark_alive(inet_address addr) {
|
||||
_pending_mark_alive_endpoints.erase(addr);
|
||||
});
|
||||
|
||||
msg_addr id = get_msg_addr(addr);
|
||||
auto id = get_host_id(addr);
|
||||
auto generation = my_endpoint_state().get_heart_beat_state().get_generation();
|
||||
// Enter the _background_msg gate so stop() would wait on it
|
||||
auto gh = _background_msg.hold();
|
||||
logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation);
|
||||
(void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(15), generation.value(), false).then([this, addr] {
|
||||
logger.debug("Sending a EchoMessage to {}/{}, with generation_number={}", id, addr, generation);
|
||||
(void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, get_host_id(addr), netw::messaging_service::clock_type::now() + std::chrono::seconds(15), generation.value(), false).then([this, addr] {
|
||||
logger.trace("Got EchoMessage Reply");
|
||||
return real_mark_alive(addr);
|
||||
}).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending)] (auto ep) {
|
||||
logger.warn("Fail to send EchoMessage to {}: {}", addr, ep);
|
||||
}).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending), id] (auto ep) {
|
||||
logger.warn("Fail to send EchoMessage to {}/{}: {}", id, addr, ep);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user