diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 11ba9fe590..d9d23f78ba 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -90,6 +90,12 @@ distributed_loader::process_sstable_dir(sharded& di return utils::directories::verify_owner_and_mode(d.sstable_dir()); }); + if (flags.garbage_collect) { + co_await dir.invoke_on(0, [] (sstables::sstable_directory& d) { + return d.garbage_collect(); + }); + } + co_await dir.invoke_on_all([&dir, flags] (sstables::sstable_directory& d) -> future<> { // Supposed to be called with the node either down or on behalf of maintenance tasks // like nodetool refresh @@ -530,48 +536,6 @@ distributed_loader::get_sstables_from_upload_dir(distributed& }); } -future<> distributed_loader::cleanup_column_family_temp_sst_dirs(sstring sstdir) { - std::vector> futures; - - co_await lister::scan_dir(sstdir, lister::dir_entry_types::of(), [&] (fs::path sstdir, directory_entry de) { - // push futures that remove files/directories into an array of futures, - // so that the supplied callback will not block scan_dir() from - // reading the next entry in the directory. - fs::path dirpath = sstdir / de.name; - if (sstables::sstable::is_temp_dir(dirpath)) { - dblog.info("Found temporary sstable directory: {}, removing", dirpath); - futures.push_back(io_check([dirpath = std::move(dirpath)] () { return lister::rmdir(dirpath); })); - } - return make_ready_future<>(); - }); - - co_await when_all_succeed(futures.begin(), futures.end()).discard_result(); -} - -future<> distributed_loader::handle_sstables_pending_delete(sstring pending_delete_dir) { - std::vector> futures; - - co_await lister::scan_dir(pending_delete_dir, lister::dir_entry_types::of(), [&futures] (fs::path dir, directory_entry de) { - // push nested futures that remove files/directories into an array of futures, - // so that the supplied callback will not block scan_dir() from - // reading the next entry in the directory. - fs::path file_path = dir / de.name; - if (file_path.extension() == ".tmp") { - dblog.info("Found temporary pending_delete log file: {}, deleting", file_path); - futures.push_back(remove_file(file_path.string())); - } else if (file_path.extension() == ".log") { - dblog.info("Found pending_delete log file: {}, replaying", file_path); - auto f = sstables::sstable_directory::replay_pending_delete_log(std::move(file_path)); - futures.push_back(std::move(f)); - } else { - dblog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path); - } - return make_ready_future<>(); - }); - - co_await when_all_succeed(futures.begin(), futures.end()).discard_result(); -} - class table_populator { distributed& _db; sstring _ks; @@ -637,14 +601,6 @@ future<> table_populator::start_subdir(sstring subdir) { co_return; } - // First pass, cleanup temporary sstable directories and sstables pending delete. - co_await distributed_loader::cleanup_column_family_temp_sst_dirs(sstdir); - auto pending_delete_dir = sstdir + "/" + sstables::sstable::pending_delete_dir_basename(); - auto exists = co_await file_exists(pending_delete_dir); - if (exists) { - co_await distributed_loader::handle_sstables_pending_delete(pending_delete_dir); - } - auto dptr = make_lw_shared>(); auto& directory = *dptr; auto& global_table = _global_table; @@ -666,6 +622,7 @@ future<> table_populator::start_subdir(sstring subdir) { .throw_on_missing_toc = true, .enable_dangerous_direct_import_of_cassandra_counters = db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters(), .allow_loading_materialized_view = true, + .garbage_collect = true, }; co_await distributed_loader::process_sstable_dir(directory, flags); diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index 063f7ad177..7ef265589c 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -76,8 +76,6 @@ class distributed_loader { sharded& db, sharded& view_update_generator, bool needs_view_update, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sstring datadir, sstring ks_name); - static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir); - static future<> handle_sstables_pending_delete(sstring pending_deletes_dir); public: static future<> init_system_keyspace(sharded& sys_ks, distributed& db, distributed& ss, sharded& g, sharded& raft_gr, db::config& cfg, system_table_load_phase phase); diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 7e108c7046..66581a6557 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -498,7 +498,7 @@ future<> sstable_directory::delete_with_pending_deletion_log(std::vector_storage->prefix() + "/" + sstable::pending_delete_dir_basename(); + sstring pending_delete_dir = first->_storage->prefix() + "/" + sstables::pending_delete_dir; sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max()); sstring tmp_pending_delete_log = pending_delete_log + ".tmp"; sstlog.trace("Writing {}", tmp_pending_delete_log); @@ -553,7 +553,6 @@ future<> sstable_directory::delete_with_pending_deletion_log(std::vector sstable_directory::replay_pending_delete_log(fs::path pending_delete_log) { sstlog.debug("Reading pending_deletes log file {}", pending_delete_log); fs::path pending_delete_dir = pending_delete_log.parent_path(); - assert(sstable::is_pending_delete_dir(pending_delete_dir)); try { sstring sstdir = pending_delete_dir.parent_path().native(); auto text = co_await seastar::util::read_entire_file_contiguous(pending_delete_log); @@ -572,6 +571,60 @@ future<> sstable_directory::replay_pending_delete_log(fs::path pending_delete_lo } } +future<> sstable_directory::garbage_collect() { + // First pass, cleanup temporary sstable directories and sstables pending delete. + co_await cleanup_column_family_temp_sst_dirs(); + co_await handle_sstables_pending_delete(); +} + +future<> sstable_directory::cleanup_column_family_temp_sst_dirs() { + std::vector> futures; + + co_await lister::scan_dir(_sstable_dir, lister::dir_entry_types::of(), [&] (fs::path sstdir, directory_entry de) { + // push futures that remove files/directories into an array of futures, + // so that the supplied callback will not block scan_dir() from + // reading the next entry in the directory. + fs::path dirpath = sstdir / de.name; + if (dirpath.extension().string() == tempdir_extension) { + sstlog.info("Found temporary sstable directory: {}, removing", dirpath); + futures.push_back(io_check([dirpath = std::move(dirpath)] () { return lister::rmdir(dirpath); })); + } + return make_ready_future<>(); + }); + + co_await when_all_succeed(futures.begin(), futures.end()).discard_result(); +} + +future<> sstable_directory::handle_sstables_pending_delete() { + auto pending_delete_dir = _sstable_dir / sstables::pending_delete_dir; + auto exists = co_await file_exists(pending_delete_dir.native()); + if (!exists) { + co_return; + } + + std::vector> futures; + + co_await lister::scan_dir(pending_delete_dir, lister::dir_entry_types::of(), [this, &futures] (fs::path dir, directory_entry de) { + // push nested futures that remove files/directories into an array of futures, + // so that the supplied callback will not block scan_dir() from + // reading the next entry in the directory. + fs::path file_path = dir / de.name; + if (file_path.extension() == ".tmp") { + sstlog.info("Found temporary pending_delete log file: {}, deleting", file_path); + futures.push_back(remove_file(file_path.string())); + } else if (file_path.extension() == ".log") { + sstlog.info("Found pending_delete log file: {}, replaying", file_path); + auto f = replay_pending_delete_log(std::move(file_path)); + futures.push_back(std::move(f)); + } else { + sstlog.debug("Found unknown file in pending_delete directory: {}, ignoring", file_path); + } + return make_ready_future<>(); + }); + + co_await when_all_succeed(futures.begin(), futures.end()).discard_result(); +} + future> highest_generation_seen(sharded& directory) { auto highest = co_await directory.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_generation_seen), sstables::generation_type(0), [] (std::optional a, std::optional b) { diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 1bfbc3a857..d5d3aeeff4 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -65,6 +65,7 @@ public: bool enable_dangerous_direct_import_of_cassandra_counters = false; bool allow_loading_materialized_view = false; bool sort_sstables_according_to_owner = true; + bool garbage_collect = false; sstables::sstable_open_config sstable_open_config; }; @@ -176,6 +177,11 @@ private: future> get_shards_for_this_sstable(const sstables::entry_descriptor& desc, process_flags flags) const; // Retrieves sstables::foreign_sstable_open_info for a particular SSTable. future get_open_info_for_this_sstable(const sstables::entry_descriptor& desc) const; + + future<> cleanup_column_family_temp_sst_dirs(); + future<> handle_sstables_pending_delete(); + future<> replay_pending_delete_log(std::filesystem::path log_file); + public: sstable_directory(sstables_manager& manager, schema_ptr schema, @@ -260,9 +266,9 @@ public: // // This function only solves the second problem for now. static future<> delete_with_pending_deletion_log(std::vector ssts); - static future<> replay_pending_delete_log(std::filesystem::path log_file); static bool compare_sstable_storage_prefix(const sstring& a, const sstring& b) noexcept; + future<> garbage_collect(); }; future> highest_generation_seen(sharded& directory); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 22b9d3e71b..522d404325 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -122,6 +122,7 @@ constexpr const char* upload_dir = "upload"; constexpr const char* snapshots_dir = "snapshots"; constexpr const char* quarantine_dir = "quarantine"; constexpr const char* pending_delete_dir = "pending_delete"; +constexpr const char* tempdir_extension = ".sstable"; constexpr auto table_subdirectories = std::to_array({ staging_dir, @@ -377,24 +378,6 @@ public: return filename(component_type::Index); } - static sstring sst_dir_basename(generation_type gen) { - return fmt::format("{}.sstable", gen); - } - - static bool is_temp_dir(const fs::path& dirpath) - { - return dirpath.extension().string() == ".sstable"; - } - - static sstring pending_delete_dir_basename() { - return pending_delete_dir; - } - - static bool is_pending_delete_dir(const fs::path& dirpath) - { - return dirpath.filename().string() == pending_delete_dir_basename().c_str(); - } - bool requires_view_building() const; bool is_quarantined() const noexcept; diff --git a/sstables/storage.cc b/sstables/storage.cc index 85e9450d60..88655de7a8 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -100,7 +100,7 @@ static future open_sstable_component_file_non_checked(std::string_view nam future filesystem_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) { auto create_flags = open_flags::create | open_flags::exclusive; auto readonly = (flags & create_flags) != create_flags; - auto tgt_dir = !readonly && temp_dir ? dir + "/" + sstable::sst_dir_basename(sst._generation) : dir; + auto tgt_dir = !readonly && temp_dir ? *temp_dir : dir; auto name = sst.filename(tgt_dir, type); auto f = open_sstable_component_file_non_checked(name, flags, options, check_integrity); @@ -162,30 +162,27 @@ future<> filesystem_storage::seal(const sstable& sst) { future<> filesystem_storage::touch_temp_dir(const sstable& sst) { if (temp_dir) { - return make_ready_future<>(); + co_return; } - auto tmp = dir + "/" + sstable::sst_dir_basename(sst._generation); + auto tmp = fmt::format("{}/{}{}", dir, sst._generation, tempdir_extension); sstlog.debug("Touching temp_dir={}", tmp); - auto fut = sst.sstable_touch_directory_io_check(tmp); - return fut.then([this, tmp = std::move(tmp)] () mutable { - temp_dir = std::move(tmp); - }); + co_await sst.sstable_touch_directory_io_check(tmp); + temp_dir = std::move(tmp); } future<> filesystem_storage::remove_temp_dir() { if (!temp_dir) { - return make_ready_future<>(); + co_return; } sstlog.debug("Removing temp_dir={}", temp_dir); - return remove_file(*temp_dir).then_wrapped([this] (future<> f) { - if (!f.failed()) { - temp_dir.reset(); - return make_ready_future<>(); - } - auto ep = f.get_exception(); - sstlog.error("Could not remove temporary directory: {}", ep); - return make_exception_future<>(ep); - }); + try { + co_await remove_file(*temp_dir); + } catch (...) { + sstlog.error("Could not remove temporary directory: {}", std::current_exception()); + throw; + } + + temp_dir.reset(); } static bool is_same_file(const seastar::stat_data& sd1, const seastar::stat_data& sd2) noexcept { diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 96e75e2b0e..90c1166366 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -322,10 +322,10 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) { require_exist(file_name, true); }; - auto temp_sst_dir_2 = sst_dir + "/" + sst::sst_dir_basename(generation_from_value(2)); + auto temp_sst_dir_2 = fmt::format("{}/{}{}", sst_dir, generation_from_value(2), tempdir_extension); touch_dir(temp_sst_dir_2); - auto temp_sst_dir_3 = sst_dir + "/" + sst::sst_dir_basename(generation_from_value(3)); + auto temp_sst_dir_3 = fmt::format("{}/{}{}", sst_dir, generation_from_value(3), tempdir_extension); touch_dir(temp_sst_dir_3); auto temp_file_name = sst::filename(temp_sst_dir_3, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(3), sst::format_types::big, component_type::TemporaryTOC); @@ -358,7 +358,7 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) { sstring ks = "system"; sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f"; sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string(); - sstring pending_delete_dir = sst_dir + "/" + sst::pending_delete_dir_basename(); + sstring pending_delete_dir = sst_dir + "/" + sstables::pending_delete_dir; auto require_exist = [] (const sstring& name, bool should_exist) { auto exists = file_exists(name).get0();