locator: add class node

And keep per node information (idx, host_id, endpoint, dc_rack, is_pending)
in node objects, indexed by topology on several indices like:
idx, host_id, endpoint, current/pending, per dc, per dc/rack.

The node index is a shorthand identifier for the node.

node* and index are valid while the respective topology instance is valid.
To be used, the caller must hold on to the topology / token_metadata object
(e.g. via a token_metadata_ptr or effective_replication_map)

Refs #6403

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

topology: add node idx

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-11-13 20:18:39 +02:00
parent 006e02410f
commit f3d5df5448
9 changed files with 654 additions and 101 deletions

View File

@@ -32,8 +32,10 @@ bool host_filter::can_hint_for(const locator::topology& topo, gms::inet_address
switch (_enabled_kind) {
case enabled_kind::enabled_for_all:
return true;
case enabled_kind::enabled_selectively:
return topo.has_endpoint(ep) && _dcs.contains(topo.get_datacenter(ep));
case enabled_kind::enabled_selectively: {
auto node = topo.find_node(ep);
return node && _dcs.contains(node->dc_rack().dc);
}
case enabled_kind::disabled_for_all:
return false;
}

View File

@@ -521,6 +521,7 @@ void token_metadata_impl::debug_show() const {
}
void token_metadata_impl::update_host_id(const host_id& host_id, inet_address endpoint) {
_topology.add_or_update_endpoint(endpoint, host_id);
_endpoint_to_host_id_map[endpoint] = host_id;
}
@@ -1261,7 +1262,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metada
auto lk = co_await stm.local().get_lock();
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
pending_token_metadata_ptr.reserve(smp::count);
pending_token_metadata_ptr.resize(smp::count);
auto tmptr = make_token_metadata_ptr(co_await stm.local().get()->clone_async());
auto& tm = *tmptr;
// bump the token_metadata ring_version

View File

@@ -117,6 +117,10 @@ public:
const std::unordered_map<token, inet_address>& get_token_to_endpoint() const;
const std::unordered_set<inet_address>& get_leaving_endpoints() const;
const std::unordered_map<token, inet_address>& get_bootstrap_tokens() const;
/**
* Update or add endpoint given its inet_address and endpoint_dc_rack.
*/
void update_topology(inet_address ep, endpoint_dc_rack dr);
/**
* Creates an iterable range of the sorted tokens starting at the token next

View File

@@ -25,45 +25,420 @@ thread_local const endpoint_dc_rack endpoint_dc_rack::default_location = {
.rack = locator::production_snitch_base::default_rack,
};
node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, this_node is_this_node, node::idx_type idx)
: _topology(topology)
, _host_id(id)
, _endpoint(endpoint)
, _dc_rack(std::move(dc_rack))
, _is_this_node(is_this_node)
, _idx(idx)
{}
node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, node::this_node is_this_node, node::idx_type idx) {
return std::make_unique<node>(topology, std::move(id), std::move(endpoint), std::move(dc_rack), is_this_node, idx);
}
node_holder node::clone() const {
return make(nullptr, host_id(), endpoint(), dc_rack(), is_this_node());
}
future<> topology::clear_gently() noexcept {
co_await utils::clear_gently(_dc_endpoints);
co_await utils::clear_gently(_dc_racks);
co_await utils::clear_gently(_current_locations);
_datacenters.clear();
co_return;
_dc_rack_nodes.clear();
_dc_nodes.clear();
_nodes_by_endpoint.clear();
_nodes_by_host_id.clear();
co_await utils::clear_gently(_nodes);
}
topology::topology() noexcept
: _shard(this_shard_id())
{
tlogger.trace("topology[{}]: default-constructed", fmt::ptr(this));
}
topology::topology(config cfg)
: _sort_by_proximity(!cfg.disable_proximity_sorting)
: _shard(this_shard_id())
, _sort_by_proximity(!cfg.disable_proximity_sorting)
{
add_or_update_endpoint(utils::fb_utilities::get_broadcast_address(), cfg.local_dc_rack);
tlogger.trace("topology[{}]: constructing using config: host_id={} endpoint={} dc={} rack={}", fmt::ptr(this),
cfg.this_host_id, cfg.this_endpoint, cfg.local_dc_rack.dc, cfg.local_dc_rack.rack);
if (cfg.this_host_id || cfg.this_endpoint != inet_address{}) {
add_node(node::make(this, cfg.this_host_id, cfg.this_endpoint, cfg.local_dc_rack, node::this_node::yes));
}
}
topology::topology(topology&& o) noexcept
: _shard(o._shard)
, _nodes(std::move(o._nodes))
, _nodes_by_host_id(std::move(o._nodes_by_host_id))
, _nodes_by_endpoint(std::move(o._nodes_by_endpoint))
, _dc_nodes(std::move(o._dc_nodes))
, _dc_rack_nodes(std::move(o._dc_rack_nodes))
, _dc_endpoints(std::move(o._dc_endpoints))
, _dc_racks(std::move(o._dc_racks))
, _sort_by_proximity(o._sort_by_proximity)
, _datacenters(std::move(o._datacenters))
{
assert(_shard == this_shard_id());
tlogger.trace("topology[{}]: move from [{}]", fmt::ptr(this), fmt::ptr(&o));
for (auto& n : _nodes) {
if (n) {
n->set_topology(this);
}
}
}
future<topology> topology::clone_gently() const {
topology ret;
ret._dc_endpoints.reserve(_dc_endpoints.size());
for (const auto& p : _dc_endpoints) {
ret._dc_endpoints.emplace(p);
}
co_await coroutine::maybe_yield();
ret._dc_racks.reserve(_dc_racks.size());
for (const auto& [dc, rack_endpoints] : _dc_racks) {
ret._dc_racks[dc].reserve(rack_endpoints.size());
for (const auto& p : rack_endpoints) {
ret._dc_racks[dc].emplace(p);
tlogger.debug("topology[{}]: clone_gently to {} from shard {}", fmt::ptr(this), fmt::ptr(&ret), _shard);
for (const auto& nptr : _nodes) {
if (nptr) {
ret.add_node(nptr->clone());
}
co_await coroutine::maybe_yield();
}
co_await coroutine::maybe_yield();
ret._current_locations.reserve(_current_locations.size());
for (const auto& p : _current_locations) {
ret._current_locations.emplace(p);
}
co_await coroutine::maybe_yield();
ret._datacenters = _datacenters;
ret._sort_by_proximity = _sort_by_proximity;
co_return ret;
}
std::string topology::debug_format(const node* node) {
if (!node) {
return format("node={}", fmt::ptr(node));
}
return format("node={} idx={} host_id={} endpoint={} dc={} rack={} this_node={}", fmt::ptr(node),
node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, bool(node->is_this_node()));
}
const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr) {
if (dr.dc.empty() || dr.rack.empty()) {
on_internal_error(tlogger, "Node must have valid dc and rack");
}
// OK to add a different node with the same ip address.
if (utils::fb_utilities::is_me(ep) && this_node() && this_node()->host_id() == id) {
on_internal_error(tlogger, format("topology[{}]: local node already set: host_id={} endpoint={} dc={} rack={}: currently mapped to {}", fmt::ptr(this),
id, ep, dr.dc, dr.rack, debug_format(this_node())));
}
return add_node(node::make(this, id, ep, dr));
}
const node* topology::add_node(node_holder nptr) {
const node* node = nptr.get();
if (nptr->topology() != this) {
if (nptr->topology()) {
on_fatal_internal_error(tlogger, format("topology[{}]: {} belongs to different topology={}", fmt::ptr(this), debug_format(node), fmt::ptr(node->topology())));
}
nptr->set_topology(this);
}
if (node->is_this_node() && this_node()) {
on_internal_error(tlogger, format("topology[{}]: {}: local node already mapped to {}", fmt::ptr(this), debug_format(node), debug_format(this_node())));
}
if (node->idx() > 0) {
on_internal_error(tlogger, format("topology[{}]: {}: has assigned idx", fmt::ptr(this), debug_format(node)));
}
// Note that _nodes contains also the this_node()
nptr->set_idx(_nodes.size());
_nodes.emplace_back(std::move(nptr));
tlogger.debug("topology[{}]: add_node: {}, at {}", fmt::ptr(this), debug_format(node), current_backtrace());
try {
index_node(node);
} catch (...) {
pop_node(make_mutable(node));
throw;
}
return node;
}
const node* topology::update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr) {
tlogger.debug("topology[{}]: update_node: {}: to: host_id={} endpoint={} dc={} rack={}, at {}", fmt::ptr(this), debug_format(node),
opt_id ? format("{}", *opt_id) : "unchanged",
opt_ep ? format("{}", *opt_ep) : "unchanged",
opt_dr ? format("{}", opt_dr->dc) : "unchanged",
opt_dr ? format("{}", opt_dr->rack) : "unchanged",
current_backtrace());
bool changed = false;
if (opt_id) {
if (*opt_id != node->host_id()) {
if (!*opt_id) {
on_internal_error(tlogger, format("Updating node host_id to null is disallowed: {}: new host_id={}", debug_format(node), *opt_id));
}
if (node->is_this_node() && node->host_id()) {
on_internal_error(tlogger, format("This node host_id is alredy set: {}: new host_id={}", debug_format(node), *opt_id));
}
if (_nodes_by_host_id.contains(*opt_id)) {
on_internal_error(tlogger, format("Cannot update node host_id: {}: new host_id already exists: {}", debug_format(node), debug_format(_nodes_by_host_id[*opt_id])));
}
changed = true;
} else {
opt_id.reset();
}
}
if (opt_ep) {
if (*opt_ep != node->endpoint()) {
if (*opt_ep == inet_address{}) {
on_internal_error(tlogger, format("Updating node endpoint to null is disallowed: {}: new endpoint={}", debug_format(node), *opt_ep));
}
changed = true;
} else {
opt_ep.reset();
}
}
if (opt_dr) {
if (opt_dr->dc.empty() || opt_dr->dc == production_snitch_base::default_dc) {
opt_dr->dc = node->dc_rack().dc;
}
if (opt_dr->rack.empty() || opt_dr->rack == production_snitch_base::default_rack) {
opt_dr->rack = node->dc_rack().rack;
}
if (*opt_dr != node->dc_rack()) {
changed = true;
} else {
opt_dr.reset();
}
}
if (!changed) {
return node;
}
unindex_node(node);
// The following block must not throw
try {
auto mutable_node = make_mutable(node);
if (opt_id) {
mutable_node->_host_id = *opt_id;
}
if (opt_ep) {
mutable_node->_endpoint = *opt_ep;
}
if (opt_dr) {
mutable_node->_dc_rack = std::move(*opt_dr);
}
} catch (...) {
std::terminate();
}
index_node(node);
return node;
}
bool topology::remove_node(host_id id) {
auto node = find_node(id);
tlogger.debug("topology[{}]: remove_node: host_id={}: {}", fmt::ptr(this), id, debug_format(node));
if (node) {
remove_node(node);
return true;
}
return false;
}
void topology::remove_node(const node* node) {
// never delete this node
if (node->is_this_node()) {
unindex_node(node);
} else {
pop_node(node);
}
}
void topology::index_node(const node* node) {
tlogger.trace("topology[{}]: index_node: {}, at {}", fmt::ptr(this), debug_format(node), current_backtrace());
if (node->idx() < 0) {
on_internal_error(tlogger, format("topology[{}]: {}: must already have a valid idx", fmt::ptr(this), debug_format(node)));
}
// FIXME: for now we allow adding nodes with null host_id, for the following cases:
// 1. This node might be added with no host_id on pristine nodes.
// 2. Other nodes may be introduced via gossip with their endpoint only first
// and their host_id is updated later on.
if (node->host_id()) {
auto [nit, inserted_host_id] = _nodes_by_host_id.emplace(node->host_id(), node);
if (!inserted_host_id) {
on_internal_error(tlogger, format("topology[{}]: {}: node already exists", fmt::ptr(this), debug_format(node)));
}
}
if (node->endpoint() != inet_address{}) {
auto [eit, inserted_endpoint] = _nodes_by_endpoint.emplace(node->endpoint(), node);
if (!inserted_endpoint) {
if (node->host_id()) {
_nodes_by_host_id.erase(node->host_id());
}
on_internal_error(tlogger, format("topology[{}]: {}: node endpoint already mapped to {}", fmt::ptr(this), debug_format(node), debug_format(eit->second)));
}
}
const auto& dc = node->dc_rack().dc;
const auto& rack = node->dc_rack().rack;
const auto& endpoint = node->endpoint();
_dc_nodes[dc].emplace(node);
_dc_rack_nodes[dc][rack].emplace(node);
_dc_endpoints[dc].insert(endpoint);
_dc_racks[dc][rack].insert(endpoint);
_datacenters.insert(dc);
}
void topology::unindex_node(const node* node) {
tlogger.trace("topology[{}]: unindex_node: {}, at {}", fmt::ptr(this), debug_format(node), current_backtrace());
const auto& dc = node->dc_rack().dc;
const auto& rack = node->dc_rack().rack;
if (auto dit = _dc_endpoints.find(dc); dit != _dc_endpoints.end()) {
const auto& ep = node->endpoint();
auto& eps = dit->second;
eps.erase(ep);
if (eps.empty()) {
_dc_racks.erase(dc);
_dc_endpoints.erase(dit);
} else {
auto& racks = _dc_racks[dc];
if (auto rit = racks.find(rack); rit != racks.end()) {
eps = rit->second;
eps.erase(ep);
if (eps.empty()) {
racks.erase(rit);
}
}
}
}
if (auto dit = _dc_nodes.find(dc); dit != _dc_nodes.end()) {
auto& nodes = dit->second;
nodes.erase(node);
if (nodes.empty()) {
_dc_rack_nodes.erase(dc);
_datacenters.erase(dc);
_dc_nodes.erase(dit);
} else {
auto& racks = _dc_rack_nodes[dc];
if (auto rit = racks.find(rack); rit != racks.end()) {
nodes = rit->second;
nodes.erase(node);
if (nodes.empty()) {
racks.erase(rit);
}
}
}
}
_nodes_by_host_id.erase(node->host_id());
_nodes_by_endpoint.erase(node->endpoint());
}
node_holder topology::pop_node(const node* node) {
tlogger.trace("topology[{}]: pop_node: {}, at {}", fmt::ptr(this), debug_format(node), current_backtrace());
unindex_node(node);
// this node?
if (node->idx() == 0) {
on_internal_error(tlogger, format("topology[{}]: {}: cannot pop this_node", fmt::ptr(this), debug_format(node)));
}
auto nh = std::exchange(_nodes[node->idx()], {});
// shrink _nodes if the last node is popped
// like when failing to index a newly added node
if (node->idx() == _nodes.size() - 1) {
_nodes.resize(node->idx());
}
return nh;
}
// Finds a node by its host_id
// Returns nullptr if not found
const node* topology::find_node(host_id id) const noexcept {
auto it = _nodes_by_host_id.find(id);
if (it != _nodes_by_host_id.end()) {
return it->second;
}
return nullptr;
}
// Finds a node by its endpoint
// Returns nullptr if not found
const node* topology::find_node(const inet_address& ep) const noexcept {
auto it = _nodes_by_endpoint.find(ep);
if (it != _nodes_by_endpoint.end()) {
return it->second;
}
return nullptr;
}
// Finds a node by its index
// Returns nullptr if not found
const node* topology::find_node(node::idx_type idx) const noexcept {
if (idx >= _nodes.size()) {
return nullptr;
}
return _nodes.at(idx).get();
}
const node* topology::add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr)
{
tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={}, at {}", fmt::ptr(this),
ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack,
current_backtrace());
auto n = find_node(ep);
if (n) {
return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr));
} else if (opt_id && (n = find_node(*opt_id))) {
return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr));
} else {
return add_node(opt_id.value_or(host_id::create_null_id()), ep, opt_dr.value_or(endpoint_dc_rack::default_location));
}
}
bool topology::remove_endpoint(inet_address ep)
{
auto node = find_node(ep);
tlogger.debug("topology[{}]: remove_endpoint: endpoint={}: {}", fmt::ptr(this), ep, debug_format(node));
if (node) {
remove_node(node);
return true;
}
return false;
}
bool topology::has_node(host_id id) const noexcept {
auto node = find_node(id);
tlogger.trace("topology[{}]: has_node: host_id={}: {}", fmt::ptr(this), id, debug_format(node));
return bool(node);
}
bool topology::has_node(inet_address ep) const noexcept {
auto node = find_node(ep);
tlogger.trace("topology[{}]: has_node: endpoint={}: node={}", fmt::ptr(this), ep, debug_format(node));
return bool(node);
}
bool topology::has_endpoint(inet_address ep) const
{
return has_node(ep);
}
const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
if (ep == utils::fb_utilities::get_broadcast_address()) {
return get_location();
}
if (auto node = find_node(ep)) {
return node->dc_rack();
}
// FIXME -- this shouldn't happen. After topology is stable and is
// correctly populated with endpoints, this should be replaced with
// on_internal_error()
tlogger.warn("Requested location for node {} not in topology. backtrace {}", ep, current_backtrace());
return endpoint_dc_rack::default_location;
}
#ifdef FIXME_TO_BE_REMOVED
void topology::add_or_update_endpoint(const inet_address& ep, endpoint_dc_rack dr)
{
auto current = _current_locations.find(ep);
@@ -136,6 +511,7 @@ const endpoint_dc_rack& topology::get_location(const inet_address& ep) const {
tlogger.warn("Requested location for node {} not in topology. backtrace {}", ep, current_backtrace());
return endpoint_dc_rack::default_location;
}
#endif
void topology::sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const {
if (_sort_by_proximity) {
@@ -168,3 +544,12 @@ std::weak_ordering topology::compare_endpoints(const inet_address& address, cons
}
} // namespace locator
namespace std {
std::ostream& operator<<(std::ostream& out, const locator::node& node) {
fmt::print(out, "{}", node);
return out;
}
} // namespace std

View File

@@ -13,6 +13,7 @@
#include <unordered_set>
#include <unordered_map>
#include <compare>
#include <iostream>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
@@ -20,35 +21,138 @@
#include "locator/types.hh"
#include "inet_address_vectors.hh"
#include "utils/fb_utilities.hh"
using namespace seastar;
namespace locator {
class topology;
class node;
using node_holder = std::unique_ptr<node>;
class node {
public:
using this_node = bool_class<struct this_node_tag>;
using idx_type = int;
private:
const topology* _topology;
host_id _host_id;
inet_address _endpoint;
endpoint_dc_rack _dc_rack;
// Is this node the `localhost` instance
this_node _is_this_node;
idx_type _idx = -1;
public:
node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, this_node is_this_node = this_node::no, idx_type idx = -1);
node(const node&) = delete;
node(node&&) = delete;
const topology* topology() const noexcept {
return _topology;
}
const host_id& host_id() const noexcept {
return _host_id;
}
const inet_address& endpoint() const noexcept {
return _endpoint;
}
const endpoint_dc_rack& dc_rack() const noexcept {
return _dc_rack;
}
// Is this "localhost"?
this_node is_this_node() const noexcept { return _is_this_node; }
// idx < 0 means "unassigned"
idx_type idx() const noexcept { return _idx; }
private:
static node_holder make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, node::this_node is_this_node = this_node::no, idx_type idx = -1);
node_holder clone() const;
void set_topology(const locator::topology* topology) noexcept { _topology = topology; }
void set_idx(idx_type idx) noexcept { _idx = idx; }
friend class topology;
};
class topology {
public:
struct config {
host_id this_host_id;
inet_address this_endpoint;
endpoint_dc_rack local_dc_rack;
bool disable_proximity_sorting = false;
};
topology(config cfg);
topology(topology&&) = default;
topology(topology&&) noexcept;
topology& operator=(topology&&) = default;
future<topology> clone_gently() const;
future<> clear_gently() noexcept;
public:
const node* this_node() const noexcept {
return _nodes.size() ? _nodes.front().get() : nullptr;
}
// Adds a node with given host_id, endpoint, and DC/rack.
const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr);
// Optionally updates node's current host_id, endpoint, or DC/rack.
// Note: the host_id may be updated from null to non-null after a new node gets a new, random host_id,
// or a peer node host_id may be updated when the node is replaced with another node using the same ip address.
const node* update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr);
// Removes a node using its host_id
// Returns true iff the node was found and removed.
bool remove_node(host_id id);
// Looks up a node by its host_id.
// Returns a pointer to the node if found, or nullptr otherwise.
const node* find_node(host_id id) const noexcept;
// Looks up a node by its inet_address.
// Returns a pointer to the node if found, or nullptr otherwise.
const node* find_node(const inet_address& ep) const noexcept;
// Finds a node by its index
// Returns a pointer to the node if found, or nullptr otherwise.
const node* find_node(node::idx_type idx) const noexcept;
// Returns true if a node with given host_id is found
bool has_node(host_id id) const noexcept;
bool has_node(inet_address id) const noexcept;
/**
* Stores current DC/rack assignment for ep
*
* Adds or updates a node with given endpoint
*/
void add_or_update_endpoint(const inet_address& ep, endpoint_dc_rack dr);
const node* add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr);
// Legacy entry point from token_metadata::update_topology
const node* add_or_update_endpoint(inet_address ep, endpoint_dc_rack dr) {
return add_or_update_endpoint(ep, std::nullopt, std::move(dr));
}
const node* add_or_update_endpoint(inet_address ep, host_id id) {
return add_or_update_endpoint(ep, id, std::nullopt);
}
/**
* Removes current DC/rack assignment for ep
* Returns true if the node was found and removed.
*/
void remove_endpoint(inet_address ep);
bool remove_endpoint(inet_address ep);
/**
* Returns true iff contains given endpoint.
@@ -72,27 +176,45 @@ public:
return _datacenters;
}
// Get dc/rack location of the local node
// Get dc/rack location of this node
const endpoint_dc_rack& get_location() const noexcept {
return get_location(utils::fb_utilities::get_broadcast_address());
return this_node()->dc_rack();
}
// Get dc/rack location of a node identified by host_id
// The specified node must exist.
const endpoint_dc_rack& get_location(host_id id) const {
return find_node(id)->dc_rack();
}
// Get dc/rack location of a node identified by endpoint
// The specified node must exist.
const endpoint_dc_rack& get_location(const inet_address& ep) const;
// Get datacenter of the local node
// Get datacenter of this node
const sstring& get_datacenter() const noexcept {
return get_location().dc;
}
// Get datacenter of a node identified by host_id
// The specified node must exist.
const sstring& get_datacenter(host_id id) const {
return get_location(id).dc;
}
// Get datacenter of a node identified by endpoint
// The specified node must exist.
const sstring& get_datacenter(inet_address ep) const {
return get_location(ep).dc;
}
// Get rack of the local node
// Get rack of this node
const sstring& get_rack() const noexcept {
return get_location().rack;
}
// Get rack of a node identified by host_id
// The specified node must exist.
const sstring& get_rack(host_id id) const {
return get_location(id).rack;
}
// Get rack of a node identified by endpoint
// The specified node must exist.
const sstring& get_rack(inet_address ep) const {
return get_location(ep).rack;
}
@@ -116,7 +238,20 @@ public:
private:
// default constructor for cloning purposes
topology() = default;
topology() noexcept;
const node* add_node(node_holder node);
void remove_node(const node* node);
static std::string debug_format(const node*);
void index_node(const node* node);
void unindex_node(const node* node);
node_holder pop_node(const node* node);
static node* make_mutable(const node* nptr) {
return const_cast<node*>(nptr);
}
/**
* compares two endpoints in relation to the target endpoint, returning as
@@ -129,6 +264,14 @@ private:
*/
std::weak_ordering compare_endpoints(const inet_address& address, const inet_address& a1, const inet_address& a2) const;
unsigned _shard;
std::vector<node_holder> _nodes;
std::unordered_map<host_id, const node*> _nodes_by_host_id;
std::unordered_map<inet_address, const node*> _nodes_by_endpoint;
std::unordered_map<sstring, std::unordered_set<const node*>> _dc_nodes;
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<const node*>>> _dc_rack_nodes;
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,
std::unordered_set<inet_address>>
@@ -140,9 +283,6 @@ private:
std::unordered_set<inet_address>>>
_dc_racks;
/** reverse-lookup map: endpoint -> current known dc/rack assignment */
std::unordered_map<inet_address, endpoint_dc_rack> _current_locations;
bool _sort_by_proximity = true;
// pre-calculated
@@ -155,3 +295,17 @@ public:
};
} // namespace locator
namespace std {
std::ostream& operator<<(std::ostream& out, const locator::node* node);
} // namespace std
template <>
struct fmt::formatter<locator::node> : fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format(const locator::node& node, FormatContext& ctx) const {
return fmt::format_to(ctx.out(), "{}/{}", node.host_id(), node.endpoint());
}
};

View File

@@ -753,6 +753,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
supervisor::notify("starting tokens manager");
locator::token_metadata::config tm_cfg;
// The local node's host_id is updated after loading from system.local
// or making a random one for a new node
tm_cfg.topo_cfg.this_host_id = host_id::create_null_id();
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
if (snitch.local()->get_name() == "org.apache.cassandra.locator.SimpleSnitch") {
//
@@ -1149,6 +1153,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
return sys_ks.start(snitch.local());
}).get();
cfg->host_id = sys_ks.local().load_local_host_id().get0();
shared_token_metadata::mutate_on_all_shards(token_metadata, [hostid = cfg->host_id, endpoint = utils::fb_utilities::get_broadcast_address()] (locator::token_metadata& tm) {
tm.get_topology().add_or_update_endpoint(endpoint, hostid);
return make_ready_future<>();
}).get();
supervisor::notify("initializing batchlog manager");
db::batchlog_manager_config bm_cfg;

View File

@@ -3747,7 +3747,8 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto local_dc = topology.get_datacenter();
for(auto dest: handler.get_targets()) {
sstring dc = topology.get_datacenter(dest);
auto node = topology.find_node(dest);
const auto& dc = node->dc_rack().dc;
// read repair writes do not go through coordinator since mutations are per destination
if (handler.read_repair_write() || dc == local_dc) {
local.emplace_back("", inet_address_vector_replica_set({dest}));

View File

@@ -8,6 +8,9 @@
#include <boost/test/unit_test.hpp>
#include <boost/range/adaptor/map.hpp>
#include "gms/inet_address.hh"
#include "locator/types.hh"
#include "utils/UUID_gen.hh"
#include "utils/fb_utilities.hh"
#include "utils/sequenced_set.hh"
#include "locator/network_topology_strategy.hh"
@@ -36,6 +39,7 @@ using namespace locator;
struct ring_point {
double point;
inet_address host;
host_id id = host_id::create_random_id();
};
void print_natural_endpoints(double point, const inet_address_vector_replica_set v) {
@@ -142,9 +146,9 @@ auto d2t = [](double d) -> int64_t {
void full_ring_check(const std::vector<ring_point>& ring_points,
const std::map<sstring, sstring>& options,
abstract_replication_strategy::ptr_type ars_ptr,
locator::token_metadata_ptr tmptr,
const locator::topology& topo) {
locator::token_metadata_ptr tmptr) {
auto& tm = *tmptr;
const auto& topo = tm.get_topology();
strategy_sanity_check(ars_ptr, tm, options);
auto erm = calculate_effective_replication_map(ars_ptr, tmptr).get0();
@@ -173,18 +177,12 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
}
}
std::unique_ptr<locator::topology> generate_topology(const std::vector<ring_point>& pts) {
auto topo = std::make_unique<locator::topology>(locator::topology::config{});
locator::endpoint_dc_rack make_endpoint_dc_rack(gms::inet_address endpoint) {
// This resembles rack_inferring_snitch dc/rack generation which is
// still in use by this test via token_metadata internals
for (const auto& p : pts) {
auto rack = std::to_string(uint8_t(p.host.bytes()[2]));
auto dc = std::to_string(uint8_t(p.host.bytes()[1]));
topo->add_or_update_endpoint(p.host, { dc, rack });
}
return topo;
auto dc = std::to_string(uint8_t(endpoint.bytes()[1]));
auto rack = std::to_string(uint8_t(endpoint.bytes()[2]));
return locator::endpoint_dc_rack{dc, rack};
}
// Run in a seastar thread.
@@ -200,7 +198,11 @@ void simple_test() {
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, locator::token_metadata::config{});
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_host_id = host_id::create_random_id();
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
std::vector<ring_point> ring_points = {
{ 1.0, inet_address("192.100.10.1") },
@@ -216,18 +218,14 @@ void simple_test() {
{ 11.0, inet_address("192.102.40.2") }
};
auto topo = generate_topology(ring_points);
std::unordered_map<inet_address, std::unordered_set<token>> endpoint_tokens;
for (const auto& [ring_point, endpoint] : ring_points) {
endpoint_tokens[endpoint].insert({dht::token::kind::key, d2t(ring_point / ring_points.size())});
}
// Initialize the token_metadata
stm.mutate_token_metadata([&endpoint_tokens, &topo] (token_metadata& tm) -> future<> {
for (auto&& i : endpoint_tokens) {
tm.update_topology(i.first, topo->get_location(i.first));
co_await tm.update_normal_tokens(std::move(i.second), i.first);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())});
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint));
co_await tm.update_normal_tokens(std::move(tokens), endpoint);
}
}).get();
@@ -242,8 +240,7 @@ void simple_test() {
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options323);
full_ring_check(ring_points, options323, ars_ptr, stm.get(), *topo);
full_ring_check(ring_points, options323, ars_ptr, stm.get());
///////////////
// Create the replication strategy
@@ -256,7 +253,7 @@ void simple_test() {
ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options320);
full_ring_check(ring_points, options320, ars_ptr, stm.get(), *topo);
full_ring_check(ring_points, options320, ars_ptr, stm.get());
//
// Check cache invalidation: invalidate the cache and run a full ring
@@ -268,7 +265,7 @@ void simple_test() {
tm.invalidate_cached_rings();
return make_ready_future<>();
}).get();
full_ring_check(ring_points, options320, ars_ptr, stm.get(), *topo);
full_ring_check(ring_points, options320, ars_ptr, stm.get());
}
// Run in a seastar thread.
@@ -328,19 +325,18 @@ void heavy_origin_test() {
}
}
auto topo = generate_topology(ring_points);
stm.mutate_token_metadata([&tokens, &topo] (token_metadata& tm) -> future<> {
for (auto&& i : tokens) {
tm.update_topology(i.first, topo->get_location(i.first));
co_await tm.update_normal_tokens(std::move(i.second), i.first);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint));
co_await tm.update_normal_tokens(std::move(tokens[endpoint]), endpoint);
}
}).get();
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", config_options);
full_ring_check(ring_points, config_options, ars_ptr, stm.get(), *topo);
full_ring_check(ring_points, config_options, ars_ptr, stm.get());
}
@@ -396,7 +392,7 @@ static bool has_sufficient_replicas(
static locator::endpoint_set calculate_natural_endpoints(
const token& search_token, const token_metadata& tm,
locator::topology& topo,
const locator::topology& topo,
const std::unordered_map<sstring, size_t>& datacenters) {
//
// We want to preserve insertion order so that the first added endpoint
@@ -434,7 +430,7 @@ static locator::endpoint_set calculate_natural_endpoints(
// the members of a DC
//
const std::unordered_map<sstring,
std::unordered_set<inet_address>>&
std::unordered_set<inet_address>>
all_endpoints = tp.get_datacenter_endpoints();
//
// all racks in a DC so we can check when we have exhausted all racks in a
@@ -442,7 +438,7 @@ static locator::endpoint_set calculate_natural_endpoints(
//
const std::unordered_map<sstring,
std::unordered_map<sstring,
std::unordered_set<inet_address>>>&
std::unordered_set<inet_address>>>
racks = tp.get_datacenter_racks();
// not aware of any cluster members
@@ -506,7 +502,7 @@ static locator::endpoint_set calculate_natural_endpoints(
}
// Called in a seastar thread.
static void test_equivalence(const shared_token_metadata& stm, std::unique_ptr<locator::topology> topo, const std::unordered_map<sstring, size_t>& datacenters) {
static void test_equivalence(const shared_token_metadata& stm, const locator::topology& topo, const std::unordered_map<sstring, size_t>& datacenters) {
class my_network_topology_strategy : public network_topology_strategy {
public:
using network_topology_strategy::network_topology_strategy;
@@ -524,7 +520,7 @@ static void test_equivalence(const shared_token_metadata& stm, std::unique_ptr<l
const token_metadata& tm = *stm.get();
for (size_t i = 0; i < 1000; ++i) {
auto token = dht::token::get_random_token();
auto expected = calculate_natural_endpoints(token, tm, *topo, datacenters);
auto expected = calculate_natural_endpoints(token, tm, topo, datacenters);
auto actual = nts.calculate_natural_endpoints(token, tm).get0();
// Because the old algorithm does not put the nodes in the correct order in the case where more replicas
@@ -539,7 +535,7 @@ static void test_equivalence(const shared_token_metadata& stm, std::unique_ptr<l
}
std::unique_ptr<locator::topology> generate_topology(const std::unordered_map<sstring, size_t> datacenters, const std::vector<inet_address>& nodes) {
void generate_topology(topology& topo, const std::unordered_map<sstring, size_t> datacenters, const std::vector<inet_address>& nodes) {
auto& e1 = seastar::testing::local_random_engine;
std::unordered_map<sstring, size_t> racks_per_dc;
@@ -559,16 +555,12 @@ std::unique_ptr<locator::topology> generate_topology(const std::unordered_map<ss
out = std::fill_n(out, rf, std::cref(dc));
}
auto topo = std::make_unique<locator::topology>(locator::topology::config{});
for (auto& node : nodes) {
const sstring& dc = dcs[udist(0, dcs.size() - 1)(e1)];
auto rc = racks_per_dc.at(dc);
auto r = udist(0, rc)(e1);
topo->add_or_update_endpoint(node, { dc, to_sstring(r) });
topo.add_node(host_id::create_random_id(), node, {dc, to_sstring(r)});
}
return topo;
}
SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
@@ -595,7 +587,6 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
for (size_t run = 0; run < RUNS; ++run) {
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{});
auto topo = generate_topology(datacenters, nodes);
std::unordered_set<dht::token> random_tokens;
while (random_tokens.size() < nodes.size() * VNODES) {
@@ -610,13 +601,13 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
}
}
stm.mutate_token_metadata([&endpoint_tokens, &topo] (token_metadata& tm) -> future<> {
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
generate_topology(tm.get_topology(), datacenters, nodes);
for (auto&& i : endpoint_tokens) {
tm.update_topology(i.first, topo->get_location(i.first));
co_await tm.update_normal_tokens(std::move(i.second), i.first);
}
}).get();
test_equivalence(stm, std::move(topo), datacenters);
test_equivalence(stm, stm.get()->get_topology(), datacenters);
}
}
@@ -703,21 +694,25 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{});
auto topo = generate_topology(datacenters, nodes);
stm.mutate_token_metadata([&] (token_metadata& tm) {
auto& topo = tm.get_topology();
generate_topology(topo, datacenters, nodes);
const auto& address = nodes[tests::random::get_int<size_t>(0, NODES-1)];
const auto& a1 = nodes[tests::random::get_int<size_t>(0, NODES-1)];
const auto& a2 = nodes[tests::random::get_int<size_t>(0, NODES-1)];
const auto& address = nodes[tests::random::get_int<size_t>(0, NODES-1)];
const auto& a1 = nodes[tests::random::get_int<size_t>(0, NODES-1)];
const auto& a2 = nodes[tests::random::get_int<size_t>(0, NODES-1)];
topo->test_compare_endpoints(address, address, address);
topo->test_compare_endpoints(address, address, a1);
topo->test_compare_endpoints(address, a1, address);
topo->test_compare_endpoints(address, a1, a1);
topo->test_compare_endpoints(address, a1, a2);
topo->test_compare_endpoints(address, a2, a1);
topo.test_compare_endpoints(address, address, address);
topo.test_compare_endpoints(address, address, a1);
topo.test_compare_endpoints(address, a1, address);
topo.test_compare_endpoints(address, a1, a1);
topo.test_compare_endpoints(address, a1, a2);
topo.test_compare_endpoints(address, a2, a1);
topo->test_compare_endpoints(bogus_address, bogus_address, bogus_address);
topo->test_compare_endpoints(address, bogus_address, bogus_address);
topo->test_compare_endpoints(address, a1, bogus_address);
topo->test_compare_endpoints(address, bogus_address, a2);
topo.test_compare_endpoints(bogus_address, bogus_address, bogus_address);
topo.test_compare_endpoints(address, bogus_address, bogus_address);
topo.test_compare_endpoints(address, a1, bogus_address);
topo.test_compare_endpoints(address, bogus_address, a2);
return make_ready_future<>();
}).get();
}

View File

@@ -66,6 +66,7 @@
#include "service/raft/raft_group0_client.hh"
#include "service/raft/raft_group0.hh"
#include "init.hh"
#include "utils/fb_utilities.hh"
#include <sys/time.h>
#include <sys/resource.h>
@@ -560,6 +561,8 @@ public:
sharded<locator::shared_token_metadata> token_metadata;
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_host_id = cfg->host_id;
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_token_metadata = defer([&token_metadata] { token_metadata.stop().get(); });