diff --git a/alternator/executor.cc b/alternator/executor.cc index 2c6293fa13..d37fcddcf3 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -828,17 +828,6 @@ future executor::list_tags_of_resource(client_sta return make_ready_future(make_jsonable(std::move(ret))); } -static future<> wait_for_schema_agreement(service::migration_manager& mm, db::timeout_clock::time_point deadline) { - return do_until([&mm, deadline] { - if (db::timeout_clock::now() > deadline) { - throw std::runtime_error("Unable to reach schema agreement"); - } - return mm.have_schema_agreement(); - }, [] { - return seastar::sleep(500ms); - }); -} - static void verify_billing_mode(const rjson::value& request) { // Alternator does not yet support billing or throughput limitations, but // let's verify that BillingMode is at least legal. @@ -1084,7 +1073,7 @@ static future create_table_on_shard0(tracing::tra } co_await mm.announce(std::move(schema_mutations), std::move(group0_guard)); - co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); + co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr); rjson::value status = rjson::empty_object(); executor::supplement_table_info(request, *schema, sp); rjson::add(status, "TableDescription", std::move(request)); @@ -1151,7 +1140,7 @@ future executor::update_table(client_state& clien co_await mm.announce(std::move(m), std::move(group0_guard)); - co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); + co_await mm.wait_for_schema_agreement(p.local().local_db(), db::timeout_clock::now() + 10s, nullptr); rjson::value status = rjson::empty_object(); supplement_table_info(request, *schema, p.local()); diff --git a/auth/common.cc b/auth/common.cc index 00784aa503..7158b99b1f 100644 --- a/auth/common.cc +++ b/auth/common.cc @@ -84,20 +84,6 @@ future<> create_metadata_table_if_missing( return futurize_invoke(create_metadata_table_if_missing_impl, table_name, qp, cql, mm); } -future<> wait_for_schema_agreement(::service::migration_manager& mm, const replica::database& db, seastar::abort_source& as) { - static const auto pause = [] { return sleep(std::chrono::milliseconds(500)); }; - - return do_until([&db, &as] { - as.check(); - return db.get_version() != replica::database::empty_version; - }, pause).then([&mm, &as] { - return do_until([&mm, &as] { - as.check(); - return mm.have_schema_agreement(); - }, pause); - }); -} - ::service::query_state& internal_distributed_query_state() noexcept { #ifdef DEBUG // Give the much slower debug tests more headroom for completing auth queries. diff --git a/auth/common.hh b/auth/common.hh index 071112cc08..bdaa98d930 100644 --- a/auth/common.hh +++ b/auth/common.hh @@ -67,8 +67,6 @@ future<> create_metadata_table_if_missing( std::string_view cql, ::service::migration_manager&) noexcept; -future<> wait_for_schema_agreement(::service::migration_manager&, const replica::database&, seastar::abort_source&); - /// /// Time-outs for internal, non-local CQL queries. /// diff --git a/auth/default_authorizer.cc b/auth/default_authorizer.cc index 5216e14b68..6c71b1772b 100644 --- a/auth/default_authorizer.cc +++ b/auth/default_authorizer.cc @@ -129,7 +129,7 @@ future<> default_authorizer::start() { _migration_manager).then([this] { _finished = do_after_system_ready(_as, [this] { return async([this] { - wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); + _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0(); if (legacy_metadata_exists()) { if (!any_granted().get0()) { diff --git a/auth/password_authenticator.cc b/auth/password_authenticator.cc index b5ab045d2f..d6b66b5198 100644 --- a/auth/password_authenticator.cc +++ b/auth/password_authenticator.cc @@ -132,7 +132,7 @@ future<> password_authenticator::start() { _stopped = do_after_system_ready(_as, [this] { return async([this] { - wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); + _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0(); if (any_nondefault_role_row_satisfies(_qp, &has_salted_hash).get0()) { if (legacy_metadata_exists()) { diff --git a/auth/standard_role_manager.cc b/auth/standard_role_manager.cc index 3706be66e3..eaf4bf6e5f 100644 --- a/auth/standard_role_manager.cc +++ b/auth/standard_role_manager.cc @@ -28,6 +28,7 @@ #include "log.hh" #include "utils/class_registrator.hh" #include "replica/database.hh" +#include "service/migration_manager.hh" namespace auth { @@ -232,7 +233,7 @@ future<> standard_role_manager::start() { return this->create_metadata_tables_if_missing().then([this] { _stopped = auth::do_after_system_ready(_as, [this] { return seastar::async([this] { - wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); + _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0(); if (any_nondefault_role_row_satisfies(_qp, &has_can_login).get0()) { if (this->legacy_metadata_exists()) { diff --git a/db/view/view.cc b/db/view/view.cc index 20588eed9e..d3e715e18d 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1794,9 +1794,8 @@ future<> view_builder::start(service::migration_manager& mm) { // or `on_update_view` events. auto units = get_units(_sem, 1).get0(); // Wait for schema agreement even if we're a seed node. - while (!mm.have_schema_agreement()) { - seastar::sleep_abortable(500ms, _as).get(); - } + mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as).get(); + auto built = _sys_ks.load_built_views().get0(); auto in_progress = _sys_ks.load_view_build_progress().get0(); setup_shard_build_step(vbi, std::move(built), std::move(in_progress)); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 149455b42f..3ebcc33a15 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -239,6 +239,18 @@ bool migration_manager::have_schema_agreement() { return match; } +future<> migration_manager::wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as) { + while (db.get_version() == replica::database::empty_version || !have_schema_agreement()) { + if (as) { + as->check(); + } + if (db::timeout_clock::now() > deadline) { + throw std::runtime_error("Unable to reach schema agreement"); + } + co_await (as ? sleep_abortable(std::chrono::milliseconds(500), *as) : sleep(std::chrono::milliseconds(500))); + } +} + /** * If versions differ this node sends request with local migration list to the endpoint * and expecting to receive a list of migrations to apply locally. diff --git a/service/migration_manager.hh b/service/migration_manager.hh index b0798b50c9..5778873e54 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -172,6 +172,7 @@ public: * Known peers in the cluster have the same schema version as us. */ bool have_schema_agreement(); + future<> wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as); void init_messaging_service();