sstable_directory: create_pending_deletion_log: place pending_delete log under the base directory
To be able to atomically delete sstables both in base table directory and in its sub-directories, like `staging/`, use a shared pending_delete_dir under under the base directory. Note that this requires loading and processing the base directory first. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -301,6 +301,9 @@ public:
|
||||
future<> start() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// The table base directory (with sstable_state::normal) must be
|
||||
// loaded and processed first as it now may contain the shared
|
||||
// pending_delete_dir, possibly referring to sstables in sub-directories.
|
||||
for (auto state : { sstables::sstable_state::normal, sstables::sstable_state::staging, sstables::sstable_state::quarantine }) {
|
||||
co_await start_subdir(state);
|
||||
}
|
||||
|
||||
@@ -563,22 +563,22 @@ bool sstable_directory::compare_sstable_storage_prefix(const sstring& prefix_a,
|
||||
return size_a == size_b && sstring::traits_type::compare(prefix_a.begin(), prefix_b.begin(), size_a) == 0;
|
||||
}
|
||||
|
||||
future<std::unordered_map<sstring, sstring>> sstable_directory::create_pending_deletion_log(const std::vector<shared_sstable>& ssts) {
|
||||
return seastar::async([&ssts] {
|
||||
std::unordered_map<sstring, min_max_tracker<generation_type>> gen_trackers;
|
||||
std::unordered_map<sstring, sstring> res;
|
||||
future<sstable_directory::pending_delete_result> sstable_directory::create_pending_deletion_log(opened_directory& base_dir, const std::vector<shared_sstable>& ssts) {
|
||||
return seastar::async([&] {
|
||||
min_max_tracker<generation_type> gen_tracker;
|
||||
pending_delete_result res;
|
||||
|
||||
for (const auto& sst : ssts) {
|
||||
auto prefix = sst->_storage->prefix();
|
||||
gen_trackers[prefix].update(sst->generation());
|
||||
res.prefixes.insert(prefix);
|
||||
gen_tracker.update(sst->generation());
|
||||
}
|
||||
|
||||
for (const auto& [prefix, gen_tracker] : gen_trackers) {
|
||||
sstring pending_delete_dir = prefix + "/" + sstables::pending_delete_dir;
|
||||
sstring pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max());
|
||||
sstring tmp_pending_delete_log = pending_delete_log + ".tmp";
|
||||
sstring pending_delete_dir = (base_dir.path() / sstables::pending_delete_dir).native();
|
||||
res.pending_delete_log = format("{}/sstables-{}-{}.log", pending_delete_dir, gen_tracker.min(), gen_tracker.max());
|
||||
sstring tmp_pending_delete_log = res.pending_delete_log + ".tmp";
|
||||
dirlog.trace("Writing {}", tmp_pending_delete_log);
|
||||
try {
|
||||
|
||||
touch_directory(pending_delete_dir).get();
|
||||
auto oflags = open_flags::wo | open_flags::create | open_flags::exclusive;
|
||||
// Create temporary pending_delete log file.
|
||||
@@ -587,29 +587,34 @@ future<std::unordered_map<sstring, sstring>> sstable_directory::create_pending_d
|
||||
auto out = make_file_output_stream(std::move(f), 4096).get();
|
||||
auto close_out = deferred_close(out);
|
||||
|
||||
try {
|
||||
auto trim_size = base_dir.native().size() + 1; // Account for the '/' delimiter
|
||||
for (const auto& sst : ssts) {
|
||||
auto prefix = sst->_storage->prefix();
|
||||
if (prefix.size() > trim_size) {
|
||||
out.write(prefix.begin() + trim_size, prefix.size() - trim_size).get();
|
||||
out.write("/").get();
|
||||
}
|
||||
auto toc = sst->component_basename(component_type::TOC);
|
||||
out.write(toc).get();
|
||||
out.write("\n").get();
|
||||
dirlog.trace("Wrote '{}{}' to {}",
|
||||
prefix.size() > trim_size ? sstring(prefix.begin() + trim_size, prefix.size() - trim_size) + "/" : "",
|
||||
sst->component_basename(component_type::TOC), tmp_pending_delete_log);
|
||||
}
|
||||
|
||||
out.flush().get();
|
||||
close_out.close_now();
|
||||
} catch (...) {
|
||||
dirlog.warn("Error while writing {}: {}. Ignoring.", tmp_pending_delete_log, std::current_exception());
|
||||
}
|
||||
|
||||
auto dir_f = open_directory(pending_delete_dir).get();
|
||||
auto close_dir = deferred_close(dir_f);
|
||||
// Once flushed and closed, the temporary log file can be renamed.
|
||||
rename_file(tmp_pending_delete_log, pending_delete_log).get();
|
||||
io_check(rename_file, tmp_pending_delete_log, res.pending_delete_log).get();
|
||||
|
||||
// Guarantee that the changes above reached the disk.
|
||||
dir_f.flush().get();
|
||||
close_dir.close_now();
|
||||
dirlog.debug("{} written successfully.", pending_delete_log);
|
||||
res.emplace(std::move(pending_delete_dir), std::move(pending_delete_log));
|
||||
} catch (...) {
|
||||
dirlog.warn("Error while writing {}: {}. Ignoring.", pending_delete_log, std::current_exception());
|
||||
}
|
||||
}
|
||||
base_dir.sync(general_disk_error_handler).get();
|
||||
dirlog.debug("{} written successfully.", res.pending_delete_log);
|
||||
|
||||
return res;
|
||||
});
|
||||
@@ -628,6 +633,7 @@ future<> sstable_directory::filesystem_components_lister::replay_pending_delete_
|
||||
std::vector<sstring> basenames;
|
||||
boost::split(basenames, all, boost::is_any_of("\n"), boost::token_compress_on);
|
||||
auto tocs = boost::copy_range<std::vector<sstring>>(basenames | boost::adaptors::filtered([] (auto&& basename) { return !basename.empty(); }));
|
||||
dirlog.debug("TOCs to remove: {}", tocs);
|
||||
co_await parallel_for_each(tocs, [&sstdir] (const sstring& name) {
|
||||
// Only move TOC to TOC.tmp, the rest will be finished by regular process
|
||||
return make_toc_temporary(sstdir + "/" + name).discard_result();
|
||||
|
||||
@@ -34,6 +34,7 @@ namespace sstables {
|
||||
enum class sstable_state;
|
||||
class storage;
|
||||
class sstables_manager;
|
||||
class opened_directory;
|
||||
bool manifest_json_filter(const std::filesystem::path&, const directory_entry& entry);
|
||||
|
||||
class directory_semaphore {
|
||||
@@ -267,6 +268,11 @@ public:
|
||||
using can_be_remote = bool_class<struct can_be_remote_tag>;
|
||||
future<> collect_output_unshared_sstables(std::vector<sstables::shared_sstable> resharded_sstables, can_be_remote);
|
||||
|
||||
struct pending_delete_result {
|
||||
sstring pending_delete_log;
|
||||
std::unordered_set<sstring> prefixes;
|
||||
};
|
||||
|
||||
// When we compact sstables, we have to atomically instantiate the new
|
||||
// sstable and delete the old ones. Otherwise, if we compact A+B into C,
|
||||
// and if A contained some data that was tombstoned by B, and if B was
|
||||
@@ -280,11 +286,10 @@ public:
|
||||
//
|
||||
// This function only solves the second problem for now.
|
||||
|
||||
// Creates the deletion log for atomic deletion of sstables (helper for the
|
||||
// Creates the deletion log for atomic deletion of sstables at `base_dir` (helper for the
|
||||
// above function that's also used by tests)
|
||||
// Returns an unordered_map of <directory with sstables, logfile_name> for every sstable prefix.
|
||||
// Currently, atomicity is guaranteed only within each unique prefix and not across prefixes (See #18862)
|
||||
static future<std::unordered_map<sstring, sstring>> create_pending_deletion_log(const std::vector<shared_sstable>& ssts);
|
||||
// Returns the name of the pending_delete_log and an unordered_set of sstable prefixes.
|
||||
static future<pending_delete_result> create_pending_deletion_log(opened_directory& base_dir, const std::vector<shared_sstable>& ssts);
|
||||
|
||||
static bool compare_sstable_storage_prefix(const sstring& a, const sstring& b) noexcept;
|
||||
};
|
||||
|
||||
@@ -477,26 +477,24 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
}
|
||||
|
||||
future<atomic_delete_context> filesystem_storage::atomic_delete_prepare(const std::vector<shared_sstable>& ssts) const {
|
||||
return sstable_directory::create_pending_deletion_log(ssts);
|
||||
co_return co_await sstable_directory::create_pending_deletion_log(base_dir(), ssts);
|
||||
}
|
||||
|
||||
future<> filesystem_storage::atomic_delete_complete(atomic_delete_context ctx) const {
|
||||
co_await coroutine::parallel_for_each(ctx, [] (const auto& x) -> future<> {
|
||||
const auto& dir = x.first;
|
||||
const auto& log = x.second;
|
||||
|
||||
co_await coroutine::parallel_for_each(ctx.prefixes, [] (const auto& dir) -> future<> {
|
||||
co_await sync_directory(dir);
|
||||
});
|
||||
|
||||
// Once all sstables are deleted, the log file can be removed.
|
||||
// Note: the log file will be removed also if unlink failed to remove
|
||||
// any sstable and ignored the error.
|
||||
const auto& log = ctx.pending_delete_log;
|
||||
try {
|
||||
co_await remove_file(log);
|
||||
sstlog.debug("{} removed.", log);
|
||||
} catch (...) {
|
||||
sstlog.warn("Error removing {}: {}. Ignoring.", log, std::current_exception());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> filesystem_storage::remove_by_registry_entry(entry_descriptor desc) {
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include "sstables/component_type.hh"
|
||||
#include "sstables/generation_type.hh"
|
||||
#include "utils/disk-error-handler.hh"
|
||||
#include "sstables/sstable_directory.hh"
|
||||
|
||||
namespace data_dictionary {
|
||||
class storage_options;
|
||||
@@ -36,7 +37,7 @@ class sstable;
|
||||
class sstables_manager;
|
||||
class entry_descriptor;
|
||||
|
||||
using atomic_delete_context = std::unordered_map<sstring, sstring>;
|
||||
using atomic_delete_context = sstable_directory::pending_delete_result;
|
||||
|
||||
class opened_directory final {
|
||||
std::filesystem::path _pathname;
|
||||
@@ -74,7 +75,7 @@ public:
|
||||
class storage {
|
||||
friend class test;
|
||||
|
||||
std::filesystem::path _base_dir; // Local base directory (of table)
|
||||
mutable opened_directory _base_dir; // Local base directory (of table)
|
||||
|
||||
// Internal, but can also be used by tests
|
||||
virtual future<> change_dir_for_test(sstring nd) {
|
||||
@@ -94,7 +95,7 @@ public:
|
||||
using absolute_path = bool_class<class absolute_path_tag>; // FIXME -- should go away eventually
|
||||
using sync_dir = bool_class<struct sync_dir_tag>; // meaningful only to filesystem storage
|
||||
|
||||
const std::filesystem::path& base_dir() const {
|
||||
opened_directory& base_dir() const {
|
||||
return _base_dir;
|
||||
}
|
||||
|
||||
|
||||
@@ -796,7 +796,8 @@ SEASTAR_TEST_CASE(test_pending_log_garbage_collection) {
|
||||
// Now start atomic deletion -- create the pending deletion log for all
|
||||
// three sstables, move TOC file for one of them into temporary-TOC, and
|
||||
// partially delete another
|
||||
sstable_directory::create_pending_deletion_log(ssts_to_remove).get();
|
||||
auto base_opened_dir = opened_directory(base);
|
||||
sstable_directory::create_pending_deletion_log(base_opened_dir, ssts_to_remove).get();
|
||||
rename_file(test(ssts_to_remove[1]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[1]).filename(sstables::component_type::TemporaryTOC).native()).get();
|
||||
rename_file(test(ssts_to_remove[2]).filename(sstables::component_type::TOC).native(), test(ssts_to_remove[2]).filename(sstables::component_type::TemporaryTOC).native()).get();
|
||||
remove_file(test(ssts_to_remove[2]).filename(sstables::component_type::Data).native()).get();
|
||||
|
||||
Reference in New Issue
Block a user