diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index decec7dce2..78a3b15d65 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -117,8 +117,8 @@ public: return _bootstrap_tokens; } - void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st) { - _topology.add_or_update_endpoint(ep, std::move(dr), std::move(opt_st)); + void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st, std::optional shard_count = std::nullopt) { + _topology.add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st), std::move(shard_count)); } /** @@ -920,8 +920,8 @@ token_metadata::get_bootstrap_tokens() const { } void -token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st) { - _impl->update_topology(ep, std::move(dr), std::move(opt_st)); +token_metadata::update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st, std::optional shard_count) { + _impl->update_topology(ep, std::move(dr), std::move(opt_st), std::move(shard_count)); } boost::iterator_range diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 54794b9bee..0f463a58a2 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -132,7 +132,8 @@ public: /** * Update or add endpoint given its inet_address and endpoint_dc_rack. */ - void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st = std::nullopt); + void update_topology(inet_address ep, endpoint_dc_rack dr, std::optional opt_st = std::nullopt, + std::optional shard_count = std::nullopt); /** * Creates an iterable range of the sorted tokens starting at the token t * such that t >= start. diff --git a/locator/topology.cc b/locator/topology.cc index 6d5bee057e..7cee6e6e4d 100644 --- a/locator/topology.cc +++ b/locator/topology.cc @@ -25,22 +25,23 @@ thread_local const endpoint_dc_rack endpoint_dc_rack::default_location = { .rack = locator::production_snitch_base::default_rack, }; -node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node, node::idx_type idx) +node::node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, shard_id shard_count, this_node is_this_node, node::idx_type idx) : _topology(topology) , _host_id(id) , _endpoint(endpoint) , _dc_rack(std::move(dc_rack)) , _state(state) + , _shard_count(std::move(shard_count)) , _is_this_node(is_this_node) , _idx(idx) {} -node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node, node::idx_type idx) { - return std::make_unique(topology, std::move(id), std::move(endpoint), std::move(dc_rack), std::move(state), is_this_node, idx); +node_holder node::make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, shard_id shard_count, node::this_node is_this_node, node::idx_type idx) { + return std::make_unique(topology, std::move(id), std::move(endpoint), std::move(dc_rack), std::move(state), shard_count, is_this_node, idx); } node_holder node::clone() const { - return make(nullptr, host_id(), endpoint(), dc_rack(), get_state(), is_this_node()); + return make(nullptr, host_id(), endpoint(), dc_rack(), get_state(), get_shard_count(), is_this_node()); } std::string node::to_string(node::state s) { @@ -124,15 +125,15 @@ std::string topology::debug_format(const node* node) { if (!node) { return format("node={}", fmt::ptr(node)); } - return format("node={} idx={} host_id={} endpoint={} dc={} rack={} state={} this_node={}", fmt::ptr(node), - node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, node::to_string(node->get_state()), bool(node->is_this_node())); + return format("node={} idx={} host_id={} endpoint={} dc={} rack={} state={} shards={} this_node={}", fmt::ptr(node), + node->idx(), node->host_id(), node->endpoint(), node->dc_rack().dc, node->dc_rack().rack, node::to_string(node->get_state()), node->get_shard_count(), bool(node->is_this_node())); } -const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state) { +const node* topology::add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state, shard_id shard_count) { if (dr.dc.empty() || dr.rack.empty()) { on_internal_error(tlogger, "Node must have valid dc and rack"); } - return add_node(node::make(this, id, ep, dr, state)); + return add_node(node::make(this, id, ep, dr, state, shard_count)); } bool topology::is_configured_this_node(const node& n) const { @@ -186,7 +187,7 @@ const node* topology::add_node(node_holder nptr) { return node; } -const node* topology::update_node(node* node, std::optional opt_id, std::optional opt_ep, std::optional opt_dr, std::optional opt_st) { +const node* topology::update_node(node* node, std::optional opt_id, std::optional opt_ep, std::optional opt_dr, std::optional opt_st, std::optional opt_shard_count) { if (tlogger.is_enabled(log_level::debug)) { tlogger.debug("topology[{}]: update_node: {}: to: host_id={} endpoint={} dc={} rack={} state={}, at {}", fmt::ptr(this), debug_format(node), opt_id ? format("{}", *opt_id) : "unchanged", @@ -194,6 +195,7 @@ const node* topology::update_node(node* node, std::optional opt_id, std opt_dr ? format("{}", opt_dr->dc) : "unchanged", opt_dr ? format("{}", opt_dr->rack) : "unchanged", opt_st ? format("{}", *opt_st) : "unchanged", + opt_shard_count ? format("{}", *opt_shard_count) : "unchanged", current_backtrace()); } @@ -240,6 +242,9 @@ const node* topology::update_node(node* node, std::optional opt_id, std if (opt_st) { changed |= node->get_state() != *opt_st; } + if (opt_shard_count) { + changed |= node->get_shard_count() != *opt_shard_count; + } if (!changed) { return node; @@ -261,6 +266,9 @@ const node* topology::update_node(node* node, std::optional opt_id, std if (opt_st) { mutable_node->set_state(*opt_st); } + if (opt_shard_count) { + mutable_node->set_shard_count(*opt_shard_count); + } } catch (...) { std::terminate(); } @@ -424,20 +432,23 @@ const node* topology::find_node(node::idx_type idx) const noexcept { return _nodes.at(idx).get(); } -const node* topology::add_or_update_endpoint(inet_address ep, std::optional opt_id, std::optional opt_dr, std::optional opt_st) +const node* topology::add_or_update_endpoint(inet_address ep, std::optional opt_id, std::optional opt_dr, std::optional opt_st, std::optional shard_count) { if (tlogger.is_enabled(log_level::trace)) { - tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={} state={}, at {}", fmt::ptr(this), - ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none), + tlogger.trace("topology[{}]: add_or_update_endpoint: ep={} host_id={} dc={} rack={} state={} shards={}, at {}", fmt::ptr(this), + ep, opt_id.value_or(host_id::create_null_id()), opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none), shard_count, current_backtrace()); } auto n = find_node(ep); if (n) { - return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st)); + return update_node(make_mutable(n), opt_id, std::nullopt, std::move(opt_dr), std::move(opt_st), std::move(shard_count)); } else if (opt_id && (n = find_node(*opt_id))) { - return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr), std::move(opt_st)); + return update_node(make_mutable(n), std::nullopt, ep, std::move(opt_dr), std::move(opt_st), std::move(shard_count)); } else { - return add_node(opt_id.value_or(host_id::create_null_id()), ep, opt_dr.value_or(endpoint_dc_rack::default_location), opt_st.value_or(node::state::normal)); + return add_node(opt_id.value_or(host_id::create_null_id()), ep, + opt_dr.value_or(endpoint_dc_rack::default_location), + opt_st.value_or(node::state::normal), + shard_count.value_or(0)); } } diff --git a/locator/topology.hh b/locator/topology.hh index 2701828fbe..cbe2cf36d6 100644 --- a/locator/topology.hh +++ b/locator/topology.hh @@ -17,6 +17,7 @@ #include #include +#include #include #include "locator/types.hh" @@ -37,6 +38,8 @@ namespace locator { class node; using node_holder = std::unique_ptr; +using shard_id = seastar::shard_id; + class node { public: using this_node = bool_class; @@ -56,13 +59,21 @@ private: inet_address _endpoint; endpoint_dc_rack _dc_rack; state _state; + shard_id _shard_count = 0; // Is this node the `localhost` instance this_node _is_this_node; idx_type _idx = -1; public: - node(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, this_node is_this_node = this_node::no, idx_type idx = -1); + node(const locator::topology* topology, + locator::host_id id, + inet_address endpoint, + endpoint_dc_rack dc_rack, + state state, + shard_id shard_count = 0, + this_node is_this_node = this_node::no, + idx_type idx = -1); node(const node&) = delete; node(node&&) = delete; @@ -91,15 +102,25 @@ public: state get_state() const noexcept { return _state; } + shard_id get_shard_count() const noexcept { return _shard_count; } + static std::string to_string(state); private: - static node_holder make(const locator::topology* topology, locator::host_id id, inet_address endpoint, endpoint_dc_rack dc_rack, state state, node::this_node is_this_node = this_node::no, idx_type idx = -1); + static node_holder make(const locator::topology* topology, + locator::host_id id, + inet_address endpoint, + endpoint_dc_rack dc_rack, + state state, + shard_id shard_count = 0, + node::this_node is_this_node = this_node::no, + idx_type idx = -1); node_holder clone() const; void set_topology(const locator::topology* topology) noexcept { _topology = topology; } void set_idx(idx_type idx) noexcept { _idx = idx; } void set_state(state state) noexcept { _state = state; } + void set_shard_count(shard_id shard_count) noexcept { _shard_count = shard_count; } friend class topology; }; @@ -130,12 +151,18 @@ public: } // Adds a node with given host_id, endpoint, and DC/rack. - const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state); + const node* add_node(host_id id, const inet_address& ep, const endpoint_dc_rack& dr, node::state state, + shard_id shard_count = 0); // Optionally updates node's current host_id, endpoint, or DC/rack. // Note: the host_id may be updated from null to non-null after a new node gets a new, random host_id, // or a peer node host_id may be updated when the node is replaced with another node using the same ip address. - const node* update_node(node* node, std::optional opt_id, std::optional opt_ep, std::optional opt_dr, std::optional opt_st); + const node* update_node(node* node, + std::optional opt_id, + std::optional opt_ep, + std::optional opt_dr, + std::optional opt_st, + std::optional opt_shard_count = std::nullopt); // Removes a node using its host_id // Returns true iff the node was found and removed. @@ -162,14 +189,17 @@ public: * * Adds or updates a node with given endpoint */ - const node* add_or_update_endpoint(inet_address ep, std::optional opt_id, std::optional opt_dr, std::optional opt_st); + const node* add_or_update_endpoint(inet_address ep, std::optional opt_id, + std::optional opt_dr, + std::optional opt_st, + std::optional shard_count = std::nullopt); // Legacy entry point from token_metadata::update_topology const node* add_or_update_endpoint(inet_address ep, endpoint_dc_rack dr, std::optional opt_st) { - return add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st)); + return add_or_update_endpoint(ep, std::nullopt, std::move(dr), std::move(opt_st), std::nullopt); } const node* add_or_update_endpoint(inet_address ep, host_id id) { - return add_or_update_endpoint(ep, id, std::nullopt, std::nullopt); + return add_or_update_endpoint(ep, id, std::nullopt, std::nullopt, std::nullopt); } /** diff --git a/service/storage_service.cc b/service/storage_service.cc index b7e5d3e1ab..ca8cccad46 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -348,15 +348,15 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s tmptr->set_version(_topology_state_machine._topology.version); auto update_topology = [&] (inet_address ip, const replica_state& rs) { - tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}); + tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}, std::nullopt, rs.shard_count); }; auto add_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> { locator::host_id host_id{id.uuid()}; auto ip = co_await id2ip(id); - slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}", - id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens); + slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}", + id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count); // Save tokens, not needed for raft topology management, but needed by legacy // Also ip -> id mapping is needed for address map recreation on reboot if (!utils::fb_utilities::is_me(ip)) {