system_keyspace: De-bloat .setup() from messing with system.local
On boot several manipulations with system.local are performed. 1. The host_id value is selected from it with key = local If not found, system_keyspace generates a new host_id, inserts the new value into the table and returns back 2. The cluster_name is selected from it with key = local Then it's system_keyspace that either checks that the name matches the one from db::config, or inserts the db::config value into the table 3. The row with key = local is updated with various info like versions, listen, rpc and bcast addresses, dc, rack, etc. Unconditionally All three steps are scattered over main, p.1 is called directly, p.2 and p.3 are executed via system_keyspace::setup() that happens rather late. Also there's some touch of this table from the cql_test_env startup code. The proposal is to collect this setup into one place and execute it early -- as soon as the system.local table is populated. This frees the system_keyspace code from the logic of selecting host id and cluster name leaving it to main and keeps it with only select/insert work. refs: #2795 Signed-off-by: Pavel Emelyanov <xemul@scylladb.com> Closes #15082
This commit is contained in:
committed by
Avi Kivity
parent
1552044615
commit
6bc30f1944
@@ -1357,12 +1357,31 @@ schema_ptr system_keyspace::legacy::aggregates() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
future<> system_keyspace::setup_version(sharded<netw::messaging_service>& ms) {
|
||||
future<system_keyspace::local_info> system_keyspace::load_local_info() {
|
||||
auto msg = co_await execute_cql(format("SELECT host_id, cluster_name FROM system.{} WHERE key=?", LOCAL), sstring(LOCAL));
|
||||
|
||||
local_info ret;
|
||||
if (!msg->empty()) {
|
||||
auto& row = msg->one();
|
||||
if (row.has("host_id")) {
|
||||
ret.host_id = locator::host_id(row.get_as<utils::UUID>("host_id"));
|
||||
}
|
||||
if (row.has("cluster_name")) {
|
||||
ret.cluster_name = row.get_as<sstring>("cluster_name");
|
||||
}
|
||||
}
|
||||
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<> system_keyspace::save_local_info(local_info sysinfo) {
|
||||
auto& cfg = _db.get_config();
|
||||
sstring req = fmt::format("INSERT INTO system.{} (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||||
sstring req = fmt::format("INSERT INTO system.{} (key, host_id, cluster_name, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||||
, db::system_keyspace::LOCAL);
|
||||
|
||||
return execute_cql(req, sstring(db::system_keyspace::LOCAL),
|
||||
sysinfo.host_id.uuid(),
|
||||
sysinfo.cluster_name,
|
||||
version::release(),
|
||||
cql3::query_processor::CQL_VERSION,
|
||||
::cassandra::thrift_version,
|
||||
@@ -1372,7 +1391,7 @@ future<> system_keyspace::setup_version(sharded<netw::messaging_service>& ms) {
|
||||
sstring(cfg.partitioner()),
|
||||
utils::fb_utilities::get_broadcast_rpc_address().addr(),
|
||||
utils::fb_utilities::get_broadcast_address().addr(),
|
||||
ms.local().listen_address().addr()
|
||||
sysinfo.listen_address.addr()
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
@@ -1433,9 +1452,7 @@ future<> system_keyspace::build_bootstrap_info() {
|
||||
future<> system_keyspace::setup(sharded<netw::messaging_service>& ms) {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
co_await setup_version(ms);
|
||||
co_await build_bootstrap_info();
|
||||
co_await check_health();
|
||||
co_await db::schema_tables::save_system_keyspace_schema(_qp);
|
||||
// #2514 - make sure "system" is written to system_schema.keyspaces.
|
||||
co_await db::schema_tables::save_system_schema(_qp, NAME);
|
||||
@@ -1724,34 +1741,6 @@ future<> system_keyspace::force_blocking_flush(sstring cfname) {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* One of three things will happen if you try to read the system keyspace:
|
||||
* 1. files are present and you can read them: great
|
||||
* 2. no files are there: great (new node is assumed)
|
||||
* 3. files are present but you can't read them: bad
|
||||
*/
|
||||
future<> system_keyspace::check_health() {
|
||||
using namespace cql_transport::messages;
|
||||
sstring req = format("SELECT cluster_name FROM system.{} WHERE key=?", LOCAL);
|
||||
return execute_cql(req, sstring(LOCAL)).then([this] (::shared_ptr<cql3::untyped_result_set> msg) {
|
||||
if (msg->empty() || !msg->one().has("cluster_name")) {
|
||||
// this is a brand new node
|
||||
sstring ins_req = format("INSERT INTO system.{} (key, cluster_name) VALUES (?, ?)", LOCAL);
|
||||
auto cluster_name = _db.get_config().cluster_name();
|
||||
return execute_cql(ins_req, sstring(LOCAL), cluster_name).discard_result();
|
||||
} else {
|
||||
auto cluster_name = _db.get_config().cluster_name();
|
||||
auto saved_cluster_name = msg->one().get_as<sstring>("cluster_name");
|
||||
|
||||
if (cluster_name != saved_cluster_name) {
|
||||
throw exceptions::configuration_exception("Saved cluster name " + saved_cluster_name + " != configured name " + cluster_name);
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<std::unordered_set<dht::token>> system_keyspace::get_saved_tokens() {
|
||||
sstring req = format("SELECT tokens FROM system.{} WHERE key = ?", LOCAL);
|
||||
return execute_cql(req, sstring(LOCAL)).then([] (auto msg) {
|
||||
@@ -1953,28 +1942,6 @@ future<> system_keyspace::initialize_virtual_tables(
|
||||
install_virtual_readers(*this, db);
|
||||
}
|
||||
|
||||
future<locator::host_id> system_keyspace::load_local_host_id() {
|
||||
sstring req = format("SELECT host_id FROM system.{} WHERE key=?", LOCAL);
|
||||
auto msg = co_await execute_cql(req, sstring(LOCAL));
|
||||
if (msg->empty() || !msg->one().has("host_id")) {
|
||||
co_return co_await set_local_random_host_id();
|
||||
} else {
|
||||
auto host_id = locator::host_id(msg->one().get_as<utils::UUID>("host_id"));
|
||||
slogger.info("Loaded local host id: {}", host_id);
|
||||
co_return host_id;
|
||||
}
|
||||
}
|
||||
|
||||
future<locator::host_id> system_keyspace::set_local_random_host_id() {
|
||||
auto host_id = locator::host_id::create_random_id();
|
||||
slogger.info("Setting local host id to {}", host_id);
|
||||
|
||||
sstring req = format("INSERT INTO system.{} (key, host_id) VALUES (?, ?)", LOCAL);
|
||||
co_await execute_cql(req, sstring(LOCAL), host_id.uuid());
|
||||
co_await force_blocking_flush(LOCAL);
|
||||
co_return host_id;
|
||||
}
|
||||
|
||||
locator::endpoint_dc_rack system_keyspace::local_dc_rack() const {
|
||||
return _cache->_local_dc_rack_info;
|
||||
}
|
||||
|
||||
@@ -126,8 +126,6 @@ class system_keyspace : public seastar::peering_sharded_service<system_keyspace>
|
||||
static schema_ptr large_rows();
|
||||
static schema_ptr large_cells();
|
||||
static schema_ptr scylla_local();
|
||||
future<> setup_version(sharded<netw::messaging_service>& ms);
|
||||
future<> check_health();
|
||||
future<> force_blocking_flush(sstring cfname);
|
||||
future<> build_bootstrap_info();
|
||||
future<> cache_truncation_record();
|
||||
@@ -405,16 +403,15 @@ public:
|
||||
bool was_decommissioned() const;
|
||||
future<> set_bootstrap_state(bootstrap_state state);
|
||||
|
||||
/**
|
||||
* Read the host ID from the system keyspace, creating (and storing) one if
|
||||
* none exists.
|
||||
*/
|
||||
future<locator::host_id> load_local_host_id();
|
||||
struct local_info {
|
||||
locator::host_id host_id;
|
||||
sstring cluster_name;
|
||||
gms::inet_address listen_address;
|
||||
};
|
||||
|
||||
future<local_info> load_local_info();
|
||||
future<> save_local_info(local_info);
|
||||
private:
|
||||
/**
|
||||
* Sets the local host ID explicitly. Used only internally when intializing the host_id
|
||||
*/
|
||||
future<locator::host_id> set_local_random_host_id();
|
||||
future<truncation_record> get_truncation_record(table_id cf_id);
|
||||
|
||||
public:
|
||||
|
||||
20
main.cc
20
main.cc
@@ -1101,7 +1101,23 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// described here: https://github.com/scylladb/scylla/issues/1014
|
||||
supervisor::notify("loading system sstables");
|
||||
replica::distributed_loader::init_system_keyspace(sys_ks, erm_factory, db, *cfg, system_table_load_phase::phase1).get();
|
||||
cfg->host_id = sys_ks.local().load_local_host_id().get0();
|
||||
|
||||
auto listen_address = utils::resolve(cfg->listen_address, family).get0();
|
||||
|
||||
auto linfo = sys_ks.local().load_local_info().get0();
|
||||
if (linfo.cluster_name.empty()) {
|
||||
linfo.cluster_name = cfg->cluster_name();
|
||||
} else if (linfo.cluster_name != cfg->cluster_name()) {
|
||||
throw exceptions::configuration_exception("Saved cluster name " + linfo.cluster_name + " != configured name " + cfg->cluster_name());
|
||||
}
|
||||
if (!linfo.host_id) {
|
||||
linfo.host_id = locator::host_id::create_random_id();
|
||||
startlog.info("Setting local host id to {}", linfo.host_id);
|
||||
}
|
||||
|
||||
cfg->host_id = linfo.host_id;
|
||||
linfo.listen_address = listen_address;
|
||||
sys_ks.local().save_local_info(std::move(linfo)).get();
|
||||
|
||||
shared_token_metadata::mutate_on_all_shards(token_metadata, [hostid = cfg->host_id, endpoint = utils::fb_utilities::get_broadcast_address()] (locator::token_metadata& tm) {
|
||||
// Makes local host id available in topology cfg as soon as possible.
|
||||
@@ -1115,7 +1131,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
netw::messaging_service::config mscfg;
|
||||
|
||||
mscfg.id = cfg->host_id;
|
||||
mscfg.ip = utils::resolve(cfg->listen_address, family).get0();
|
||||
mscfg.ip = listen_address;
|
||||
mscfg.port = cfg->storage_port();
|
||||
mscfg.ssl_port = cfg->ssl_storage_port();
|
||||
mscfg.listen_on_broadcast_address = cfg->listen_on_broadcast_address();
|
||||
|
||||
@@ -667,7 +667,12 @@ private:
|
||||
}
|
||||
|
||||
if (!cfg->host_id) {
|
||||
cfg->host_id = _sys_ks.local().load_local_host_id().get0();
|
||||
auto linfo = _sys_ks.local().load_local_info().get0();
|
||||
if (!linfo.host_id) {
|
||||
linfo.host_id = locator::host_id::create_random_id();
|
||||
}
|
||||
cfg->host_id = linfo.host_id;
|
||||
_sys_ks.local().save_local_info(std::move(linfo)).get();
|
||||
}
|
||||
locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = cfg->host_id] (locator::token_metadata& tm) {
|
||||
tm.get_topology().set_host_id_cfg(hostid);
|
||||
|
||||
Reference in New Issue
Block a user