From ac2e2f8883b5d88e679fd3399211cd85bf6cba25 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 1 Dec 2022 22:04:51 +0200 Subject: [PATCH] view: coroutinize maybe_mark_view_as_built Somewhat simplifies complicated logic. --- db/view/view.cc | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 4d4ea7f888..5ab57e7fe7 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -26,6 +26,7 @@ #include #include #include +#include #include "replica/database.hh" #include "clustering_bounds_comparator.hh" @@ -2305,38 +2306,41 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) { future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_token) { _built_views.emplace(view->id()); vlogger.debug("Shard finished building view {}.{}", view->ks_name(), view->cf_name()); - return container().map_reduce0( + bool built = co_await container().map_reduce0( [view_id = view->id()] (view_builder& builder) { return builder._built_views.contains(view_id); }, true, [] (bool result, bool shard_complete) { return result && shard_complete; - }).then([this, view, next_token = std::move(next_token)] (bool built) { + }); + { if (built) { inject_failure("view_builder_mark_view_as_built"); - return container().invoke_on_all([view_id = view->id()] (view_builder& builder) { + co_await container().invoke_on_all(coroutine::lambda([view_id = view->id()] (view_builder& builder) -> future<> { if (builder._built_views.erase(view_id) == 0 || this_shard_id() != 0) { - return make_ready_future<>(); + co_return; } auto view = builder._db.find_schema(view_id); vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name()); - return seastar::when_all_succeed( - system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()), - builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then_unpack([view] { + co_await coroutine::all( + [&] { return system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()); }, + [&] { return builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name()); } + ); + { // The view is built, so shard 0 can remove the entry in the build progress system table on // behalf of all shards. It is guaranteed to have a higher timestamp than the per-shard entries. - return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); - }).then([&builder, view] { + co_await system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); + auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name())); if (it != builder._build_notifiers.end()) { it->second.set_value(); } - }); - }); + } + })); } - return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token); - }); + co_await system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token); + } } future<> view_builder::wait_until_built(const sstring& ks_name, const sstring& view_name) {