From c17df1759e6ab14d27e55fa81bc778c8dd973ae1 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 14 Jan 2023 19:52:56 +0200 Subject: [PATCH] topology: add node state Add a simple node state model with: `joining`, `normal`, `leaving`, and `left` states to help managing nodes during replace with the the same ip address. Later on, this could also help prevent nodes that were decommissioned, removed, or replaced from rejoining the cluster. Signed-off-by: Benny Halevy --- locator/token_metadata.cc | 10 +-- locator/token_metadata.hh | 2 +- locator/topology.cc | 76 ++++++++++++++------ locator/topology.hh | 41 ++++++++--- repair/repair.cc | 4 +- service/storage_service.cc | 20 +++--- test/boost/network_topology_strategy_test.cc | 6 +- 7 files changed, 107 insertions(+), 52 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index faf73074a0..7ce338083d 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -102,8 +102,8 @@ public: return _bootstrap_tokens; } - void update_topology(inet_address ep, endpoint_dc_rack dr) { - _topology.add_or_update_endpoint(ep, std::move(dr)); + void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st) { + _topology.add_or_update_endpoint(ep, std::move(dr), std::move(opt_st)); } /** @@ -852,7 +852,7 @@ void token_metadata_impl::calculate_pending_ranges_for_bootstrap( for (auto& x : tmp) { auto& endpoint = x.first; auto& tokens = x.second; - all_left_metadata->update_topology(endpoint, get_dc_rack(endpoint)); + all_left_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::joining); all_left_metadata->update_normal_tokens(tokens, endpoint).get(); auto address_ranges = strategy.get_ranges(endpoint, *all_left_metadata).get0(); for (const dht::token_range& x : address_ranges) { @@ -1025,8 +1025,8 @@ token_metadata::get_bootstrap_tokens() const { } void -token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr) { - _impl->update_topology(ep, std::move(dr)); +token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st) { + _impl->update_topology(ep, std::move(dr), std::move(opt_st)); } boost::iterator_range diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 24f967edd3..25e5863e48 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -121,7 +121,7 @@ public: /** * Update or add endpoint given its inet_address and endpoint_dc_rack. */ - void update_topology(inet_address ep, endpoint_dc_rack dr); + void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st = std::nullopt); /** * Creates an iterable range of the sorted tokens starting at the token next * after the given one. diff --git a/locator/topology.cc b/locator/topology.cc index 7b52ca3f55..490fe5cb17 100644 --- a/locator/topology.cc +++ b/locator/topology.cc @@ -25,21 +25,33 @@ thread_local const endpoint_dc_rack endpoint_dc_rack::default_location = { .rack = locator::production_snitch_base::default_rack, }; -node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, this_node is_this_node, node::idx_type idx) +node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node, node::idx_type idx) : _topology(topology) , _host_id(id) , _endpoint(endpoint) , _dc_rack(std::move(dc_rack)) + , _state(state) , _is_this_node(is_this_node) , _idx(idx) {} -node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, node::this_node is_this_node, node::idx_type idx) { - return std::make_unique(topology, std::move(id), std::move(endpoint), std::move(dc_rack), is_this_node, idx); +node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node, node::idx_type idx) { + return std::make_unique(topology, std::move(id), std::move(endpoint), std::move(dc_rack), std::move(state), is_this_node, idx); } node_holder node::clone() const { - return make(nullptr, host_id(), endpoint(), dc_rack(), is_this_node()); + return make(nullptr, host_id(), endpoint(), dc_rack(), get_state(), is_this_node()); +} + +std::string node::to_string(node::state s) { + switch (s) { + case state::none: return "none"; + case state::joining: return "joining"; + case state::normal: return "normal"; + case state::leaving: return "leaving"; + case state::left: return "left"; + } + __builtin_unreachable(); } future<> topology::clear_gently() noexcept { @@ -66,7 +78,7 @@ topology::topology(config cfg) tlogger.trace("topology[{}]: constructing using config: host_id={} endpoint={} dc={} rack={}", fmt::ptr(this), cfg.this_host_id, cfg.this_endpoint, cfg.local_dc_rack.dc, cfg.local_dc_rack.rack); if (cfg.this_host_id || cfg.this_endpoint != inet_address{}) { - add_node(node::make(this, cfg.this_host_id, cfg.this_endpoint, cfg.local_dc_rack, node::this_node::yes)); + add_node(node::make(this, cfg.this_host_id, cfg.this_endpoint, cfg.local_dc_rack, node::state::joining, node::this_node::yes)); } } @@ -109,11 +121,11 @@ std::string topology::debug_format(const node* node) { if (!node) { return format("node={}", fmt::ptr(node)); } - return format("node={} idx={} host_id={} endpoint={} dc={} rack={} this_node={}", fmt::ptr(node), - node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, bool(node->is_this_node())); + return format("node={} idx={} host_id={} endpoint={} dc={} rack={} state={} this_node={}", fmt::ptr(node), + node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, node::to_string(node->get_state()), bool(node->is_this_node())); } -const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr) { +const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state) { if (dr.dc.empty() || dr.rack.empty()) { on_internal_error(tlogger, "Node must have valid dc and rack"); } @@ -122,7 +134,7 @@ const node* topology::add_node(host_id id, const inet_address& ep, const endpoin on_internal_error(tlogger, format("topology[{}]: local node already set: host_id={} endpoint={} dc={} rack={}: currently mapped to {}", fmt::ptr(this), id, ep, dr.dc, dr.rack, debug_format(this_node()))); } - return add_node(node::make(this, id, ep, dr)); + return add_node(node::make(this, id, ep, dr, state)); } const node* topology::add_node(node_holder nptr) { @@ -158,13 +170,15 @@ const node* topology::add_node(node_holder nptr) { return node; } -const node* topology::update_node(node* node, std::optional opt_id, std::optional opt_ep, std::optional opt_dr) { - tlogger.debug("topology[{}]: update_node: {}: to: host_id={} endpoint={} dc={} rack={}, at {}", fmt::ptr(this), debug_format(node), +const node* topology::update_node(node* node, std::optional opt_id, std::optional opt_ep, std::optional opt_dr, std::optional opt_st) { + tlogger.debug("topology[{}]: update_node: {}: to: host_id={} endpoint={} dc={} rack={} state={}, at {}", fmt::ptr(this), debug_format(node), opt_id ? format("{}", *opt_id) : "unchanged", opt_ep ? format("{}", *opt_ep) : "unchanged", opt_dr ? format("{}", opt_dr->dc) : "unchanged", opt_dr ? format("{}", opt_dr->rack) : "unchanged", + opt_st ? format("{}", *opt_st) : "unchanged", current_backtrace()); + bool changed = false; if (opt_id) { if (*opt_id != node->host_id()) { @@ -205,6 +219,9 @@ const node* topology::update_node(node* node, std::optional opt_id, std opt_dr.reset(); } } + if (opt_st) { + changed = node->get_state() != *opt_st; + } if (!changed) { return node; @@ -223,6 +240,9 @@ const node* topology::update_node(node* node, std::optional opt_id, std if (opt_dr) { mutable_node->_dc_rack = std::move(*opt_dr); } + if (opt_st) { + mutable_node->set_state(*opt_st); + } } catch (...) { std::terminate(); } @@ -267,12 +287,19 @@ void topology::index_node(const node* node) { } } if (node->endpoint() != inet_address{}) { - auto [eit, inserted_endpoint] = _nodes_by_endpoint.emplace(node->endpoint(), node); - if (!inserted_endpoint) { - if (node->host_id()) { - _nodes_by_host_id.erase(node->host_id()); + auto eit = _nodes_by_endpoint.find(node->endpoint()); + if (eit != _nodes_by_endpoint.end()) { + if (eit->second->get_state() == node::state::leaving || eit->second->get_state() == node::state::left) { + _nodes_by_endpoint.erase(node->endpoint()); + } else if (node->get_state() != node::state::leaving && node->get_state() != node::state::left) { + if (node->host_id()) { + _nodes_by_host_id.erase(node->host_id()); + } + on_internal_error(tlogger, format("topology[{}]: {}: node endpoint already mapped to {}", fmt::ptr(this), debug_format(node), debug_format(eit->second))); } - on_internal_error(tlogger, format("topology[{}]: {}: node endpoint already mapped to {}", fmt::ptr(this), debug_format(node), debug_format(eit->second))); + } + if (node->get_state() != node::state::left) { + _nodes_by_endpoint.try_emplace(node->endpoint(), node); } } @@ -381,18 +408,18 @@ const node* topology::find_node(node::idx_type idx) const noexcept { return _nodes.at(idx).get(); } -const node* topology::add_or_update_endpoint(inet_address ep, std::optional opt_id, std::optional opt_dr) +const node* topology::add_or_update_endpoint(inet_address ep, std::optional opt_id, std::optional opt_dr, std::optional opt_st) { - tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={}, at {}", fmt::ptr(this), - ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, + tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={} state={}, at {}", fmt::ptr(this), + ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none), current_backtrace()); auto n = find_node(ep); if (n) { - return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr)); + return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st)); } else if (opt_id && (n = find_node(*opt_id))) { - return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr)); + return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr), std::move(opt_st)); } else { - return add_node(opt_id.value_or(host_id::create_null_id()), ep, opt_dr.value_or(endpoint_dc_rack::default_location)); + return add_node(opt_id.value_or(host_id::create_null_id()), ep, opt_dr.value_or(endpoint_dc_rack::default_location), opt_st.value_or(node::state::normal)); } } @@ -477,4 +504,9 @@ std::ostream& operator<<(std::ostream& out, const locator::node& node) { return out; } +std::ostream& operator<<(std::ostream& out, const locator::node::state& state) { + fmt::print(out, "{}", state); + return out; +} + } // namespace std diff --git a/locator/topology.hh b/locator/topology.hh index 7416c2a436..d67701e325 100644 --- a/locator/topology.hh +++ b/locator/topology.hh @@ -36,18 +36,27 @@ public: using this_node = bool_class; using idx_type = int; + enum class state { + none = 0, + joining, // while bootstrapping, replacing + normal, + leaving, // while decommissioned, removed, replaced + left // after decommissioned, removed, replaced + }; + private: const topology* _topology; host_id _host_id; inet_address _endpoint; endpoint_dc_rack _dc_rack; + state _state; // Is this node the `localhost` instance this_node _is_this_node; idx_type _idx = -1; public: - node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, this_node is_this_node = this_node::no, idx_type idx = -1); + node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node = this_node::no, idx_type idx = -1); node(const node&) = delete; node(node&&) = delete; @@ -74,12 +83,17 @@ public: // idx < 0 means "unassigned" idx_type idx() const noexcept { return _idx; } + state get_state() const noexcept { return _state; } + + static std::string to_string(state); + private: - static node_holder make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, node::this_node is_this_node = this_node::no, idx_type idx = -1); + static node_holder make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node = this_node::no, idx_type idx = -1); node_holder clone() const; void set_topology(const locator::topology* topology) noexcept { _topology = topology; } void set_idx(idx_type idx) noexcept { _idx = idx; } + void set_state(state state) noexcept { _state = state; } friend class topology; }; @@ -106,12 +120,12 @@ public: } // Adds a node with given host_id, endpoint, and DC/rack. - const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr); + const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state); // Optionally updates node's current host_id, endpoint, or DC/rack. // Note: the host_id may be updated from null to non-null after a new node gets a new, random host_id, // or a peer node host_id may be updated when the node is replaced with another node using the same ip address. - const node* update_node(node* node, std::optional opt_id, std::optional opt_ep, std::optional opt_dr); + const node* update_node(node* node, std::optional opt_id, std::optional opt_ep, std::optional opt_dr, std::optional opt_st); // Removes a node using its host_id // Returns true iff the node was found and removed. @@ -138,14 +152,14 @@ public: * * Adds or updates a node with given endpoint */ - const node* add_or_update_endpoint(inet_address ep, std::optional opt_id, std::optional opt_dr); + const node* add_or_update_endpoint(inet_address ep, std::optional opt_id, std::optional opt_dr, std::optional opt_st); // Legacy entry point from token_metadata::update_topology - const node* add_or_update_endpoint(inet_address ep, endpoint_dc_rack dr) { - return add_or_update_endpoint(ep, std::nullopt, std::move(dr)); + const node* add_or_update_endpoint(inet_address ep, endpoint_dc_rack dr, std::optional opt_st) { + return add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st)); } const node* add_or_update_endpoint(inet_address ep, host_id id) { - return add_or_update_endpoint(ep, id, std::nullopt); + return add_or_update_endpoint(ep, id, std::nullopt, std::nullopt); } /** @@ -298,7 +312,8 @@ public: namespace std { -std::ostream& operator<<(std::ostream& out, const locator::node* node); +std::ostream& operator<<(std::ostream& out, const locator::node& node); +std::ostream& operator<<(std::ostream& out, const locator::node::state& state); } // namespace std @@ -309,3 +324,11 @@ struct fmt::formatter : fmt::formatter { return fmt::format_to(ctx.out(), "{}/{}", node.host_id(), node.endpoint()); } }; + +template <> +struct fmt::formatter : fmt::formatter { + template + auto format(const locator::node::state& state, FormatContext& ctx) const { + return fmt::format_to(ctx.out(), "{}", locator::node::to_string(state)); + } +}; diff --git a/repair/repair.cc b/repair/repair.cc index ba2f366a0e..e322760cf6 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1400,7 +1400,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr auto range_addresses = strat.get_range_addresses(metadata_clone).get0(); //Pending ranges - metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack()); + metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack(), locator::node::state::joining); metadata_clone.update_normal_tokens(tokens, myip).get(); auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0(); metadata_clone.clear_gently().get(); @@ -1857,7 +1857,7 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, // update a cloned version of tmptr // no need to set the original version auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm)); - cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack()); + cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::joining); co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address()); co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes)); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 63fb96417e..c4a65eb9d9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -983,7 +983,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}", get_broadcast_address() == *replace_address ? "the same" : "a different", get_broadcast_address(), *replace_address); - tmptr->update_topology(*replace_address, std::move(ri->dc_rack)); + tmptr->update_topology(*replace_address, std::move(ri->dc_rack), locator::node::state::leaving); co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address); replaced_host_id = ri->host_id; } @@ -1022,7 +1022,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi // This node must know about its chosen tokens before other nodes do // since they may start sending writes to this node after it gossips status = NORMAL. // Therefore we update _token_metadata now, before gossip starts. - tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack()); + tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::normal); co_await tmptr->update_normal_tokens(my_tokens, get_broadcast_address()); cdc_gen_id = co_await _sys_ks.local().get_cdc_generation_id(); @@ -1257,7 +1257,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi // This node must know about its chosen tokens before other nodes do // since they may start sending writes to this node after it gossips status = NORMAL. // Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now. - tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack()); + tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::normal); return tmptr->update_normal_tokens(bootstrap_tokens, get_broadcast_address()); }); @@ -1429,7 +1429,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens); mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) { auto endpoint = get_broadcast_address(); - tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack()); + tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::joining); tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint); return update_pending_ranges(std::move(tmptr), format("bootstrapping node {}", endpoint)); }).get(); @@ -1558,7 +1558,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) { tmptr->remove_endpoint(endpoint); } - tmptr->update_topology(endpoint, get_dc_rack_for(endpoint)); + tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining); tmptr->add_bootstrap_tokens(tokens, endpoint); if (_gossiper.uses_host_id(endpoint)) { tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint); @@ -1700,7 +1700,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) { do_notify_joined = true; } - tmptr->update_topology(endpoint, get_dc_rack_for(endpoint)); + tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::normal); co_await tmptr->update_normal_tokens(owned_tokens, endpoint); } @@ -1752,7 +1752,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) { // FIXME: this code should probably resolve token collisions too, like handle_state_normal slogger.info("Node {} state jump to leaving", endpoint); - tmptr->update_topology(endpoint, get_dc_rack_for(endpoint)); + tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::leaving); co_await tmptr->update_normal_tokens(tokens, endpoint); } else { auto tokens_ = tmptr->get_tokens(endpoint); @@ -2175,7 +2175,7 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service, // entry has been mistakenly added, delete it _sys_ks.local().remove_endpoint(ep).get(); } else { - tmptr->update_topology(ep, get_dc_rack(ep)); + tmptr->update_topology(ep, get_dc_rack(ep), locator::node::state::normal); tmptr->update_normal_tokens(tokens, ep).get(); if (loaded_host_ids.contains(ep)) { tmptr->update_host_id(loaded_host_ids.at(ep), ep); @@ -3589,7 +3589,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad auto existing_node = x.first; auto replacing_node = x.second; slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); - tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node)); + tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::node::state::joining); tmptr->add_replacing_endpoint(existing_node, replacing_node); } return make_ready_future<>(); @@ -3641,7 +3641,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad auto& endpoint = x.first; auto tokens = std::unordered_set(x.second.begin(), x.second.end()); slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); - tmptr->update_topology(endpoint, get_dc_rack_for(endpoint)); + tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining); tmptr->add_bootstrap_tokens(tokens, endpoint); } return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index d55d780cff..b84b2112a2 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -224,7 +224,7 @@ void simple_test() { for (const auto& [ring_point, endpoint, id] : ring_points) { std::unordered_set tokens; tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())}); - topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint)); + topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal); co_await tm.update_normal_tokens(std::move(tokens), endpoint); } }).get(); @@ -328,7 +328,7 @@ void heavy_origin_test() { stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { - topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint)); + topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal); co_await tm.update_normal_tokens(std::move(tokens[endpoint]), endpoint); } }).get(); @@ -559,7 +559,7 @@ void generate_topology(topology& topo, const std::unordered_map const sstring& dc = dcs[udist(0, dcs.size() - 1)(e1)]; auto rc = racks_per_dc.at(dc); auto r = udist(0, rc)(e1); - topo.add_node(host_id::create_random_id(), node, {dc, to_sstring(r)}); + topo.add_node(host_id::create_random_id(), node, {dc, to_sstring(r)}, locator::node::state::normal); } }