From 4f4fc7fe22a56e42778fe71291385587edfde10b Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 13 Nov 2022 12:28:27 +0200 Subject: [PATCH 1/4] token_metadata: coroutinize clone functions Signed-off-by: Benny Halevy --- locator/token_metadata.cc | 65 ++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 887c42dc40..6994c8b4f5 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -341,44 +341,39 @@ token_metadata_impl::ring_range(const token& start) const { } future token_metadata_impl::clone_async() const noexcept { - return clone_only_token_map().then([this] (token_metadata_impl ret) { - return do_with(std::move(ret), [this] (token_metadata_impl& ret) { - ret._bootstrap_tokens.reserve(_bootstrap_tokens.size()); - return do_for_each(_bootstrap_tokens, [&ret] (const auto& p) { - ret._bootstrap_tokens.emplace(p); - }).then([this, &ret] { - ret._leaving_endpoints = _leaving_endpoints; - ret._replacing_endpoints = _replacing_endpoints; - }).then([this, &ret] { - return do_for_each(_pending_ranges_interval_map, - [this, &ret] (const auto& p) { - ret._pending_ranges_interval_map.emplace(p); - }); - }).then([this, &ret] { - ret._ring_version = _ring_version; - return make_ready_future(std::move(ret)); - }); - }); - }); + auto ret = co_await clone_only_token_map(); + ret._bootstrap_tokens.reserve(_bootstrap_tokens.size()); + for (const auto& p : _bootstrap_tokens) { + ret._bootstrap_tokens.emplace(p); + co_await coroutine::maybe_yield(); + } + ret._leaving_endpoints = _leaving_endpoints; + ret._replacing_endpoints = _replacing_endpoints; + for (const auto& p : _pending_ranges_interval_map) { + ret._pending_ranges_interval_map.emplace(p); + co_await coroutine::maybe_yield(); + } + ret._ring_version = _ring_version; + co_return ret; } future token_metadata_impl::clone_only_token_map(bool clone_sorted_tokens) const noexcept { - return do_with(token_metadata_impl(shallow_copy{}, *this), [this, clone_sorted_tokens] (token_metadata_impl& ret) { - ret._token_to_endpoint_map.reserve(_token_to_endpoint_map.size()); - return do_for_each(_token_to_endpoint_map, [&ret] (const auto& p) { - ret._token_to_endpoint_map.emplace(p); - }).then([this, &ret] { - ret._normal_token_owners = _normal_token_owners; - ret._endpoint_to_host_id_map = _endpoint_to_host_id_map; - }).then([this, &ret] { - ret._topology = _topology; - }).then([this, &ret, clone_sorted_tokens] { - if (clone_sorted_tokens) { - ret._sorted_tokens = _sorted_tokens; - } - return make_ready_future(std::move(ret)); - }); - }); + auto ret = token_metadata_impl(shallow_copy{}, *this); + ret._token_to_endpoint_map.reserve(_token_to_endpoint_map.size()); + for (const auto& p : _token_to_endpoint_map) { + ret._token_to_endpoint_map.emplace(p); + co_await coroutine::maybe_yield(); + } + ret._normal_token_owners = _normal_token_owners; + ret._endpoint_to_host_id_map = _endpoint_to_host_id_map; + co_await coroutine::maybe_yield(); + ret._topology = _topology; + co_await coroutine::maybe_yield(); + if (clone_sorted_tokens) { + ret._sorted_tokens = _sorted_tokens; + co_await coroutine::maybe_yield(); + } + co_return ret; } future<> token_metadata_impl::clear_gently() noexcept { From 0c94ffcc858da31806ce13c7c30830cea8a0866e Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 13 Nov 2022 13:08:14 +0200 Subject: [PATCH 2/4] topology: delete copy constructor Topology is copied only from token_metadata_impl::clone_only_token_map which copies the token_metadata_impl with yielding to prevent reactor stalls. This should apply to topology as well, so add a clone_gently function for cloning the topology from token_metadata_impl::clone_only_token_map. Signed-off-by: Benny Halevy --- locator/token_metadata.cc | 38 ++++++++++++++++++++++++++++---------- locator/token_metadata.hh | 8 +++++++- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 6994c8b4f5..e1f748f612 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -366,9 +366,7 @@ future token_metadata_impl::clone_only_token_map(bool clone } ret._normal_token_owners = _normal_token_owners; ret._endpoint_to_host_id_map = _endpoint_to_host_id_map; - co_await coroutine::maybe_yield(); - ret._topology = _topology; - co_await coroutine::maybe_yield(); + ret._topology = co_await _topology.clone_gently(); if (clone_sorted_tokens) { ret._sorted_tokens = _sorted_tokens; co_await coroutine::maybe_yield(); @@ -1261,13 +1259,33 @@ topology::topology(config cfg) _pending_locations[utils::fb_utilities::get_broadcast_address()] = std::move(cfg.local_dc_rack); } -topology::topology(const topology& other) - : _dc_endpoints(other._dc_endpoints) - , _dc_racks(other._dc_racks) - , _current_locations(other._current_locations) - , _pending_locations(other._pending_locations) - , _sort_by_proximity(other._sort_by_proximity) -{ +future topology::clone_gently() const { + topology ret; + ret._dc_endpoints.reserve(_dc_endpoints.size()); + for (const auto& p : _dc_endpoints) { + ret._dc_endpoints.emplace(p); + } + co_await coroutine::maybe_yield(); + ret._dc_racks.reserve(_dc_racks.size()); + for (const auto& [dc, rack_endpoints] : _dc_racks) { + ret._dc_racks[dc].reserve(rack_endpoints.size()); + for (const auto& p : rack_endpoints) { + ret._dc_racks[dc].emplace(p); + } + } + co_await coroutine::maybe_yield(); + ret._current_locations.reserve(_current_locations.size()); + for (const auto& p : _current_locations) { + ret._current_locations.emplace(p); + } + co_await coroutine::maybe_yield(); + ret._pending_locations.reserve(_pending_locations.size()); + for (const auto& p : _pending_locations) { + ret._pending_locations.emplace(p); + } + co_await coroutine::maybe_yield(); + ret._sort_by_proximity = _sort_by_proximity; + co_return ret; } void topology::remove_pending_location(const inet_address& ep) { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index a3cfc43064..122b1459ac 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -50,8 +50,11 @@ public: bool disable_proximity_sorting = false; }; topology(config cfg); - topology(const topology& other); + topology(topology&&) = default; + topology& operator=(topology&&) = default; + + future clone_gently() const; future<> clear_gently() noexcept; using pending = bool_class; @@ -108,6 +111,9 @@ public: void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const; private: + // default constructor for cloning purposes + topology() = default; + /** * compares two endpoints in relation to the target endpoint, returning as * Comparator.compare would From 297a4de4e4c75d8e34f4c2d66a66a9663de4d3b6 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 13 Nov 2022 10:23:31 +0200 Subject: [PATCH 3/4] locator: add types.hh To export low-level types that are used by oher modules for the locator interfaces. Signed-off-by: Benny Halevy --- locator/token_metadata.hh | 10 ++-------- locator/types.hh | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) create mode 100644 locator/types.hh diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 122b1459ac..8589d22373 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -16,7 +16,6 @@ #include "gms/inet_address.hh" #include "dht/i_partitioner.hh" #include "inet_address_vectors.hh" -#include "locator/host_id.hh" #include #include #include @@ -25,6 +24,8 @@ #include #include +#include "locator/types.hh" + // forward declaration since replica/database.hh includes this file namespace replica { class keyspace; @@ -34,15 +35,8 @@ namespace locator { class abstract_replication_strategy; -using inet_address = gms::inet_address; using token = dht::token; -// Endpoint Data Center and Rack names -struct endpoint_dc_rack { - sstring dc; - sstring rack; -}; - class topology { public: struct config { diff --git a/locator/types.hh b/locator/types.hh new file mode 100644 index 0000000000..b618ea270e --- /dev/null +++ b/locator/types.hh @@ -0,0 +1,32 @@ +/* + * + * Modified by ScyllaDB + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0) + */ + +#pragma once + +#include + +#include "gms/inet_address.hh" +#include "locator/host_id.hh" + +using namespace seastar; + +namespace locator { + +using inet_address = gms::inet_address; + +// Endpoint Data Center and Rack names +struct endpoint_dc_rack { + sstring dc; + sstring rack; +}; + +using dc_rack_fn = seastar::noncopyable_function; + +} // namespace locator From d0bd305d162dccc13c474404a68f58686e4425a7 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 13 Nov 2022 10:19:57 +0200 Subject: [PATCH 4/4] locator: refactor topology out of token_metadata Signed-off-by: Benny Halevy --- configure.py | 1 + locator/token_metadata.cc | 190 ---------------------------------- locator/token_metadata.hh | 98 +----------------- locator/topology.cc | 209 ++++++++++++++++++++++++++++++++++++++ locator/topology.hh | 123 ++++++++++++++++++++++ 5 files changed, 334 insertions(+), 287 deletions(-) create mode 100644 locator/topology.cc create mode 100644 locator/topology.hh diff --git a/configure.py b/configure.py index ec10c9c7ac..9d61635315 100755 --- a/configure.py +++ b/configure.py @@ -945,6 +945,7 @@ scylla_core = (['message/messaging_service.cc', 'locator/ec2_snitch.cc', 'locator/ec2_multi_region_snitch.cc', 'locator/gce_snitch.cc', + 'locator/topology.cc', 'service/client_state.cc', 'service/storage_service.cc', 'service/misc_services.cc', diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index e1f748f612..c9bb51f5b7 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -1244,196 +1244,6 @@ token_metadata::invalidate_cached_rings() { _impl->invalidate_cached_rings(); } -/////////////////// class topology ///////////////////////////////////////////// -inline future<> topology::clear_gently() noexcept { - co_await utils::clear_gently(_dc_endpoints); - co_await utils::clear_gently(_dc_racks); - co_await utils::clear_gently(_current_locations); - co_await utils::clear_gently(_pending_locations); - co_return; -} - -topology::topology(config cfg) - : _sort_by_proximity(!cfg.disable_proximity_sorting) -{ - _pending_locations[utils::fb_utilities::get_broadcast_address()] = std::move(cfg.local_dc_rack); -} - -future topology::clone_gently() const { - topology ret; - ret._dc_endpoints.reserve(_dc_endpoints.size()); - for (const auto& p : _dc_endpoints) { - ret._dc_endpoints.emplace(p); - } - co_await coroutine::maybe_yield(); - ret._dc_racks.reserve(_dc_racks.size()); - for (const auto& [dc, rack_endpoints] : _dc_racks) { - ret._dc_racks[dc].reserve(rack_endpoints.size()); - for (const auto& p : rack_endpoints) { - ret._dc_racks[dc].emplace(p); - } - } - co_await coroutine::maybe_yield(); - ret._current_locations.reserve(_current_locations.size()); - for (const auto& p : _current_locations) { - ret._current_locations.emplace(p); - } - co_await coroutine::maybe_yield(); - ret._pending_locations.reserve(_pending_locations.size()); - for (const auto& p : _pending_locations) { - ret._pending_locations.emplace(p); - } - co_await coroutine::maybe_yield(); - ret._sort_by_proximity = _sort_by_proximity; - co_return ret; -} - -void topology::remove_pending_location(const inet_address& ep) { - if (ep != utils::fb_utilities::get_broadcast_address()) { - _pending_locations.erase(ep); - } -} - -void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend) -{ - if (pend) { - _pending_locations[ep] = std::move(dr); - return; - } - - auto current = _current_locations.find(ep); - - if (current != _current_locations.end()) { - if (current->second.dc == dr.dc && current->second.rack == dr.rack) { - return; - } - remove_endpoint(ep); - } - - _dc_endpoints[dr.dc].insert(ep); - _dc_racks[dr.dc][dr.rack].insert(ep); - _current_locations[ep] = std::move(dr); - remove_pending_location(ep); -} - -void topology::remove_endpoint(inet_address ep) -{ - auto cur_dc_rack = _current_locations.find(ep); - - if (cur_dc_rack == _current_locations.end()) { - remove_pending_location(ep); - return; - } - - _dc_endpoints[cur_dc_rack->second.dc].erase(ep); - - auto& racks = _dc_racks[cur_dc_rack->second.dc]; - auto& eps = racks[cur_dc_rack->second.rack]; - eps.erase(ep); - if (eps.empty()) { - racks.erase(cur_dc_rack->second.rack); - } - - _current_locations.erase(cur_dc_rack); -} - -bool topology::has_endpoint(inet_address ep, pending with_pending) const -{ - return _current_locations.contains(ep) || (with_pending && _pending_locations.contains(ep)); -} - -const endpoint_dc_rack& topology::get_location(const inet_address& ep) const { - if (_current_locations.contains(ep)) { - return _current_locations.at(ep); - } - - if (_pending_locations.contains(ep)) { - return _pending_locations.at(ep); - } - - on_internal_error(tlogger, format("Node {} is not in topology", ep)); -} - -// FIXME -- both methods below should rather return data from the -// get_location() result, but to make it work two things are to be fixed: -// - topology should be aware of internal-ip conversions -// - topology should be pre-populated with data loaded from system ks - -sstring topology::get_rack() const { - return get_rack(utils::fb_utilities::get_broadcast_address()); -} - -sstring topology::get_rack(inet_address ep) const { - return get_location(ep).rack; -} - -sstring topology::get_datacenter() const { - return get_datacenter(utils::fb_utilities::get_broadcast_address()); -} - -sstring topology::get_datacenter(inet_address ep) const { - return get_location(ep).dc; -} - -void topology::sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const { - if (_sort_by_proximity) { - std::sort(addresses.begin(), addresses.end(), [this, &address](inet_address& a1, inet_address& a2) { - return compare_endpoints(address, a1, a2) < 0; - }); - } -} - -int topology::compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const { - // - // if one of the Nodes IS the Node we are comparing to and the other one - // IS NOT - then return the appropriate result. - // - if (address == a1 && address != a2) { - return -1; - } - - if (address == a2 && address != a1) { - return 1; - } - - // ...otherwise perform the similar check in regard to Data Center - sstring address_datacenter = get_datacenter(address); - sstring a1_datacenter = get_datacenter(a1); - sstring a2_datacenter = get_datacenter(a2); - - if (address_datacenter == a1_datacenter && - address_datacenter != a2_datacenter) { - return -1; - } else if (address_datacenter == a2_datacenter && - address_datacenter != a1_datacenter) { - return 1; - } else if (address_datacenter == a2_datacenter && - address_datacenter == a1_datacenter) { - // - // ...otherwise (in case Nodes belong to the same Data Center) check - // the racks they belong to. - // - sstring address_rack = get_rack(address); - sstring a1_rack = get_rack(a1); - sstring a2_rack = get_rack(a2); - - if (address_rack == a1_rack && address_rack != a2_rack) { - return -1; - } - - if (address_rack == a2_rack && address_rack != a1_rack) { - return 1; - } - } - // - // We don't differentiate between Nodes if all Nodes belong to different - // Data Centers, thus make them equal. - // - return 0; -} - -/////////////////// class topology end ///////////////////////////////////////// - void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { if (_shared->get_ring_version() >= tmptr->get_ring_version()) { on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version())); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 8589d22373..fd25321a01 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -25,6 +25,7 @@ #include #include "locator/types.hh" +#include "locator/topology.hh" // forward declaration since replica/database.hh includes this file namespace replica { @@ -37,102 +38,6 @@ class abstract_replication_strategy; using token = dht::token; -class topology { -public: - struct config { - endpoint_dc_rack local_dc_rack; - bool disable_proximity_sorting = false; - }; - topology(config cfg); - topology(topology&&) = default; - - topology& operator=(topology&&) = default; - - future clone_gently() const; - future<> clear_gently() noexcept; - - using pending = bool_class; - - /** - * Stores current DC/rack assignment for ep - */ - void update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend); - - /** - * Removes current DC/rack assignment for ep - */ - void remove_endpoint(inet_address ep); - - /** - * Returns true iff contains given endpoint - */ - bool has_endpoint(inet_address, pending with_pending = pending::no) const; - - const std::unordered_map>& - get_datacenter_endpoints() const { - return _dc_endpoints; - } - - const std::unordered_map>>& - get_datacenter_racks() const { - return _dc_racks; - } - - const endpoint_dc_rack& get_location(const inet_address& ep) const; - sstring get_rack() const; - sstring get_rack(inet_address ep) const; - sstring get_datacenter() const; - sstring get_datacenter(inet_address ep) const; - - auto get_local_dc_filter() const noexcept { - return [ this, local_dc = get_datacenter() ] (inet_address ep) { - return get_datacenter(ep) == local_dc; - }; - }; - - template - inline size_t count_local_endpoints(const Range& endpoints) const { - return std::count_if(endpoints.begin(), endpoints.end(), get_local_dc_filter()); - } - - /** - * This method will sort the List by proximity to the given - * address. - */ - void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const; - -private: - // default constructor for cloning purposes - topology() = default; - - /** - * compares two endpoints in relation to the target endpoint, returning as - * Comparator.compare would - */ - int compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const; - void remove_pending_location(const inet_address& ep); - - /** multi-map: DC -> endpoints in that DC */ - std::unordered_map> - _dc_endpoints; - - /** map: DC -> (multi-map: rack -> endpoints in that rack) */ - std::unordered_map>> - _dc_racks; - - /** reverse-lookup map: endpoint -> current known dc/rack assignment */ - std::unordered_map _current_locations; - std::unordered_map _pending_locations; - - bool _sort_by_proximity = true; -}; - class token_metadata; struct host_id_or_endpoint { @@ -160,7 +65,6 @@ struct host_id_or_endpoint { void resolve(const token_metadata& tm); }; -using dc_rack_fn = seastar::noncopyable_function; class token_metadata_impl; class token_metadata final { diff --git a/locator/topology.cc b/locator/topology.cc new file mode 100644 index 0000000000..2d8d766e0f --- /dev/null +++ b/locator/topology.cc @@ -0,0 +1,209 @@ +/* + * Copyright (C) 2015-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include +#include +#include + +#include "log.hh" +#include "locator/topology.hh" +#include "utils/stall_free.hh" +#include "utils/fb_utilities.hh" + +namespace locator { + +static logging::logger tlogger("topology"); + +future<> topology::clear_gently() noexcept { + co_await utils::clear_gently(_dc_endpoints); + co_await utils::clear_gently(_dc_racks); + co_await utils::clear_gently(_current_locations); + co_await utils::clear_gently(_pending_locations); + co_return; +} + +topology::topology(config cfg) + : _sort_by_proximity(!cfg.disable_proximity_sorting) +{ + _pending_locations[utils::fb_utilities::get_broadcast_address()] = std::move(cfg.local_dc_rack); +} + +future topology::clone_gently() const { + topology ret; + ret._dc_endpoints.reserve(_dc_endpoints.size()); + for (const auto& p : _dc_endpoints) { + ret._dc_endpoints.emplace(p); + } + co_await coroutine::maybe_yield(); + ret._dc_racks.reserve(_dc_racks.size()); + for (const auto& [dc, rack_endpoints] : _dc_racks) { + ret._dc_racks[dc].reserve(rack_endpoints.size()); + for (const auto& p : rack_endpoints) { + ret._dc_racks[dc].emplace(p); + } + } + co_await coroutine::maybe_yield(); + ret._current_locations.reserve(_current_locations.size()); + for (const auto& p : _current_locations) { + ret._current_locations.emplace(p); + } + co_await coroutine::maybe_yield(); + ret._pending_locations.reserve(_pending_locations.size()); + for (const auto& p : _pending_locations) { + ret._pending_locations.emplace(p); + } + co_await coroutine::maybe_yield(); + ret._sort_by_proximity = _sort_by_proximity; + co_return ret; +} + +void topology::remove_pending_location(const inet_address& ep) { + if (ep != utils::fb_utilities::get_broadcast_address()) { + _pending_locations.erase(ep); + } +} + +void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend) +{ + if (pend) { + _pending_locations[ep] = std::move(dr); + return; + } + + auto current = _current_locations.find(ep); + + if (current != _current_locations.end()) { + if (current->second.dc == dr.dc && current->second.rack == dr.rack) { + return; + } + remove_endpoint(ep); + } + + _dc_endpoints[dr.dc].insert(ep); + _dc_racks[dr.dc][dr.rack].insert(ep); + _current_locations[ep] = std::move(dr); + remove_pending_location(ep); +} + +void topology::remove_endpoint(inet_address ep) +{ + auto cur_dc_rack = _current_locations.find(ep); + + if (cur_dc_rack == _current_locations.end()) { + remove_pending_location(ep); + return; + } + + _dc_endpoints[cur_dc_rack->second.dc].erase(ep); + + auto& racks = _dc_racks[cur_dc_rack->second.dc]; + auto& eps = racks[cur_dc_rack->second.rack]; + eps.erase(ep); + if (eps.empty()) { + racks.erase(cur_dc_rack->second.rack); + } + + _current_locations.erase(cur_dc_rack); +} + +bool topology::has_endpoint(inet_address ep, pending with_pending) const +{ + return _current_locations.contains(ep) || (with_pending && _pending_locations.contains(ep)); +} + +const endpoint_dc_rack& topology::get_location(const inet_address& ep) const { + if (_current_locations.contains(ep)) { + return _current_locations.at(ep); + } + + if (_pending_locations.contains(ep)) { + return _pending_locations.at(ep); + } + + on_internal_error(tlogger, format("Node {} is not in topology", ep)); +} + +// FIXME -- both methods below should rather return data from the +// get_location() result, but to make it work two things are to be fixed: +// - topology should be aware of internal-ip conversions +// - topology should be pre-populated with data loaded from system ks + +sstring topology::get_rack() const { + return get_rack(utils::fb_utilities::get_broadcast_address()); +} + +sstring topology::get_rack(inet_address ep) const { + return get_location(ep).rack; +} + +sstring topology::get_datacenter() const { + return get_datacenter(utils::fb_utilities::get_broadcast_address()); +} + +sstring topology::get_datacenter(inet_address ep) const { + return get_location(ep).dc; +} + +void topology::sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const { + if (_sort_by_proximity) { + std::sort(addresses.begin(), addresses.end(), [this, &address](inet_address& a1, inet_address& a2) { + return compare_endpoints(address, a1, a2) < 0; + }); + } +} + +int topology::compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const { + // + // if one of the Nodes IS the Node we are comparing to and the other one + // IS NOT - then return the appropriate result. + // + if (address == a1 && address != a2) { + return -1; + } + + if (address == a2 && address != a1) { + return 1; + } + + // ...otherwise perform the similar check in regard to Data Center + sstring address_datacenter = get_datacenter(address); + sstring a1_datacenter = get_datacenter(a1); + sstring a2_datacenter = get_datacenter(a2); + + if (address_datacenter == a1_datacenter && + address_datacenter != a2_datacenter) { + return -1; + } else if (address_datacenter == a2_datacenter && + address_datacenter != a1_datacenter) { + return 1; + } else if (address_datacenter == a2_datacenter && + address_datacenter == a1_datacenter) { + // + // ...otherwise (in case Nodes belong to the same Data Center) check + // the racks they belong to. + // + sstring address_rack = get_rack(address); + sstring a1_rack = get_rack(a1); + sstring a2_rack = get_rack(a2); + + if (address_rack == a1_rack && address_rack != a2_rack) { + return -1; + } + + if (address_rack == a2_rack && address_rack != a1_rack) { + return 1; + } + } + // + // We don't differentiate between Nodes if all Nodes belong to different + // Data Centers, thus make them equal. + // + return 0; +} + +} // namespace locator diff --git a/locator/topology.hh b/locator/topology.hh new file mode 100644 index 0000000000..d04c3a6a83 --- /dev/null +++ b/locator/topology.hh @@ -0,0 +1,123 @@ +/* + * + * Modified by ScyllaDB + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0) + */ + +#pragma once + +#include +#include + +#include +#include +#include + +#include "locator/types.hh" +#include "inet_address_vectors.hh" + +using namespace seastar; + +namespace locator { + +class topology { +public: + struct config { + endpoint_dc_rack local_dc_rack; + bool disable_proximity_sorting = false; + }; + topology(config cfg); + topology(topology&&) = default; + + topology& operator=(topology&&) = default; + + future clone_gently() const; + future<> clear_gently() noexcept; + + using pending = bool_class; + + /** + * Stores current DC/rack assignment for ep + */ + void update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend); + + /** + * Removes current DC/rack assignment for ep + */ + void remove_endpoint(inet_address ep); + + /** + * Returns true iff contains given endpoint + */ + bool has_endpoint(inet_address, pending with_pending = pending::no) const; + + const std::unordered_map>& + get_datacenter_endpoints() const { + return _dc_endpoints; + } + + const std::unordered_map>>& + get_datacenter_racks() const { + return _dc_racks; + } + + const endpoint_dc_rack& get_location(const inet_address& ep) const; + sstring get_rack() const; + sstring get_rack(inet_address ep) const; + sstring get_datacenter() const; + sstring get_datacenter(inet_address ep) const; + + auto get_local_dc_filter() const noexcept { + return [ this, local_dc = get_datacenter() ] (inet_address ep) { + return get_datacenter(ep) == local_dc; + }; + }; + + template + inline size_t count_local_endpoints(const Range& endpoints) const { + return std::count_if(endpoints.begin(), endpoints.end(), get_local_dc_filter()); + } + + /** + * This method will sort the List by proximity to the given + * address. + */ + void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const; + +private: + // default constructor for cloning purposes + topology() = default; + + /** + * compares two endpoints in relation to the target endpoint, returning as + * Comparator.compare would + */ + int compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const; + void remove_pending_location(const inet_address& ep); + + /** multi-map: DC -> endpoints in that DC */ + std::unordered_map> + _dc_endpoints; + + /** map: DC -> (multi-map: rack -> endpoints in that rack) */ + std::unordered_map>> + _dc_racks; + + /** reverse-lookup map: endpoint -> current known dc/rack assignment */ + std::unordered_map _current_locations; + std::unordered_map _pending_locations; + + bool _sort_by_proximity = true; +}; + +} // namespace locator