db: system_keyspace: don't take sharded<> references

Take `query_processor` and `database` references directly, not through
`sharded<...>&`. This is now possible because we moved `query_processor`
and `database` construction early, so by the time `system_keyspace` is
started, the services it depends on were also already started.

Calls to `_qp.local()` and `_db.local()` inside `system_keyspace` member
functions can now be replaced with direct uses of `_qp` and `_db`.
Runtime assertions for dependant services being initialized are gone.
This commit is contained in:
Kamil Braun
2022-07-22 12:45:09 +02:00
parent b7627085cb
commit 53cf646103
2 changed files with 23 additions and 25 deletions

View File

@@ -1382,7 +1382,7 @@ schema_ptr system_keyspace::legacy::aggregates() {
}
future<> system_keyspace::setup_version(sharded<netw::messaging_service>& ms) {
auto& cfg = _db.local().get_config();
auto& cfg = _db.get_config();
sstring req = fmt::format("INSERT INTO system.{} (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
, db::system_keyspace::LOCAL);
@@ -1458,12 +1458,12 @@ future<> system_keyspace::setup(sharded<locator::snitch_ptr>& snitch, sharded<ne
assert(this_shard_id() == 0);
co_await setup_version(ms);
co_await update_schema_version(_db.local().get_version());
co_await update_schema_version(_db.get_version());
co_await build_bootstrap_info();
co_await check_health();
co_await db::schema_tables::save_system_keyspace_schema(_qp.local());
co_await db::schema_tables::save_system_keyspace_schema(_qp);
// #2514 - make sure "system" is written to system_schema.keyspaces.
co_await db::schema_tables::save_system_schema(_qp.local(), NAME);
co_await db::schema_tables::save_system_schema(_qp, NAME);
co_await cache_truncation_record();
if (snitch.local()->prefer_local()) {
@@ -1487,7 +1487,7 @@ struct truncation_record {
namespace db {
future<truncation_record> system_keyspace::get_truncation_record(table_id cf_id) {
if (_db.local().get_config().ignore_truncation_record.is_set()) {
if (_db.get_config().ignore_truncation_record.is_set()) {
truncation_record r{truncation_record::current_magic};
return make_ready_future<truncation_record>(std::move(r));
}
@@ -1510,7 +1510,7 @@ future<truncation_record> system_keyspace::get_truncation_record(table_id cf_id)
// Read system.truncate table and cache last truncation time in `table` object for each table on every shard
future<> system_keyspace::cache_truncation_record() {
if (_db.local().get_config().ignore_truncation_record.is_set()) {
if (_db.get_config().ignore_truncation_record.is_set()) {
return make_ready_future<>();
}
sstring req = format("SELECT DISTINCT table_uuid, truncated_at from system.{}", TRUNCATED);
@@ -1519,7 +1519,7 @@ future<> system_keyspace::cache_truncation_record() {
auto table_uuid = table_id(row.get_as<utils::UUID>("table_uuid"));
auto ts = row.get_as<db_clock::time_point>("truncated_at");
return _db.invoke_on_all([table_uuid, ts] (replica::database& db) mutable {
return _db.container().invoke_on_all([table_uuid, ts] (replica::database& db) mutable {
try {
replica::table& cf = db.find_column_family(table_uuid);
cf.cache_truncation_record(ts);
@@ -1769,10 +1769,10 @@ future<> system_keyspace::check_health() {
if (msg->empty() || !msg->one().has("cluster_name")) {
// this is a brand new node
sstring ins_req = format("INSERT INTO system.{} (key, cluster_name) VALUES (?, ?)", LOCAL);
auto cluster_name = _db.local().get_config().cluster_name();
auto cluster_name = _db.get_config().cluster_name();
return execute_cql(ins_req, sstring(LOCAL), cluster_name).discard_result();
} else {
auto cluster_name = _db.local().get_config().cluster_name();
auto cluster_name = _db.get_config().cluster_name();
auto saved_cluster_name = msg->one().get_as<sstring>("cluster_name");
if (cluster_name != saved_cluster_name) {
@@ -3078,7 +3078,7 @@ future<> system_keyspace::update_compaction_history(utils::UUID uuid, sstring ks
future<> system_keyspace::get_compaction_history(compaction_history_consumer consumer) {
sstring req = format("SELECT * from system.{}", COMPACTION_HISTORY);
co_await _qp.local().query_internal(req, [&consumer] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
co_await _qp.query_internal(req, [&consumer] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
compaction_history_entry entry;
entry.id = row.get_as<utils::UUID>("id");
entry.ks = row.get_as<sstring>("keyspace_name");
@@ -3101,7 +3101,7 @@ future<> system_keyspace::update_repair_history(repair_history_entry entry) {
future<> system_keyspace::get_repair_history(::table_id table_id, repair_history_consumer f) {
sstring req = format("SELECT * from system.{} WHERE table_uuid = {}", REPAIR_HISTORY, table_id);
co_await _qp.local().query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
repair_history_entry ent;
ent.id = tasks::task_id(row.get_as<utils::UUID>("repair_uuid"));
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
@@ -3117,7 +3117,7 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
future<int> system_keyspace::increment_and_get_generation() {
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
auto rs = co_await _qp.local().execute_internal(req, cql3::query_processor::cache_internal::yes);
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
gms::generation_type generation;
if (rs->empty() || !rs->one().has("gossip_generation")) {
// seconds-since-epoch isn't a foolproof new generation
@@ -3137,7 +3137,7 @@ future<int> system_keyspace::increment_and_get_generation() {
}
}
req = format("INSERT INTO system.{} (key, gossip_generation) VALUES ('{}', ?)", LOCAL, LOCAL);
co_await _qp.local().execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes);
co_await _qp.execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes);
co_await force_blocking_flush(LOCAL);
co_return generation;
}
@@ -3672,7 +3672,7 @@ future<cdc::topology_description>
system_keyspace::read_cdc_generation(utils::UUID id) {
std::vector<cdc::token_range_description> entries;
auto num_ranges = 0;
co_await _qp.local().query_internal(
co_await _qp.query_internal(
format("SELECT range_end, streams, ignore_msb, num_ranges FROM {}.{} WHERE id = ?",
NAME, CDC_GENERATIONS_V3),
db::consistency_level::ONE,
@@ -3740,7 +3740,7 @@ future<> system_keyspace::sstables_registry_list(sstring location, sstable_regis
static const auto req = format("SELECT uuid, status, generation, version, format FROM system.{} WHERE location = ?", SSTABLES_REGISTRY);
slogger.trace("Listing {} entries from {}", location, SSTABLES_REGISTRY);
co_await _qp.local().query_internal(req, db::consistency_level::ONE, { location }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
co_await _qp.query_internal(req, db::consistency_level::ONE, { location }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
auto uuid = row.get_as<utils::UUID>("uuid");
auto status = row.get_as<sstring>("status");
auto gen = sstables::generation_type(row.get_as<utils::UUID>("generation"));
@@ -3756,7 +3756,7 @@ sstring system_keyspace_name() {
return system_keyspace::NAME;
}
system_keyspace::system_keyspace(sharded<cql3::query_processor>& qp, sharded<replica::database>& db) noexcept
system_keyspace::system_keyspace(cql3::query_processor& qp, replica::database& db) noexcept
: _qp(qp)
, _db(db)
, _cache(std::make_unique<local_cache>())
@@ -3767,13 +3767,11 @@ system_keyspace::~system_keyspace() {
}
future<> system_keyspace::start(const locator::snitch_ptr& snitch) {
assert(_qp.local_is_initialized() && _db.local_is_initialized());
if (this_shard_id() == 0) {
qctx = std::make_unique<query_context>(_qp);
qctx = std::make_unique<query_context>(_qp.container());
}
_db.local().plug_system_keyspace(*this);
_db.plug_system_keyspace(*this);
// FIXME
// This should be coupled with setup_version()'s part committing these values into
@@ -3786,7 +3784,7 @@ future<> system_keyspace::start(const locator::snitch_ptr& snitch) {
}
future<> system_keyspace::shutdown() {
_db.local().unplug_system_keyspace();
_db.unplug_system_keyspace();
co_return;
}
@@ -3795,7 +3793,7 @@ future<> system_keyspace::stop() {
}
future<::shared_ptr<cql3::untyped_result_set>> system_keyspace::execute_cql(const sstring& query_string, const std::initializer_list<data_value>& values) {
return _qp.local().execute_internal(query_string, values, cql3::query_processor::cache_internal::yes);
return _qp.execute_internal(query_string, values, cql3::query_processor::cache_internal::yes);
}
} // namespace db

View File

@@ -101,8 +101,8 @@ struct compaction_history_entry {
};
class system_keyspace : public seastar::peering_sharded_service<system_keyspace>, public seastar::async_sharded_service<system_keyspace> {
sharded<cql3::query_processor>& _qp;
sharded<replica::database>& _db;
cql3::query_processor& _qp;
replica::database& _db;
std::unique_ptr<local_cache> _cache;
static schema_ptr raft_snapshot_config();
@@ -488,7 +488,7 @@ public:
future<bool> get_must_synchronize_topology();
future<> set_must_synchronize_topology(bool);
system_keyspace(sharded<cql3::query_processor>& qp, sharded<replica::database>& db) noexcept;
system_keyspace(cql3::query_processor& qp, replica::database& db) noexcept;
~system_keyspace();
future<> start(const locator::snitch_ptr&);
future<> stop();