gossiper: modify endpoint state only via replicate

And restrict the accessor methods to return const pointers
or refrences.

With that, the endpoint_state_ptr:s held in the _endpoint_state_map
point to immutable endpoint_state objects - with one exception:
the endpoint_state update_timestamp may be updated in place,
but the endpoint_state_map is immutable.

replicate() replaces the endpoint_state_ptr in the map
with a new one to maintain immutability.

A later change will also make this exception safe so
replicate will guarantee strong exception safety so that all shards
are updated or none of them.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-07-21 11:49:18 +03:00
parent d00e49a1bb
commit 1d04242a90
2 changed files with 47 additions and 46 deletions

View File

@@ -568,9 +568,9 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
// If state does not exist just add it. If it does then add it if the remote generation is greater.
// If there is a generation tie, attempt to break it by heartbeat version.
auto permit = co_await this->lock_endpoint(node, null_permit_id);
auto* es = this->get_mutable_endpoint_state_ptr(node);
auto es = this->get_endpoint_state_ptr(node);
if (es) {
endpoint_state& local_state = *es;
endpoint_state local_state = *es;
auto local_generation = local_state.get_heart_beat_state().get_generation();
auto remote_generation = remote_state.get_heart_beat_state().get_generation();
logger.trace("{} local generation {}, remote generation {}", node, local_generation, remote_generation);
@@ -585,7 +585,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
co_await this->handle_major_state_change(node, remote_state, permit.id());
} else {
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
_endpoint_state_map[node] = make_endpoint_state_ptr(remote_state);
co_await replicate(node, remote_state, permit.id());
}
} else if (remote_generation == local_generation) {
if (listener_notification) {
@@ -602,6 +602,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
this->mark_alive(node);
}
} else {
bool update = false;
for (const auto& item : remote_state.get_application_state_map()) {
const auto& remote_key = item.first;
const auto& remote_value = item.second;
@@ -610,10 +611,16 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
logger.debug("Applying remote_state for node {} (remote generation = local generation), key={}, value={}",
node, remote_key, remote_value);
local_state.add_application_state(remote_key, remote_value);
update = true;
} else {
logger.debug("Ignoring remote_state for node {} (remote generation = local generation), key={}, value={}", node, remote_key, remote_value);
logger.trace("Ignoring remote_state for node {} (remote generation = local generation), key={}, value={}", node, remote_key, remote_value);
}
}
if (update) {
co_await replicate(node, std::move(local_state), permit.id());
} else {
logger.debug("Ignoring remote_state for node {} (remote generation = local generation)", node);
}
}
} else {
logger.debug("Ignoring remote generation {} < {}", remote_generation, local_generation);
@@ -623,7 +630,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
co_await this->handle_major_state_change(node, remote_state, permit.id());
} else {
logger.debug("Applying remote_state for node {} (new node)", node);
_endpoint_state_map[node] = make_endpoint_state_ptr(remote_state);
co_await replicate(node, remote_state, permit.id());
}
}
}
@@ -1251,12 +1258,12 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
#endif
}
future<> gossiper::replicate(inet_address ep, const endpoint_state& es, permit_id pid) {
future<> gossiper::replicate(inet_address ep, endpoint_state es, permit_id pid) {
verify_permit(ep, pid);
return container().invoke_on_all([ep, es, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
if (this_shard_id() != orig) {
g.get_or_create_endpoint_state(ep).add_application_state(es);
}
// FIXME: make exception safe to ensure that the state
// will end up consistent on all shards
return container().invoke_on_all([ep, es = std::move(es)] (gossiper& g) {
g._endpoint_state_map[ep] = make_endpoint_state_ptr(es);
});
}
@@ -1270,8 +1277,7 @@ future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
_endpoint_state_map[endpoint] = make_endpoint_state_ptr(std::move(eps));
co_await replicate(endpoint, eps, pid);
co_await replicate(endpoint, std::move(eps), pid);
// ensure at least one gossip round occurs before returning
co_await sleep_abortable(INTERVAL * 2, _abort_source);
}
@@ -1286,7 +1292,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
return seastar::async([&gossiper, g = gossiper.shared_from_this(), address] {
inet_address endpoint(address);
auto permit = gossiper.lock_endpoint(endpoint, null_permit_id).get0();
auto es = gossiper.get_mutable_endpoint_state_ptr(endpoint);
auto es = gossiper.get_endpoint_state_ptr(endpoint);
auto now = gossiper.now();
generation_type gen(std::chrono::duration_cast<std::chrono::seconds>((now + std::chrono::seconds(60)).time_since_epoch()).count());
version_type ver(9999);
@@ -1307,7 +1313,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
// make sure it did not change
sleep_abortable(ring_delay, gossiper._abort_source).get();
es = gossiper.get_mutable_endpoint_state_ptr(endpoint);
es = gossiper.get_endpoint_state_ptr(endpoint);
if (!es) {
logger.warn("Endpoint {} disappeared while trying to assassinate, continuing anyway", endpoint);
} else {
@@ -1401,16 +1407,11 @@ endpoint_state_ptr gossiper::get_endpoint_state_ptr(inet_address ep) const noexc
}
}
endpoint_state* gossiper::get_mutable_endpoint_state_ptr(inet_address ep) noexcept {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
return nullptr;
} else {
return const_cast<endpoint_state*>(it->second.get());
}
void gossiper::update_timestamp(const endpoint_state_ptr& eps) noexcept {
const_cast<endpoint_state&>(*eps).update_timestamp();
}
endpoint_state& gossiper::get_endpoint_state(inet_address ep) {
const endpoint_state& gossiper::get_endpoint_state(inet_address ep) const {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
throw std::out_of_range(format("ep={}", ep));
@@ -1562,7 +1563,7 @@ void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_
for (const auto& x : map) {
const gms::inet_address& endpoint = x.first;
const endpoint_state& remote_endpoint_state = x.second;
auto* local_endpoint_state = get_mutable_endpoint_state_ptr(endpoint);
auto local_endpoint_state = get_endpoint_state_ptr(endpoint);
if (local_endpoint_state) {
bool update = false;
auto local_generation = local_endpoint_state->get_heart_beat_state().get_generation();
@@ -1578,7 +1579,7 @@ void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_
}
if (update) {
logger.trace("Updated timestamp for node {}", endpoint);
local_endpoint_state->update_timestamp();
update_timestamp(local_endpoint_state);
}
}
}
@@ -1620,7 +1621,7 @@ future<> gossiper::real_mark_alive(inet_address addr) {
// After sending echo message, the Node might not be in the
// _endpoint_state_map anymore, use the reference of local_state
// might cause user-after-free
auto* es = get_mutable_endpoint_state_ptr(addr);
auto es = get_endpoint_state_ptr(addr);
if (!es) {
logger.info("Node {} is not in endpoint_state_map anymore", addr);
co_return;
@@ -1635,11 +1636,8 @@ future<> gossiper::real_mark_alive(inet_address addr) {
logger.debug("Mark Node {} alive after EchoMessage", addr);
auto& local_state = *es;
local_state.update_timestamp(); // prevents do_status_check from racing us and evicting if it was down > A_VERY_LONG_TIME
// Make a copy for endpoint_state because the code below can yield
endpoint_state state = local_state;
// prevents do_status_check from racing us and evicting if it was down > A_VERY_LONG_TIME
update_timestamp(es);
logger.debug("removing expire time for endpoint : {}", addr);
_unreachable_endpoints.erase(addr);
@@ -1668,8 +1666,8 @@ future<> gossiper::real_mark_alive(inet_address addr) {
logger.info("InetAddress {} is now UP, status = {}", addr, status);
}
co_await _subscribers.for_each([addr, state = std::move(state), pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await subscriber->on_alive(addr, state, pid);
co_await _subscribers.for_each([addr, es, pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await subscriber->on_alive(addr, *es, pid);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}
@@ -1701,7 +1699,6 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
}
}
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
_endpoint_state_map[ep] = make_endpoint_state_ptr(eps);
co_await replicate(ep, eps, pid);
if (is_in_shadow_round()) {
@@ -1968,7 +1965,7 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, std::map
generation_nbr = gms::generation_type(_force_gossip_generation());
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
endpoint_state& local_state = my_endpoint_state();
endpoint_state local_state = my_endpoint_state();
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
for (auto& entry : preload_local_states) {
local_state.add_application_state(entry.first, entry.second);
@@ -2138,10 +2135,10 @@ future<> gossiper::add_saved_endpoint(inet_address ep) {
if (host_id) {
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value()));
}
_endpoint_state_map[ep] = make_endpoint_state_ptr(ep_state);
auto generation = ep_state.get_heart_beat_state().get_generation();
co_await replicate(ep, ep_state, permit.id());
_unreachable_endpoints[ep] = now();
logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation());
logger.trace("Adding saved endpoint {} {}", ep, generation);
}
future<> gossiper::add_local_application_state(application_state state, versioned_value value) {
@@ -2196,11 +2193,12 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
gossiper.do_before_change_notifications(ep_addr, *ep_state_before, state, value).get();
}
auto es = gossiper.get_mutable_endpoint_state_ptr(ep_addr);
auto es = gossiper.get_endpoint_state_ptr(ep_addr);
if (!es) {
return;
}
auto local_state = *es;
for (auto& p : states) {
auto& state = p.first;
auto& value = p.second;
@@ -2209,14 +2207,14 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
// if another value with a newer version was received in the meantime:
value = versioned_value::clone_with_higher_version(value);
// Add to local application state
es->add_application_state(state, value);
local_state.add_application_state(state, value);
}
// It is OK to replicate the new endpoint_state
// after all application states were modified as a batch.
// We guarantee that the on_change notifications
// will be called in the order given by `states` anyhow.
gossiper.replicate(ep_addr, *es, permit.id()).get();
gossiper.replicate(ep_addr, std::move(local_state), permit.id()).get();
for (auto& p : states) {
auto& state = p.first;
@@ -2427,9 +2425,9 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application
*/
future<> gossiper::mark_as_shutdown(const inet_address& endpoint, permit_id pid) {
verify_permit(endpoint, pid);
auto es = get_mutable_endpoint_state_ptr(endpoint);
auto es = get_endpoint_state_ptr(endpoint);
if (es) {
auto& ep_state = *es;
auto ep_state = *es;
ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true));
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
co_await replicate(endpoint, ep_state, pid);

View File

@@ -274,7 +274,7 @@ private:
// Replicates given endpoint_state to all other shards.
// The state state doesn't have to be kept alive around until completes.
// Must be called under lock_endpoint.
future<> replicate(inet_address, const endpoint_state&, permit_id);
future<> replicate(inet_address, endpoint_state, permit_id);
public:
explicit gossiper(abort_source& as, const locator::shared_token_metadata& stm, netw::messaging_service& ms, const db::config& cfg, gossip_config gcfg);
@@ -416,6 +416,8 @@ public:
// Gets a shared pointer to the endpoint_state, if exists.
// Otherwise, returns a null ptr.
// The endpoint_state is immutable (except for its update_timestamp), guaranteed not to change while
// the endpoint_state_ptr is held.
endpoint_state_ptr get_endpoint_state_ptr(inet_address ep) const noexcept;
const versioned_value* get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept;
@@ -465,16 +467,17 @@ public:
*/
sstring get_rpc_address(const inet_address& endpoint) const;
private:
// FIXME: for now, allow modifying the endpoint_state in place
// until all updates are applied only using replicate
// FIXME: for now, allow modifying the endpoint_state's heartbeat_state in place
// Gets or creates endpoint_state for this node
endpoint_state& get_or_create_endpoint_state(inet_address ep);
endpoint_state& my_endpoint_state() {
return get_or_create_endpoint_state(get_broadcast_address());
}
endpoint_state* get_mutable_endpoint_state_ptr(inet_address ep) noexcept;
endpoint_state& get_endpoint_state(inet_address ep);
// Use with care, as the endpoint_state_ptr in the endpoint_state_map is considered
// immutable, with one exception - the update_timestamp.
void update_timestamp(const endpoint_state_ptr& eps) noexcept;
const endpoint_state& get_endpoint_state(inet_address ep) const;
void update_timestamp_for_nodes(const std::map<inet_address, endpoint_state>& map);