diff --git a/db/schema_tables.cc b/db/schema_tables.cc index df3212a28c..d406e1d772 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -45,6 +45,7 @@ #include #include #include +#include #include #include @@ -1087,6 +1088,13 @@ future<> store_column_mapping(distributed& proxy, schema co_await proxy.local().mutate_locally(std::move(muts), tracing::trace_state_ptr()); } +// Limit concurrency of user tables to prevent stalls. +// See https://github.com/scylladb/scylladb/issues/11574 +// Note: we aim at providing enough concurrency to utilize +// the cpu while operations are blocked on disk I/O +// and or filesystem calls, e.g. fsync. +constexpr size_t max_concurrent = 8; + static future<> do_merge_schema(distributed& proxy, std::vector mutations, bool do_flush) { slogger.trace("do_merge_schema: {}", mutations); @@ -1118,7 +1126,7 @@ static future<> do_merge_schema(distributed& proxy, std: if (do_flush) { auto& db = proxy.local().get_db(); - co_await coroutine::parallel_for_each(column_families, [&db] (const table_id& id) -> future<> { + co_await max_concurrent_for_each(column_families, max_concurrent, [&db] (const table_id& id) -> future<> { return replica::database::flush_table_on_all_shards(db, id); }); } @@ -1327,11 +1335,11 @@ static future<> merge_tables_and_views(distributed& prox // was already dropped (see https://github.com/scylladb/scylla/issues/5614) auto& db = proxy.local().get_db(); auto ts = db_clock::now(); - co_await coroutine::parallel_for_each(views_diff.dropped, [&db, ts] (schema_diff::dropped_schema& dt) { + co_await max_concurrent_for_each(views_diff.dropped, max_concurrent, [&db, ts] (schema_diff::dropped_schema& dt) { auto& s = *dt.schema.get(); return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name()); }); - co_await coroutine::parallel_for_each(tables_diff.dropped, [&db, ts] (schema_diff::dropped_schema& dt) -> future<> { + co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&db, ts] (schema_diff::dropped_schema& dt) -> future<> { auto& s = *dt.schema.get(); return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name()); }); @@ -1339,25 +1347,26 @@ static future<> merge_tables_and_views(distributed& prox co_await proxy.local().get_db().invoke_on_all([&] (replica::database& db) -> future<> { // In order to avoid possible races we first create the tables and only then the views. // That way if a view seeks information about its base table it's guarantied to find it. - co_await coroutine::parallel_for_each(tables_diff.created, [&] (global_schema_ptr& gs) -> future<> { + co_await max_concurrent_for_each(tables_diff.created, max_concurrent, [&] (global_schema_ptr& gs) -> future<> { co_await db.add_column_family_and_make_directory(gs); }); - co_await coroutine::parallel_for_each(views_diff.created, [&] (global_schema_ptr& gs) -> future<> { + co_await max_concurrent_for_each(views_diff.created, max_concurrent, [&] (global_schema_ptr& gs) -> future<> { co_await db.add_column_family_and_make_directory(gs); }); for (auto&& gs : boost::range::join(tables_diff.created, views_diff.created)) { db.find_column_family(gs).mark_ready_for_writes(); + co_await coroutine::maybe_yield(); } std::vector columns_changed; columns_changed.reserve(tables_diff.altered.size() + views_diff.altered.size()); for (auto&& altered : boost::range::join(tables_diff.altered, views_diff.altered)) { columns_changed.push_back(db.update_column_family(altered.new_schema)); + co_await coroutine::maybe_yield(); } auto it = columns_changed.begin(); auto notify = [&] (auto& r, auto&& f) -> future<> { - auto notifications = r | boost::adaptors::transformed(f); - co_await when_all(notifications.begin(), notifications.end()); + co_await max_concurrent_for_each(r, max_concurrent, std::move(f)); }; // View drops are notified first, because a table can only be dropped if its views are already deleted co_await notify(views_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_view(view_ptr(dt.schema)); }); @@ -1380,20 +1389,18 @@ static future<> merge_tables_and_views(distributed& prox // // Drop column mapping entries for dropped tables since these will not be TTLed automatically // and will stay there forever if we don't clean them up manually - co_await when_all_succeed( - parallel_for_each(tables_diff.created, [&proxy] (global_schema_ptr& gs) -> future<> { - co_await store_column_mapping(proxy, gs.get(), false); - }), - parallel_for_each(tables_diff.altered, [&proxy] (schema_diff::altered_schema& altered) -> future<> { - co_await when_all_succeed( - store_column_mapping(proxy, altered.old_schema.get(), true), - store_column_mapping(proxy, altered.new_schema.get(), false)); - }), - parallel_for_each(tables_diff.dropped, [&proxy] (schema_diff::dropped_schema& dropped) -> future<> { - schema_ptr s = dropped.schema.get(); - co_await drop_column_mapping(s->id(), s->version()); - }) - ); + co_await max_concurrent_for_each(tables_diff.created, max_concurrent, [&proxy] (global_schema_ptr& gs) -> future<> { + co_await store_column_mapping(proxy, gs.get(), false); + }); + co_await max_concurrent_for_each(tables_diff.altered, max_concurrent, [&proxy] (schema_diff::altered_schema& altered) -> future<> { + co_await when_all_succeed( + store_column_mapping(proxy, altered.old_schema.get(), true), + store_column_mapping(proxy, altered.new_schema.get(), false)); + }); + co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&proxy] (schema_diff::dropped_schema& dropped) -> future<> { + schema_ptr s = dropped.schema.get(); + co_await drop_column_mapping(s->id(), s->version()); + }); } static std::vector collect_rows(const std::set& keys, const schema_result& result) { @@ -2691,7 +2698,7 @@ future create_table_from_name(distributed& p future> create_tables_from_tables_partition(distributed& proxy, const schema_result::mapped_type& result) { auto tables = std::map(); - co_await coroutine::parallel_for_each(result->rows().begin(), result->rows().end(), [&] (const query::result_set_row& row) -> future<> { + co_await max_concurrent_for_each(result->rows().begin(), result->rows().end(), max_concurrent, [&] (const query::result_set_row& row) -> future<> { schema_ptr cfm = co_await create_table_from_table_row(proxy, row); tables.emplace(cfm->cf_name(), std::move(cfm)); }); @@ -3190,7 +3197,7 @@ static future create_view_from_table_row(distributed> create_views_from_schema_partition(distributed& proxy, const schema_result::mapped_type& result) { std::vector views; - co_await coroutine::parallel_for_each(result->rows().begin(), result->rows().end(), [&] (auto&& row) -> future<> { + co_await max_concurrent_for_each(result->rows().begin(), result->rows().end(), max_concurrent, [&] (auto&& row) -> future<> { auto v = co_await create_view_from_table_row(proxy, row); views.push_back(std::move(v)); });