topology: add node state
Add a simple node state model with: `joining`, `normal`, `leaving`, and `left` states to help managing nodes during replace with the the same ip address. Later on, this could also help prevent nodes that were decommissioned, removed, or replaced from rejoining the cluster. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -102,8 +102,8 @@ public:
|
||||
return _bootstrap_tokens;
|
||||
}
|
||||
|
||||
void update_topology(inet_address ep, endpoint_dc_rack dr) {
|
||||
_topology.add_or_update_endpoint(ep, std::move(dr));
|
||||
void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st) {
|
||||
_topology.add_or_update_endpoint(ep, std::move(dr), std::move(opt_st));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -852,7 +852,7 @@ void token_metadata_impl::calculate_pending_ranges_for_bootstrap(
|
||||
for (auto& x : tmp) {
|
||||
auto& endpoint = x.first;
|
||||
auto& tokens = x.second;
|
||||
all_left_metadata->update_topology(endpoint, get_dc_rack(endpoint));
|
||||
all_left_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::joining);
|
||||
all_left_metadata->update_normal_tokens(tokens, endpoint).get();
|
||||
auto address_ranges = strategy.get_ranges(endpoint, *all_left_metadata).get0();
|
||||
for (const dht::token_range& x : address_ranges) {
|
||||
@@ -1025,8 +1025,8 @@ token_metadata::get_bootstrap_tokens() const {
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr) {
|
||||
_impl->update_topology(ep, std::move(dr));
|
||||
token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st) {
|
||||
_impl->update_topology(ep, std::move(dr), std::move(opt_st));
|
||||
}
|
||||
|
||||
boost::iterator_range<token_metadata::tokens_iterator>
|
||||
|
||||
@@ -121,7 +121,7 @@ public:
|
||||
/**
|
||||
* Update or add endpoint given its inet_address and endpoint_dc_rack.
|
||||
*/
|
||||
void update_topology(inet_address ep, endpoint_dc_rack dr);
|
||||
void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st = std::nullopt);
|
||||
/**
|
||||
* Creates an iterable range of the sorted tokens starting at the token next
|
||||
* after the given one.
|
||||
|
||||
@@ -25,21 +25,33 @@ thread_local const endpoint_dc_rack endpoint_dc_rack::default_location = {
|
||||
.rack = locator::production_snitch_base::default_rack,
|
||||
};
|
||||
|
||||
node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, this_node is_this_node, node::idx_type idx)
|
||||
node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node, node::idx_type idx)
|
||||
: _topology(topology)
|
||||
, _host_id(id)
|
||||
, _endpoint(endpoint)
|
||||
, _dc_rack(std::move(dc_rack))
|
||||
, _state(state)
|
||||
, _is_this_node(is_this_node)
|
||||
, _idx(idx)
|
||||
{}
|
||||
|
||||
node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, node::this_node is_this_node, node::idx_type idx) {
|
||||
return std::make_unique<node>(topology, std::move(id), std::move(endpoint), std::move(dc_rack), is_this_node, idx);
|
||||
node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node, node::idx_type idx) {
|
||||
return std::make_unique<node>(topology, std::move(id), std::move(endpoint), std::move(dc_rack), std::move(state), is_this_node, idx);
|
||||
}
|
||||
|
||||
node_holder node::clone() const {
|
||||
return make(nullptr, host_id(), endpoint(), dc_rack(), is_this_node());
|
||||
return make(nullptr, host_id(), endpoint(), dc_rack(), get_state(), is_this_node());
|
||||
}
|
||||
|
||||
std::string node::to_string(node::state s) {
|
||||
switch (s) {
|
||||
case state::none: return "none";
|
||||
case state::joining: return "joining";
|
||||
case state::normal: return "normal";
|
||||
case state::leaving: return "leaving";
|
||||
case state::left: return "left";
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
future<> topology::clear_gently() noexcept {
|
||||
@@ -66,7 +78,7 @@ topology::topology(config cfg)
|
||||
tlogger.trace("topology[{}]: constructing using config: host_id={} endpoint={} dc={} rack={}", fmt::ptr(this),
|
||||
cfg.this_host_id, cfg.this_endpoint, cfg.local_dc_rack.dc, cfg.local_dc_rack.rack);
|
||||
if (cfg.this_host_id || cfg.this_endpoint != inet_address{}) {
|
||||
add_node(node::make(this, cfg.this_host_id, cfg.this_endpoint, cfg.local_dc_rack, node::this_node::yes));
|
||||
add_node(node::make(this, cfg.this_host_id, cfg.this_endpoint, cfg.local_dc_rack, node::state::joining, node::this_node::yes));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,11 +121,11 @@ std::string topology::debug_format(const node* node) {
|
||||
if (!node) {
|
||||
return format("node={}", fmt::ptr(node));
|
||||
}
|
||||
return format("node={} idx={} host_id={} endpoint={} dc={} rack={} this_node={}", fmt::ptr(node),
|
||||
node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, bool(node->is_this_node()));
|
||||
return format("node={} idx={} host_id={} endpoint={} dc={} rack={} state={} this_node={}", fmt::ptr(node),
|
||||
node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, node::to_string(node->get_state()), bool(node->is_this_node()));
|
||||
}
|
||||
|
||||
const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr) {
|
||||
const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state) {
|
||||
if (dr.dc.empty() || dr.rack.empty()) {
|
||||
on_internal_error(tlogger, "Node must have valid dc and rack");
|
||||
}
|
||||
@@ -122,7 +134,7 @@ const node* topology::add_node(host_id id, const inet_address& ep, const endpoin
|
||||
on_internal_error(tlogger, format("topology[{}]: local node already set: host_id={} endpoint={} dc={} rack={}: currently mapped to {}", fmt::ptr(this),
|
||||
id, ep, dr.dc, dr.rack, debug_format(this_node())));
|
||||
}
|
||||
return add_node(node::make(this, id, ep, dr));
|
||||
return add_node(node::make(this, id, ep, dr, state));
|
||||
}
|
||||
|
||||
const node* topology::add_node(node_holder nptr) {
|
||||
@@ -158,13 +170,15 @@ const node* topology::add_node(node_holder nptr) {
|
||||
return node;
|
||||
}
|
||||
|
||||
const node* topology::update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr) {
|
||||
tlogger.debug("topology[{}]: update_node: {}: to: host_id={} endpoint={} dc={} rack={}, at {}", fmt::ptr(this), debug_format(node),
|
||||
const node* topology::update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st) {
|
||||
tlogger.debug("topology[{}]: update_node: {}: to: host_id={} endpoint={} dc={} rack={} state={}, at {}", fmt::ptr(this), debug_format(node),
|
||||
opt_id ? format("{}", *opt_id) : "unchanged",
|
||||
opt_ep ? format("{}", *opt_ep) : "unchanged",
|
||||
opt_dr ? format("{}", opt_dr->dc) : "unchanged",
|
||||
opt_dr ? format("{}", opt_dr->rack) : "unchanged",
|
||||
opt_st ? format("{}", *opt_st) : "unchanged",
|
||||
current_backtrace());
|
||||
|
||||
bool changed = false;
|
||||
if (opt_id) {
|
||||
if (*opt_id != node->host_id()) {
|
||||
@@ -205,6 +219,9 @@ const node* topology::update_node(node* node, std::optional<host_id> opt_id, std
|
||||
opt_dr.reset();
|
||||
}
|
||||
}
|
||||
if (opt_st) {
|
||||
changed = node->get_state() != *opt_st;
|
||||
}
|
||||
|
||||
if (!changed) {
|
||||
return node;
|
||||
@@ -223,6 +240,9 @@ const node* topology::update_node(node* node, std::optional<host_id> opt_id, std
|
||||
if (opt_dr) {
|
||||
mutable_node->_dc_rack = std::move(*opt_dr);
|
||||
}
|
||||
if (opt_st) {
|
||||
mutable_node->set_state(*opt_st);
|
||||
}
|
||||
} catch (...) {
|
||||
std::terminate();
|
||||
}
|
||||
@@ -267,14 +287,21 @@ void topology::index_node(const node* node) {
|
||||
}
|
||||
}
|
||||
if (node->endpoint() != inet_address{}) {
|
||||
auto [eit, inserted_endpoint] = _nodes_by_endpoint.emplace(node->endpoint(), node);
|
||||
if (!inserted_endpoint) {
|
||||
auto eit = _nodes_by_endpoint.find(node->endpoint());
|
||||
if (eit != _nodes_by_endpoint.end()) {
|
||||
if (eit->second->get_state() == node::state::leaving || eit->second->get_state() == node::state::left) {
|
||||
_nodes_by_endpoint.erase(node->endpoint());
|
||||
} else if (node->get_state() != node::state::leaving && node->get_state() != node::state::left) {
|
||||
if (node->host_id()) {
|
||||
_nodes_by_host_id.erase(node->host_id());
|
||||
}
|
||||
on_internal_error(tlogger, format("topology[{}]: {}: node endpoint already mapped to {}", fmt::ptr(this), debug_format(node), debug_format(eit->second)));
|
||||
}
|
||||
}
|
||||
if (node->get_state() != node::state::left) {
|
||||
_nodes_by_endpoint.try_emplace(node->endpoint(), node);
|
||||
}
|
||||
}
|
||||
|
||||
const auto& dc = node->dc_rack().dc;
|
||||
const auto& rack = node->dc_rack().rack;
|
||||
@@ -381,18 +408,18 @@ const node* topology::find_node(node::idx_type idx) const noexcept {
|
||||
return _nodes.at(idx).get();
|
||||
}
|
||||
|
||||
const node* topology::add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr)
|
||||
const node* topology::add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st)
|
||||
{
|
||||
tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={}, at {}", fmt::ptr(this),
|
||||
ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack,
|
||||
tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={} state={}, at {}", fmt::ptr(this),
|
||||
ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none),
|
||||
current_backtrace());
|
||||
auto n = find_node(ep);
|
||||
if (n) {
|
||||
return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr));
|
||||
return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st));
|
||||
} else if (opt_id && (n = find_node(*opt_id))) {
|
||||
return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr));
|
||||
return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr), std::move(opt_st));
|
||||
} else {
|
||||
return add_node(opt_id.value_or(host_id::create_null_id()), ep, opt_dr.value_or(endpoint_dc_rack::default_location));
|
||||
return add_node(opt_id.value_or(host_id::create_null_id()), ep, opt_dr.value_or(endpoint_dc_rack::default_location), opt_st.value_or(node::state::normal));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,4 +504,9 @@ std::ostream& operator<<(std::ostream& out, const locator::node& node) {
|
||||
return out;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const locator::node::state& state) {
|
||||
fmt::print(out, "{}", state);
|
||||
return out;
|
||||
}
|
||||
|
||||
} // namespace std
|
||||
|
||||
@@ -36,18 +36,27 @@ public:
|
||||
using this_node = bool_class<struct this_node_tag>;
|
||||
using idx_type = int;
|
||||
|
||||
enum class state {
|
||||
none = 0,
|
||||
joining, // while bootstrapping, replacing
|
||||
normal,
|
||||
leaving, // while decommissioned, removed, replaced
|
||||
left // after decommissioned, removed, replaced
|
||||
};
|
||||
|
||||
private:
|
||||
const topology* _topology;
|
||||
host_id _host_id;
|
||||
inet_address _endpoint;
|
||||
endpoint_dc_rack _dc_rack;
|
||||
state _state;
|
||||
|
||||
// Is this node the `localhost` instance
|
||||
this_node _is_this_node;
|
||||
idx_type _idx = -1;
|
||||
|
||||
public:
|
||||
node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, this_node is_this_node = this_node::no, idx_type idx = -1);
|
||||
node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node = this_node::no, idx_type idx = -1);
|
||||
|
||||
node(const node&) = delete;
|
||||
node(node&&) = delete;
|
||||
@@ -74,12 +83,17 @@ public:
|
||||
// idx < 0 means "unassigned"
|
||||
idx_type idx() const noexcept { return _idx; }
|
||||
|
||||
state get_state() const noexcept { return _state; }
|
||||
|
||||
static std::string to_string(state);
|
||||
|
||||
private:
|
||||
static node_holder make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, node::this_node is_this_node = this_node::no, idx_type idx = -1);
|
||||
static node_holder make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node = this_node::no, idx_type idx = -1);
|
||||
node_holder clone() const;
|
||||
|
||||
void set_topology(const locator::topology* topology) noexcept { _topology = topology; }
|
||||
void set_idx(idx_type idx) noexcept { _idx = idx; }
|
||||
void set_state(state state) noexcept { _state = state; }
|
||||
|
||||
friend class topology;
|
||||
};
|
||||
@@ -106,12 +120,12 @@ public:
|
||||
}
|
||||
|
||||
// Adds a node with given host_id, endpoint, and DC/rack.
|
||||
const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr);
|
||||
const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state);
|
||||
|
||||
// Optionally updates node's current host_id, endpoint, or DC/rack.
|
||||
// Note: the host_id may be updated from null to non-null after a new node gets a new, random host_id,
|
||||
// or a peer node host_id may be updated when the node is replaced with another node using the same ip address.
|
||||
const node* update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr);
|
||||
const node* update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st);
|
||||
|
||||
// Removes a node using its host_id
|
||||
// Returns true iff the node was found and removed.
|
||||
@@ -138,14 +152,14 @@ public:
|
||||
*
|
||||
* Adds or updates a node with given endpoint
|
||||
*/
|
||||
const node* add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr);
|
||||
const node* add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st);
|
||||
|
||||
// Legacy entry point from token_metadata::update_topology
|
||||
const node* add_or_update_endpoint(inet_address ep, endpoint_dc_rack dr) {
|
||||
return add_or_update_endpoint(ep, std::nullopt, std::move(dr));
|
||||
const node* add_or_update_endpoint(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st) {
|
||||
return add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st));
|
||||
}
|
||||
const node* add_or_update_endpoint(inet_address ep, host_id id) {
|
||||
return add_or_update_endpoint(ep, id, std::nullopt);
|
||||
return add_or_update_endpoint(ep, id, std::nullopt, std::nullopt);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -298,7 +312,8 @@ public:
|
||||
|
||||
namespace std {
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const locator::node* node);
|
||||
std::ostream& operator<<(std::ostream& out, const locator::node& node);
|
||||
std::ostream& operator<<(std::ostream& out, const locator::node::state& state);
|
||||
|
||||
} // namespace std
|
||||
|
||||
@@ -309,3 +324,11 @@ struct fmt::formatter<locator::node> : fmt::formatter<std::string_view> {
|
||||
return fmt::format_to(ctx.out(), "{}/{}", node.host_id(), node.endpoint());
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<locator::node::state> : fmt::formatter<std::string_view> {
|
||||
template <typename FormatContext>
|
||||
auto format(const locator::node::state& state, FormatContext& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{}", locator::node::to_string(state));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1400,7 +1400,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
|
||||
|
||||
//Pending ranges
|
||||
metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack());
|
||||
metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack(), locator::node::state::joining);
|
||||
metadata_clone.update_normal_tokens(tokens, myip).get();
|
||||
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
|
||||
metadata_clone.clear_gently().get();
|
||||
@@ -1857,7 +1857,7 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr,
|
||||
// update a cloned version of tmptr
|
||||
// no need to set the original version
|
||||
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
|
||||
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack());
|
||||
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::joining);
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
|
||||
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes));
|
||||
}
|
||||
|
||||
@@ -983,7 +983,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
|
||||
get_broadcast_address() == *replace_address ? "the same" : "a different",
|
||||
get_broadcast_address(), *replace_address);
|
||||
tmptr->update_topology(*replace_address, std::move(ri->dc_rack));
|
||||
tmptr->update_topology(*replace_address, std::move(ri->dc_rack), locator::node::state::leaving);
|
||||
co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address);
|
||||
replaced_host_id = ri->host_id;
|
||||
}
|
||||
@@ -1022,7 +1022,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
// This node must know about its chosen tokens before other nodes do
|
||||
// since they may start sending writes to this node after it gossips status = NORMAL.
|
||||
// Therefore we update _token_metadata now, before gossip starts.
|
||||
tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack());
|
||||
tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::normal);
|
||||
co_await tmptr->update_normal_tokens(my_tokens, get_broadcast_address());
|
||||
|
||||
cdc_gen_id = co_await _sys_ks.local().get_cdc_generation_id();
|
||||
@@ -1257,7 +1257,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
// This node must know about its chosen tokens before other nodes do
|
||||
// since they may start sending writes to this node after it gossips status = NORMAL.
|
||||
// Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now.
|
||||
tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack());
|
||||
tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::normal);
|
||||
return tmptr->update_normal_tokens(bootstrap_tokens, get_broadcast_address());
|
||||
});
|
||||
|
||||
@@ -1429,7 +1429,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
|
||||
slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens);
|
||||
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack());
|
||||
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::joining);
|
||||
tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint);
|
||||
return update_pending_ranges(std::move(tmptr), format("bootstrapping node {}", endpoint));
|
||||
}).get();
|
||||
@@ -1558,7 +1558,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
|
||||
tmptr->remove_endpoint(endpoint);
|
||||
}
|
||||
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining);
|
||||
tmptr->add_bootstrap_tokens(tokens, endpoint);
|
||||
if (_gossiper.uses_host_id(endpoint)) {
|
||||
tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint);
|
||||
@@ -1700,7 +1700,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
|
||||
do_notify_joined = true;
|
||||
}
|
||||
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::normal);
|
||||
co_await tmptr->update_normal_tokens(owned_tokens, endpoint);
|
||||
}
|
||||
|
||||
@@ -1752,7 +1752,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) {
|
||||
// FIXME: this code should probably resolve token collisions too, like handle_state_normal
|
||||
slogger.info("Node {} state jump to leaving", endpoint);
|
||||
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::leaving);
|
||||
co_await tmptr->update_normal_tokens(tokens, endpoint);
|
||||
} else {
|
||||
auto tokens_ = tmptr->get_tokens(endpoint);
|
||||
@@ -2175,7 +2175,7 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service,
|
||||
// entry has been mistakenly added, delete it
|
||||
_sys_ks.local().remove_endpoint(ep).get();
|
||||
} else {
|
||||
tmptr->update_topology(ep, get_dc_rack(ep));
|
||||
tmptr->update_topology(ep, get_dc_rack(ep), locator::node::state::normal);
|
||||
tmptr->update_normal_tokens(tokens, ep).get();
|
||||
if (loaded_host_ids.contains(ep)) {
|
||||
tmptr->update_host_id(loaded_host_ids.at(ep), ep);
|
||||
@@ -3589,7 +3589,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node));
|
||||
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::node::state::joining);
|
||||
tmptr->add_replacing_endpoint(existing_node, replacing_node);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
@@ -3641,7 +3641,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining);
|
||||
tmptr->add_bootstrap_tokens(tokens, endpoint);
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
|
||||
|
||||
@@ -224,7 +224,7 @@ void simple_test() {
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
std::unordered_set<token> tokens;
|
||||
tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())});
|
||||
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint));
|
||||
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
|
||||
co_await tm.update_normal_tokens(std::move(tokens), endpoint);
|
||||
}
|
||||
}).get();
|
||||
@@ -328,7 +328,7 @@ void heavy_origin_test() {
|
||||
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
||||
auto& topo = tm.get_topology();
|
||||
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
||||
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint));
|
||||
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
|
||||
co_await tm.update_normal_tokens(std::move(tokens[endpoint]), endpoint);
|
||||
}
|
||||
}).get();
|
||||
@@ -559,7 +559,7 @@ void generate_topology(topology& topo, const std::unordered_map<sstring, size_t>
|
||||
const sstring& dc = dcs[udist(0, dcs.size() - 1)(e1)];
|
||||
auto rc = racks_per_dc.at(dc);
|
||||
auto r = udist(0, rc)(e1);
|
||||
topo.add_node(host_id::create_random_id(), node, {dc, to_sstring(r)});
|
||||
topo.add_node(host_id::create_random_id(), node, {dc, to_sstring(r)}, locator::node::state::normal);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user