topology: Add pending locations collection

Nowadays the topology object only keeps info about nodes that are normal
members of the ring. Nodes that are joining or bootstrapping or leaving
are out of it. However, one of the goals of this patchset is to make
topology object provide dc/rack info for _all_ nodes, even those in
transitive state.

The introduced _pending_locations is about to hold the dc/rack info for
transitive endpoints. When a node becomes member of the ring it is moved
from pending (if it's there) to current locations, when it leaves the
ring it's moved back to pending.

For now the new collection is just added and the add/remove/get API is
extended to maintain it, but it's not really populated. It will come in
the next patch

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-09-29 11:13:36 +03:00
parent fa613285e7
commit da75552e1f
3 changed files with 35 additions and 12 deletions

View File

@@ -101,8 +101,8 @@ public:
return _bootstrap_tokens;
}
void update_topology(inet_address ep, endpoint_dc_rack dr) {
_topology.update_endpoint(ep, std::move(dr));
void update_topology(inet_address ep, endpoint_dc_rack dr, topology::pending pend) {
_topology.update_endpoint(ep, std::move(dr), pend);
}
/**
@@ -1031,8 +1031,8 @@ token_metadata::get_bootstrap_tokens() const {
}
void
token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr) {
_impl->update_topology(ep, std::move(dr));
token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr, topology::pending pend) {
_impl->update_topology(ep, std::move(dr), pend);
}
boost::iterator_range<token_metadata::tokens_iterator>
@@ -1243,6 +1243,7 @@ inline future<> topology::clear_gently() noexcept {
co_await utils::clear_gently(_dc_endpoints);
co_await utils::clear_gently(_dc_racks);
co_await utils::clear_gently(_current_locations);
co_await utils::clear_gently(_pending_locations);
co_return;
}
@@ -1254,11 +1255,23 @@ topology::topology(const topology& other)
: _dc_endpoints(other._dc_endpoints)
, _dc_racks(other._dc_racks)
, _current_locations(other._current_locations)
, _pending_locations(other._pending_locations)
{
}
void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr)
void topology::remove_pending_location(const inet_address& ep) {
if (ep != utils::fb_utilities::get_broadcast_address()) {
_pending_locations.erase(ep);
}
}
void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend)
{
if (pend) {
_pending_locations[ep] = std::move(dr);
return;
}
auto current = _current_locations.find(ep);
if (current != _current_locations.end()) {
@@ -1271,6 +1284,7 @@ void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr)
_dc_endpoints[dr.dc].insert(ep);
_dc_racks[dr.dc][dr.rack].insert(ep);
_current_locations[ep] = std::move(dr);
remove_pending_location(ep);
}
void topology::remove_endpoint(inet_address ep)
@@ -1278,6 +1292,7 @@ void topology::remove_endpoint(inet_address ep)
auto cur_dc_rack = _current_locations.find(ep);
if (cur_dc_rack == _current_locations.end()) {
remove_pending_location(ep);
return;
}
@@ -1293,9 +1308,9 @@ void topology::remove_endpoint(inet_address ep)
_current_locations.erase(cur_dc_rack);
}
bool topology::has_endpoint(inet_address ep) const
bool topology::has_endpoint(inet_address ep, pending with_pending) const
{
return _current_locations.contains(ep);
return _current_locations.contains(ep) || (with_pending && _pending_locations.contains(ep));
}
const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
@@ -1303,6 +1318,10 @@ const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
return _current_locations.at(ep);
}
if (_pending_locations.contains(ep)) {
return _pending_locations.at(ep);
}
on_internal_error(tlogger, format("Node {} is not in topology", ep));
}

View File

@@ -52,10 +52,12 @@ public:
future<> clear_gently() noexcept;
using pending = bool_class<struct pending_tag>;
/**
* Stores current DC/rack assignment for ep
*/
void update_endpoint(const inet_address& ep, endpoint_dc_rack dr);
void update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend);
/**
* Removes current DC/rack assignment for ep
@@ -65,7 +67,7 @@ public:
/**
* Returns true iff contains given endpoint
*/
bool has_endpoint(inet_address) const;
bool has_endpoint(inet_address, pending with_pending = pending::no) const;
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
@@ -113,6 +115,7 @@ private:
* Comparator.compare would
*/
int compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
void remove_pending_location(const inet_address& ep);
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,
@@ -127,6 +130,7 @@ private:
/** reverse-lookup map: endpoint -> current known dc/rack assignment */
std::unordered_map<inet_address, endpoint_dc_rack> _current_locations;
std::unordered_map<inet_address, endpoint_dc_rack> _pending_locations;
bool _sort_by_proximity = true;
};
@@ -182,7 +186,7 @@ public:
const std::unordered_map<token, inet_address>& get_token_to_endpoint() const;
const std::unordered_set<inet_address>& get_leaving_endpoints() const;
const std::unordered_map<token, inet_address>& get_bootstrap_tokens() const;
void update_topology(inet_address ep, endpoint_dc_rack dr);
void update_topology(inet_address ep, endpoint_dc_rack dr, topology::pending pend = topology::pending::no);
/**
* Creates an iterable range of the sorted tokens starting at the token next
* after the given one.

View File

@@ -179,7 +179,7 @@ std::unique_ptr<locator::topology> generate_topology(const std::vector<ring_poin
for (const auto& p : pts) {
auto rack = std::to_string(uint8_t(p.host.bytes()[2]));
auto dc = std::to_string(uint8_t(p.host.bytes()[1]));
topo->update_endpoint(p.host, { dc, rack });
topo->update_endpoint(p.host, { dc, rack }, locator::topology::pending::no);
}
return topo;
@@ -565,7 +565,7 @@ std::unique_ptr<locator::topology> generate_topology(const std::unordered_map<ss
const sstring& dc = dcs[udist(0, dcs.size() - 1)(e1)];
auto rc = racks_per_dc.at(dc);
auto r = udist(0, rc)(e1);
topo->update_endpoint(node, { dc, to_sstring(r) });
topo->update_endpoint(node, { dc, to_sstring(r) }, locator::topology::pending::no);
}
return topo;