Merge 'Don't cache dc:rack on system keyspace local cache' from Pavel Emelyanov

The local node's dc:rack pair is cached on system keyspace on start. However, most of other code don't need it as they get dc:rack from topology or directly from snitch. There are few places left that still mess with sysks cache, but they are easy to patch. So after this patch all the core code uses two sources of dc:rack -- topology / snitch -- instead of three.

Closes #15280

* github.com:scylladb/scylladb:
  system_keyspace: Don't require snitch argument on start
  system_keyspace: Don't cache local dc:rack pair
  system_keyspace: Save local info with explicit location
  storage_service: Get endpoint location from snitch, not system keyspace
  snitch: Introduce and use get_location() method
  repair: Local location variables instead of system keyspace's one
  repair: Use full endpoint location instead of datacenter part
This commit is contained in:
Botond Dénes
2023-09-11 10:26:26 +03:00
7 changed files with 39 additions and 50 deletions

View File

@@ -1378,7 +1378,7 @@ future<system_keyspace::local_info> system_keyspace::load_local_info() {
co_return ret;
}
future<> system_keyspace::save_local_info(local_info sysinfo) {
future<> system_keyspace::save_local_info(local_info sysinfo, locator::endpoint_dc_rack location) {
auto& cfg = _db.get_config();
sstring req = fmt::format("INSERT INTO system.{} (key, host_id, cluster_name, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
, db::system_keyspace::LOCAL);
@@ -1390,8 +1390,8 @@ future<> system_keyspace::save_local_info(local_info sysinfo) {
cql3::query_processor::CQL_VERSION,
::cassandra::thrift_version,
to_sstring(unsigned(cql_serialization_format::latest().protocol_version())),
local_dc_rack().dc,
local_dc_rack().rack,
location.dc,
location.rack,
sstring(cfg.partitioner()),
utils::fb_utilities::get_broadcast_rpc_address().addr(),
utils::fb_utilities::get_broadcast_address().addr(),
@@ -1410,7 +1410,6 @@ future<> system_keyspace::save_local_supported_features(const std::set<std::stri
// is different than the one that wrote, may see a corrupted value. invoke_on_all will be used to guarantee that all
// updates are propagated correctly.
struct local_cache {
locator::endpoint_dc_rack _local_dc_rack_info;
system_keyspace::bootstrap_state _state;
};
@@ -1966,10 +1965,6 @@ future<> system_keyspace::initialize_virtual_tables(
install_virtual_readers(*this, db);
}
locator::endpoint_dc_rack system_keyspace::local_dc_rack() const {
return _cache->_local_dc_rack_info;
}
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
system_keyspace::query_mutations(distributed<replica::database>& db, const sstring& ks_name, const sstring& cf_name) {
schema_ptr schema = db.local().find_schema(ks_name, cf_name);
@@ -2805,19 +2800,12 @@ sstring system_keyspace_name() {
}
system_keyspace::system_keyspace(
cql3::query_processor& qp, replica::database& db, const locator::snitch_ptr& snitch) noexcept
cql3::query_processor& qp, replica::database& db) noexcept
: _qp(qp)
, _db(db)
, _cache(std::make_unique<local_cache>())
{
_db.plug_system_keyspace(*this);
// FIXME
// This should be coupled with setup_version()'s part committing these values into
// the system.local table. However, cql_test_env needs cached local_dc_rack strings,
// but it doesn't call system_keyspace::setup() and thus ::setup_version() either
_cache->_local_dc_rack_info.dc = snitch->get_datacenter();
_cache->_local_dc_rack_info.rack = snitch->get_rack();
}
system_keyspace::~system_keyspace() {

View File

@@ -62,7 +62,6 @@ namespace gms {
namespace locator {
class effective_replication_map_factory;
class endpoint_dc_rack;
class snitch_ptr;
} // namespace locator
namespace gms {
@@ -330,7 +329,6 @@ public:
* Return a map of IP addresses containing a map of dc and rack info
*/
future<std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>> load_dc_rack_info();
locator::endpoint_dc_rack local_dc_rack() const;
enum class bootstrap_state {
NEEDS_BOOTSTRAP,
@@ -410,7 +408,7 @@ public:
};
future<local_info> load_local_info();
future<> save_local_info(local_info);
future<> save_local_info(local_info, locator::endpoint_dc_rack);
private:
future<truncation_record> get_truncation_record(table_id cf_id);
@@ -513,7 +511,7 @@ private:
public:
system_keyspace(cql3::query_processor& qp, replica::database& db, const locator::snitch_ptr&) noexcept;
system_keyspace(cql3::query_processor& qp, replica::database& db) noexcept;
~system_keyspace();
future<> shutdown();

View File

@@ -15,6 +15,7 @@
#include <boost/signals2.hpp>
#include <boost/signals2/dummy_mutex.hpp>
#include "locator/types.hh"
#include "gms/inet_address.hh"
#include "inet_address_vectors.hh"
#include "gms/versioned_value.hh"
@@ -70,6 +71,13 @@ public:
*/
virtual sstring get_datacenter() const = 0;
locator::endpoint_dc_rack get_location() const {
return locator::endpoint_dc_rack{
.dc = get_datacenter(),
.rack = get_rack(),
};
}
/**
* returns whatever info snitch wants to gossip
*/

View File

@@ -852,7 +852,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
supervisor::notify("starting tokens manager");
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
tm_cfg.topo_cfg.local_dc_rack = snitch.local()->get_location();
if (snitch.local()->get_name() == "org.apache.cassandra.locator.SimpleSnitch") {
//
// Simple snitch wants sort_by_proximity() not to reorder nodes anyhow
@@ -1089,7 +1089,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
static sharded<cdc::generation_service> cdc_generation_service;
supervisor::notify("starting system keyspace");
sys_ks.start(std::ref(qp), std::ref(db), std::ref(snitch)).get();
sys_ks.start(std::ref(qp), std::ref(db)).get();
// TODO: stop()?
// Initialization of a keyspace is done by shard 0 only. For system
@@ -1117,7 +1117,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
cfg->host_id = linfo.host_id;
linfo.listen_address = listen_address;
sys_ks.local().save_local_info(std::move(linfo)).get();
sys_ks.local().save_local_info(std::move(linfo), snitch.local()->get_location()).get();
shared_token_metadata::mutate_on_all_shards(token_metadata, [hostid = cfg->host_id, endpoint = utils::fb_utilities::get_broadcast_address()] (locator::token_metadata& tm) {
// Makes local host id available in topology cfg as soon as possible.

View File

@@ -1436,7 +1436,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
auto& db = get_db().local();
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto& topology = tmptr->get_topology();
auto local_dc = topology.get_datacenter();
auto myloc = topology.get_location();
auto myip = utils::fb_utilities::get_broadcast_address();
auto reason = streaming::stream_reason::bootstrap;
// Calculate number of ranges to sync data
@@ -1446,7 +1446,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
continue;
}
auto& strat = erm->get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, _sys_ks.local().local_dc_rack()).get0();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, myloc).get0();
seastar::thread::maybe_yield();
auto nr_tables = get_nr_tables(db, keyspace_name);
nr_ranges_total += desired_ranges.size() * nr_tables;
@@ -1462,7 +1462,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
continue;
}
auto& strat = erm->get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, _sys_ks.local().local_dc_rack()).get0();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, myloc).get0();
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
bool everywhere_topology = strat.get_type() == locator::replication_strategy_type::everywhere_topology;
auto replication_factor = erm->get_replication_factor();
@@ -1472,7 +1472,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
//Pending ranges
metadata_clone.update_topology(myip, _sys_ks.local().local_dc_rack(), locator::node::state::bootstrapping);
metadata_clone.update_topology(myip, myloc, locator::node::state::bootstrapping);
metadata_clone.update_normal_tokens(tokens, myip).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
metadata_clone.clear_gently().get();
@@ -1530,14 +1530,14 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
throw std::runtime_error(format("bootstrap_with_repair: keyspace={}, range={}, failed to cast to network_topology_strategy",
keyspace_name, desired_range));
}
rf_in_local_dc = nts->get_replication_factor(local_dc);
rf_in_local_dc = nts->get_replication_factor(myloc.dc);
}
return rf_in_local_dc;
};
auto get_old_endpoints_in_local_dc = [&] () {
return boost::copy_range<std::vector<gms::inet_address>>(old_endpoints |
boost::adaptors::filtered([&] (const gms::inet_address& node) {
return topology.get_datacenter(node) == local_dc;
return topology.get_datacenter(node) == myloc.dc;
})
);
};
@@ -1924,14 +1924,14 @@ future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr,
auto cloned_tm = co_await tmptr->clone_async();
auto op = sstring("replace_with_repair");
auto& topology = tmptr->get_topology();
auto source_dc = topology.get_datacenter();
auto myloc = topology.get_location();
auto reason = streaming::stream_reason::replace;
// update a cloned version of tmptr
// no need to set the original version
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::replacing);
cloned_tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), myloc, locator::node::state::replacing);
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, utils::fb_utilities::get_broadcast_address());
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes));
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), myloc.dc, reason, std::move(ignore_nodes));
}
node_ops_cmd_category categorize_node_ops_cmd(node_ops_cmd cmd) noexcept {

View File

@@ -2510,7 +2510,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore we update _token_metadata now, before gossip starts.
tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::normal);
tmptr->update_topology(get_broadcast_address(), _snitch.local()->get_location(), locator::node::state::normal);
co_await tmptr->update_normal_tokens(my_tokens, get_broadcast_address());
cdc_gen_id = co_await _sys_ks.local().get_cdc_generation_id();
@@ -2801,7 +2801,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now.
tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack(), locator::node::state::normal);
tmptr->update_topology(get_broadcast_address(), _snitch.local()->get_location(), locator::node::state::normal);
return tmptr->update_normal_tokens(bootstrap_tokens, get_broadcast_address());
});
@@ -2940,7 +2940,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens);
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
auto endpoint = get_broadcast_address();
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::bootstrapping);
tmptr->update_topology(endpoint, _snitch.local()->get_location(), locator::node::state::bootstrapping);
tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint);
return update_topology_change_info(std::move(tmptr), ::format("bootstrapping node {}", endpoint));
}).get();
@@ -2963,7 +2963,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count());
_gossiper.wait_for_range_setup().get();
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), bootstrap_tokens, get_token_metadata_ptr());
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr());
slogger.info("Starting to bootstrap...");
bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper).get();
} else {
@@ -4627,7 +4627,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
_repair.local().replace_with_repair(get_token_metadata_ptr(), bootstrap_tokens, ctl.ignore_nodes).get();
} else {
slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid);
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), bootstrap_tokens, get_token_metadata_ptr());
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr());
bs.bootstrap(streaming::stream_reason::replace, _gossiper, replace_address).get();
}
on_streaming_finished();
@@ -5294,7 +5294,7 @@ future<> storage_service::rebuild(sstring source_dc) {
co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._stream_manager, tmptr, ss._abort_source,
ss.get_broadcast_address(), ss._sys_ks.local().local_dc_rack(), "Rebuild", streaming::stream_reason::rebuild);
ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(ss._gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
@@ -5470,7 +5470,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
as.request_abort();
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Removenode", streaming::stream_reason::removenode);
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode);
removenode_add_ranges(streamer, leaving_node).get();
try {
streamer->stream_async().get();
@@ -5521,7 +5521,7 @@ future<> storage_service::leave_ring() {
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace) {
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Unbootstrap", streaming::stream_reason::decommission);
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, get_broadcast_address(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission);
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
@@ -5727,12 +5727,7 @@ future<> storage_service::snitch_reconfigured() {
auto& snitch = _snitch.local();
co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> {
// re-read local rack and DC info
auto endpoint = utils::fb_utilities::get_broadcast_address();
auto dr = locator::endpoint_dc_rack {
.dc = snitch->get_datacenter(),
.rack = snitch->get_rack(),
};
tmptr->update_topology(endpoint, std::move(dr));
tmptr->update_topology(utils::fb_utilities::get_broadcast_address(), snitch->get_location());
return make_ready_future<>();
});
@@ -5974,7 +5969,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, _abort_source,
get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Rebuild", streaming::stream_reason::rebuild);
get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(_gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
@@ -6099,7 +6094,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
auto& table = _db.local().find_column_family(tablet.table);
std::vector<sstring> tables = {table.schema()->cf_name()};
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tm, _abort_source,
get_broadcast_address(), _sys_ks.local().local_dc_rack(),
get_broadcast_address(), _snitch.local()->get_location(),
"Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables));
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
_gossiper.get_unreachable_members()));

View File

@@ -613,7 +613,7 @@ private:
auto stop_sl_controller = defer([this] { _sl_controller.stop().get(); });
_sl_controller.invoke_on_all(&qos::service_level_controller::start).get();
_sys_ks.start(std::ref(_qp), std::ref(_db), std::ref(_snitch)).get();
_sys_ks.start(std::ref(_qp), std::ref(_db)).get();
auto stop_sys_kd = defer([this] { _sys_ks.stop().get(); });
for (const auto p: all_system_table_load_phases) {
replica::distributed_loader::init_system_keyspace(_sys_ks, _erm_factory, _db, *cfg, p).get();
@@ -625,7 +625,7 @@ private:
linfo.host_id = locator::host_id::create_random_id();
}
cfg->host_id = linfo.host_id;
_sys_ks.local().save_local_info(std::move(linfo)).get();
_sys_ks.local().save_local_info(std::move(linfo), _snitch.local()->get_location()).get();
}
locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = cfg->host_id] (locator::token_metadata& tm) {
tm.get_topology().set_host_id_cfg(hostid);