Merge 'schema_tables: limit concurrency' from Benny Halevy
To prevent stalls due to large number of tables. Fixes scylladb/scylladb#11574 Closes #11689 * github.com:scylladb/scylladb: schema_tables: merge_tables_and_views reindent schema_tables: limit paralellism
This commit is contained in:
@@ -45,6 +45,7 @@
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/core/loop.hh>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
@@ -1087,6 +1088,13 @@ future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema
|
||||
co_await proxy.local().mutate_locally(std::move(muts), tracing::trace_state_ptr());
|
||||
}
|
||||
|
||||
// Limit concurrency of user tables to prevent stalls.
|
||||
// See https://github.com/scylladb/scylladb/issues/11574
|
||||
// Note: we aim at providing enough concurrency to utilize
|
||||
// the cpu while operations are blocked on disk I/O
|
||||
// and or filesystem calls, e.g. fsync.
|
||||
constexpr size_t max_concurrent = 8;
|
||||
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
|
||||
{
|
||||
slogger.trace("do_merge_schema: {}", mutations);
|
||||
@@ -1118,7 +1126,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
|
||||
if (do_flush) {
|
||||
auto& db = proxy.local().get_db();
|
||||
co_await coroutine::parallel_for_each(column_families, [&db] (const table_id& id) -> future<> {
|
||||
co_await max_concurrent_for_each(column_families, max_concurrent, [&db] (const table_id& id) -> future<> {
|
||||
return replica::database::flush_table_on_all_shards(db, id);
|
||||
});
|
||||
}
|
||||
@@ -1327,11 +1335,11 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
|
||||
auto& db = proxy.local().get_db();
|
||||
auto ts = db_clock::now();
|
||||
co_await coroutine::parallel_for_each(views_diff.dropped, [&db, ts] (schema_diff::dropped_schema& dt) {
|
||||
co_await max_concurrent_for_each(views_diff.dropped, max_concurrent, [&db, ts] (schema_diff::dropped_schema& dt) {
|
||||
auto& s = *dt.schema.get();
|
||||
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name());
|
||||
});
|
||||
co_await coroutine::parallel_for_each(tables_diff.dropped, [&db, ts] (schema_diff::dropped_schema& dt) -> future<> {
|
||||
co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&db, ts] (schema_diff::dropped_schema& dt) -> future<> {
|
||||
auto& s = *dt.schema.get();
|
||||
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name());
|
||||
});
|
||||
@@ -1339,25 +1347,26 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
// In order to avoid possible races we first create the tables and only then the views.
|
||||
// That way if a view seeks information about its base table it's guarantied to find it.
|
||||
co_await coroutine::parallel_for_each(tables_diff.created, [&] (global_schema_ptr& gs) -> future<> {
|
||||
co_await max_concurrent_for_each(tables_diff.created, max_concurrent, [&] (global_schema_ptr& gs) -> future<> {
|
||||
co_await db.add_column_family_and_make_directory(gs);
|
||||
});
|
||||
co_await coroutine::parallel_for_each(views_diff.created, [&] (global_schema_ptr& gs) -> future<> {
|
||||
co_await max_concurrent_for_each(views_diff.created, max_concurrent, [&] (global_schema_ptr& gs) -> future<> {
|
||||
co_await db.add_column_family_and_make_directory(gs);
|
||||
});
|
||||
for (auto&& gs : boost::range::join(tables_diff.created, views_diff.created)) {
|
||||
db.find_column_family(gs).mark_ready_for_writes();
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
std::vector<bool> columns_changed;
|
||||
columns_changed.reserve(tables_diff.altered.size() + views_diff.altered.size());
|
||||
for (auto&& altered : boost::range::join(tables_diff.altered, views_diff.altered)) {
|
||||
columns_changed.push_back(db.update_column_family(altered.new_schema));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
auto it = columns_changed.begin();
|
||||
auto notify = [&] (auto& r, auto&& f) -> future<> {
|
||||
auto notifications = r | boost::adaptors::transformed(f);
|
||||
co_await when_all(notifications.begin(), notifications.end());
|
||||
co_await max_concurrent_for_each(r, max_concurrent, std::move(f));
|
||||
};
|
||||
// View drops are notified first, because a table can only be dropped if its views are already deleted
|
||||
co_await notify(views_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_view(view_ptr(dt.schema)); });
|
||||
@@ -1380,20 +1389,18 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
//
|
||||
// Drop column mapping entries for dropped tables since these will not be TTLed automatically
|
||||
// and will stay there forever if we don't clean them up manually
|
||||
co_await when_all_succeed(
|
||||
parallel_for_each(tables_diff.created, [&proxy] (global_schema_ptr& gs) -> future<> {
|
||||
co_await store_column_mapping(proxy, gs.get(), false);
|
||||
}),
|
||||
parallel_for_each(tables_diff.altered, [&proxy] (schema_diff::altered_schema& altered) -> future<> {
|
||||
co_await when_all_succeed(
|
||||
store_column_mapping(proxy, altered.old_schema.get(), true),
|
||||
store_column_mapping(proxy, altered.new_schema.get(), false));
|
||||
}),
|
||||
parallel_for_each(tables_diff.dropped, [&proxy] (schema_diff::dropped_schema& dropped) -> future<> {
|
||||
schema_ptr s = dropped.schema.get();
|
||||
co_await drop_column_mapping(s->id(), s->version());
|
||||
})
|
||||
);
|
||||
co_await max_concurrent_for_each(tables_diff.created, max_concurrent, [&proxy] (global_schema_ptr& gs) -> future<> {
|
||||
co_await store_column_mapping(proxy, gs.get(), false);
|
||||
});
|
||||
co_await max_concurrent_for_each(tables_diff.altered, max_concurrent, [&proxy] (schema_diff::altered_schema& altered) -> future<> {
|
||||
co_await when_all_succeed(
|
||||
store_column_mapping(proxy, altered.old_schema.get(), true),
|
||||
store_column_mapping(proxy, altered.new_schema.get(), false));
|
||||
});
|
||||
co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&proxy] (schema_diff::dropped_schema& dropped) -> future<> {
|
||||
schema_ptr s = dropped.schema.get();
|
||||
co_await drop_column_mapping(s->id(), s->version());
|
||||
});
|
||||
}
|
||||
|
||||
static std::vector<const query::result_set_row*> collect_rows(const std::set<sstring>& keys, const schema_result& result) {
|
||||
@@ -2691,7 +2698,7 @@ future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& p
|
||||
future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
|
||||
{
|
||||
auto tables = std::map<sstring, schema_ptr>();
|
||||
co_await coroutine::parallel_for_each(result->rows().begin(), result->rows().end(), [&] (const query::result_set_row& row) -> future<> {
|
||||
co_await max_concurrent_for_each(result->rows().begin(), result->rows().end(), max_concurrent, [&] (const query::result_set_row& row) -> future<> {
|
||||
schema_ptr cfm = co_await create_table_from_table_row(proxy, row);
|
||||
tables.emplace(cfm->cf_name(), std::move(cfm));
|
||||
});
|
||||
@@ -3190,7 +3197,7 @@ static future<view_ptr> create_view_from_table_row(distributed<service::storage_
|
||||
future<std::vector<view_ptr>> create_views_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
|
||||
{
|
||||
std::vector<view_ptr> views;
|
||||
co_await coroutine::parallel_for_each(result->rows().begin(), result->rows().end(), [&] (auto&& row) -> future<> {
|
||||
co_await max_concurrent_for_each(result->rows().begin(), result->rows().end(), max_concurrent, [&] (auto&& row) -> future<> {
|
||||
auto v = co_await create_view_from_table_row(proxy, row);
|
||||
views.push_back(std::move(v));
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user