locator: refactor topology out of token_metadata

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-11-13 10:19:57 +02:00
parent 297a4de4e4
commit d0bd305d16
5 changed files with 334 additions and 287 deletions

View File

@@ -945,6 +945,7 @@ scylla_core = (['message/messaging_service.cc',
'locator/ec2_snitch.cc',
'locator/ec2_multi_region_snitch.cc',
'locator/gce_snitch.cc',
'locator/topology.cc',
'service/client_state.cc',
'service/storage_service.cc',
'service/misc_services.cc',

View File

@@ -1244,196 +1244,6 @@ token_metadata::invalidate_cached_rings() {
_impl->invalidate_cached_rings();
}
/////////////////// class topology /////////////////////////////////////////////
inline future<> topology::clear_gently() noexcept {
co_await utils::clear_gently(_dc_endpoints);
co_await utils::clear_gently(_dc_racks);
co_await utils::clear_gently(_current_locations);
co_await utils::clear_gently(_pending_locations);
co_return;
}
topology::topology(config cfg)
: _sort_by_proximity(!cfg.disable_proximity_sorting)
{
_pending_locations[utils::fb_utilities::get_broadcast_address()] = std::move(cfg.local_dc_rack);
}
future<topology> topology::clone_gently() const {
topology ret;
ret._dc_endpoints.reserve(_dc_endpoints.size());
for (const auto& p : _dc_endpoints) {
ret._dc_endpoints.emplace(p);
}
co_await coroutine::maybe_yield();
ret._dc_racks.reserve(_dc_racks.size());
for (const auto& [dc, rack_endpoints] : _dc_racks) {
ret._dc_racks[dc].reserve(rack_endpoints.size());
for (const auto& p : rack_endpoints) {
ret._dc_racks[dc].emplace(p);
}
}
co_await coroutine::maybe_yield();
ret._current_locations.reserve(_current_locations.size());
for (const auto& p : _current_locations) {
ret._current_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._pending_locations.reserve(_pending_locations.size());
for (const auto& p : _pending_locations) {
ret._pending_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._sort_by_proximity = _sort_by_proximity;
co_return ret;
}
void topology::remove_pending_location(const inet_address& ep) {
if (ep != utils::fb_utilities::get_broadcast_address()) {
_pending_locations.erase(ep);
}
}
void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend)
{
if (pend) {
_pending_locations[ep] = std::move(dr);
return;
}
auto current = _current_locations.find(ep);
if (current != _current_locations.end()) {
if (current->second.dc == dr.dc && current->second.rack == dr.rack) {
return;
}
remove_endpoint(ep);
}
_dc_endpoints[dr.dc].insert(ep);
_dc_racks[dr.dc][dr.rack].insert(ep);
_current_locations[ep] = std::move(dr);
remove_pending_location(ep);
}
void topology::remove_endpoint(inet_address ep)
{
auto cur_dc_rack = _current_locations.find(ep);
if (cur_dc_rack == _current_locations.end()) {
remove_pending_location(ep);
return;
}
_dc_endpoints[cur_dc_rack->second.dc].erase(ep);
auto& racks = _dc_racks[cur_dc_rack->second.dc];
auto& eps = racks[cur_dc_rack->second.rack];
eps.erase(ep);
if (eps.empty()) {
racks.erase(cur_dc_rack->second.rack);
}
_current_locations.erase(cur_dc_rack);
}
bool topology::has_endpoint(inet_address ep, pending with_pending) const
{
return _current_locations.contains(ep) || (with_pending && _pending_locations.contains(ep));
}
const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
if (_current_locations.contains(ep)) {
return _current_locations.at(ep);
}
if (_pending_locations.contains(ep)) {
return _pending_locations.at(ep);
}
on_internal_error(tlogger, format("Node {} is not in topology", ep));
}
// FIXME -- both methods below should rather return data from the
// get_location() result, but to make it work two things are to be fixed:
// - topology should be aware of internal-ip conversions
// - topology should be pre-populated with data loaded from system ks
sstring topology::get_rack() const {
return get_rack(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_rack(inet_address ep) const {
return get_location(ep).rack;
}
sstring topology::get_datacenter() const {
return get_datacenter(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_datacenter(inet_address ep) const {
return get_location(ep).dc;
}
void topology::sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const {
if (_sort_by_proximity) {
std::sort(addresses.begin(), addresses.end(), [this, &address](inet_address& a1, inet_address& a2) {
return compare_endpoints(address, a1, a2) < 0;
});
}
}
int topology::compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const {
//
// if one of the Nodes IS the Node we are comparing to and the other one
// IS NOT - then return the appropriate result.
//
if (address == a1 && address != a2) {
return -1;
}
if (address == a2 && address != a1) {
return 1;
}
// ...otherwise perform the similar check in regard to Data Center
sstring address_datacenter = get_datacenter(address);
sstring a1_datacenter = get_datacenter(a1);
sstring a2_datacenter = get_datacenter(a2);
if (address_datacenter == a1_datacenter &&
address_datacenter != a2_datacenter) {
return -1;
} else if (address_datacenter == a2_datacenter &&
address_datacenter != a1_datacenter) {
return 1;
} else if (address_datacenter == a2_datacenter &&
address_datacenter == a1_datacenter) {
//
// ...otherwise (in case Nodes belong to the same Data Center) check
// the racks they belong to.
//
sstring address_rack = get_rack(address);
sstring a1_rack = get_rack(a1);
sstring a2_rack = get_rack(a2);
if (address_rack == a1_rack && address_rack != a2_rack) {
return -1;
}
if (address_rack == a2_rack && address_rack != a1_rack) {
return 1;
}
}
//
// We don't differentiate between Nodes if all Nodes belong to different
// Data Centers, thus make them equal.
//
return 0;
}
/////////////////// class topology end /////////////////////////////////////////
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
if (_shared->get_ring_version() >= tmptr->get_ring_version()) {
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));

View File

@@ -25,6 +25,7 @@
#include <seastar/core/semaphore.hh>
#include "locator/types.hh"
#include "locator/topology.hh"
// forward declaration since replica/database.hh includes this file
namespace replica {
@@ -37,102 +38,6 @@ class abstract_replication_strategy;
using token = dht::token;
class topology {
public:
struct config {
endpoint_dc_rack local_dc_rack;
bool disable_proximity_sorting = false;
};
topology(config cfg);
topology(topology&&) = default;
topology& operator=(topology&&) = default;
future<topology> clone_gently() const;
future<> clear_gently() noexcept;
using pending = bool_class<struct pending_tag>;
/**
* Stores current DC/rack assignment for ep
*/
void update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend);
/**
* Removes current DC/rack assignment for ep
*/
void remove_endpoint(inet_address ep);
/**
* Returns true iff contains given endpoint
*/
bool has_endpoint(inet_address, pending with_pending = pending::no) const;
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
get_datacenter_endpoints() const {
return _dc_endpoints;
}
const std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>&
get_datacenter_racks() const {
return _dc_racks;
}
const endpoint_dc_rack& get_location(const inet_address& ep) const;
sstring get_rack() const;
sstring get_rack(inet_address ep) const;
sstring get_datacenter() const;
sstring get_datacenter(inet_address ep) const;
auto get_local_dc_filter() const noexcept {
return [ this, local_dc = get_datacenter() ] (inet_address ep) {
return get_datacenter(ep) == local_dc;
};
};
template <std::ranges::range Range>
inline size_t count_local_endpoints(const Range& endpoints) const {
return std::count_if(endpoints.begin(), endpoints.end(), get_local_dc_filter());
}
/**
* This method will sort the <tt>List</tt> by proximity to the given
* address.
*/
void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const;
private:
// default constructor for cloning purposes
topology() = default;
/**
* compares two endpoints in relation to the target endpoint, returning as
* Comparator.compare would
*/
int compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
void remove_pending_location(const inet_address& ep);
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,
std::unordered_set<inet_address>>
_dc_endpoints;
/** map: DC -> (multi-map: rack -> endpoints in that rack) */
std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>
_dc_racks;
/** reverse-lookup map: endpoint -> current known dc/rack assignment */
std::unordered_map<inet_address, endpoint_dc_rack> _current_locations;
std::unordered_map<inet_address, endpoint_dc_rack> _pending_locations;
bool _sort_by_proximity = true;
};
class token_metadata;
struct host_id_or_endpoint {
@@ -160,7 +65,6 @@ struct host_id_or_endpoint {
void resolve(const token_metadata& tm);
};
using dc_rack_fn = seastar::noncopyable_function<endpoint_dc_rack(inet_address)>;
class token_metadata_impl;
class token_metadata final {

209
locator/topology.cc Normal file
View File

@@ -0,0 +1,209 @@
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/on_internal_error.hh>
#include "log.hh"
#include "locator/topology.hh"
#include "utils/stall_free.hh"
#include "utils/fb_utilities.hh"
namespace locator {
static logging::logger tlogger("topology");
future<> topology::clear_gently() noexcept {
co_await utils::clear_gently(_dc_endpoints);
co_await utils::clear_gently(_dc_racks);
co_await utils::clear_gently(_current_locations);
co_await utils::clear_gently(_pending_locations);
co_return;
}
topology::topology(config cfg)
: _sort_by_proximity(!cfg.disable_proximity_sorting)
{
_pending_locations[utils::fb_utilities::get_broadcast_address()] = std::move(cfg.local_dc_rack);
}
future<topology> topology::clone_gently() const {
topology ret;
ret._dc_endpoints.reserve(_dc_endpoints.size());
for (const auto& p : _dc_endpoints) {
ret._dc_endpoints.emplace(p);
}
co_await coroutine::maybe_yield();
ret._dc_racks.reserve(_dc_racks.size());
for (const auto& [dc, rack_endpoints] : _dc_racks) {
ret._dc_racks[dc].reserve(rack_endpoints.size());
for (const auto& p : rack_endpoints) {
ret._dc_racks[dc].emplace(p);
}
}
co_await coroutine::maybe_yield();
ret._current_locations.reserve(_current_locations.size());
for (const auto& p : _current_locations) {
ret._current_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._pending_locations.reserve(_pending_locations.size());
for (const auto& p : _pending_locations) {
ret._pending_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._sort_by_proximity = _sort_by_proximity;
co_return ret;
}
void topology::remove_pending_location(const inet_address& ep) {
if (ep != utils::fb_utilities::get_broadcast_address()) {
_pending_locations.erase(ep);
}
}
void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend)
{
if (pend) {
_pending_locations[ep] = std::move(dr);
return;
}
auto current = _current_locations.find(ep);
if (current != _current_locations.end()) {
if (current->second.dc == dr.dc && current->second.rack == dr.rack) {
return;
}
remove_endpoint(ep);
}
_dc_endpoints[dr.dc].insert(ep);
_dc_racks[dr.dc][dr.rack].insert(ep);
_current_locations[ep] = std::move(dr);
remove_pending_location(ep);
}
void topology::remove_endpoint(inet_address ep)
{
auto cur_dc_rack = _current_locations.find(ep);
if (cur_dc_rack == _current_locations.end()) {
remove_pending_location(ep);
return;
}
_dc_endpoints[cur_dc_rack->second.dc].erase(ep);
auto& racks = _dc_racks[cur_dc_rack->second.dc];
auto& eps = racks[cur_dc_rack->second.rack];
eps.erase(ep);
if (eps.empty()) {
racks.erase(cur_dc_rack->second.rack);
}
_current_locations.erase(cur_dc_rack);
}
bool topology::has_endpoint(inet_address ep, pending with_pending) const
{
return _current_locations.contains(ep) || (with_pending && _pending_locations.contains(ep));
}
const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
if (_current_locations.contains(ep)) {
return _current_locations.at(ep);
}
if (_pending_locations.contains(ep)) {
return _pending_locations.at(ep);
}
on_internal_error(tlogger, format("Node {} is not in topology", ep));
}
// FIXME -- both methods below should rather return data from the
// get_location() result, but to make it work two things are to be fixed:
// - topology should be aware of internal-ip conversions
// - topology should be pre-populated with data loaded from system ks
sstring topology::get_rack() const {
return get_rack(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_rack(inet_address ep) const {
return get_location(ep).rack;
}
sstring topology::get_datacenter() const {
return get_datacenter(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_datacenter(inet_address ep) const {
return get_location(ep).dc;
}
void topology::sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const {
if (_sort_by_proximity) {
std::sort(addresses.begin(), addresses.end(), [this, &address](inet_address& a1, inet_address& a2) {
return compare_endpoints(address, a1, a2) < 0;
});
}
}
int topology::compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const {
//
// if one of the Nodes IS the Node we are comparing to and the other one
// IS NOT - then return the appropriate result.
//
if (address == a1 && address != a2) {
return -1;
}
if (address == a2 && address != a1) {
return 1;
}
// ...otherwise perform the similar check in regard to Data Center
sstring address_datacenter = get_datacenter(address);
sstring a1_datacenter = get_datacenter(a1);
sstring a2_datacenter = get_datacenter(a2);
if (address_datacenter == a1_datacenter &&
address_datacenter != a2_datacenter) {
return -1;
} else if (address_datacenter == a2_datacenter &&
address_datacenter != a1_datacenter) {
return 1;
} else if (address_datacenter == a2_datacenter &&
address_datacenter == a1_datacenter) {
//
// ...otherwise (in case Nodes belong to the same Data Center) check
// the racks they belong to.
//
sstring address_rack = get_rack(address);
sstring a1_rack = get_rack(a1);
sstring a2_rack = get_rack(a2);
if (address_rack == a1_rack && address_rack != a2_rack) {
return -1;
}
if (address_rack == a2_rack && address_rack != a1_rack) {
return 1;
}
}
//
// We don't differentiate between Nodes if all Nodes belong to different
// Data Centers, thus make them equal.
//
return 0;
}
} // namespace locator

123
locator/topology.hh Normal file
View File

@@ -0,0 +1,123 @@
/*
*
* Modified by ScyllaDB
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include <unordered_set>
#include <unordered_map>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/bool_class.hh>
#include "locator/types.hh"
#include "inet_address_vectors.hh"
using namespace seastar;
namespace locator {
class topology {
public:
struct config {
endpoint_dc_rack local_dc_rack;
bool disable_proximity_sorting = false;
};
topology(config cfg);
topology(topology&&) = default;
topology& operator=(topology&&) = default;
future<topology> clone_gently() const;
future<> clear_gently() noexcept;
using pending = bool_class<struct pending_tag>;
/**
* Stores current DC/rack assignment for ep
*/
void update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend);
/**
* Removes current DC/rack assignment for ep
*/
void remove_endpoint(inet_address ep);
/**
* Returns true iff contains given endpoint
*/
bool has_endpoint(inet_address, pending with_pending = pending::no) const;
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
get_datacenter_endpoints() const {
return _dc_endpoints;
}
const std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>&
get_datacenter_racks() const {
return _dc_racks;
}
const endpoint_dc_rack& get_location(const inet_address& ep) const;
sstring get_rack() const;
sstring get_rack(inet_address ep) const;
sstring get_datacenter() const;
sstring get_datacenter(inet_address ep) const;
auto get_local_dc_filter() const noexcept {
return [ this, local_dc = get_datacenter() ] (inet_address ep) {
return get_datacenter(ep) == local_dc;
};
};
template <std::ranges::range Range>
inline size_t count_local_endpoints(const Range& endpoints) const {
return std::count_if(endpoints.begin(), endpoints.end(), get_local_dc_filter());
}
/**
* This method will sort the <tt>List</tt> by proximity to the given
* address.
*/
void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const;
private:
// default constructor for cloning purposes
topology() = default;
/**
* compares two endpoints in relation to the target endpoint, returning as
* Comparator.compare would
*/
int compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
void remove_pending_location(const inet_address& ep);
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,
std::unordered_set<inet_address>>
_dc_endpoints;
/** map: DC -> (multi-map: rack -> endpoints in that rack) */
std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>
_dc_racks;
/** reverse-lookup map: endpoint -> current known dc/rack assignment */
std::unordered_map<inet_address, endpoint_dc_rack> _current_locations;
std::unordered_map<inet_address, endpoint_dc_rack> _pending_locations;
bool _sort_by_proximity = true;
};
} // namespace locator