sstables, code: Wrap directory semaphore with concurrency

Currently this is a sharded<semaphore> started/stopped in main and
referenced by database in order to be fed into sstables code. This
semaphore always comes with the "concurrency" parameter that limits the
parallel_for_each parallelizm.

This patch wraps both together into directory_semaphore class. This
makes its usage simpler and will allow extending it in the future.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-11-28 19:01:39 +03:00
parent 02b66bb31a
commit be8512d7cc
9 changed files with 33 additions and 22 deletions

View File

@@ -513,7 +513,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sharded<netw::messaging_service> messaging;
sharded<cql3::query_processor> qp;
sharded<db::batchlog_manager> bm;
sharded<semaphore> sst_dir_semaphore;
sharded<sstables::directory_semaphore> sst_dir_semaphore;
sharded<service::raft_group_registry> raft_gr;
sharded<service::memory_limiter> service_memory_limiter;
sharded<repair_service> repair;

View File

@@ -307,7 +307,7 @@ public:
};
database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
compaction_manager& cm, sharded<semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier)
compaction_manager& cm, sharded<sstables::directory_semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier)
: _stats(make_lw_shared<db_stats>())
, _user_types(std::make_shared<db_user_types_storage>(*this))
, _cl_stats(std::make_unique<cell_locker_stats>())

View File

@@ -91,6 +91,7 @@ class compaction_completion_desc;
class sstables_manager;
class compaction_data;
class sstable_set;
class directory_semaphore;
}
@@ -1345,7 +1346,7 @@ private:
std::vector<std::any> _listeners;
const locator::shared_token_metadata& _shared_token_metadata;
sharded<semaphore>& _sst_dir_semaphore;
sharded<sstables::directory_semaphore>& _sst_dir_semaphore;
std::unique_ptr<wasm::engine> _wasm_engine;
utils::cross_shard_barrier _stop_barrier;
@@ -1429,7 +1430,7 @@ public:
future<> parse_system_tables(distributed<service::storage_proxy>&, sharded<db::system_keyspace>&);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm,
compaction_manager& cm, sharded<semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
compaction_manager& cm, sharded<sstables::directory_semaphore>& sst_dir_sem, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */);
database(database&&) = delete;
~database();
@@ -1702,7 +1703,7 @@ public:
bool is_internal_query() const;
sharded<semaphore>& get_sharded_sst_dir_semaphore() {
sharded<sstables::directory_semaphore>& get_sharded_sst_dir_semaphore() {
return _sst_dir_semaphore;
}

View File

@@ -312,7 +312,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
sharded<sstables::sstable_directory> directory;
auto upload = fs::path(global_table->dir()) / sstables::upload_dir;
directory.start(upload, service::get_local_streaming_priority(),
db.local().get_config().initial_sstable_loading_concurrency(), std::ref(db.local().get_sharded_sst_dir_semaphore()),
std::ref(db.local().get_sharded_sst_dir_semaphore()),
[&global_table] (fs::path dir, sstables::generation_type gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return global_table->make_sstable(dir.native(), gen, v, f, &error_handler_gen_for_upload_dir);
@@ -380,7 +380,7 @@ distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>&
auto upload = fs::path(global_table->dir()) / sstables::upload_dir;
directory.start(upload, service::get_local_streaming_priority(),
db.local().get_config().initial_sstable_loading_concurrency(), std::ref(db.local().get_sharded_sst_dir_semaphore()),
std::ref(db.local().get_sharded_sst_dir_semaphore()),
[&global_table] (fs::path dir, sstables::generation_type gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return global_table->make_sstable(dir.native(), gen, v, f, &error_handler_gen_for_upload_dir);
@@ -549,7 +549,7 @@ future<> table_population_metadata::start_subdir(sstring subdir) {
auto& global_table = _global_table;
auto& db = _db;
co_await directory.start(fs::path(sstdir), default_priority_class(),
db.local().get_config().initial_sstable_loading_concurrency(), std::ref(db.local().get_sharded_sst_dir_semaphore()),
std::ref(db.local().get_sharded_sst_dir_semaphore()),
[&global_table] (fs::path dir, sstables::generation_type gen, sstables::sstable_version_types v, sstables::sstable_format_types f) {
return global_table->make_sstable(dir.native(), gen, v, f);
});

View File

@@ -1497,9 +1497,9 @@ future<table::snapshot_file_set> table::take_snapshot(database& db, sstring json
auto table_names = std::make_unique<std::unordered_set<sstring>>();
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
co_await max_concurrent_for_each(tables, db.get_config().initial_sstable_loading_concurrency(), [&db, &jsondir, &table_names] (sstables::shared_sstable sstable) {
co_await max_concurrent_for_each(tables, db.get_sharded_sst_dir_semaphore().local()._concurrency, [&db, &jsondir, &table_names] (sstables::shared_sstable sstable) {
table_names->insert(sstable->component_basename(sstables::component_type::Data));
return with_semaphore(db.get_sharded_sst_dir_semaphore().local(), 1, [&jsondir, sstable] {
return with_semaphore(db.get_sharded_sst_dir_semaphore().local()._sem, 1, [&jsondir, sstable] {
return io_check([sstable, &dir = jsondir] {
return sstable->create_links(dir);
});

View File

@@ -33,12 +33,10 @@ bool manifest_json_filter(const fs::path&, const directory_entry& entry) {
sstable_directory::sstable_directory(fs::path sstable_dir,
::io_priority_class io_prio,
unsigned load_parallelism,
semaphore& load_semaphore,
directory_semaphore& load_semaphore,
sstable_object_from_existing_fn sstable_from_existing)
: _sstable_dir(std::move(sstable_dir))
, _io_priority(std::move(io_prio))
, _load_parallelism(load_parallelism)
, _load_semaphore(load_semaphore)
, _sstable_object_from_existing_sstable(std::move(sstable_from_existing))
, _unshared_remote_sstables(smp::count)
@@ -439,8 +437,8 @@ template <typename Container, typename Func>
future<>
sstable_directory::parallel_for_each_restricted(Container&& C, Func&& func) {
return do_with(std::move(C), std::move(func), [this] (Container& c, Func& func) mutable {
return max_concurrent_for_each(c, _load_parallelism, [this, &func] (auto& el) mutable {
return with_semaphore(_load_semaphore, 1, [this, &func, el = std::move(el)] () mutable {
return max_concurrent_for_each(c, _load_semaphore._concurrency, [this, &func] (auto& el) mutable {
return with_semaphore(_load_semaphore._sem, 1, [this, &func, el = std::move(el)] () mutable {
return func(el);
});
});

View File

@@ -30,6 +30,20 @@ namespace sstables {
bool manifest_json_filter(const std::filesystem::path&, const directory_entry& entry);
class directory_semaphore {
unsigned _concurrency;
semaphore _sem;
public:
directory_semaphore(unsigned concurrency)
: _concurrency(concurrency)
, _sem(concurrency)
{
}
friend class sstable_directory;
friend class ::replica::table; // FIXME table snapshots should switch to sstable_directory
};
// Handles a directory containing SSTables. It could be an auxiliary directory (like upload),
// or the main directory.
class sstable_directory {
@@ -79,8 +93,7 @@ private:
// We may have hundreds of thousands of files to load. To protect against OOMs we will limit
// how many of them we process at the same time.
const size_t _load_parallelism;
semaphore& _load_semaphore;
directory_semaphore& _load_semaphore;
// How to create an SSTable object from an existing SSTable file (respecting generation, etc)
sstable_object_from_existing_fn _sstable_object_from_existing_sstable;
@@ -122,8 +135,7 @@ private:
public:
sstable_directory(std::filesystem::path sstable_dir,
::io_priority_class io_prio,
unsigned load_parallelism,
semaphore& load_semaphore,
directory_semaphore& load_semaphore,
sstable_object_from_existing_fn sstable_from_existing);
std::vector<sstables::shared_sstable>& get_unsorted_sstables() {

View File

@@ -135,7 +135,7 @@ static void with_sstable_directory(
sstable_directory::sstable_object_from_existing_fn sstable_from_existing,
noncopyable_function<void (sharded<sstable_directory>&)> func) {
sharded<semaphore> sstdir_sem;
sharded<sstables::directory_semaphore> sstdir_sem;
sstdir_sem.start(load_parallelism).get();
auto stop_sstdir_sem = defer([&sstdir_sem] {
sstdir_sem.stop().get();
@@ -153,7 +153,7 @@ static void with_sstable_directory(
return sstable_from_existing(std::move(dir), gen, v, f);
};
sstdir.start(std::move(path), default_priority_class(), load_parallelism, std::ref(sstdir_sem), std::move(wrapped_sfe)).get();
sstdir.start(std::move(path), default_priority_class(), std::ref(sstdir_sem), std::move(wrapped_sfe)).get();
func(sstdir);
}

View File

@@ -652,7 +652,7 @@ public:
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(ms), std::ref(mm), std::ref(gossiper)).get();
auto stop_streaming = defer([&stream_manager] { stream_manager.stop().get(); });
sharded<semaphore> sst_dir_semaphore;
sharded<sstables::directory_semaphore> sst_dir_semaphore;
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
auto stop_sst_dir_sem = defer([&sst_dir_semaphore] {
sst_dir_semaphore.stop().get();