Merge 'Refactor topology out of token_metadata' from Benny Halevy

This series moves the topology code from locator/token_metadata.{cc,hh} out to localtor/topology.{cc,hh}
and introduces a shared header file: locator/types.hh contains shared, low level definitions, in anticipation of https://github.com/scylladb/scylladb/pull/11987

While at it, the token_metadata functions are turned into coroutines
and topology copy constructor is deleted.  The copy functionality is moved into an async `clone_gently` function that allows yielding while copying the topology.

Closes #12001

* github.com:scylladb/scylladb:
  locator: refactor topology out of token_metadata
  locator: add types.hh
  topology: delete copy constructor
  token_metadata: coroutinize clone functions
This commit is contained in:
Pavel Emelyanov
2022-11-17 13:55:34 +03:00
6 changed files with 396 additions and 304 deletions

View File

@@ -945,6 +945,7 @@ scylla_core = (['message/messaging_service.cc',
'locator/ec2_snitch.cc',
'locator/ec2_multi_region_snitch.cc',
'locator/gce_snitch.cc',
'locator/topology.cc',
'service/client_state.cc',
'service/storage_service.cc',
'service/misc_services.cc',

View File

@@ -341,44 +341,37 @@ token_metadata_impl::ring_range(const token& start) const {
}
future<token_metadata_impl> token_metadata_impl::clone_async() const noexcept {
return clone_only_token_map().then([this] (token_metadata_impl ret) {
return do_with(std::move(ret), [this] (token_metadata_impl& ret) {
ret._bootstrap_tokens.reserve(_bootstrap_tokens.size());
return do_for_each(_bootstrap_tokens, [&ret] (const auto& p) {
ret._bootstrap_tokens.emplace(p);
}).then([this, &ret] {
ret._leaving_endpoints = _leaving_endpoints;
ret._replacing_endpoints = _replacing_endpoints;
}).then([this, &ret] {
return do_for_each(_pending_ranges_interval_map,
[this, &ret] (const auto& p) {
ret._pending_ranges_interval_map.emplace(p);
});
}).then([this, &ret] {
ret._ring_version = _ring_version;
return make_ready_future<token_metadata_impl>(std::move(ret));
});
});
});
auto ret = co_await clone_only_token_map();
ret._bootstrap_tokens.reserve(_bootstrap_tokens.size());
for (const auto& p : _bootstrap_tokens) {
ret._bootstrap_tokens.emplace(p);
co_await coroutine::maybe_yield();
}
ret._leaving_endpoints = _leaving_endpoints;
ret._replacing_endpoints = _replacing_endpoints;
for (const auto& p : _pending_ranges_interval_map) {
ret._pending_ranges_interval_map.emplace(p);
co_await coroutine::maybe_yield();
}
ret._ring_version = _ring_version;
co_return ret;
}
future<token_metadata_impl> token_metadata_impl::clone_only_token_map(bool clone_sorted_tokens) const noexcept {
return do_with(token_metadata_impl(shallow_copy{}, *this), [this, clone_sorted_tokens] (token_metadata_impl& ret) {
ret._token_to_endpoint_map.reserve(_token_to_endpoint_map.size());
return do_for_each(_token_to_endpoint_map, [&ret] (const auto& p) {
ret._token_to_endpoint_map.emplace(p);
}).then([this, &ret] {
ret._normal_token_owners = _normal_token_owners;
ret._endpoint_to_host_id_map = _endpoint_to_host_id_map;
}).then([this, &ret] {
ret._topology = _topology;
}).then([this, &ret, clone_sorted_tokens] {
if (clone_sorted_tokens) {
ret._sorted_tokens = _sorted_tokens;
}
return make_ready_future<token_metadata_impl>(std::move(ret));
});
});
auto ret = token_metadata_impl(shallow_copy{}, *this);
ret._token_to_endpoint_map.reserve(_token_to_endpoint_map.size());
for (const auto& p : _token_to_endpoint_map) {
ret._token_to_endpoint_map.emplace(p);
co_await coroutine::maybe_yield();
}
ret._normal_token_owners = _normal_token_owners;
ret._endpoint_to_host_id_map = _endpoint_to_host_id_map;
ret._topology = co_await _topology.clone_gently();
if (clone_sorted_tokens) {
ret._sorted_tokens = _sorted_tokens;
co_await coroutine::maybe_yield();
}
co_return ret;
}
future<> token_metadata_impl::clear_gently() noexcept {
@@ -1251,176 +1244,6 @@ token_metadata::invalidate_cached_rings() {
_impl->invalidate_cached_rings();
}
/////////////////// class topology /////////////////////////////////////////////
inline future<> topology::clear_gently() noexcept {
co_await utils::clear_gently(_dc_endpoints);
co_await utils::clear_gently(_dc_racks);
co_await utils::clear_gently(_current_locations);
co_await utils::clear_gently(_pending_locations);
co_return;
}
topology::topology(config cfg)
: _sort_by_proximity(!cfg.disable_proximity_sorting)
{
_pending_locations[utils::fb_utilities::get_broadcast_address()] = std::move(cfg.local_dc_rack);
}
topology::topology(const topology& other)
: _dc_endpoints(other._dc_endpoints)
, _dc_racks(other._dc_racks)
, _current_locations(other._current_locations)
, _pending_locations(other._pending_locations)
, _sort_by_proximity(other._sort_by_proximity)
{
}
void topology::remove_pending_location(const inet_address& ep) {
if (ep != utils::fb_utilities::get_broadcast_address()) {
_pending_locations.erase(ep);
}
}
void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend)
{
if (pend) {
_pending_locations[ep] = std::move(dr);
return;
}
auto current = _current_locations.find(ep);
if (current != _current_locations.end()) {
if (current->second.dc == dr.dc && current->second.rack == dr.rack) {
return;
}
remove_endpoint(ep);
}
_dc_endpoints[dr.dc].insert(ep);
_dc_racks[dr.dc][dr.rack].insert(ep);
_current_locations[ep] = std::move(dr);
remove_pending_location(ep);
}
void topology::remove_endpoint(inet_address ep)
{
auto cur_dc_rack = _current_locations.find(ep);
if (cur_dc_rack == _current_locations.end()) {
remove_pending_location(ep);
return;
}
_dc_endpoints[cur_dc_rack->second.dc].erase(ep);
auto& racks = _dc_racks[cur_dc_rack->second.dc];
auto& eps = racks[cur_dc_rack->second.rack];
eps.erase(ep);
if (eps.empty()) {
racks.erase(cur_dc_rack->second.rack);
}
_current_locations.erase(cur_dc_rack);
}
bool topology::has_endpoint(inet_address ep, pending with_pending) const
{
return _current_locations.contains(ep) || (with_pending && _pending_locations.contains(ep));
}
const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
if (_current_locations.contains(ep)) {
return _current_locations.at(ep);
}
if (_pending_locations.contains(ep)) {
return _pending_locations.at(ep);
}
on_internal_error(tlogger, format("Node {} is not in topology", ep));
}
// FIXME -- both methods below should rather return data from the
// get_location() result, but to make it work two things are to be fixed:
// - topology should be aware of internal-ip conversions
// - topology should be pre-populated with data loaded from system ks
sstring topology::get_rack() const {
return get_rack(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_rack(inet_address ep) const {
return get_location(ep).rack;
}
sstring topology::get_datacenter() const {
return get_datacenter(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_datacenter(inet_address ep) const {
return get_location(ep).dc;
}
void topology::sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const {
if (_sort_by_proximity) {
std::sort(addresses.begin(), addresses.end(), [this, &address](inet_address& a1, inet_address& a2) {
return compare_endpoints(address, a1, a2) < 0;
});
}
}
int topology::compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const {
//
// if one of the Nodes IS the Node we are comparing to and the other one
// IS NOT - then return the appropriate result.
//
if (address == a1 && address != a2) {
return -1;
}
if (address == a2 && address != a1) {
return 1;
}
// ...otherwise perform the similar check in regard to Data Center
sstring address_datacenter = get_datacenter(address);
sstring a1_datacenter = get_datacenter(a1);
sstring a2_datacenter = get_datacenter(a2);
if (address_datacenter == a1_datacenter &&
address_datacenter != a2_datacenter) {
return -1;
} else if (address_datacenter == a2_datacenter &&
address_datacenter != a1_datacenter) {
return 1;
} else if (address_datacenter == a2_datacenter &&
address_datacenter == a1_datacenter) {
//
// ...otherwise (in case Nodes belong to the same Data Center) check
// the racks they belong to.
//
sstring address_rack = get_rack(address);
sstring a1_rack = get_rack(a1);
sstring a2_rack = get_rack(a2);
if (address_rack == a1_rack && address_rack != a2_rack) {
return -1;
}
if (address_rack == a2_rack && address_rack != a1_rack) {
return 1;
}
}
//
// We don't differentiate between Nodes if all Nodes belong to different
// Data Centers, thus make them equal.
//
return 0;
}
/////////////////// class topology end /////////////////////////////////////////
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
if (_shared->get_ring_version() >= tmptr->get_ring_version()) {
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));

View File

@@ -16,7 +16,6 @@
#include "gms/inet_address.hh"
#include "dht/i_partitioner.hh"
#include "inet_address_vectors.hh"
#include "locator/host_id.hh"
#include <optional>
#include <memory>
#include <boost/range/iterator_range.hpp>
@@ -25,6 +24,9 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/semaphore.hh>
#include "locator/types.hh"
#include "locator/topology.hh"
// forward declaration since replica/database.hh includes this file
namespace replica {
class keyspace;
@@ -34,105 +36,8 @@ namespace locator {
class abstract_replication_strategy;
using inet_address = gms::inet_address;
using token = dht::token;
// Endpoint Data Center and Rack names
struct endpoint_dc_rack {
sstring dc;
sstring rack;
};
class topology {
public:
struct config {
endpoint_dc_rack local_dc_rack;
bool disable_proximity_sorting = false;
};
topology(config cfg);
topology(const topology& other);
future<> clear_gently() noexcept;
using pending = bool_class<struct pending_tag>;
/**
* Stores current DC/rack assignment for ep
*/
void update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend);
/**
* Removes current DC/rack assignment for ep
*/
void remove_endpoint(inet_address ep);
/**
* Returns true iff contains given endpoint
*/
bool has_endpoint(inet_address, pending with_pending = pending::no) const;
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
get_datacenter_endpoints() const {
return _dc_endpoints;
}
const std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>&
get_datacenter_racks() const {
return _dc_racks;
}
const endpoint_dc_rack& get_location(const inet_address& ep) const;
sstring get_rack() const;
sstring get_rack(inet_address ep) const;
sstring get_datacenter() const;
sstring get_datacenter(inet_address ep) const;
auto get_local_dc_filter() const noexcept {
return [ this, local_dc = get_datacenter() ] (inet_address ep) {
return get_datacenter(ep) == local_dc;
};
};
template <std::ranges::range Range>
inline size_t count_local_endpoints(const Range& endpoints) const {
return std::count_if(endpoints.begin(), endpoints.end(), get_local_dc_filter());
}
/**
* This method will sort the <tt>List</tt> by proximity to the given
* address.
*/
void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const;
private:
/**
* compares two endpoints in relation to the target endpoint, returning as
* Comparator.compare would
*/
int compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
void remove_pending_location(const inet_address& ep);
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,
std::unordered_set<inet_address>>
_dc_endpoints;
/** map: DC -> (multi-map: rack -> endpoints in that rack) */
std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>
_dc_racks;
/** reverse-lookup map: endpoint -> current known dc/rack assignment */
std::unordered_map<inet_address, endpoint_dc_rack> _current_locations;
std::unordered_map<inet_address, endpoint_dc_rack> _pending_locations;
bool _sort_by_proximity = true;
};
class token_metadata;
struct host_id_or_endpoint {
@@ -160,7 +65,6 @@ struct host_id_or_endpoint {
void resolve(const token_metadata& tm);
};
using dc_rack_fn = seastar::noncopyable_function<endpoint_dc_rack(inet_address)>;
class token_metadata_impl;
class token_metadata final {

209
locator/topology.cc Normal file
View File

@@ -0,0 +1,209 @@
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/on_internal_error.hh>
#include "log.hh"
#include "locator/topology.hh"
#include "utils/stall_free.hh"
#include "utils/fb_utilities.hh"
namespace locator {
static logging::logger tlogger("topology");
future<> topology::clear_gently() noexcept {
co_await utils::clear_gently(_dc_endpoints);
co_await utils::clear_gently(_dc_racks);
co_await utils::clear_gently(_current_locations);
co_await utils::clear_gently(_pending_locations);
co_return;
}
topology::topology(config cfg)
: _sort_by_proximity(!cfg.disable_proximity_sorting)
{
_pending_locations[utils::fb_utilities::get_broadcast_address()] = std::move(cfg.local_dc_rack);
}
future<topology> topology::clone_gently() const {
topology ret;
ret._dc_endpoints.reserve(_dc_endpoints.size());
for (const auto& p : _dc_endpoints) {
ret._dc_endpoints.emplace(p);
}
co_await coroutine::maybe_yield();
ret._dc_racks.reserve(_dc_racks.size());
for (const auto& [dc, rack_endpoints] : _dc_racks) {
ret._dc_racks[dc].reserve(rack_endpoints.size());
for (const auto& p : rack_endpoints) {
ret._dc_racks[dc].emplace(p);
}
}
co_await coroutine::maybe_yield();
ret._current_locations.reserve(_current_locations.size());
for (const auto& p : _current_locations) {
ret._current_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._pending_locations.reserve(_pending_locations.size());
for (const auto& p : _pending_locations) {
ret._pending_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._sort_by_proximity = _sort_by_proximity;
co_return ret;
}
void topology::remove_pending_location(const inet_address& ep) {
if (ep != utils::fb_utilities::get_broadcast_address()) {
_pending_locations.erase(ep);
}
}
void topology::update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend)
{
if (pend) {
_pending_locations[ep] = std::move(dr);
return;
}
auto current = _current_locations.find(ep);
if (current != _current_locations.end()) {
if (current->second.dc == dr.dc && current->second.rack == dr.rack) {
return;
}
remove_endpoint(ep);
}
_dc_endpoints[dr.dc].insert(ep);
_dc_racks[dr.dc][dr.rack].insert(ep);
_current_locations[ep] = std::move(dr);
remove_pending_location(ep);
}
void topology::remove_endpoint(inet_address ep)
{
auto cur_dc_rack = _current_locations.find(ep);
if (cur_dc_rack == _current_locations.end()) {
remove_pending_location(ep);
return;
}
_dc_endpoints[cur_dc_rack->second.dc].erase(ep);
auto& racks = _dc_racks[cur_dc_rack->second.dc];
auto& eps = racks[cur_dc_rack->second.rack];
eps.erase(ep);
if (eps.empty()) {
racks.erase(cur_dc_rack->second.rack);
}
_current_locations.erase(cur_dc_rack);
}
bool topology::has_endpoint(inet_address ep, pending with_pending) const
{
return _current_locations.contains(ep) || (with_pending && _pending_locations.contains(ep));
}
const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
if (_current_locations.contains(ep)) {
return _current_locations.at(ep);
}
if (_pending_locations.contains(ep)) {
return _pending_locations.at(ep);
}
on_internal_error(tlogger, format("Node {} is not in topology", ep));
}
// FIXME -- both methods below should rather return data from the
// get_location() result, but to make it work two things are to be fixed:
// - topology should be aware of internal-ip conversions
// - topology should be pre-populated with data loaded from system ks
sstring topology::get_rack() const {
return get_rack(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_rack(inet_address ep) const {
return get_location(ep).rack;
}
sstring topology::get_datacenter() const {
return get_datacenter(utils::fb_utilities::get_broadcast_address());
}
sstring topology::get_datacenter(inet_address ep) const {
return get_location(ep).dc;
}
void topology::sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const {
if (_sort_by_proximity) {
std::sort(addresses.begin(), addresses.end(), [this, &address](inet_address& a1, inet_address& a2) {
return compare_endpoints(address, a1, a2) < 0;
});
}
}
int topology::compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const {
//
// if one of the Nodes IS the Node we are comparing to and the other one
// IS NOT - then return the appropriate result.
//
if (address == a1 && address != a2) {
return -1;
}
if (address == a2 && address != a1) {
return 1;
}
// ...otherwise perform the similar check in regard to Data Center
sstring address_datacenter = get_datacenter(address);
sstring a1_datacenter = get_datacenter(a1);
sstring a2_datacenter = get_datacenter(a2);
if (address_datacenter == a1_datacenter &&
address_datacenter != a2_datacenter) {
return -1;
} else if (address_datacenter == a2_datacenter &&
address_datacenter != a1_datacenter) {
return 1;
} else if (address_datacenter == a2_datacenter &&
address_datacenter == a1_datacenter) {
//
// ...otherwise (in case Nodes belong to the same Data Center) check
// the racks they belong to.
//
sstring address_rack = get_rack(address);
sstring a1_rack = get_rack(a1);
sstring a2_rack = get_rack(a2);
if (address_rack == a1_rack && address_rack != a2_rack) {
return -1;
}
if (address_rack == a2_rack && address_rack != a1_rack) {
return 1;
}
}
//
// We don't differentiate between Nodes if all Nodes belong to different
// Data Centers, thus make them equal.
//
return 0;
}
} // namespace locator

123
locator/topology.hh Normal file
View File

@@ -0,0 +1,123 @@
/*
*
* Modified by ScyllaDB
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include <unordered_set>
#include <unordered_map>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/bool_class.hh>
#include "locator/types.hh"
#include "inet_address_vectors.hh"
using namespace seastar;
namespace locator {
class topology {
public:
struct config {
endpoint_dc_rack local_dc_rack;
bool disable_proximity_sorting = false;
};
topology(config cfg);
topology(topology&&) = default;
topology& operator=(topology&&) = default;
future<topology> clone_gently() const;
future<> clear_gently() noexcept;
using pending = bool_class<struct pending_tag>;
/**
* Stores current DC/rack assignment for ep
*/
void update_endpoint(const inet_address& ep, endpoint_dc_rack dr, pending pend);
/**
* Removes current DC/rack assignment for ep
*/
void remove_endpoint(inet_address ep);
/**
* Returns true iff contains given endpoint
*/
bool has_endpoint(inet_address, pending with_pending = pending::no) const;
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
get_datacenter_endpoints() const {
return _dc_endpoints;
}
const std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>&
get_datacenter_racks() const {
return _dc_racks;
}
const endpoint_dc_rack& get_location(const inet_address& ep) const;
sstring get_rack() const;
sstring get_rack(inet_address ep) const;
sstring get_datacenter() const;
sstring get_datacenter(inet_address ep) const;
auto get_local_dc_filter() const noexcept {
return [ this, local_dc = get_datacenter() ] (inet_address ep) {
return get_datacenter(ep) == local_dc;
};
};
template <std::ranges::range Range>
inline size_t count_local_endpoints(const Range& endpoints) const {
return std::count_if(endpoints.begin(), endpoints.end(), get_local_dc_filter());
}
/**
* This method will sort the <tt>List</tt> by proximity to the given
* address.
*/
void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const;
private:
// default constructor for cloning purposes
topology() = default;
/**
* compares two endpoints in relation to the target endpoint, returning as
* Comparator.compare would
*/
int compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
void remove_pending_location(const inet_address& ep);
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,
std::unordered_set<inet_address>>
_dc_endpoints;
/** map: DC -> (multi-map: rack -> endpoints in that rack) */
std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>
_dc_racks;
/** reverse-lookup map: endpoint -> current known dc/rack assignment */
std::unordered_map<inet_address, endpoint_dc_rack> _current_locations;
std::unordered_map<inet_address, endpoint_dc_rack> _pending_locations;
bool _sort_by_proximity = true;
};
} // namespace locator

32
locator/types.hh Normal file
View File

@@ -0,0 +1,32 @@
/*
*
* Modified by ScyllaDB
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "gms/inet_address.hh"
#include "locator/host_id.hh"
using namespace seastar;
namespace locator {
using inet_address = gms::inet_address;
// Endpoint Data Center and Rack names
struct endpoint_dc_rack {
sstring dc;
sstring rack;
};
using dc_rack_fn = seastar::noncopyable_function<endpoint_dc_rack(inet_address)>;
} // namespace locator