Merge 'Keep sstables garbage collection in sstable_directory' from Pavel Emelyanov

Currently temporary directories with incomplete sstables and pending deletion log are processed by distributed loader on start. That's not nice, because for s3 backed sstables this code makes no sense (and is currently a no-op because of incomplete implementation). This garbage collecting should be kept in sstable_directory where it can off-load this work onto lister component that is storage-aware.

Once g.c. code moved, it allows to clean the class sstable list of static helpers a bit.

refs: #13024
refs: #13020
refs: #12707

Closes #13767

* github.com:scylladb/scylladb:
  sstable: Toss tempdir extension usage
  sstable: Drop pending_delete_dir_basename()
  sstable: Drop is_pending_delete_dir() helper
  sstable_directory: Make garbage_collect() non-static
  sstable_directory: Move deletion log exists check
  distributed_loader: Move garbage collecting into sstable_directory
  distributed_loader: Collect garbace collecting in one call
  sstable: Coroutinize remove_temp_dir()
  sstable: Coroutinize touch_temp_dir()
  sstable: Use storage::temp_dir instead of hand-crafted path
This commit is contained in:
Botond Dénes
2023-05-19 08:50:13 +03:00
7 changed files with 87 additions and 93 deletions

View File

@@ -90,6 +90,12 @@ distributed_loader::process_sstable_dir(sharded<sstables::sstable_directory>& di
return utils::directories::verify_owner_and_mode(d.sstable_dir());
});
if (flags.garbage_collect) {
co_await dir.invoke_on(0, [] (sstables::sstable_directory& d) {
return d.garbage_collect();
});
}
co_await dir.invoke_on_all([&dir, flags] (sstables::sstable_directory& d) -> future<> {
// Supposed to be called with the node either down or on behalf of maintenance tasks
// like nodetool refresh
@@ -530,48 +536,6 @@ distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>&
});
}
future<> distributed_loader::cleanup_column_family_temp_sst_dirs(sstring sstdir) {
std::vector<future<>> futures;
co_await lister::scan_dir(sstdir, lister::dir_entry_types::of<directory_entry_type::directory>(), [&] (fs::path sstdir, directory_entry de) {
// push futures that remove files/directories into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
fs::path dirpath = sstdir / de.name;
if (sstables::sstable::is_temp_dir(dirpath)) {
dblog.info("Found temporary sstable directory: {}, removing", dirpath);
futures.push_back(io_check([dirpath = std::move(dirpath)] () { return lister::rmdir(dirpath); }));
}
return make_ready_future<>();
});
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
}
future<> distributed_loader::handle_sstables_pending_delete(sstring pending_delete_dir) {
std::vector<future<>> futures;
co_await lister::scan_dir(pending_delete_dir, lister::dir_entry_types::of<directory_entry_type::regular>(), [&futures] (fs::path dir, directory_entry de) {
// push nested futures that remove files/directories into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
fs::path file_path = dir / de.name;
if (file_path.extension() == ".tmp") {
dblog.info("Found temporary pending_delete log file: {}, deleting", file_path);
futures.push_back(remove_file(file_path.string()));
} else if (file_path.extension() == ".log") {
dblog.info("Found pending_delete log file: {}, replaying", file_path);
auto f = sstables::sstable_directory::replay_pending_delete_log(std::move(file_path));
futures.push_back(std::move(f));
} else {
dblog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path);
}
return make_ready_future<>();
});
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
}
class table_populator {
distributed<replica::database>& _db;
sstring _ks;
@@ -637,14 +601,6 @@ future<> table_populator::start_subdir(sstring subdir) {
co_return;
}
// First pass, cleanup temporary sstable directories and sstables pending delete.
co_await distributed_loader::cleanup_column_family_temp_sst_dirs(sstdir);
auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename();
auto exists = co_await file_exists(pending_delete_dir);
if (exists) {
co_await distributed_loader::handle_sstables_pending_delete(pending_delete_dir);
}
auto dptr = make_lw_shared<sharded<sstables::sstable_directory>>();
auto& directory = *dptr;
auto& global_table = _global_table;
@@ -666,6 +622,7 @@ future<> table_populator::start_subdir(sstring subdir) {
.throw_on_missing_toc = true,
.enable_dangerous_direct_import_of_cassandra_counters = db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters(),
.allow_loading_materialized_view = true,
.garbage_collect = true,
};
co_await distributed_loader::process_sstable_dir(directory, flags);

View File

@@ -76,8 +76,6 @@ class distributed_loader {
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
bool needs_view_update, sstring ks, sstring cf);
static future<> populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name);
static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir);
static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);
public:
static future<> init_system_keyspace(sharded<db::system_keyspace>& sys_ks, distributed<replica::database>& db, distributed<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<service::raft_group_registry>& raft_gr, db::config& cfg, system_table_load_phase phase);

View File

@@ -498,7 +498,7 @@ future<> sstable_directory::delete_with_pending_deletion_log(std::vector<shared_
}
}
sstring pending_delete_dir = first->_storage->prefix() + "/" + sstable::pending_delete_dir_basename();
sstring pending_delete_dir = first->_storage->prefix() + "/" + sstables::pending_delete_dir;
sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max());
sstring tmp_pending_delete_log = pending_delete_log + ".tmp";
sstlog.trace("Writing {}", tmp_pending_delete_log);
@@ -553,7 +553,6 @@ future<> sstable_directory::delete_with_pending_deletion_log(std::vector<shared_
future<> sstable_directory::replay_pending_delete_log(fs::path pending_delete_log) {
sstlog.debug("Reading pending_deletes log file {}", pending_delete_log);
fs::path pending_delete_dir = pending_delete_log.parent_path();
assert(sstable::is_pending_delete_dir(pending_delete_dir));
try {
sstring sstdir = pending_delete_dir.parent_path().native();
auto text = co_await seastar::util::read_entire_file_contiguous(pending_delete_log);
@@ -572,6 +571,60 @@ future<> sstable_directory::replay_pending_delete_log(fs::path pending_delete_lo
}
}
future<> sstable_directory::garbage_collect() {
// First pass, cleanup temporary sstable directories and sstables pending delete.
co_await cleanup_column_family_temp_sst_dirs();
co_await handle_sstables_pending_delete();
}
future<> sstable_directory::cleanup_column_family_temp_sst_dirs() {
std::vector<future<>> futures;
co_await lister::scan_dir(_sstable_dir, lister::dir_entry_types::of<directory_entry_type::directory>(), [&] (fs::path sstdir, directory_entry de) {
// push futures that remove files/directories into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
fs::path dirpath = sstdir / de.name;
if (dirpath.extension().string() == tempdir_extension) {
sstlog.info("Found temporary sstable directory: {}, removing", dirpath);
futures.push_back(io_check([dirpath = std::move(dirpath)] () { return lister::rmdir(dirpath); }));
}
return make_ready_future<>();
});
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
}
future<> sstable_directory::handle_sstables_pending_delete() {
auto pending_delete_dir = _sstable_dir / sstables::pending_delete_dir;
auto exists = co_await file_exists(pending_delete_dir.native());
if (!exists) {
co_return;
}
std::vector<future<>> futures;
co_await lister::scan_dir(pending_delete_dir, lister::dir_entry_types::of<directory_entry_type::regular>(), [this, &futures] (fs::path dir, directory_entry de) {
// push nested futures that remove files/directories into an array of futures,
// so that the supplied callback will not block scan_dir() from
// reading the next entry in the directory.
fs::path file_path = dir / de.name;
if (file_path.extension() == ".tmp") {
sstlog.info("Found temporary pending_delete log file: {}, deleting", file_path);
futures.push_back(remove_file(file_path.string()));
} else if (file_path.extension() == ".log") {
sstlog.info("Found pending_delete log file: {}, replaying", file_path);
auto f = replay_pending_delete_log(std::move(file_path));
futures.push_back(std::move(f));
} else {
sstlog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path);
}
return make_ready_future<>();
});
co_await when_all_succeed(futures.begin(), futures.end()).discard_result();
}
future<std::optional<sstables::generation_type>>
highest_generation_seen(sharded<sstables::sstable_directory>& directory) {
auto highest = co_await directory.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_generation_seen), sstables::generation_type(0), [] (std::optional<sstables::generation_type> a, std::optional<sstables::generation_type> b) {

View File

@@ -65,6 +65,7 @@ public:
bool enable_dangerous_direct_import_of_cassandra_counters = false;
bool allow_loading_materialized_view = false;
bool sort_sstables_according_to_owner = true;
bool garbage_collect = false;
sstables::sstable_open_config sstable_open_config;
};
@@ -176,6 +177,11 @@ private:
future<std::vector<shard_id>> get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const;
// Retrieves sstables::foreign_sstable_open_info for a particular SSTable.
future<foreign_sstable_open_info> get_open_info_for_this_sstable(const sstables::entry_descriptor& desc) const;
future<> cleanup_column_family_temp_sst_dirs();
future<> handle_sstables_pending_delete();
future<> replay_pending_delete_log(std::filesystem::path log_file);
public:
sstable_directory(sstables_manager& manager,
schema_ptr schema,
@@ -260,9 +266,9 @@ public:
//
// This function only solves the second problem for now.
static future<> delete_with_pending_deletion_log(std::vector<shared_sstable> ssts);
static future<> replay_pending_delete_log(std::filesystem::path log_file);
static bool compare_sstable_storage_prefix(const sstring& a, const sstring& b) noexcept;
future<> garbage_collect();
};
future<std::optional<sstables::generation_type>> highest_generation_seen(sharded<sstables::sstable_directory>& directory);

View File

@@ -122,6 +122,7 @@ constexpr const char* upload_dir = "upload";
constexpr const char* snapshots_dir = "snapshots";
constexpr const char* quarantine_dir = "quarantine";
constexpr const char* pending_delete_dir = "pending_delete";
constexpr const char* tempdir_extension = ".sstable";
constexpr auto table_subdirectories = std::to_array({
staging_dir,
@@ -377,24 +378,6 @@ public:
return filename(component_type::Index);
}
static sstring sst_dir_basename(generation_type gen) {
return fmt::format("{}.sstable", gen);
}
static bool is_temp_dir(const fs::path& dirpath)
{
return dirpath.extension().string() == ".sstable";
}
static sstring pending_delete_dir_basename() {
return pending_delete_dir;
}
static bool is_pending_delete_dir(const fs::path& dirpath)
{
return dirpath.filename().string() == pending_delete_dir_basename().c_str();
}
bool requires_view_building() const;
bool is_quarantined() const noexcept;

View File

@@ -100,7 +100,7 @@ static future<file> open_sstable_component_file_non_checked(std::string_view nam
future<file> filesystem_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
auto create_flags = open_flags::create | open_flags::exclusive;
auto readonly = (flags & create_flags) != create_flags;
auto tgt_dir = !readonly && temp_dir ? dir + "/" + sstable::sst_dir_basename(sst._generation) : dir;
auto tgt_dir = !readonly && temp_dir ? *temp_dir : dir;
auto name = sst.filename(tgt_dir, type);
auto f = open_sstable_component_file_non_checked(name, flags, options, check_integrity);
@@ -162,30 +162,27 @@ future<> filesystem_storage::seal(const sstable& sst) {
future<> filesystem_storage::touch_temp_dir(const sstable& sst) {
if (temp_dir) {
return make_ready_future<>();
co_return;
}
auto tmp = dir + "/" + sstable::sst_dir_basename(sst._generation);
auto tmp = fmt::format("{}/{}{}", dir, sst._generation, tempdir_extension);
sstlog.debug("Touching temp_dir={}", tmp);
auto fut = sst.sstable_touch_directory_io_check(tmp);
return fut.then([this, tmp = std::move(tmp)] () mutable {
temp_dir = std::move(tmp);
});
co_await sst.sstable_touch_directory_io_check(tmp);
temp_dir = std::move(tmp);
}
future<> filesystem_storage::remove_temp_dir() {
if (!temp_dir) {
return make_ready_future<>();
co_return;
}
sstlog.debug("Removing temp_dir={}", temp_dir);
return remove_file(*temp_dir).then_wrapped([this] (future<> f) {
if (!f.failed()) {
temp_dir.reset();
return make_ready_future<>();
}
auto ep = f.get_exception();
sstlog.error("Could not remove temporary directory: {}", ep);
return make_exception_future<>(ep);
});
try {
co_await remove_file(*temp_dir);
} catch (...) {
sstlog.error("Could not remove temporary directory: {}", std::current_exception());
throw;
}
temp_dir.reset();
}
static bool is_same_file(const seastar::stat_data& sd1, const seastar::stat_data& sd2) noexcept {

View File

@@ -322,10 +322,10 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
require_exist(file_name, true);
};
auto temp_sst_dir_2 = sst_dir + "/" + sst::sst_dir_basename(generation_from_value(2));
auto temp_sst_dir_2 = fmt::format("{}/{}{}", sst_dir, generation_from_value(2), tempdir_extension);
touch_dir(temp_sst_dir_2);
auto temp_sst_dir_3 = sst_dir + "/" + sst::sst_dir_basename(generation_from_value(3));
auto temp_sst_dir_3 = fmt::format("{}/{}{}", sst_dir, generation_from_value(3), tempdir_extension);
touch_dir(temp_sst_dir_3);
auto temp_file_name = sst::filename(temp_sst_dir_3, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(3), sst::format_types::big, component_type::TemporaryTOC);
@@ -358,7 +358,7 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) {
sstring ks = "system";
sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f";
sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string();
sstring pending_delete_dir = sst_dir + "/" + sst::pending_delete_dir_basename();
sstring pending_delete_dir = sst_dir + "/" + sstables::pending_delete_dir;
auto require_exist = [] (const sstring& name, bool should_exist) {
auto exists = file_exists(name).get0();