migration_manager: add wait_for_schema_agreement() function

Several subsystems re-implement the same logic for waiting for schema
agreement. Provide the function in the migration_manager and use it
instead.
This commit is contained in:
Gleb Natapov
2023-05-24 17:10:26 +03:00
parent 5aea6938ae
commit a429018a8a
9 changed files with 21 additions and 35 deletions

View File

@@ -828,17 +828,6 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret))); return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
} }
static future<> wait_for_schema_agreement(service::migration_manager& mm, db::timeout_clock::time_point deadline) {
return do_until([&mm, deadline] {
if (db::timeout_clock::now() > deadline) {
throw std::runtime_error("Unable to reach schema agreement");
}
return mm.have_schema_agreement();
}, [] {
return seastar::sleep(500ms);
});
}
static void verify_billing_mode(const rjson::value& request) { static void verify_billing_mode(const rjson::value& request) {
// Alternator does not yet support billing or throughput limitations, but // Alternator does not yet support billing or throughput limitations, but
// let's verify that BillingMode is at least legal. // let's verify that BillingMode is at least legal.
@@ -1084,7 +1073,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
} }
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard)); co_await mm.announce(std::move(schema_mutations), std::move(group0_guard));
co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
rjson::value status = rjson::empty_object(); rjson::value status = rjson::empty_object();
executor::supplement_table_info(request, *schema, sp); executor::supplement_table_info(request, *schema, sp);
rjson::add(status, "TableDescription", std::move(request)); rjson::add(status, "TableDescription", std::move(request));
@@ -1151,7 +1140,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
co_await mm.announce(std::move(m), std::move(group0_guard)); co_await mm.announce(std::move(m), std::move(group0_guard));
co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); co_await mm.wait_for_schema_agreement(p.local().local_db(), db::timeout_clock::now() + 10s, nullptr);
rjson::value status = rjson::empty_object(); rjson::value status = rjson::empty_object();
supplement_table_info(request, *schema, p.local()); supplement_table_info(request, *schema, p.local());

View File

@@ -84,20 +84,6 @@ future<> create_metadata_table_if_missing(
return futurize_invoke(create_metadata_table_if_missing_impl, table_name, qp, cql, mm); return futurize_invoke(create_metadata_table_if_missing_impl, table_name, qp, cql, mm);
} }
future<> wait_for_schema_agreement(::service::migration_manager& mm, const replica::database& db, seastar::abort_source& as) {
static const auto pause = [] { return sleep(std::chrono::milliseconds(500)); };
return do_until([&db, &as] {
as.check();
return db.get_version() != replica::database::empty_version;
}, pause).then([&mm, &as] {
return do_until([&mm, &as] {
as.check();
return mm.have_schema_agreement();
}, pause);
});
}
::service::query_state& internal_distributed_query_state() noexcept { ::service::query_state& internal_distributed_query_state() noexcept {
#ifdef DEBUG #ifdef DEBUG
// Give the much slower debug tests more headroom for completing auth queries. // Give the much slower debug tests more headroom for completing auth queries.

View File

@@ -67,8 +67,6 @@ future<> create_metadata_table_if_missing(
std::string_view cql, std::string_view cql,
::service::migration_manager&) noexcept; ::service::migration_manager&) noexcept;
future<> wait_for_schema_agreement(::service::migration_manager&, const replica::database&, seastar::abort_source&);
/// ///
/// Time-outs for internal, non-local CQL queries. /// Time-outs for internal, non-local CQL queries.
/// ///

View File

@@ -129,7 +129,7 @@ future<> default_authorizer::start() {
_migration_manager).then([this] { _migration_manager).then([this] {
_finished = do_after_system_ready(_as, [this] { _finished = do_after_system_ready(_as, [this] {
return async([this] { return async([this] {
wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0();
if (legacy_metadata_exists()) { if (legacy_metadata_exists()) {
if (!any_granted().get0()) { if (!any_granted().get0()) {

View File

@@ -132,7 +132,7 @@ future<> password_authenticator::start() {
_stopped = do_after_system_ready(_as, [this] { _stopped = do_after_system_ready(_as, [this] {
return async([this] { return async([this] {
wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0();
if (any_nondefault_role_row_satisfies(_qp, &has_salted_hash).get0()) { if (any_nondefault_role_row_satisfies(_qp, &has_salted_hash).get0()) {
if (legacy_metadata_exists()) { if (legacy_metadata_exists()) {

View File

@@ -28,6 +28,7 @@
#include "log.hh" #include "log.hh"
#include "utils/class_registrator.hh" #include "utils/class_registrator.hh"
#include "replica/database.hh" #include "replica/database.hh"
#include "service/migration_manager.hh"
namespace auth { namespace auth {
@@ -232,7 +233,7 @@ future<> standard_role_manager::start() {
return this->create_metadata_tables_if_missing().then([this] { return this->create_metadata_tables_if_missing().then([this] {
_stopped = auth::do_after_system_ready(_as, [this] { _stopped = auth::do_after_system_ready(_as, [this] {
return seastar::async([this] { return seastar::async([this] {
wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0();
if (any_nondefault_role_row_satisfies(_qp, &has_can_login).get0()) { if (any_nondefault_role_row_satisfies(_qp, &has_can_login).get0()) {
if (this->legacy_metadata_exists()) { if (this->legacy_metadata_exists()) {

View File

@@ -1794,9 +1794,8 @@ future<> view_builder::start(service::migration_manager& mm) {
// or `on_update_view` events. // or `on_update_view` events.
auto units = get_units(_sem, 1).get0(); auto units = get_units(_sem, 1).get0();
// Wait for schema agreement even if we're a seed node. // Wait for schema agreement even if we're a seed node.
while (!mm.have_schema_agreement()) { mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as).get();
seastar::sleep_abortable(500ms, _as).get();
}
auto built = _sys_ks.load_built_views().get0(); auto built = _sys_ks.load_built_views().get0();
auto in_progress = _sys_ks.load_view_build_progress().get0(); auto in_progress = _sys_ks.load_view_build_progress().get0();
setup_shard_build_step(vbi, std::move(built), std::move(in_progress)); setup_shard_build_step(vbi, std::move(built), std::move(in_progress));

View File

@@ -239,6 +239,18 @@ bool migration_manager::have_schema_agreement() {
return match; return match;
} }
future<> migration_manager::wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as) {
while (db.get_version() == replica::database::empty_version || !have_schema_agreement()) {
if (as) {
as->check();
}
if (db::timeout_clock::now() > deadline) {
throw std::runtime_error("Unable to reach schema agreement");
}
co_await (as ? sleep_abortable(std::chrono::milliseconds(500), *as) : sleep(std::chrono::milliseconds(500)));
}
}
/** /**
* If versions differ this node sends request with local migration list to the endpoint * If versions differ this node sends request with local migration list to the endpoint
* and expecting to receive a list of migrations to apply locally. * and expecting to receive a list of migrations to apply locally.

View File

@@ -172,6 +172,7 @@ public:
* Known peers in the cluster have the same schema version as us. * Known peers in the cluster have the same schema version as us.
*/ */
bool have_schema_agreement(); bool have_schema_agreement();
future<> wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as);
void init_messaging_service(); void init_messaging_service();