locator/topology: add key_kind parameter

For the host_id-based token_metadata we want host_id
to be the main node key, meaning it should be used
in add_or_update_endpoint to find the node to update.
For the inet_address-based token_metadata version
we want to retain the old behaviour during transition period.

In this commit we introduce key_kind parameter and use
key_kind::inet_address in all current topology usages.
Later we'll use key_kind::host_id for the new token_metadata.

In the last commits of the series, when the new token_metadata
version is used everywhere, we will remove key_kind enum.
This commit is contained in:
Piotr Dulikowski
2023-10-04 15:50:49 +02:00
committed by Petr Gusev
parent 2f137776c3
commit 5227b71363
4 changed files with 84 additions and 22 deletions

View File

@@ -94,9 +94,9 @@ private:
struct shallow_copy {};
public:
token_metadata_impl(shallow_copy, const token_metadata_impl& o) noexcept
: _topology(topology::config{})
: _topology(topology::config{}, topology::key_kind::inet_address)
{}
token_metadata_impl(token_metadata::config cfg) noexcept : _topology(std::move(cfg.topo_cfg)) {};
token_metadata_impl(token_metadata::config cfg) noexcept : _topology(std::move(cfg.topo_cfg), topology::key_kind::inet_address) {};
token_metadata_impl(const token_metadata_impl&) = delete; // it's too huge for direct copy, use clone_async()
token_metadata_impl(token_metadata_impl&&) noexcept = default;
const std::vector<token>& sorted_tokens() const;

View File

@@ -70,10 +70,11 @@ future<> topology::clear_gently() noexcept {
co_await utils::clear_gently(_nodes);
}
topology::topology(config cfg)
topology::topology(config cfg, key_kind k)
: _shard(this_shard_id())
, _cfg(cfg)
, _sort_by_proximity(!cfg.disable_proximity_sorting)
, _key_kind(k)
{
tlogger.trace("topology[{}]: constructing using config: endpoint={} dc={} rack={}", fmt::ptr(this),
cfg.this_endpoint, cfg.local_dc_rack.dc, cfg.local_dc_rack.rack);
@@ -92,6 +93,7 @@ topology::topology(topology&& o) noexcept
, _dc_racks(std::move(o._dc_racks))
, _sort_by_proximity(o._sort_by_proximity)
, _datacenters(std::move(o._datacenters))
, _key_kind(o._key_kind)
{
assert(_shard == this_shard_id());
tlogger.trace("topology[{}]: move from [{}]", fmt::ptr(this), fmt::ptr(&o));
@@ -112,7 +114,7 @@ topology& topology::operator=(topology&& o) noexcept {
}
future<topology> topology::clone_gently() const {
topology ret(_cfg);
topology ret(_cfg, _key_kind);
tlogger.debug("topology[{}]: clone_gently to {} from shard {}", fmt::ptr(this), fmt::ptr(&ret), _shard);
for (const auto& nptr : _nodes) {
if (nptr) {
@@ -437,24 +439,45 @@ const node* topology::find_node(node::idx_type idx) const noexcept {
return _nodes.at(idx).get();
}
const node* topology::add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st, std::optional<shard_id> shard_count)
const node* topology::add_or_update_endpoint(std::optional<inet_address> opt_ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st, std::optional<shard_id> shard_count)
{
if (tlogger.is_enabled(log_level::trace)) {
tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={} state={} shards={}, at {}", fmt::ptr(this),
ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none), shard_count,
opt_ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none), shard_count,
current_backtrace());
}
auto n = find_node(ep);
if (n) {
return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
} else if (opt_id && (n = find_node(*opt_id))) {
return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
} else {
return add_node(opt_id.value_or(host_id::create_null_id()), ep,
opt_dr.value_or(endpoint_dc_rack::default_location),
opt_st.value_or(node::state::normal),
shard_count.value_or(0));
const node* n;
switch (_key_kind) {
case topology::key_kind::host_id:
if (!opt_id) {
on_internal_error(tlogger, format("topology: host_id is not set, ep={}", opt_ep));
}
n = find_node(*opt_id);
if (n) {
return update_node(make_mutable(n), std::nullopt, opt_ep, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
} else if (opt_ep && (n = find_node(*opt_ep))) {
return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
}
break;
case topology::key_kind::inet_address:
if (!opt_ep) {
on_internal_error(tlogger, format("topology: endpoint is not set, id={}", opt_id));
}
n = find_node(*opt_ep);
if (n) {
return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
} else if (opt_id && (n = find_node(*opt_id))) {
return update_node(make_mutable(n), std::nullopt, opt_ep, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
}
break;
}
return add_node(opt_id.value_or(host_id::create_null_id()),
opt_ep.value_or(inet_address{}),
opt_dr.value_or(endpoint_dc_rack::default_location),
opt_st.value_or(node::state::normal),
shard_count.value_or(0));
}
bool topology::remove_endpoint(inet_address ep)

View File

@@ -159,6 +159,11 @@ private:
class topology {
public:
enum class key_kind {
inet_address,
host_id,
};
struct config {
inet_address this_endpoint;
inet_address this_cql_address; // corresponds to broadcast_rpc_address
@@ -168,7 +173,7 @@ public:
bool operator==(const config&) const = default;
};
topology(config cfg);
topology(config cfg, key_kind k);
topology(topology&&) noexcept;
topology& operator=(topology&&) noexcept;
@@ -234,7 +239,7 @@ public:
*
* Adds or updates a node with given endpoint
*/
const node* add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id,
const node* add_or_update_endpoint(std::optional<inet_address> ep, std::optional<host_id> opt_id,
std::optional<endpoint_dc_rack> opt_dr,
std::optional<node::state> opt_st,
std::optional<shard_id> shard_count = std::nullopt);
@@ -409,6 +414,8 @@ private:
// pre-calculated
std::unordered_set<sstring> _datacenters;
key_kind _key_kind;
void calculate_datacenters();
const std::unordered_map<inet_address, const node*>& get_nodes_by_endpoint() const noexcept {

View File

@@ -36,7 +36,7 @@ SEASTAR_THREAD_TEST_CASE(test_add_node) {
.local_dc_rack = endpoint_dc_rack::default_location,
};
auto topo = topology(cfg);
auto topo = topology(cfg, topology::key_kind::inet_address);
set_abort_on_internal_error(false);
auto reset_on_internal_abort = seastar::defer([] {
@@ -73,7 +73,7 @@ SEASTAR_THREAD_TEST_CASE(test_moving) {
.local_dc_rack = endpoint_dc_rack::default_location,
};
auto topo = topology(cfg);
auto topo = topology(cfg, topology::key_kind::inet_address);
topo.add_node(id1, ep1, endpoint_dc_rack::default_location, node::state::normal);
@@ -102,7 +102,7 @@ SEASTAR_THREAD_TEST_CASE(test_update_node) {
.local_dc_rack = endpoint_dc_rack::default_location,
};
auto topo = topology(cfg);
auto topo = topology(cfg, topology::key_kind::inet_address);
set_abort_on_internal_error(false);
auto reset_on_internal_abort = seastar::defer([] {
@@ -171,6 +171,38 @@ SEASTAR_THREAD_TEST_CASE(test_update_node) {
BOOST_REQUIRE_EQUAL(node->get_state(), locator::node::state::left);
}
SEASTAR_THREAD_TEST_CASE(test_add_or_update_by_host_id) {
auto id1 = host_id::create_random_id();
auto id2 = host_id::create_random_id();
auto ep1 = gms::inet_address("127.0.0.1");
// In this test we check that add_or_update_endpoint searches by host_id first.
// We create two nodes, one matches by id, another - by ip,
// and assert that add_or_update_endpoint updates the first.
// We need to make the second node 'being_decommissioned', so that
// it gets removed from ip index and we don't get the non-unique IP error.
auto topo = topology({}, topology::key_kind::host_id);
//auto topo = topology({});
topo.add_node(id1, gms::inet_address{}, endpoint_dc_rack::default_location, node::state::normal);
topo.add_node(id2, ep1, endpoint_dc_rack::default_location, node::state::being_decommissioned);
topo.add_or_update_endpoint(ep1, id1, std::nullopt, node::state::bootstrapping);
auto* n = topo.find_node(id1);
BOOST_REQUIRE_EQUAL(n->get_state(), node::state::bootstrapping);
BOOST_REQUIRE_EQUAL(n->host_id(), id1);
BOOST_REQUIRE_EQUAL(n->endpoint(), ep1);
auto* n2 = topo.find_node(ep1);
BOOST_REQUIRE_EQUAL(n, n2);
auto* n3 = topo.find_node(id2);
BOOST_REQUIRE_EQUAL(n3->get_state(), node::state::being_decommissioned);
BOOST_REQUIRE_EQUAL(n3->host_id(), id2);
BOOST_REQUIRE_EQUAL(n3->endpoint(), ep1);
}
SEASTAR_THREAD_TEST_CASE(test_remove_endpoint) {
using dc_endpoints_t = std::unordered_map<sstring, std::unordered_set<inet_address>>;
using dc_racks_t = std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<inet_address>>>;
@@ -194,7 +226,7 @@ SEASTAR_THREAD_TEST_CASE(test_remove_endpoint) {
.local_dc_rack = dc_rack1
};
auto topo = topology(cfg);
auto topo = topology(cfg, topology::key_kind::inet_address);
topo.add_node(id1, ep1, dc_rack1, node::state::normal);
topo.add_node(id2, ep2, dc_rack2, node::state::normal);