sstables: delete_with_pending_deletion_log: batch sync_directory
When deleting multiple sstables with the same prefix the deletion atomicity is ensured by the pending_delete_log file, so if scylla crashes in the middle, deletions will be replyed on restart. Therefore, we don't have to ensure atomicity of each individual `unlink`. We just need to sync the directory once, before removing the pending_delete_log file. Signed-off-by: Benny Halevy <bhalevy@scylladb.com> Closes #14967
This commit is contained in:
@@ -527,9 +527,11 @@ future<> sstable_directory::delete_with_pending_deletion_log(std::vector<shared_
|
||||
}
|
||||
|
||||
parallel_for_each(ssts, [] (shared_sstable sst) {
|
||||
return sst->unlink();
|
||||
return sst->unlink(sstables::storage::sync_dir::no);
|
||||
}).get();
|
||||
|
||||
sync_directory(first->_storage->prefix()).get();
|
||||
|
||||
// Once all sstables are deleted, the log file can be removed.
|
||||
// Note: the log file will be removed also if unlink failed to remove
|
||||
// any sstable and ignored the error.
|
||||
|
||||
@@ -2609,7 +2609,7 @@ std::optional<std::pair<uint64_t, uint64_t>> sstable::get_sample_indexes_for_ran
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
future<> remove_by_toc_name(sstring sstable_toc_name) {
|
||||
future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) {
|
||||
sstring prefix = sstable_toc_name.substr(0, sstable_toc_name.size() - sstable_version_constants::TOC_SUFFIX.size());
|
||||
sstring new_toc_name = prefix + sstable_version_constants::TEMPORARY_TOC_SUFFIX;
|
||||
|
||||
@@ -2617,7 +2617,9 @@ future<> remove_by_toc_name(sstring sstable_toc_name) {
|
||||
if (co_await sstable_io_check(sstable_write_error_handler, file_exists, sstable_toc_name)) {
|
||||
// If new_toc_name exists it will be atomically replaced. See rename(2)
|
||||
co_await sstable_io_check(sstable_write_error_handler, rename_file, sstable_toc_name, new_toc_name);
|
||||
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
|
||||
if (sync) {
|
||||
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
|
||||
}
|
||||
} else {
|
||||
if (!co_await sstable_io_check(sstable_write_error_handler, file_exists, new_toc_name)) {
|
||||
sstlog.warn("Unable to delete {} because it doesn't exist.", sstable_toc_name);
|
||||
@@ -2653,7 +2655,9 @@ future<> remove_by_toc_name(sstring sstable_toc_name) {
|
||||
sstlog.debug("Forgiving ENOENT when deleting file {}", fname);
|
||||
}
|
||||
});
|
||||
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
|
||||
if (sync) {
|
||||
co_await sstable_io_check(sstable_write_error_handler, sync_directory, parent_path(new_toc_name));
|
||||
}
|
||||
co_await sstable_io_check(sstable_write_error_handler, remove_file, new_toc_name);
|
||||
}
|
||||
|
||||
@@ -2791,8 +2795,8 @@ utils::hashed_key sstable::make_hashed_key(const schema& s, const partition_key&
|
||||
}
|
||||
|
||||
future<>
|
||||
sstable::unlink() noexcept {
|
||||
auto remove_fut = _storage->wipe(*this);
|
||||
sstable::unlink(storage::sync_dir sync) noexcept {
|
||||
auto remove_fut = _storage->wipe(*this, sync);
|
||||
|
||||
try {
|
||||
co_await get_large_data_handler().maybe_delete_large_data_entries(shared_from_this());
|
||||
|
||||
@@ -388,7 +388,9 @@ public:
|
||||
|
||||
// Delete the sstable by unlinking all sstable files
|
||||
// Ignores all errors.
|
||||
future<> unlink() noexcept;
|
||||
// Caller may pass sync_dir::no for batching multiple deletes in the same directory,
|
||||
// and make sure the directory is sync'ed on or after the last call.
|
||||
future<> unlink(storage::sync_dir sync = storage::sync_dir::yes) noexcept;
|
||||
|
||||
db::large_data_handler& get_large_data_handler() {
|
||||
return _large_data_handler;
|
||||
@@ -958,6 +960,8 @@ public:
|
||||
future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir);
|
||||
|
||||
// similar to sstable::unlink, but works on a TOC file name
|
||||
future<> remove_by_toc_name(sstring sstable_toc_name);
|
||||
// Caller may pass sync_dir::no for batching multiple deletes in the same directory,
|
||||
// and make sure the directory is sync'ed on or after the last call.
|
||||
future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync = storage::sync_dir::yes);
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -62,7 +62,7 @@ public:
|
||||
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) override;
|
||||
// runs in async context
|
||||
virtual void open(sstable& sst) override;
|
||||
virtual future<> wipe(const sstable& sst) noexcept override;
|
||||
virtual future<> wipe(const sstable& sst, sync_dir) noexcept override;
|
||||
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) override;
|
||||
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) override;
|
||||
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) override;
|
||||
@@ -387,7 +387,7 @@ future<> filesystem_storage::change_state(const sstable& sst, sstring to, genera
|
||||
co_await move(sst, path.native(), std::move(new_generation), delay_commit);
|
||||
}
|
||||
|
||||
future<> filesystem_storage::wipe(const sstable& sst) noexcept {
|
||||
future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
// We must be able to generate toc_filename()
|
||||
// in order to delete the sstable.
|
||||
// Running out of memory here will terminate.
|
||||
@@ -397,7 +397,7 @@ future<> filesystem_storage::wipe(const sstable& sst) noexcept {
|
||||
}();
|
||||
|
||||
try {
|
||||
co_await remove_by_toc_name(name);
|
||||
co_await remove_by_toc_name(name, sync);
|
||||
} catch (...) {
|
||||
// Log and ignore the failure since there is nothing much we can do about it at this point.
|
||||
// a. Compaction will retry deleting the sstable in the next pass, and
|
||||
@@ -446,7 +446,7 @@ public:
|
||||
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) override;
|
||||
// runs in async context
|
||||
virtual void open(sstable& sst) override;
|
||||
virtual future<> wipe(const sstable& sst) noexcept override;
|
||||
virtual future<> wipe(const sstable& sst, sync_dir) noexcept override;
|
||||
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) override;
|
||||
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) override;
|
||||
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) override;
|
||||
@@ -518,7 +518,7 @@ future<> s3_storage::change_state(const sstable& sst, sstring to, generation_typ
|
||||
co_await coroutine::return_exception(std::runtime_error("Moving S3 objects not implemented"));
|
||||
}
|
||||
|
||||
future<> s3_storage::wipe(const sstable& sst) noexcept {
|
||||
future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept {
|
||||
auto& sys_ks = sst.manager().system_keyspace();
|
||||
|
||||
co_await sys_ks.sstables_registry_update_entry_status(_location, sst.generation(), status_removing);
|
||||
|
||||
@@ -48,13 +48,14 @@ public:
|
||||
virtual ~storage() {}
|
||||
|
||||
using absolute_path = bool_class<class absolute_path_tag>; // FIXME -- should go away eventually
|
||||
using sync_dir = bool_class<struct sync_dir_tag>; // meaningful only to filesystem storage
|
||||
|
||||
virtual future<> seal(const sstable& sst) = 0;
|
||||
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs) const = 0;
|
||||
virtual future<> change_state(const sstable& sst, sstring to, generation_type generation, delayed_commit_changes* delay) = 0;
|
||||
// runs in async context
|
||||
virtual void open(sstable& sst) = 0;
|
||||
virtual future<> wipe(const sstable& sst) noexcept = 0;
|
||||
virtual future<> wipe(const sstable& sst, sync_dir) noexcept = 0;
|
||||
virtual future<file> open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) = 0;
|
||||
virtual future<data_sink> make_data_or_index_sink(sstable& sst, component_type type) = 0;
|
||||
virtual future<data_sink> make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) = 0;
|
||||
|
||||
Reference in New Issue
Block a user