locator: topology: node::state: make fine grained
Currently the node::state is coarse grained so one cannot distinguish between e.g. a leaving node due to decommission (where the node is used for reading) vs. due to remove node (where the node is not used for reading). Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -48,9 +48,12 @@ node_holder node::clone() const {
|
||||
std::string node::to_string(node::state s) {
|
||||
switch (s) {
|
||||
case state::none: return "none";
|
||||
case state::joining: return "joining";
|
||||
case state::bootstrapping: return "bootstrapping";
|
||||
case state::replacing: return "replacing";
|
||||
case state::normal: return "normal";
|
||||
case state::leaving: return "leaving";
|
||||
case state::being_decommissioned: return "being_decommissioned";
|
||||
case state::being_removed: return "being_removed";
|
||||
case state::being_replaced: return "being_replaced";
|
||||
case state::left: return "left";
|
||||
}
|
||||
__builtin_unreachable();
|
||||
@@ -311,9 +314,9 @@ void topology::index_node(const node* node) {
|
||||
if (node->endpoint() != inet_address{}) {
|
||||
auto eit = _nodes_by_endpoint.find(node->endpoint());
|
||||
if (eit != _nodes_by_endpoint.end()) {
|
||||
if (eit->second->get_state() == node::state::leaving || eit->second->get_state() == node::state::left) {
|
||||
if (eit->second->is_leaving() || eit->second->left()) {
|
||||
_nodes_by_endpoint.erase(node->endpoint());
|
||||
} else if (node->get_state() != node::state::leaving && node->get_state() != node::state::left) {
|
||||
} else if (!node->is_leaving() && !node->left()) {
|
||||
if (node->host_id()) {
|
||||
_nodes_by_host_id.erase(node->host_id());
|
||||
}
|
||||
|
||||
@@ -47,9 +47,12 @@ public:
|
||||
|
||||
enum class state {
|
||||
none = 0,
|
||||
joining, // while bootstrapping, replacing
|
||||
bootstrapping, // (joining)
|
||||
replacing, // (joining)
|
||||
normal,
|
||||
leaving, // while decommissioned, removed, replaced
|
||||
being_decommissioned, // (leaving)
|
||||
being_removed, // (leaving)
|
||||
being_replaced, // (leaving)
|
||||
left // after decommissioned, removed, replaced
|
||||
};
|
||||
|
||||
@@ -102,6 +105,35 @@ public:
|
||||
|
||||
state get_state() const noexcept { return _state; }
|
||||
|
||||
bool is_joining() const noexcept {
|
||||
switch (_state) {
|
||||
case state::bootstrapping:
|
||||
case state::replacing:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool is_normal() const noexcept {
|
||||
return _state == state::normal;
|
||||
}
|
||||
|
||||
bool is_leaving() const noexcept {
|
||||
switch (_state) {
|
||||
case state::being_decommissioned:
|
||||
case state::being_removed:
|
||||
case state::being_replaced:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool left() const noexcept {
|
||||
return _state == state::left;
|
||||
}
|
||||
|
||||
shard_id get_shard_count() const noexcept { return _shard_count; }
|
||||
|
||||
static std::string to_string(state);
|
||||
|
||||
@@ -1425,7 +1425,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
|
||||
|
||||
//Pending ranges
|
||||
metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack(), locator::node::state::joining);
|
||||
metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack(), locator::node::state::bootstrapping);
|
||||
metadata_clone.update_normal_tokens(tokens, myip).get();
|
||||
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
|
||||
metadata_clone.clear_gently().get();
|
||||
@@ -1882,7 +1882,7 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr,
|
||||
// update a cloned version of tmptr
|
||||
// no need to set the original version
|
||||
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
|
||||
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::joining);
|
||||
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::replacing);
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
|
||||
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes));
|
||||
}
|
||||
|
||||
@@ -2086,7 +2086,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
|
||||
get_broadcast_address() == *replace_address ? "the same" : "a different",
|
||||
get_broadcast_address(), *replace_address);
|
||||
tmptr->update_topology(*replace_address, std::move(ri->dc_rack), locator::node::state::leaving);
|
||||
tmptr->update_topology(*replace_address, std::move(ri->dc_rack), locator::node::state::being_replaced);
|
||||
co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address);
|
||||
replaced_host_id = ri->host_id;
|
||||
}
|
||||
@@ -2556,7 +2556,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
|
||||
slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens);
|
||||
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::joining);
|
||||
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::bootstrapping);
|
||||
tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint);
|
||||
return update_topology_change_info(std::move(tmptr), ::format("bootstrapping node {}", endpoint));
|
||||
}).get();
|
||||
@@ -2674,7 +2674,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
|
||||
tmptr->remove_endpoint(endpoint);
|
||||
}
|
||||
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining);
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::bootstrapping);
|
||||
tmptr->add_bootstrap_tokens(tokens, endpoint);
|
||||
if (_gossiper.uses_host_id(endpoint)) {
|
||||
tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint);
|
||||
@@ -2869,7 +2869,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) {
|
||||
// FIXME: this code should probably resolve token collisions too, like handle_state_normal
|
||||
slogger.info("Node {} state jump to leaving", endpoint);
|
||||
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::leaving);
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::being_decommissioned);
|
||||
co_await tmptr->update_normal_tokens(tokens, endpoint);
|
||||
} else {
|
||||
auto tokens_ = tmptr->get_tokens(endpoint);
|
||||
@@ -4673,7 +4673,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::node::state::joining);
|
||||
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::node::state::replacing);
|
||||
tmptr->add_replacing_endpoint(existing_node, replacing_node);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
@@ -4725,7 +4725,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining);
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::bootstrapping);
|
||||
tmptr->add_bootstrap_tokens(tokens, endpoint);
|
||||
}
|
||||
return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));
|
||||
|
||||
@@ -144,15 +144,15 @@ SEASTAR_THREAD_TEST_CASE(test_update_node) {
|
||||
BOOST_REQUIRE(topo.get_location(id1) == dc_rack2);
|
||||
BOOST_REQUIRE(topo.get_location(ep1) == dc_rack2);
|
||||
|
||||
BOOST_REQUIRE_NE(node->get_state(), locator::node::state::leaving);
|
||||
node = topo.update_node(mutable_node, std::nullopt, std::nullopt, std::nullopt, locator::node::state::leaving);
|
||||
BOOST_REQUIRE_NE(node->get_state(), locator::node::state::being_decommissioned);
|
||||
node = topo.update_node(mutable_node, std::nullopt, std::nullopt, std::nullopt, locator::node::state::being_decommissioned);
|
||||
mutable_node = const_cast<locator::node*>(node);
|
||||
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::leaving);
|
||||
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::being_decommissioned);
|
||||
|
||||
auto dc_rack3 = endpoint_dc_rack{"DC3", "RACK3"};
|
||||
// Note: engage state option, but keep node::state value the same
|
||||
// to reproduce #13502
|
||||
node = topo.update_node(mutable_node, std::nullopt, ep3, dc_rack3, locator::node::state::leaving);
|
||||
node = topo.update_node(mutable_node, std::nullopt, ep3, dc_rack3, locator::node::state::being_decommissioned);
|
||||
mutable_node = const_cast<locator::node*>(node);
|
||||
BOOST_REQUIRE_EQUAL(topo.find_node(id1), node);
|
||||
BOOST_REQUIRE_EQUAL(topo.find_node(ep1), nullptr);
|
||||
@@ -161,7 +161,7 @@ SEASTAR_THREAD_TEST_CASE(test_update_node) {
|
||||
BOOST_REQUIRE(topo.get_location(id1) == dc_rack3);
|
||||
BOOST_REQUIRE(topo.get_location(ep2) == endpoint_dc_rack::default_location);
|
||||
BOOST_REQUIRE(topo.get_location(ep3) == dc_rack3);
|
||||
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::leaving);
|
||||
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::being_decommissioned);
|
||||
|
||||
// In state::left the ndoe will remain indexed only by its host_id
|
||||
node = topo.update_node(mutable_node, std::nullopt, std::nullopt, std::nullopt, locator::node::state::left);
|
||||
|
||||
@@ -954,7 +954,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
|
||||
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
co_await tm.clear_gently();
|
||||
tm.get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::leaving);
|
||||
tm.get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::being_decommissioned);
|
||||
}).get();
|
||||
|
||||
n1 = stm.get()->get_topology().find_node(host1);
|
||||
|
||||
Reference in New Issue
Block a user