storage_service: use locator::topology rather than fb_utilities

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-11-30 13:57:41 +02:00
parent a529097d96
commit b3bede8141
2 changed files with 15 additions and 13 deletions

View File

@@ -78,7 +78,6 @@
#include <seastar/coroutine/exception.hh>
#include "utils/stall_free.hh"
#include "utils/error_injection.hh"
#include "utils/fb_utilities.hh"
#include "locator/util.hh"
#include "idl/storage_service.dist.hh"
#include "service/storage_proxy.hh"
@@ -407,7 +406,7 @@ future<> storage_service::topology_state_load() {
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count);
// Save tokens, not needed for raft topology management, but needed by legacy
// Also ip -> id mapping is needed for address map recreation on reboot
if (!utils::fb_utilities::is_me(ip)) {
if (!is_me(ip)) {
// Some state that is used to fill in 'peeers' table is still propagated over gossiper.
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
@@ -465,7 +464,7 @@ future<> storage_service::topology_state_load() {
switch (rs.state) {
case node_state::bootstrapping:
if (rs.ring.has_value()) {
if (!utils::fb_utilities::is_me(ip)) {
if (!is_me(ip)) {
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
co_await _sys_ks.local().update_tokens(ip, {});
co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid());
@@ -538,7 +537,7 @@ future<> storage_service::topology_state_load() {
// endpoints. We cannot rely on seeds alone, since it is not guaranteed that seeds
// will be up to date and reachable at the time of restart.
for (const auto& e: get_token_metadata_ptr()->get_all_endpoints()) {
if (!utils::fb_utilities::is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) {
if (!is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) {
co_await _gossiper.add_saved_endpoint(e);
}
}
@@ -1074,7 +1073,7 @@ class topology_coordinator {
}
slogger.trace("raft topology: send {} command with term {} and index {} to {}/{}",
cmd.cmd, _term, cmd_index, id, *ip);
auto result = utils::fb_utilities::is_me(*ip) ?
auto result = _db.get_token_metadata().get_topology().is_me(*ip) ?
co_await _raft_topology_cmd_handler(_term, cmd_index, cmd) :
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
&_messaging, netw::msg_addr{*ip}, id, _term, cmd_index, cmd);
@@ -3077,7 +3076,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
auto broadcast_rpc_address = utils::fb_utilities::get_broadcast_rpc_address();
auto broadcast_rpc_address = get_token_metadata_ptr()->get_topology().my_cql_address();
// Ensure we know our own actual Schema UUID in preparation for updates
co_await db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service);
@@ -5651,7 +5650,7 @@ future<> storage_service::rebuild(sstring source_dc) {
}
auto ks_erms = ss._db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, erm] : ks_erms) {
co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), ss._gossiper, false);
co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, ss.get_broadcast_address()), ss._gossiper, false);
}
try {
co_await streamer->stream_async();
@@ -6075,9 +6074,9 @@ future<> storage_service::load_tablet_metadata() {
future<> storage_service::snitch_reconfigured() {
assert(this_shard_id() == 0);
auto& snitch = _snitch.local();
co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> {
co_await mutate_token_metadata([&] (mutable_token_metadata_ptr tmptr) -> future<> {
// re-read local rack and DC info
tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), snitch->get_location());
tmptr->update_topology(get_broadcast_address(), snitch->get_location());
return make_ready_future<>();
});
@@ -6330,7 +6329,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, erm] : ks_erms) {
co_await streamer->add_ranges(keyspace_name, erm, get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), _gossiper, false);
co_await streamer->add_ranges(keyspace_name, erm, get_ranges_for_endpoint(erm, get_broadcast_address()), _gossiper, false);
}
try {
co_await streamer->stream_async();

View File

@@ -25,7 +25,6 @@
#include "gms/application_state.hh"
#include <seastar/core/semaphore.hh>
#include <seastar/core/gate.hh>
#include "utils/fb_utilities.hh"
#include "replica/database_fwd.hh"
#include "streaming/stream_reason.hh"
#include <seastar/core/distributed.hh>
@@ -264,9 +263,13 @@ public:
}
private:
inet_address get_broadcast_address() const {
return utils::fb_utilities::get_broadcast_address();
inet_address get_broadcast_address() const noexcept {
return get_token_metadata_ptr()->get_topology().my_address();
}
bool is_me(inet_address addr) const noexcept {
return get_token_metadata_ptr()->get_topology().is_me(addr);
}
/* This abstraction maintains the token/endpoint metadata information */
shared_token_metadata& _shared_token_metadata;
locator::effective_replication_map_factory& _erm_factory;