topology: get rid of pending state

Now, with a44ca06906,
is_normal_token_owner that replaced is_member
does not rely anymore on the pending status
of endpoints in topology.

With that we can get rid of this state and just keep
all endpoints we know about in the topology.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-11-22 12:22:15 +02:00
parent f2753eba30
commit 68141d0aac
9 changed files with 30 additions and 53 deletions

View File

@@ -25,7 +25,7 @@ void set_endpoint_snitch(http_context& ctx, routes& r, sharded<locator::snitch_p
httpd::endpoint_snitch_info_json::get_datacenter.set(r, [&ctx](const_req req) {
auto& topology = ctx.shared_token_metadata.local().get()->get_topology();
auto ep = host_or_broadcast(req);
if (!topology.has_endpoint(ep, locator::topology::pending::yes)) {
if (!topology.has_endpoint(ep)) {
// Cannot return error here, nodetool status can race, request
// info about just-left node and not handle it nicely
return sstring(locator::production_snitch_base::default_dc);
@@ -36,7 +36,7 @@ void set_endpoint_snitch(http_context& ctx, routes& r, sharded<locator::snitch_p
httpd::endpoint_snitch_info_json::get_rack.set(r, [&ctx](const_req req) {
auto& topology = ctx.shared_token_metadata.local().get()->get_topology();
auto ep = host_or_broadcast(req);
if (!topology.has_endpoint(ep, locator::topology::pending::yes)) {
if (!topology.has_endpoint(ep)) {
// Cannot return error here, nodetool status can race, request
// info about just-left node and not handle it nicely
return sstring(locator::production_snitch_base::default_rack);

View File

@@ -33,7 +33,7 @@ bool host_filter::can_hint_for(const locator::topology& topo, gms::inet_address
case enabled_kind::enabled_for_all:
return true;
case enabled_kind::enabled_selectively:
return topo.has_endpoint(ep, locator::topology::pending::yes) && _dcs.contains(topo.get_datacenter(ep));
return topo.has_endpoint(ep) && _dcs.contains(topo.get_datacenter(ep));
case enabled_kind::disabled_for_all:
return false;
}

View File

@@ -101,8 +101,8 @@ public:
return _bootstrap_tokens;
}
void update_topology(inet_address ep, endpoint_dc_rack dr, topology::pending pend) {
_topology.update_endpoint(ep, std::move(dr), pend);
void update_topology(inet_address ep, endpoint_dc_rack dr) {
_topology.update_endpoint(ep, std::move(dr));
}
/**
@@ -420,7 +420,7 @@ future<> token_metadata_impl::update_normal_tokens(std::unordered_set<token> tok
co_return;
}
if (!_topology.has_endpoint(endpoint, topology::pending::no)) {
if (!_topology.has_endpoint(endpoint)) {
on_internal_error(tlogger, format("token_metadata_impl: {} must be a member of topology to update normal tokens", endpoint));
}
@@ -743,7 +743,7 @@ void token_metadata_impl::set_pending_ranges(const sstring& keyspace_name,
map[x.first].emplace(x.second);
auto ins = endpoints.emplace(x.second);
if (ins.second) { // insertion took place, i.e. -- new endpoint
if (!_topology.has_endpoint(x.second, topology::pending::yes)) {
if (!_topology.has_endpoint(x.second)) {
on_internal_error(tlogger, format("token_metadata_impl: {} must be member or pending to set pending tokens", x.second));
}
}
@@ -1031,8 +1031,8 @@ token_metadata::get_bootstrap_tokens() const {
}
void
token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr, topology::pending pend) {
_impl->update_topology(ep, std::move(dr), pend);
token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr) {
_impl->update_topology(ep, std::move(dr));
}
boost::iterator_range<token_metadata::tokens_iterator>

View File

@@ -116,7 +116,7 @@ public:
const std::unordered_map<token, inet_address>& get_token_to_endpoint() const;
const std::unordered_set<inet_address>& get_leaving_endpoints() const;
const std::unordered_map<token, inet_address>& get_bootstrap_tokens() const;
void update_topology(inet_address ep, endpoint_dc_rack dr, topology::pending pend = topology::pending::no);
void update_topology(inet_address ep, endpoint_dc_rack dr);
/**
* Creates an iterable range of the sorted tokens starting at the token next
* after the given one.

View File

@@ -24,7 +24,6 @@ future<> topology::clear_gently() noexcept {
co_await utils::clear_gently(_dc_endpoints);
co_await utils::clear_gently(_dc_racks);
co_await utils::clear_gently(_current_locations);
co_await utils::clear_gently(_pending_locations);
_datacenters.clear();
co_return;
}
@@ -32,7 +31,7 @@ future<> topology::clear_gently() noexcept {
topology::topology(config cfg)
: _sort_by_proximity(!cfg.disable_proximity_sorting)
{
_pending_locations[utils::fb_utilities::get_broadcast_address()] = std::move(cfg.local_dc_rack);
update_endpoint(utils::fb_utilities::get_broadcast_address(), cfg.local_dc_rack);
}
future<topology> topology::clone_gently() const {
@@ -55,29 +54,13 @@ future<topology> topology::clone_gently() const {
ret._current_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._pending_locations.reserve(_pending_locations.size());
for (const auto& p : _pending_locations) {
ret._pending_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._datacenters = _datacenters;
ret._sort_by_proximity = _sort_by_proximity;
co_return ret;
}
void topology::remove_pending_location(const inet_address& ep) {
if (ep != utils::fb_utilities::get_broadcast_address()) {
_pending_locations.erase(ep);
}
}
void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend)
void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr)
{
if (pend) {
_pending_locations[ep] = std::move(dr);
return;
}
auto current = _current_locations.find(ep);
if (current != _current_locations.end()) {
@@ -92,7 +75,6 @@ void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pend
_dc_racks[dr.dc][dr.rack].insert(ep);
_datacenters.insert(dr.dc);
_current_locations[ep] = std::move(dr);
remove_pending_location(ep);
}
void topology::remove_endpoint(inet_address ep)
@@ -100,7 +82,6 @@ void topology::remove_endpoint(inet_address ep)
auto cur_dc_rack = _current_locations.find(ep);
if (cur_dc_rack == _current_locations.end()) {
remove_pending_location(ep);
return;
}
@@ -126,12 +107,17 @@ void topology::remove_endpoint(inet_address ep)
}
}
_current_locations.erase(cur_dc_rack);
// Keep the local endpoint around
// Just unlist it from _dc_endpoints and _dc_racks
// This is needed after it is decommissioned
if (ep != utils::fb_utilities::get_broadcast_address()) {
_current_locations.erase(cur_dc_rack);
}
}
bool topology::has_endpoint(inet_address ep, pending with_pending) const
bool topology::has_endpoint(inet_address ep) const
{
return _current_locations.contains(ep) || (with_pending && _pending_locations.contains(ep));
return _current_locations.contains(ep);
}
const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
@@ -139,10 +125,6 @@ const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
return _current_locations.at(ep);
}
if (_pending_locations.contains(ep)) {
return _pending_locations.at(ep);
}
// FIXME -- this shouldn't happen. After topology is stable and is
// correctly populated with endpoints, this should be replaced with
// on_internal_error()

View File

@@ -38,12 +38,10 @@ public:
future<topology> clone_gently() const;
future<> clear_gently() noexcept;
using pending = bool_class<struct pending_tag>;
/**
* Stores current DC/rack assignment for ep
*/
void update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend);
void update_endpoint(const inet_address& ep, endpoint_dc_rack dr);
/**
* Removes current DC/rack assignment for ep
@@ -52,9 +50,8 @@ public:
/**
* Returns true iff contains given endpoint.
* Excludes pending endpoints if `with_pending == pending::no`.
*/
bool has_endpoint(inet_address, pending with_pending) const;
bool has_endpoint(inet_address) const;
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
@@ -105,7 +102,6 @@ private:
* Comparator.compare would
*/
int compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
void remove_pending_location(const inet_address& ep);
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,
@@ -120,7 +116,6 @@ private:
/** reverse-lookup map: endpoint -> current known dc/rack assignment */
std::unordered_map<inet_address, endpoint_dc_rack> _current_locations;
std::unordered_map<inet_address, endpoint_dc_rack> _pending_locations;
bool _sort_by_proximity = true;

View File

@@ -258,7 +258,7 @@ bool messaging_service::topology_known_for(inet_address addr) const {
// The token metadata pointer is nullptr before
// the service is start_listen()-ed and after it's being shutdown()-ed.
return _token_metadata
&& _token_metadata->get()->get_topology().has_endpoint(addr, locator::topology::pending::yes);
&& _token_metadata->get()->get_topology().has_endpoint(addr);
}
// Precondition: `topology_known_for(addr)`.

View File

@@ -700,7 +700,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens);
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
auto endpoint = get_broadcast_address();
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::topology::pending::yes);
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack());
tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint);
return update_pending_ranges(std::move(tmptr), format("bootstrapping node {}", endpoint));
}).get();
@@ -854,7 +854,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
tmptr->remove_endpoint(endpoint);
}
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::topology::pending::yes);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
tmptr->add_bootstrap_tokens(tokens, endpoint);
if (_gossiper.uses_host_id(endpoint)) {
tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint);
@@ -1189,7 +1189,7 @@ future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_sta
co_await handle_state_replacing_update_pending_ranges(tmptr, endpoint);
}
if (!is_normal_token_owner) {
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::topology::pending::yes);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
}
co_await replicate_to_all_cores(std::move(tmptr));
}
@@ -2659,7 +2659,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
auto existing_node = x.first;
auto replacing_node = x.second;
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::topology::pending::yes);
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node));
tmptr->add_replacing_endpoint(existing_node, replacing_node);
}
return make_ready_future<>();
@@ -2714,7 +2714,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
auto& endpoint = x.first;
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::topology::pending::yes);
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
tmptr->add_bootstrap_tokens(tokens, endpoint);
}
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));

View File

@@ -179,7 +179,7 @@ std::unique_ptr<locator::topology> generate_topology(const std::vector<ring_poin
for (const auto& p : pts) {
auto rack = std::to_string(uint8_t(p.host.bytes()[2]));
auto dc = std::to_string(uint8_t(p.host.bytes()[1]));
topo->update_endpoint(p.host, { dc, rack }, locator::topology::pending::no);
topo->update_endpoint(p.host, { dc, rack });
}
return topo;
@@ -563,7 +563,7 @@ std::unique_ptr<locator::topology> generate_topology(const std::unordered_map<ss
const sstring& dc = dcs[udist(0, dcs.size() - 1)(e1)];
auto rc = racks_per_dc.at(dc);
auto r = udist(0, rc)(e1);
topo->update_endpoint(node, { dc, to_sstring(r) }, locator::topology::pending::no);
topo->update_endpoint(node, { dc, to_sstring(r) });
}
return topo;