Merge 'Do not update endpoint state via gossiper::add_saved_endpoint once it was updated via gossip' from Benny Halevy
Currently, `add_saved_endpoint` is called from two paths: One, is when loading states from system.peers in the join path (join_cluster, join_token_ring), when `_raft_topology_change_enabled` is false, and the other is from `storage_service::topology_state_load` when raft topology changes are enabled. In the later path, from `topology_state_load`, `add_saved_endpoint` is called only if the endpoint_state does not exist yet. However, this is checked without acquiring the endpoint_lock and so it races with the gossiper, and once `add_saved_endpoint` acquires the lock, the endpoint state may already be populated. Since `add_saved_endpoint` applies local information about the endpoint state (e.g. tokens, dc, rack), it uses the local heart_beat_version, with generation=0 to update the endpoint states, and that is incompatible with changes applies via gossip that will carry the endpoint's generation and version, determining the state's update order. This change makes sure that the endpoint state is never update in `add_saved_endpoint` if it has non-zero generation. An internal error exception is thrown if non-zero generation is found, and in the only call site that might reach that state, in `storage_service::topology_state_load`, the caller acquires the endpoint_lock for checking for the existence of the endpoint_state, calling `add_saved_endpoint` under the lock only if the endpoint_state does not exist. Fixes #16429 Closes scylladb/scylladb#16432 * github.com:scylladb/scylladb: gossiper: add_saved_endpoint: keep heart_beat_state if ep_state is found storage_service: topology_state_load: lock endpoint for add_saved_endpoint raft_group_registry: move on_alive error injection to gossiper
This commit is contained in:
@@ -44,6 +44,7 @@
|
||||
#include "gms/generation-number.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
@@ -1631,6 +1632,24 @@ void gossiper::mark_alive(inet_address addr) {
|
||||
}
|
||||
|
||||
future<> gossiper::real_mark_alive(inet_address addr) {
|
||||
co_await utils::get_local_injector().inject_with_handler("gossiper::real_mark_alive", [this, endpoint = addr] (auto& handler) -> future<> {
|
||||
auto app_state_ptr = get_application_state_ptr(endpoint, application_state::HOST_ID);
|
||||
if (!app_state_ptr) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
locator::host_id id(utils::UUID(app_state_ptr->value()));
|
||||
auto second_node_ip = handler.get("second_node_ip");
|
||||
assert(second_node_ip);
|
||||
|
||||
logger.info("real_mark_alive {}/{} second_node_ip={}", id, endpoint, *second_node_ip);
|
||||
if (endpoint == gms::inet_address(sstring{*second_node_ip})) {
|
||||
logger.info("Sleeping before real_mark_alive for {}/{}", id, endpoint);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{1});
|
||||
logger.info("Finished sleeping before real_mark_alive for {}/{}", id, endpoint);
|
||||
}
|
||||
});
|
||||
|
||||
auto permit = co_await lock_endpoint(addr, null_permit_id);
|
||||
|
||||
// After sending echo message, the Node might not be in the
|
||||
@@ -2081,22 +2100,30 @@ void gossiper::build_seeds_list() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::add_saved_endpoint(inet_address ep) {
|
||||
future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) {
|
||||
if (ep == get_broadcast_address()) {
|
||||
logger.debug("Attempt to add self as saved endpoint");
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto permit = co_await lock_endpoint(ep, null_permit_id);
|
||||
auto permit = co_await lock_endpoint(ep, pid);
|
||||
|
||||
//preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
|
||||
auto ep_state = endpoint_state();
|
||||
auto es = get_endpoint_state_ptr(ep);
|
||||
if (es) {
|
||||
if (es->get_heart_beat_state().get_generation()) {
|
||||
auto msg = fmt::format("Attempted to add saved endpoint {} after endpoint_state was already established with gossip: {}, at {}", ep, es->get_heart_beat_state(), current_backtrace());
|
||||
on_internal_error(logger, msg);
|
||||
}
|
||||
ep_state = *es;
|
||||
logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state);
|
||||
ep_state.set_heart_beat_state_and_update_timestamp(heart_beat_state());
|
||||
ep_state.update_timestamp();
|
||||
}
|
||||
// It's okay to use the local version generator for the loaded application state values
|
||||
// As long as the endpoint_state has zero generation.
|
||||
// It will get updated as a whole by handle_major_state_change
|
||||
// via do_apply_state_locally when (remote_generation > local_generation)
|
||||
const auto tmptr = get_token_metadata_ptr();
|
||||
auto host_id = tmptr->get_host_id_if_known(ep);
|
||||
if (host_id) {
|
||||
|
||||
@@ -607,7 +607,7 @@ public:
|
||||
/**
|
||||
* Add an endpoint we knew about previously, but whose state is unknown
|
||||
*/
|
||||
future<> add_saved_endpoint(inet_address ep);
|
||||
future<> add_saved_endpoint(inet_address ep, permit_id);
|
||||
|
||||
future<> add_local_application_state(application_state state, versioned_value value);
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include "gms/i_endpoint_state_change_subscriber.hh"
|
||||
#include "serializer_impl.hh"
|
||||
#include "idl/raft.dist.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/when_all.hh>
|
||||
@@ -109,24 +108,7 @@ public:
|
||||
|
||||
virtual future<>
|
||||
on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override {
|
||||
co_await utils::get_local_injector().inject_with_handler("raft_group_registry::on_alive", [endpoint, ep_state] (auto& handler) -> future<> {
|
||||
auto app_state_ptr = ep_state->get_application_state_ptr(gms::application_state::HOST_ID);
|
||||
if (!app_state_ptr) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
raft::server_id id(utils::UUID(app_state_ptr->value()));
|
||||
rslog.info("gossiper_state_change_subscriber_proxy::on_alive() {} {}", endpoint, id);
|
||||
auto second_node_ip = handler.get("second_node_ip");
|
||||
assert(second_node_ip);
|
||||
|
||||
if (endpoint == gms::inet_address(sstring{*second_node_ip})) {
|
||||
rslog.info("Sleeping before handling on_alive");
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{1});
|
||||
rslog.info("Finished Sleeping before handling on_alive");
|
||||
}
|
||||
});
|
||||
co_await on_endpoint_change(endpoint, ep_state);
|
||||
return on_endpoint_change(endpoint, ep_state);
|
||||
}
|
||||
|
||||
virtual future<>
|
||||
|
||||
@@ -559,9 +559,16 @@ future<> storage_service::topology_state_load() {
|
||||
// will be up to date and reachable at the time of restart.
|
||||
const auto tmptr = get_token_metadata_ptr();
|
||||
for (const auto& e: tmptr->get_all_endpoints()) {
|
||||
if (is_me(e)) {
|
||||
continue;
|
||||
}
|
||||
const auto ep = tmptr->get_endpoint_for_host_id(e);
|
||||
if (!is_me(e) && !_gossiper.get_endpoint_state_ptr(ep)) {
|
||||
co_await _gossiper.add_saved_endpoint(ep);
|
||||
auto permit = co_await _gossiper.lock_endpoint(ep, gms::null_permit_id);
|
||||
// Add the endpoint if it doesn't exist yet in gossip
|
||||
// since it is not loaded in join_cluster in the
|
||||
// _raft_topology_change_enabled case.
|
||||
if (!_gossiper.get_endpoint_state_ptr(ep)) {
|
||||
co_await _gossiper.add_saved_endpoint(ep, permit.id());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3077,7 +3084,9 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
}
|
||||
co_await _gossiper.reset_endpoint_state_map();
|
||||
for (auto ep : loaded_endpoints) {
|
||||
co_await _gossiper.add_saved_endpoint(ep);
|
||||
// gossiping hasn't started yet
|
||||
// so no need to lock the endpoint
|
||||
co_await _gossiper.add_saved_endpoint(ep, gms::null_permit_id);
|
||||
}
|
||||
}
|
||||
auto features = _feature_service.supported_feature_set();
|
||||
@@ -4289,7 +4298,9 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
|
||||
co_await tmptr->update_normal_tokens(tokens, hostIdIt->second);
|
||||
tmptr->update_host_id(hostIdIt->second, ep);
|
||||
loaded_endpoints.insert(ep);
|
||||
co_await _gossiper.add_saved_endpoint(ep);
|
||||
// gossiping hasn't started yet
|
||||
// so no need to lock the endpoint
|
||||
co_await _gossiper.add_saved_endpoint(ep, gms::null_permit_id);
|
||||
}
|
||||
}
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
|
||||
@@ -20,8 +20,8 @@ logger = logging.getLogger(__name__)
|
||||
async def test_old_ip_notification_repro(manager: ManagerClient) -> None:
|
||||
"""
|
||||
Regression test for #14257.
|
||||
It starts two nodes. It introduces a sleep in raft_group_registry::on_alive
|
||||
(in raft_group_registry.cc) when receiving a gossip notification about
|
||||
It starts two nodes. It introduces a sleep in gossiper::real_mark_alive
|
||||
when receiving a gossip notification about
|
||||
HOST_ID update from the second node. Then it restarts the second node with
|
||||
a different IP. Due to the sleep, the old notification from the old IP arrives
|
||||
after the second node has restarted. If the bug is present, this notification
|
||||
@@ -30,7 +30,7 @@ async def test_old_ip_notification_repro(manager: ManagerClient) -> None:
|
||||
"""
|
||||
s1 = await manager.server_add()
|
||||
s2 = await manager.server_add(start=False)
|
||||
async with inject_error(manager.api, s1.ip_addr, 'raft_group_registry::on_alive',
|
||||
async with inject_error(manager.api, s1.ip_addr, 'gossiper::real_mark_alive',
|
||||
parameters={ "second_node_ip": s2.ip_addr }) as handler:
|
||||
# This injection delays the gossip notification from the initial IP of s2.
|
||||
logger.info(f"Starting {s2}")
|
||||
|
||||
Reference in New Issue
Block a user