locator: Store node shard count in topology
Will be needed by tablet allocator.
This commit is contained in:
@@ -117,8 +117,8 @@ public:
|
||||
return _bootstrap_tokens;
|
||||
}
|
||||
|
||||
void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st) {
|
||||
_topology.add_or_update_endpoint(ep, std::move(dr), std::move(opt_st));
|
||||
void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st, std::optional<shard_id> shard_count = std::nullopt) {
|
||||
_topology.add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st), std::move(shard_count));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -920,8 +920,8 @@ token_metadata::get_bootstrap_tokens() const {
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st) {
|
||||
_impl->update_topology(ep, std::move(dr), std::move(opt_st));
|
||||
token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st, std::optional<shard_id> shard_count) {
|
||||
_impl->update_topology(ep, std::move(dr), std::move(opt_st), std::move(shard_count));
|
||||
}
|
||||
|
||||
boost::iterator_range<token_metadata::tokens_iterator>
|
||||
|
||||
@@ -132,7 +132,8 @@ public:
|
||||
/**
|
||||
* Update or add endpoint given its inet_address and endpoint_dc_rack.
|
||||
*/
|
||||
void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st = std::nullopt);
|
||||
void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st = std::nullopt,
|
||||
std::optional<shard_id> shard_count = std::nullopt);
|
||||
/**
|
||||
* Creates an iterable range of the sorted tokens starting at the token t
|
||||
* such that t >= start.
|
||||
|
||||
@@ -25,22 +25,23 @@ thread_local const endpoint_dc_rack endpoint_dc_rack::default_location = {
|
||||
.rack = locator::production_snitch_base::default_rack,
|
||||
};
|
||||
|
||||
node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node, node::idx_type idx)
|
||||
node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, shard_id shard_count, this_node is_this_node, node::idx_type idx)
|
||||
: _topology(topology)
|
||||
, _host_id(id)
|
||||
, _endpoint(endpoint)
|
||||
, _dc_rack(std::move(dc_rack))
|
||||
, _state(state)
|
||||
, _shard_count(std::move(shard_count))
|
||||
, _is_this_node(is_this_node)
|
||||
, _idx(idx)
|
||||
{}
|
||||
|
||||
node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node, node::idx_type idx) {
|
||||
return std::make_unique<node>(topology, std::move(id), std::move(endpoint), std::move(dc_rack), std::move(state), is_this_node, idx);
|
||||
node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, shard_id shard_count, node::this_node is_this_node, node::idx_type idx) {
|
||||
return std::make_unique<node>(topology, std::move(id), std::move(endpoint), std::move(dc_rack), std::move(state), shard_count, is_this_node, idx);
|
||||
}
|
||||
|
||||
node_holder node::clone() const {
|
||||
return make(nullptr, host_id(), endpoint(), dc_rack(), get_state(), is_this_node());
|
||||
return make(nullptr, host_id(), endpoint(), dc_rack(), get_state(), get_shard_count(), is_this_node());
|
||||
}
|
||||
|
||||
std::string node::to_string(node::state s) {
|
||||
@@ -124,15 +125,15 @@ std::string topology::debug_format(const node* node) {
|
||||
if (!node) {
|
||||
return format("node={}", fmt::ptr(node));
|
||||
}
|
||||
return format("node={} idx={} host_id={} endpoint={} dc={} rack={} state={} this_node={}", fmt::ptr(node),
|
||||
node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, node::to_string(node->get_state()), bool(node->is_this_node()));
|
||||
return format("node={} idx={} host_id={} endpoint={} dc={} rack={} state={} shards={} this_node={}", fmt::ptr(node),
|
||||
node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, node::to_string(node->get_state()), node->get_shard_count(), bool(node->is_this_node()));
|
||||
}
|
||||
|
||||
const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state) {
|
||||
const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state, shard_id shard_count) {
|
||||
if (dr.dc.empty() || dr.rack.empty()) {
|
||||
on_internal_error(tlogger, "Node must have valid dc and rack");
|
||||
}
|
||||
return add_node(node::make(this, id, ep, dr, state));
|
||||
return add_node(node::make(this, id, ep, dr, state, shard_count));
|
||||
}
|
||||
|
||||
bool topology::is_configured_this_node(const node& n) const {
|
||||
@@ -186,7 +187,7 @@ const node* topology::add_node(node_holder nptr) {
|
||||
return node;
|
||||
}
|
||||
|
||||
const node* topology::update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st) {
|
||||
const node* topology::update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st, std::optional<shard_id> opt_shard_count) {
|
||||
if (tlogger.is_enabled(log_level::debug)) {
|
||||
tlogger.debug("topology[{}]: update_node: {}: to: host_id={} endpoint={} dc={} rack={} state={}, at {}", fmt::ptr(this), debug_format(node),
|
||||
opt_id ? format("{}", *opt_id) : "unchanged",
|
||||
@@ -194,6 +195,7 @@ const node* topology::update_node(node* node, std::optional<host_id> opt_id, std
|
||||
opt_dr ? format("{}", opt_dr->dc) : "unchanged",
|
||||
opt_dr ? format("{}", opt_dr->rack) : "unchanged",
|
||||
opt_st ? format("{}", *opt_st) : "unchanged",
|
||||
opt_shard_count ? format("{}", *opt_shard_count) : "unchanged",
|
||||
current_backtrace());
|
||||
}
|
||||
|
||||
@@ -240,6 +242,9 @@ const node* topology::update_node(node* node, std::optional<host_id> opt_id, std
|
||||
if (opt_st) {
|
||||
changed |= node->get_state() != *opt_st;
|
||||
}
|
||||
if (opt_shard_count) {
|
||||
changed |= node->get_shard_count() != *opt_shard_count;
|
||||
}
|
||||
|
||||
if (!changed) {
|
||||
return node;
|
||||
@@ -261,6 +266,9 @@ const node* topology::update_node(node* node, std::optional<host_id> opt_id, std
|
||||
if (opt_st) {
|
||||
mutable_node->set_state(*opt_st);
|
||||
}
|
||||
if (opt_shard_count) {
|
||||
mutable_node->set_shard_count(*opt_shard_count);
|
||||
}
|
||||
} catch (...) {
|
||||
std::terminate();
|
||||
}
|
||||
@@ -424,20 +432,23 @@ const node* topology::find_node(node::idx_type idx) const noexcept {
|
||||
return _nodes.at(idx).get();
|
||||
}
|
||||
|
||||
const node* topology::add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st)
|
||||
const node* topology::add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st, std::optional<shard_id> shard_count)
|
||||
{
|
||||
if (tlogger.is_enabled(log_level::trace)) {
|
||||
tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={} state={}, at {}", fmt::ptr(this),
|
||||
ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none),
|
||||
tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={} state={} shards={}, at {}", fmt::ptr(this),
|
||||
ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none), shard_count,
|
||||
current_backtrace());
|
||||
}
|
||||
auto n = find_node(ep);
|
||||
if (n) {
|
||||
return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st));
|
||||
return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
|
||||
} else if (opt_id && (n = find_node(*opt_id))) {
|
||||
return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr), std::move(opt_st));
|
||||
return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
|
||||
} else {
|
||||
return add_node(opt_id.value_or(host_id::create_null_id()), ep, opt_dr.value_or(endpoint_dc_rack::default_location), opt_st.value_or(node::state::normal));
|
||||
return add_node(opt_id.value_or(host_id::create_null_id()), ep,
|
||||
opt_dr.value_or(endpoint_dc_rack::default_location),
|
||||
opt_st.value_or(node::state::normal),
|
||||
shard_count.value_or(0));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/smp.hh>
|
||||
#include <seastar/util/bool_class.hh>
|
||||
|
||||
#include "locator/types.hh"
|
||||
@@ -37,6 +38,8 @@ namespace locator {
|
||||
class node;
|
||||
using node_holder = std::unique_ptr<node>;
|
||||
|
||||
using shard_id = seastar::shard_id;
|
||||
|
||||
class node {
|
||||
public:
|
||||
using this_node = bool_class<struct this_node_tag>;
|
||||
@@ -56,13 +59,21 @@ private:
|
||||
inet_address _endpoint;
|
||||
endpoint_dc_rack _dc_rack;
|
||||
state _state;
|
||||
shard_id _shard_count = 0;
|
||||
|
||||
// Is this node the `localhost` instance
|
||||
this_node _is_this_node;
|
||||
idx_type _idx = -1;
|
||||
|
||||
public:
|
||||
node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node = this_node::no, idx_type idx = -1);
|
||||
node(const locator::topology* topology,
|
||||
locator::host_id id,
|
||||
inet_address endpoint,
|
||||
endpoint_dc_rack dc_rack,
|
||||
state state,
|
||||
shard_id shard_count = 0,
|
||||
this_node is_this_node = this_node::no,
|
||||
idx_type idx = -1);
|
||||
|
||||
node(const node&) = delete;
|
||||
node(node&&) = delete;
|
||||
@@ -91,15 +102,25 @@ public:
|
||||
|
||||
state get_state() const noexcept { return _state; }
|
||||
|
||||
shard_id get_shard_count() const noexcept { return _shard_count; }
|
||||
|
||||
static std::string to_string(state);
|
||||
|
||||
private:
|
||||
static node_holder make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node = this_node::no, idx_type idx = -1);
|
||||
static node_holder make(const locator::topology* topology,
|
||||
locator::host_id id,
|
||||
inet_address endpoint,
|
||||
endpoint_dc_rack dc_rack,
|
||||
state state,
|
||||
shard_id shard_count = 0,
|
||||
node::this_node is_this_node = this_node::no,
|
||||
idx_type idx = -1);
|
||||
node_holder clone() const;
|
||||
|
||||
void set_topology(const locator::topology* topology) noexcept { _topology = topology; }
|
||||
void set_idx(idx_type idx) noexcept { _idx = idx; }
|
||||
void set_state(state state) noexcept { _state = state; }
|
||||
void set_shard_count(shard_id shard_count) noexcept { _shard_count = shard_count; }
|
||||
|
||||
friend class topology;
|
||||
};
|
||||
@@ -130,12 +151,18 @@ public:
|
||||
}
|
||||
|
||||
// Adds a node with given host_id, endpoint, and DC/rack.
|
||||
const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state);
|
||||
const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state,
|
||||
shard_id shard_count = 0);
|
||||
|
||||
// Optionally updates node's current host_id, endpoint, or DC/rack.
|
||||
// Note: the host_id may be updated from null to non-null after a new node gets a new, random host_id,
|
||||
// or a peer node host_id may be updated when the node is replaced with another node using the same ip address.
|
||||
const node* update_node(node* node, std::optional<host_id> opt_id, std::optional<inet_address> opt_ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st);
|
||||
const node* update_node(node* node,
|
||||
std::optional<host_id> opt_id,
|
||||
std::optional<inet_address> opt_ep,
|
||||
std::optional<endpoint_dc_rack> opt_dr,
|
||||
std::optional<node::state> opt_st,
|
||||
std::optional<shard_id> opt_shard_count = std::nullopt);
|
||||
|
||||
// Removes a node using its host_id
|
||||
// Returns true iff the node was found and removed.
|
||||
@@ -162,14 +189,17 @@ public:
|
||||
*
|
||||
* Adds or updates a node with given endpoint
|
||||
*/
|
||||
const node* add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st);
|
||||
const node* add_or_update_endpoint(inet_address ep, std::optional<host_id> opt_id,
|
||||
std::optional<endpoint_dc_rack> opt_dr,
|
||||
std::optional<node::state> opt_st,
|
||||
std::optional<shard_id> shard_count = std::nullopt);
|
||||
|
||||
// Legacy entry point from token_metadata::update_topology
|
||||
const node* add_or_update_endpoint(inet_address ep, endpoint_dc_rack dr, std::optional<node::state> opt_st) {
|
||||
return add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st));
|
||||
return add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st), std::nullopt);
|
||||
}
|
||||
const node* add_or_update_endpoint(inet_address ep, host_id id) {
|
||||
return add_or_update_endpoint(ep, id, std::nullopt, std::nullopt);
|
||||
return add_or_update_endpoint(ep, id, std::nullopt, std::nullopt, std::nullopt);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -348,15 +348,15 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
tmptr->set_version(_topology_state_machine._topology.version);
|
||||
|
||||
auto update_topology = [&] (inet_address ip, const replica_state& rs) {
|
||||
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack});
|
||||
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}, std::nullopt, rs.shard_count);
|
||||
};
|
||||
|
||||
auto add_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
auto ip = co_await id2ip(id);
|
||||
|
||||
slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens);
|
||||
slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count);
|
||||
// Save tokens, not needed for raft topology management, but needed by legacy
|
||||
// Also ip -> id mapping is needed for address map recreation on reboot
|
||||
if (!utils::fb_utilities::is_me(ip)) {
|
||||
|
||||
Reference in New Issue
Block a user