diff --git a/service/storage_service.cc b/service/storage_service.cc index 9b63e3ec39..785ec72c1f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -78,7 +78,6 @@ #include #include "utils/stall_free.hh" #include "utils/error_injection.hh" -#include "utils/fb_utilities.hh" #include "locator/util.hh" #include "idl/storage_service.dist.hh" #include "service/storage_proxy.hh" @@ -407,7 +406,7 @@ future<> storage_service::topology_state_load() { id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count); // Save tokens, not needed for raft topology management, but needed by legacy // Also ip -> id mapping is needed for address map recreation on reboot - if (!utils::fb_utilities::is_me(ip)) { + if (!is_me(ip)) { // Some state that is used to fill in 'peeers' table is still propagated over gossiper. // Populate the table with the state from the gossiper here since storage_service::on_change() // (which is called each time gossiper state changes) may have skipped it because the tokens @@ -465,7 +464,7 @@ future<> storage_service::topology_state_load() { switch (rs.state) { case node_state::bootstrapping: if (rs.ring.has_value()) { - if (!utils::fb_utilities::is_me(ip)) { + if (!is_me(ip)) { // Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned co_await _sys_ks.local().update_tokens(ip, {}); co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid()); @@ -538,7 +537,7 @@ future<> storage_service::topology_state_load() { // endpoints. We cannot rely on seeds alone, since it is not guaranteed that seeds // will be up to date and reachable at the time of restart. for (const auto& e: get_token_metadata_ptr()->get_all_endpoints()) { - if (!utils::fb_utilities::is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) { + if (!is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) { co_await _gossiper.add_saved_endpoint(e); } } @@ -1074,7 +1073,7 @@ class topology_coordinator { } slogger.trace("raft topology: send {} command with term {} and index {} to {}/{}", cmd.cmd, _term, cmd_index, id, *ip); - auto result = utils::fb_utilities::is_me(*ip) ? + auto result = _db.get_token_metadata().get_topology().is_me(*ip) ? co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) : co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd( &_messaging, netw::msg_addr{*ip}, id, _term, cmd_index, cmd); @@ -3077,7 +3076,7 @@ future<> storage_service::join_token_ring(shardedget_topology().my_cql_address(); // Ensure we know our own actual Schema UUID in preparation for updates co_await db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service); @@ -5651,7 +5650,7 @@ future<> storage_service::rebuild(sstring source_dc) { } auto ks_erms = ss._db.local().get_non_local_strategy_keyspaces_erms(); for (const auto& [keyspace_name, erm] : ks_erms) { - co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), ss._gossiper, false); + co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, ss.get_broadcast_address()), ss._gossiper, false); } try { co_await streamer->stream_async(); @@ -6075,9 +6074,9 @@ future<> storage_service::load_tablet_metadata() { future<> storage_service::snitch_reconfigured() { assert(this_shard_id() == 0); auto& snitch = _snitch.local(); - co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> { + co_await mutate_token_metadata([&] (mutable_token_metadata_ptr tmptr) -> future<> { // re-read local rack and DC info - tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), snitch->get_location()); + tmptr->update_topology(get_broadcast_address(), snitch->get_location()); return make_ready_future<>(); }); @@ -6330,7 +6329,7 @@ future storage_service::raft_topology_cmd_handler(raft } auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms(); for (const auto& [keyspace_name, erm] : ks_erms) { - co_await streamer->add_ranges(keyspace_name, erm, get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), _gossiper, false); + co_await streamer->add_ranges(keyspace_name, erm, get_ranges_for_endpoint(erm, get_broadcast_address()), _gossiper, false); } try { co_await streamer->stream_async(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 1e06804ef7..359f6f018d 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -25,7 +25,6 @@ #include "gms/application_state.hh" #include #include -#include "utils/fb_utilities.hh" #include "replica/database_fwd.hh" #include "streaming/stream_reason.hh" #include @@ -264,9 +263,13 @@ public: } private: - inet_address get_broadcast_address() const { - return utils::fb_utilities::get_broadcast_address(); + inet_address get_broadcast_address() const noexcept { + return get_token_metadata_ptr()->get_topology().my_address(); } + bool is_me(inet_address addr) const noexcept { + return get_token_metadata_ptr()->get_topology().is_me(addr); + } + /* This abstraction maintains the token/endpoint metadata information */ shared_token_metadata& _shared_token_metadata; locator::effective_replication_map_factory& _erm_factory;