Merge 'storage_service: refresh_sync_nodes: restrict to normal token owners' from Benny Halevy

It is possible that topology will contain nodes that are no longer normal token owners, so they don't need to be sync'ed with.

Fixes scylladb/scylladb#14793

Closes #14798

* github.com:scylladb/scylladb:
  storage_service: refresh_sync_nodes: restrict to reachable token owners
  storage_service: refresh_sync_nodes: fix log message
  locator: topology: node::state: make fine grained
This commit is contained in:
Botond Dénes
2023-07-31 14:52:19 +03:00
6 changed files with 72 additions and 25 deletions

View File

@@ -47,11 +47,14 @@ node_holder node::clone() const {
std::string node::to_string(node::state s) {
switch (s) {
case state::none: return "none";
case state::joining: return "joining";
case state::normal: return "normal";
case state::leaving: return "leaving";
case state::left: return "left";
case state::none: return "none";
case state::bootstrapping: return "bootstrapping";
case state::replacing: return "replacing";
case state::normal: return "normal";
case state::being_decommissioned: return "being_decommissioned";
case state::being_removed: return "being_removed";
case state::being_replaced: return "being_replaced";
case state::left: return "left";
}
__builtin_unreachable();
}
@@ -311,9 +314,9 @@ void topology::index_node(const node* node) {
if (node->endpoint() != inet_address{}) {
auto eit = _nodes_by_endpoint.find(node->endpoint());
if (eit != _nodes_by_endpoint.end()) {
if (eit->second->get_state() == node::state::leaving || eit->second->get_state() == node::state::left) {
if (eit->second->is_leaving() || eit->second->left()) {
_nodes_by_endpoint.erase(node->endpoint());
} else if (node->get_state() != node::state::leaving && node->get_state() != node::state::left) {
} else if (!node->is_leaving() && !node->left()) {
if (node->host_id()) {
_nodes_by_host_id.erase(node->host_id());
}

View File

@@ -47,9 +47,12 @@ public:
enum class state {
none = 0,
joining, // while bootstrapping, replacing
bootstrapping, // (joining)
replacing, // (joining)
normal,
leaving, // while decommissioned, removed, replaced
being_decommissioned, // (leaving)
being_removed, // (leaving)
being_replaced, // (leaving)
left // after decommissioned, removed, replaced
};
@@ -102,6 +105,35 @@ public:
state get_state() const noexcept { return _state; }
bool is_joining() const noexcept {
switch (_state) {
case state::bootstrapping:
case state::replacing:
return true;
default:
return false;
}
}
bool is_normal() const noexcept {
return _state == state::normal;
}
bool is_leaving() const noexcept {
switch (_state) {
case state::being_decommissioned:
case state::being_removed:
case state::being_replaced:
return true;
default:
return false;
}
}
bool left() const noexcept {
return _state == state::left;
}
shard_id get_shard_count() const noexcept { return _shard_count; }
static std::string to_string(state);

View File

@@ -1425,7 +1425,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
//Pending ranges
metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack(), locator::node::state::joining);
metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack(), locator::node::state::bootstrapping);
metadata_clone.update_normal_tokens(tokens, myip).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
metadata_clone.clear_gently().get();
@@ -1882,7 +1882,7 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr,
// update a cloned version of tmptr
// no need to set the original version
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::joining);
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::replacing);
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes));
}

View File

@@ -2086,7 +2086,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
get_broadcast_address() == *replace_address ? "the same" : "a different",
get_broadcast_address(), *replace_address);
tmptr->update_topology(*replace_address, std::move(ri->dc_rack), locator::node::state::leaving);
tmptr->update_topology(*replace_address, std::move(ri->dc_rack), locator::node::state::being_replaced);
co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address);
replaced_host_id = ri->host_id;
}
@@ -2556,7 +2556,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens);
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
auto endpoint = get_broadcast_address();
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::joining);
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::bootstrapping);
tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint);
return update_topology_change_info(std::move(tmptr), ::format("bootstrapping node {}", endpoint));
}).get();
@@ -2674,7 +2674,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
tmptr->remove_endpoint(endpoint);
}
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::bootstrapping);
tmptr->add_bootstrap_tokens(tokens, endpoint);
if (_gossiper.uses_host_id(endpoint)) {
tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint);
@@ -2869,7 +2869,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) {
// FIXME: this code should probably resolve token collisions too, like handle_state_normal
slogger.info("Node {} state jump to leaving", endpoint);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::leaving);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::being_decommissioned);
co_await tmptr->update_normal_tokens(tokens, endpoint);
} else {
auto tokens_ = tmptr->get_tokens(endpoint);
@@ -3770,11 +3770,23 @@ public:
// sync data with all normal token owners
sync_nodes.clear();
const auto& topo = tmptr->get_topology();
auto can_sync_with_node = [] (const locator::node& node) {
// Sync with reachable token owners.
// Note that although nodes in `being_replaced` and `being_removed`
// are still token owners, they are known to be dead and can't be sync'ed with.
switch (node.get_state()) {
case locator::node::state::normal:
case locator::node::state::being_decommissioned:
return true;
default:
return false;
}
};
topo.for_each_node([&] (const locator::node* np) {
seastar::thread::maybe_yield();
// FIXME: use node* rather than endpoint
auto node = np->endpoint();
if (!ignore_nodes.contains(node) && sync_to_node(node)) {
if (!ignore_nodes.contains(node) && can_sync_with_node(*np) && sync_to_node(node)) {
sync_nodes.insert(node);
}
});
@@ -3790,7 +3802,7 @@ public:
throw std::runtime_error(msg);
}
slogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes);
slogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), sync_nodes, ignore_nodes);
}
future<> stop() noexcept {
@@ -4673,7 +4685,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
auto existing_node = x.first;
auto replacing_node = x.second;
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::node::state::joining);
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::node::state::replacing);
tmptr->add_replacing_endpoint(existing_node, replacing_node);
}
return make_ready_future<>();
@@ -4725,7 +4737,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
auto& endpoint = x.first;
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::bootstrapping);
tmptr->add_bootstrap_tokens(tokens, endpoint);
}
return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));

View File

@@ -144,15 +144,15 @@ SEASTAR_THREAD_TEST_CASE(test_update_node) {
BOOST_REQUIRE(topo.get_location(id1) == dc_rack2);
BOOST_REQUIRE(topo.get_location(ep1) == dc_rack2);
BOOST_REQUIRE_NE(node->get_state(), locator::node::state::leaving);
node = topo.update_node(mutable_node, std::nullopt, std::nullopt, std::nullopt, locator::node::state::leaving);
BOOST_REQUIRE_NE(node->get_state(), locator::node::state::being_decommissioned);
node = topo.update_node(mutable_node, std::nullopt, std::nullopt, std::nullopt, locator::node::state::being_decommissioned);
mutable_node = const_cast<locator::node*>(node);
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::leaving);
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::being_decommissioned);
auto dc_rack3 = endpoint_dc_rack{"DC3", "RACK3"};
// Note: engage state option, but keep node::state value the same
// to reproduce #13502
node = topo.update_node(mutable_node, std::nullopt, ep3, dc_rack3, locator::node::state::leaving);
node = topo.update_node(mutable_node, std::nullopt, ep3, dc_rack3, locator::node::state::being_decommissioned);
mutable_node = const_cast<locator::node*>(node);
BOOST_REQUIRE_EQUAL(topo.find_node(id1), node);
BOOST_REQUIRE_EQUAL(topo.find_node(ep1), nullptr);
@@ -161,7 +161,7 @@ SEASTAR_THREAD_TEST_CASE(test_update_node) {
BOOST_REQUIRE(topo.get_location(id1) == dc_rack3);
BOOST_REQUIRE(topo.get_location(ep2) == endpoint_dc_rack::default_location);
BOOST_REQUIRE(topo.get_location(ep3) == dc_rack3);
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::leaving);
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::being_decommissioned);
// In state::left the ndoe will remain indexed only by its host_id
node = topo.update_node(mutable_node, std::nullopt, std::nullopt, std::nullopt, locator::node::state::left);

View File

@@ -954,7 +954,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
co_await tm.clear_gently();
tm.get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::leaving);
tm.get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::being_decommissioned);
}).get();
n1 = stm.get()->get_topology().find_node(host1);