diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 0b55fc870e..89c3b9f36b 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1378,7 +1378,7 @@ future system_keyspace::load_local_info() { co_return ret; } -future<> system_keyspace::save_local_info(local_info sysinfo) { +future<> system_keyspace::save_local_info(local_info sysinfo, locator::endpoint_dc_rack location) { auto& cfg = _db.get_config(); sstring req = fmt::format("INSERT INTO system.{} (key, host_id, cluster_name, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" , db::system_keyspace::LOCAL); @@ -1390,8 +1390,8 @@ future<> system_keyspace::save_local_info(local_info sysinfo) { cql3::query_processor::CQL_VERSION, ::cassandra::thrift_version, to_sstring(unsigned(cql_serialization_format::latest().protocol_version())), - local_dc_rack().dc, - local_dc_rack().rack, + location.dc, + location.rack, sstring(cfg.partitioner()), utils::fb_utilities::get_broadcast_rpc_address().addr(), utils::fb_utilities::get_broadcast_address().addr(), @@ -1410,7 +1410,6 @@ future<> system_keyspace::save_local_supported_features(const std::set system_keyspace::initialize_virtual_tables( install_virtual_readers(*this, db); } -locator::endpoint_dc_rack system_keyspace::local_dc_rack() const { - return _cache->_local_dc_rack_info; -} - future>> system_keyspace::query_mutations(distributed& db, const sstring& ks_name, const sstring& cf_name) { schema_ptr schema = db.local().find_schema(ks_name, cf_name); @@ -2805,19 +2800,12 @@ sstring system_keyspace_name() { } system_keyspace::system_keyspace( - cql3::query_processor& qp, replica::database& db, const locator::snitch_ptr& snitch) noexcept + cql3::query_processor& qp, replica::database& db) noexcept : _qp(qp) , _db(db) , _cache(std::make_unique()) { _db.plug_system_keyspace(*this); - - // FIXME - // This should be coupled with setup_version()'s part committing these values into - // the system.local table. However, cql_test_env needs cached local_dc_rack strings, - // but it doesn't call system_keyspace::setup() and thus ::setup_version() either - _cache->_local_dc_rack_info.dc = snitch->get_datacenter(); - _cache->_local_dc_rack_info.rack = snitch->get_rack(); } system_keyspace::~system_keyspace() { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 6fb1123f99..3289842a9f 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -62,7 +62,6 @@ namespace gms { namespace locator { class effective_replication_map_factory; class endpoint_dc_rack; - class snitch_ptr; } // namespace locator namespace gms { @@ -330,7 +329,6 @@ public: * Return a map of IP addresses containing a map of dc and rack info */ future> load_dc_rack_info(); - locator::endpoint_dc_rack local_dc_rack() const; enum class bootstrap_state { NEEDS_BOOTSTRAP, @@ -410,7 +408,7 @@ public: }; future load_local_info(); - future<> save_local_info(local_info); + future<> save_local_info(local_info, locator::endpoint_dc_rack); private: future get_truncation_record(table_id cf_id); @@ -513,7 +511,7 @@ private: public: - system_keyspace(cql3::query_processor& qp, replica::database& db, const locator::snitch_ptr&) noexcept; + system_keyspace(cql3::query_processor& qp, replica::database& db) noexcept; ~system_keyspace(); future<> shutdown(); diff --git a/locator/snitch_base.hh b/locator/snitch_base.hh index dfbe36d57b..4147be65ef 100644 --- a/locator/snitch_base.hh +++ b/locator/snitch_base.hh @@ -15,6 +15,7 @@ #include #include +#include "locator/types.hh" #include "gms/inet_address.hh" #include "inet_address_vectors.hh" #include "gms/versioned_value.hh" @@ -70,6 +71,13 @@ public: */ virtual sstring get_datacenter() const = 0; + locator::endpoint_dc_rack get_location() const { + return locator::endpoint_dc_rack{ + .dc = get_datacenter(), + .rack = get_rack(), + }; + } + /** * returns whatever info snitch wants to gossip */ diff --git a/main.cc b/main.cc index a2ebde4caf..8d16d45b6c 100644 --- a/main.cc +++ b/main.cc @@ -852,7 +852,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl supervisor::notify("starting tokens manager"); locator::token_metadata::config tm_cfg; tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address(); - tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; + tm_cfg.topo_cfg.local_dc_rack = snitch.local()->get_location(); if (snitch.local()->get_name() == "org.apache.cassandra.locator.SimpleSnitch") { // // Simple snitch wants sort_by_proximity() not to reorder nodes anyhow @@ -1089,7 +1089,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl static sharded cdc_generation_service; supervisor::notify("starting system keyspace"); - sys_ks.start(std::ref(qp), std::ref(db), std::ref(snitch)).get(); + sys_ks.start(std::ref(qp), std::ref(db)).get(); // TODO: stop()? // Initialization of a keyspace is done by shard 0 only. For system @@ -1117,7 +1117,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl cfg->host_id = linfo.host_id; linfo.listen_address = listen_address; - sys_ks.local().save_local_info(std::move(linfo)).get(); + sys_ks.local().save_local_info(std::move(linfo), snitch.local()->get_location()).get(); shared_token_metadata::mutate_on_all_shards(token_metadata, [hostid = cfg->host_id, endpoint = utils::fb_utilities::get_broadcast_address()] (locator::token_metadata& tm) { // Makes local host id available in topology cfg as soon as possible. diff --git a/repair/repair.cc b/repair/repair.cc index 02a3b8ba20..0e3b622189 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1436,7 +1436,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr auto& db = get_db().local(); auto ks_erms = db.get_non_local_strategy_keyspaces_erms(); auto& topology = tmptr->get_topology(); - auto local_dc = topology.get_datacenter(); + auto myloc = topology.get_location(); auto myip = utils::fb_utilities::get_broadcast_address(); auto reason = streaming::stream_reason::bootstrap; // Calculate number of ranges to sync data @@ -1446,7 +1446,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr continue; } auto& strat = erm->get_replication_strategy(); - dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, _sys_ks.local().local_dc_rack()).get0(); + dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, myloc).get0(); seastar::thread::maybe_yield(); auto nr_tables = get_nr_tables(db, keyspace_name); nr_ranges_total += desired_ranges.size() * nr_tables; @@ -1462,7 +1462,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr continue; } auto& strat = erm->get_replication_strategy(); - dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, _sys_ks.local().local_dc_rack()).get0(); + dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, myloc).get0(); bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology; bool everywhere_topology = strat.get_type() == locator::replication_strategy_type::everywhere_topology; auto replication_factor = erm->get_replication_factor(); @@ -1472,7 +1472,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr auto range_addresses = strat.get_range_addresses(metadata_clone).get0(); //Pending ranges - metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack(), locator::node::state::bootstrapping); + metadata_clone.update_topology(myip, myloc, locator::node::state::bootstrapping); metadata_clone.update_normal_tokens(tokens, myip).get(); auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0(); metadata_clone.clear_gently().get(); @@ -1530,14 +1530,14 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, failed to cast to network_topology_strategy", keyspace_name, desired_range)); } - rf_in_local_dc = nts->get_replication_factor(local_dc); + rf_in_local_dc = nts->get_replication_factor(myloc.dc); } return rf_in_local_dc; }; auto get_old_endpoints_in_local_dc = [&] () { return boost::copy_range>(old_endpoints | boost::adaptors::filtered([&] (const gms::inet_address& node) { - return topology.get_datacenter(node) == local_dc; + return topology.get_datacenter(node) == myloc.dc; }) ); }; @@ -1924,14 +1924,14 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, auto cloned_tm = co_await tmptr->clone_async(); auto op = sstring("replace_with_repair"); auto& topology = tmptr->get_topology(); - auto source_dc = topology.get_datacenter(); + auto myloc = topology.get_location(); auto reason = streaming::stream_reason::replace; // update a cloned version of tmptr // no need to set the original version auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm)); - cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::replacing); + cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), myloc, locator::node::state::replacing); co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address()); - co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes)); + co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), myloc.dc, reason, std::move(ignore_nodes)); } node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept { diff --git a/service/storage_service.cc b/service/storage_service.cc index 6b3ae6becc..4f4f83d10f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2510,7 +2510,7 @@ future<> storage_service::join_token_ring(shardedupdate_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::normal); + tmptr->update_topology(get_broadcast_address(), _snitch.local()->get_location(), locator::node::state::normal); co_await tmptr->update_normal_tokens(my_tokens, get_broadcast_address()); cdc_gen_id = co_await _sys_ks.local().get_cdc_generation_id(); @@ -2801,7 +2801,7 @@ future<> storage_service::join_token_ring(shardedupdate_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::normal); + tmptr->update_topology(get_broadcast_address(), _snitch.local()->get_location(), locator::node::state::normal); return tmptr->update_normal_tokens(bootstrap_tokens, get_broadcast_address()); }); @@ -2940,7 +2940,7 @@ future<> storage_service::bootstrap(std::unordered_set& bootstrap_tokens, slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens); mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) { auto endpoint = get_broadcast_address(); - tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::bootstrapping); + tmptr->update_topology(endpoint, _snitch.local()->get_location(), locator::node::state::bootstrapping); tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint); return update_topology_change_info(std::move(tmptr), ::format("bootstrapping node {}", endpoint)); }).get(); @@ -2963,7 +2963,7 @@ future<> storage_service::bootstrap(std::unordered_set& bootstrap_tokens, slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count()); _gossiper.wait_for_range_setup().get(); - dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), bootstrap_tokens, get_token_metadata_ptr()); + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()); slogger.info("Starting to bootstrap..."); bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper).get(); } else { @@ -4627,7 +4627,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token _repair.local().replace_with_repair(get_token_metadata_ptr(), bootstrap_tokens, ctl.ignore_nodes).get(); } else { slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid); - dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), bootstrap_tokens, get_token_metadata_ptr()); + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()); bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get(); } on_streaming_finished(); @@ -5294,7 +5294,7 @@ future<> storage_service::rebuild(sstring source_dc) { co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); } else { auto streamer = make_lw_shared(ss._db, ss._stream_manager, tmptr, ss._abort_source, - ss.get_broadcast_address(), ss._sys_ks.local().local_dc_rack(), "Rebuild", streaming::stream_reason::rebuild); + ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild); streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); if (source_dc != "") { streamer->add_source_filter(std::make_unique(source_dc)); @@ -5470,7 +5470,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, as.request_abort(); } }); - auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Removenode", streaming::stream_reason::removenode); + auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode); removenode_add_ranges(streamer, leaving_node).get(); try { streamer->stream_async().get(); @@ -5521,7 +5521,7 @@ future<> storage_service::leave_ring() { future<> storage_service::stream_ranges(std::unordered_map> ranges_to_stream_by_keyspace) { - auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Unbootstrap", streaming::stream_reason::decommission); + auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission); for (auto& entry : ranges_to_stream_by_keyspace) { const auto& keyspace = entry.first; auto& ranges_with_endpoints = entry.second; @@ -5727,12 +5727,7 @@ future<> storage_service::snitch_reconfigured() { auto& snitch = _snitch.local(); co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> { // re-read local rack and DC info - auto endpoint = utils::fb_utilities::get_broadcast_address(); - auto dr = locator::endpoint_dc_rack { - .dc = snitch->get_datacenter(), - .rack = snitch->get_rack(), - }; - tmptr->update_topology(endpoint, std::move(dr)); + tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), snitch->get_location()); return make_ready_future<>(); }); @@ -5974,7 +5969,7 @@ future storage_service::raft_topology_cmd_handler(shar co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); } else { auto streamer = make_lw_shared(_db, _stream_manager, tmptr, _abort_source, - get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Rebuild", streaming::stream_reason::rebuild); + get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild); streamer->add_source_filter(std::make_unique(_gossiper.get_unreachable_members())); if (source_dc != "") { streamer->add_source_filter(std::make_unique(source_dc)); @@ -6099,7 +6094,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { auto& table = _db.local().find_column_family(tablet.table); std::vector tables = {table.schema()->cf_name()}; auto streamer = make_lw_shared(_db, _stream_manager, tm, _abort_source, - get_broadcast_address(), _sys_ks.local().local_dc_rack(), + get_broadcast_address(), _snitch.local()->get_location(), "Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables)); streamer->add_source_filter(std::make_unique( _gossiper.get_unreachable_members())); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index e2059cfe8d..ff105e7fb3 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -613,7 +613,7 @@ private: auto stop_sl_controller = defer([this] { _sl_controller.stop().get(); }); _sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); - _sys_ks.start(std::ref(_qp), std::ref(_db), std::ref(_snitch)).get(); + _sys_ks.start(std::ref(_qp), std::ref(_db)).get(); auto stop_sys_kd = defer([this] { _sys_ks.stop().get(); }); for (const auto p: all_system_table_load_phases) { replica::distributed_loader::init_system_keyspace(_sys_ks, _erm_factory, _db, *cfg, p).get(); @@ -625,7 +625,7 @@ private: linfo.host_id = locator::host_id::create_random_id(); } cfg->host_id = linfo.host_id; - _sys_ks.local().save_local_info(std::move(linfo)).get(); + _sys_ks.local().save_local_info(std::move(linfo), _snitch.local()->get_location()).get(); } locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = cfg->host_id] (locator::token_metadata& tm) { tm.get_topology().set_host_id_cfg(hostid);