From 53cf646103d1ecef9cbaaefe86b8463eaedcdd97 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 22 Jul 2022 12:45:09 +0200 Subject: [PATCH] db: system_keyspace: don't take `sharded<>` references Take `query_processor` and `database` references directly, not through `sharded<...>&`. This is now possible because we moved `query_processor` and `database` construction early, so by the time `system_keyspace` is started, the services it depends on were also already started. Calls to `_qp.local()` and `_db.local()` inside `system_keyspace` member functions can now be replaced with direct uses of `_qp` and `_db`. Runtime assertions for dependant services being initialized are gone. --- db/system_keyspace.cc | 42 ++++++++++++++++++++---------------------- db/system_keyspace.hh | 6 +++--- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index cb6089922e..724a7cdf41 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1382,7 +1382,7 @@ schema_ptr system_keyspace::legacy::aggregates() { } future<> system_keyspace::setup_version(sharded& ms) { - auto& cfg = _db.local().get_config(); + auto& cfg = _db.get_config(); sstring req = fmt::format("INSERT INTO system.{} (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address, listen_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" , db::system_keyspace::LOCAL); @@ -1458,12 +1458,12 @@ future<> system_keyspace::setup(sharded& snitch, shardedprefer_local()) { @@ -1487,7 +1487,7 @@ struct truncation_record { namespace db { future system_keyspace::get_truncation_record(table_id cf_id) { - if (_db.local().get_config().ignore_truncation_record.is_set()) { + if (_db.get_config().ignore_truncation_record.is_set()) { truncation_record r{truncation_record::current_magic}; return make_ready_future(std::move(r)); } @@ -1510,7 +1510,7 @@ future system_keyspace::get_truncation_record(table_id cf_id) // Read system.truncate table and cache last truncation time in `table` object for each table on every shard future<> system_keyspace::cache_truncation_record() { - if (_db.local().get_config().ignore_truncation_record.is_set()) { + if (_db.get_config().ignore_truncation_record.is_set()) { return make_ready_future<>(); } sstring req = format("SELECT DISTINCT table_uuid, truncated_at from system.{}", TRUNCATED); @@ -1519,7 +1519,7 @@ future<> system_keyspace::cache_truncation_record() { auto table_uuid = table_id(row.get_as("table_uuid")); auto ts = row.get_as("truncated_at"); - return _db.invoke_on_all([table_uuid, ts] (replica::database& db) mutable { + return _db.container().invoke_on_all([table_uuid, ts] (replica::database& db) mutable { try { replica::table& cf = db.find_column_family(table_uuid); cf.cache_truncation_record(ts); @@ -1769,10 +1769,10 @@ future<> system_keyspace::check_health() { if (msg->empty() || !msg->one().has("cluster_name")) { // this is a brand new node sstring ins_req = format("INSERT INTO system.{} (key, cluster_name) VALUES (?, ?)", LOCAL); - auto cluster_name = _db.local().get_config().cluster_name(); + auto cluster_name = _db.get_config().cluster_name(); return execute_cql(ins_req, sstring(LOCAL), cluster_name).discard_result(); } else { - auto cluster_name = _db.local().get_config().cluster_name(); + auto cluster_name = _db.get_config().cluster_name(); auto saved_cluster_name = msg->one().get_as("cluster_name"); if (cluster_name != saved_cluster_name) { @@ -3078,7 +3078,7 @@ future<> system_keyspace::update_compaction_history(utils::UUID uuid, sstring ks future<> system_keyspace::get_compaction_history(compaction_history_consumer consumer) { sstring req = format("SELECT * from system.{}", COMPACTION_HISTORY); - co_await _qp.local().query_internal(req, [&consumer] (const cql3::untyped_result_set::row& row) mutable -> future { + co_await _qp.query_internal(req, [&consumer] (const cql3::untyped_result_set::row& row) mutable -> future { compaction_history_entry entry; entry.id = row.get_as("id"); entry.ks = row.get_as("keyspace_name"); @@ -3101,7 +3101,7 @@ future<> system_keyspace::update_repair_history(repair_history_entry entry) { future<> system_keyspace::get_repair_history(::table_id table_id, repair_history_consumer f) { sstring req = format("SELECT * from system.{} WHERE table_uuid = {}", REPAIR_HISTORY, table_id); - co_await _qp.local().query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future { + co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future { repair_history_entry ent; ent.id = tasks::task_id(row.get_as("repair_uuid")); ent.table_uuid = ::table_id(row.get_as("table_uuid")); @@ -3117,7 +3117,7 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history future system_keyspace::increment_and_get_generation() { auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL); - auto rs = co_await _qp.local().execute_internal(req, cql3::query_processor::cache_internal::yes); + auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes); gms::generation_type generation; if (rs->empty() || !rs->one().has("gossip_generation")) { // seconds-since-epoch isn't a foolproof new generation @@ -3137,7 +3137,7 @@ future system_keyspace::increment_and_get_generation() { } } req = format("INSERT INTO system.{} (key, gossip_generation) VALUES ('{}', ?)", LOCAL, LOCAL); - co_await _qp.local().execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes); + co_await _qp.execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes); co_await force_blocking_flush(LOCAL); co_return generation; } @@ -3672,7 +3672,7 @@ future system_keyspace::read_cdc_generation(utils::UUID id) { std::vector entries; auto num_ranges = 0; - co_await _qp.local().query_internal( + co_await _qp.query_internal( format("SELECT range_end, streams, ignore_msb, num_ranges FROM {}.{} WHERE id = ?", NAME, CDC_GENERATIONS_V3), db::consistency_level::ONE, @@ -3740,7 +3740,7 @@ future<> system_keyspace::sstables_registry_list(sstring location, sstable_regis static const auto req = format("SELECT uuid, status, generation, version, format FROM system.{} WHERE location = ?", SSTABLES_REGISTRY); slogger.trace("Listing {} entries from {}", location, SSTABLES_REGISTRY); - co_await _qp.local().query_internal(req, db::consistency_level::ONE, { location }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future { + co_await _qp.query_internal(req, db::consistency_level::ONE, { location }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future { auto uuid = row.get_as("uuid"); auto status = row.get_as("status"); auto gen = sstables::generation_type(row.get_as("generation")); @@ -3756,7 +3756,7 @@ sstring system_keyspace_name() { return system_keyspace::NAME; } -system_keyspace::system_keyspace(sharded& qp, sharded& db) noexcept +system_keyspace::system_keyspace(cql3::query_processor& qp, replica::database& db) noexcept : _qp(qp) , _db(db) , _cache(std::make_unique()) @@ -3767,13 +3767,11 @@ system_keyspace::~system_keyspace() { } future<> system_keyspace::start(const locator::snitch_ptr& snitch) { - assert(_qp.local_is_initialized() && _db.local_is_initialized()); - if (this_shard_id() == 0) { - qctx = std::make_unique(_qp); + qctx = std::make_unique(_qp.container()); } - _db.local().plug_system_keyspace(*this); + _db.plug_system_keyspace(*this); // FIXME // This should be coupled with setup_version()'s part committing these values into @@ -3786,7 +3784,7 @@ future<> system_keyspace::start(const locator::snitch_ptr& snitch) { } future<> system_keyspace::shutdown() { - _db.local().unplug_system_keyspace(); + _db.unplug_system_keyspace(); co_return; } @@ -3795,7 +3793,7 @@ future<> system_keyspace::stop() { } future<::shared_ptr> system_keyspace::execute_cql(const sstring& query_string, const std::initializer_list& values) { - return _qp.local().execute_internal(query_string, values, cql3::query_processor::cache_internal::yes); + return _qp.execute_internal(query_string, values, cql3::query_processor::cache_internal::yes); } } // namespace db diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 7a349e968c..ff955d2680 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -101,8 +101,8 @@ struct compaction_history_entry { }; class system_keyspace : public seastar::peering_sharded_service, public seastar::async_sharded_service { - sharded& _qp; - sharded& _db; + cql3::query_processor& _qp; + replica::database& _db; std::unique_ptr _cache; static schema_ptr raft_snapshot_config(); @@ -488,7 +488,7 @@ public: future get_must_synchronize_topology(); future<> set_must_synchronize_topology(bool); - system_keyspace(sharded& qp, sharded& db) noexcept; + system_keyspace(cql3::query_processor& qp, replica::database& db) noexcept; ~system_keyspace(); future<> start(const locator::snitch_ptr&); future<> stop();