commitlog: Use named named_file objects in delete/dispose/recycle lists

Changes delete/close queue, as well as deletetion queue into one, using
named_file objects + marker. Recycle list now also contains said named
file type.

This removes the need to re-eval file sizes on disk when deleting etc,
which in turn means we can dispose of recalculate_footprint on errors,
thus making things simpler and safer.
This commit is contained in:
Calle Wilund
2022-02-15 13:18:48 +00:00
parent cdd4066006
commit ed8f0df105

View File

@@ -15,6 +15,7 @@
#include <sys/stat.h>
#include <malloc.h>
#include <regex>
#include <filesystem>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include <unordered_map>
@@ -300,9 +301,14 @@ public:
}
};
// Segments dropped while not clean may not be
// deleted. Marker enum to keep track of this.
enum class dispose_mode : char {
Delete, ForceDelete, Keep,
};
std::optional<shared_future<with_clock<db::timeout_clock>>> _segment_allocating;
std::unordered_map<sstring, descriptor> _files_to_delete;
std::vector<file> _files_to_close;
std::vector<std::pair<named_file, dispose_mode>> _files_to_dispose;
void account_memory_usage(size_t size) noexcept {
_request_controller.consume(size);
@@ -390,13 +396,10 @@ public:
future<> orphan_all();
void add_file_to_delete(sstring, descriptor);
void add_file_to_close(file);
void add_file_to_dispose(named_file, dispose_mode);
future<> do_pending_deletes();
future<> delete_segments(std::vector<sstring>);
future<> delete_file(const sstring&);
void discard_unused_segments();
void discard_completed_segments(const cf_id_type&);
@@ -437,14 +440,11 @@ private:
future<> clear_reserve_segments();
void abort_recycled_list(std::exception_ptr);
future<> recalculate_footprint();
future<> rename_file(sstring, sstring) const;
size_t max_request_controller_units() const;
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
queue<sseg_ptr> _reserve_segments;
queue<sstring> _recycled_segments;
queue<named_file> _recycled_segments;
std::unordered_map<flush_handler_id, flush_handler> _flush_handlers;
flush_handler_id _flush_ids = 0;
replay_position _flush_position;
@@ -455,7 +455,6 @@ private:
seastar::gate _gate;
uint64_t _new_counter = 0;
std::optional<size_t> _disk_write_alignment;
seastar::semaphore _reserve_recalculation_guard;
};
future<> db::commitlog::segment_manager::named_file::open(open_flags flags, file_open_options opt, std::optional<uint64_t> size_in) noexcept {
@@ -473,9 +472,16 @@ future<> db::commitlog::segment_manager::named_file::open(open_flags flags, file
future<> db::commitlog::segment_manager::named_file::rename(std::string_view to) {
assert(!*this);
auto s = sstring(to);
co_await seastar::rename_file(_name, s);
_name = std::move(s);
try {
auto s = sstring(to);
auto dir = std::filesystem::path(to).parent_path();
co_await seastar::rename_file(_name, s);
_name = std::move(s);
co_await seastar::sync_directory(dir.string());
} catch (...) {
commit_error_handler(std::current_exception());
throw;
}
}
template <typename CharType>
@@ -568,6 +574,7 @@ std::enable_if_t<std::is_fundamental<T>::value, T> read(Input& in) {
class db::commitlog::segment : public enable_shared_from_this<segment>, public cf_holder {
friend class rp_handle;
using named_file = segment_manager::named_file;
using dispose_mode = segment_manager::dispose_mode;
::shared_ptr<segment_manager> _segment_manager;
@@ -581,9 +588,6 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
size_t _alignment;
bool _closed = false;
// Not the same as _closed since files can be reused
bool _closed_file = false;
bool _terminated = false;
using buffer_type = segment_manager::buffer_type;
@@ -651,23 +655,23 @@ public:
clogger.debug("Created new segment {}", *this);
}
~segment() {
auto filename = _file.name();
dispose_mode mode = dispose_mode::Keep;
if (!_closed_file) {
_segment_manager->add_file_to_close(std::move(_file));
}
_segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes();
if (is_clean()) {
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.active_size_on_disk -= file_position();
_segment_manager->totals.wasted_size_on_disk -= _waste;
_segment_manager->add_file_to_delete(filename, _desc);
mode = dispose_mode::Delete;
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
clogger.warn("Segment {} is dirty and is left on disk.", *this);
}
_segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes();
if (mode != dispose_mode::Keep || _file) {
_segment_manager->add_file_to_dispose(std::move(_file), mode);
}
}
uint64_t size_on_disk() const noexcept {
@@ -738,7 +742,6 @@ public:
co_await _pending_ops.close();
co_await _file.truncate(_flush_pos);
co_await _file.close();
_closed_file = true;
if (p) {
co_return coroutine::exception(std::move(p));
@@ -1292,7 +1295,6 @@ db::commitlog::segment_manager::segment_manager(config c)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
, _reserve_recalculation_guard(1)
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
@@ -1318,11 +1320,6 @@ future<> db::commitlog::segment_manager::replenish_reserve() {
}
try {
gate::holder g(_gate);
auto guard = co_await get_units(_reserve_recalculation_guard, 1);
if (_reserve_segments.full()) {
// can happen if we recalculate
continue;
}
// note: if we were strict with disk size, we would refuse to do this
// unless disk footprint is lower than threshold. but we cannot (yet?)
// trust that flush logic will absolutely free up an existing
@@ -1619,23 +1616,13 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
co_await f.close();
}
if (ep) {
add_file_to_delete(f.name(), d);
add_file_to_dispose(std::move(f), dispose_mode::Delete);
co_return coroutine::exception(std::move(ep));
}
co_return make_shared<segment>(shared_from_this(), std::move(d), std::move(f), align);
}
future<> db::commitlog::segment_manager::rename_file(sstring from, sstring to) const {
try {
co_await seastar::rename_file(from, to);
co_await seastar::sync_directory(cfg.commit_log_location);
} catch (...) {
commit_error_handler(std::current_exception());
throw;
}
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment() {
for (;;) {
descriptor d(next_id(), cfg.fname_prefix);
@@ -1646,13 +1633,12 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
}
if (!_recycled_segments.empty()) {
auto src = _recycled_segments.pop();
auto f = _recycled_segments.pop();
// Note: we have to do the rename here to ensure
// proper descriptor id order. If we renamed in the delete call
// that recycled the file we could potentially have
// out-of-order files. (Sort does not help).
clogger.debug("Using recycled segment file {} -> {}", src, dst);
named_file f(src);
clogger.debug("Using recycled segment file {} -> {}", f.name(), dst);
co_await f.rename(dst);
co_return co_await allocate_segment_ex(std::move(d), std::move(f), flags);
}
@@ -1812,20 +1798,18 @@ future<> db::commitlog::segment_manager::clear_reserve_segments() {
_reserve_segments.pop();
}
std::vector<sstring> tmp;
tmp.reserve(_recycled_segments.size());
for (auto& [f, mode] : _files_to_dispose) {
if (mode == dispose_mode::Delete) {
mode = dispose_mode::ForceDelete;
}
}
_recycled_segments.consume([&](sstring s) {
tmp.emplace_back(std::move(s));
_recycled_segments.consume([&](named_file f) {
_files_to_dispose.emplace_back(std::move(f), dispose_mode::ForceDelete);
return true;
});
co_await parallel_for_each(tmp, [this](const sstring& filename) {
clogger.debug("Deleting recycled segment file {}", filename);
return delete_file(filename);
}).finally([this] {
return do_pending_deletes();
});
return do_pending_deletes();
}
future<> db::commitlog::segment_manager::sync_all_segments() {
@@ -1920,94 +1904,117 @@ future<> db::commitlog::segment_manager::shutdown() {
co_return co_await _shutdown_promise->get_shared_future();
}
void db::commitlog::segment_manager::add_file_to_delete(sstring filename, descriptor d) {
assert(!_files_to_delete.contains(filename));
_files_to_delete.emplace(std::move(filename), std::move(d));
}
void db::commitlog::segment_manager::add_file_to_close(file f) {
_files_to_close.emplace_back(std::move(f));
}
future<> db::commitlog::segment_manager::delete_file(const sstring& filename) {
clogger.debug("Deleting segment file {}", filename);
try {
auto size = co_await seastar::file_size(filename);
co_await seastar::remove_file(filename);
clogger.trace("Reclaimed {} MB", size/(1024*1024));
totals.total_size_on_disk -= size;
} catch (...) {
commit_error_handler(std::current_exception());
throw;
}
void db::commitlog::segment_manager::add_file_to_dispose(named_file f, dispose_mode mode) {
_files_to_dispose.emplace_back(std::move(f), mode);
}
future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> files) {
if (files.empty()) {
for (auto& s : files) {
// Note: this is only for replay files. We can decide to
// recycle these, but they don't count into footprint,
// thus unopened named_files are what we want (known_size == 0)
_files_to_dispose.emplace_back(s, dispose_mode::Delete);
}
return do_pending_deletes();
}
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
// may not call here with elements in list. that would leak files.
assert(_recycled_segments.empty());
_recycled_segments.abort(ep);
// and ensure next lap(s) still has a queue
_recycled_segments = queue<named_file>(std::numeric_limits<size_t>::max());
}
namespace db {
std::ostream& operator<<(std::ostream& os, const commitlog::segment_manager::named_file& f) {
return os << f.name() << " (" << f.known_size() << ")";
}
std::ostream& operator<<(std::ostream& os, commitlog::segment_manager::dispose_mode mode) {
using dispose_mode = db::commitlog::segment_manager::dispose_mode;
switch (mode) {
case dispose_mode::Delete: os << "Delete"; break;
case dispose_mode::ForceDelete: os << "Force Delete"; break;
case dispose_mode::Keep: os << "Keep"; break;
default: break;
}
return os;
}
std::ostream& operator<<(std::ostream& os, const std::pair<commitlog::segment_manager::named_file, db::commitlog::segment_manager::dispose_mode>& p) {
return os << p.first << " (" << p.second << ")";
}
}
future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftd = std::exchange(_files_to_dispose, {});
if (ftd.empty()) {
co_return;
}
clogger.debug("Delete segments {}", files);
std::exception_ptr recycle_error;
size_t num_deleted = 0;
bool except = false;
while (!files.empty()) {
auto filename = std::move(files.back());
files.pop_back();
auto exts = cfg.extensions;
clogger.debug("Discarding segments {}", ftd);
for (auto& [f, mode] : ftd) {
try {
auto exts = cfg.extensions;
if (f) {
co_await f.close();
}
// retain the file (replay...)
if (mode == dispose_mode::Keep) {
continue;
}
if (exts && !exts->commitlog_file_extensions().empty()) {
for (auto& ext : exts->commitlog_file_extensions()) {
co_await ext->before_delete(filename);
co_await ext->before_delete(f.name());
}
}
// We allow reuse of the segment if the current disk size is less than shard max.
{
auto usage = totals.total_size_on_disk;
auto recycle = usage <= max_disk_size;
auto size = f.known_size();
auto usage = totals.total_size_on_disk;
auto next_usage = usage - size;
// if total size is not a multiple of segment size, we need
// to check if we are the overlap segment, and noone else
// can be recycled. If so, let this one live so allocation
// can proceed. We assume/hope a future delete will kill
// files down to under the threshold, but we should expect
// to stomp around nearest multiple of segment size, not
// the actual limit.
if (!recycle && _recycled_segments.empty() && files.empty()) {
auto size = co_await seastar::file_size(filename);
recycle = (usage - size) <= max_disk_size;
}
if (next_usage <= max_disk_size && mode != dispose_mode::ForceDelete) {
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
auto dst = this->filename(d);
if (recycle) {
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
auto dst = this->filename(d);
clogger.debug("Recycling segment file {}", filename);
// must rename the file since we must ensure the
// data is not replayed. Changing the name will
// cause header ID to be invalid in the file -> ignored
try {
co_await rename_file(filename, dst);
auto b = _recycled_segments.push(std::move(dst));
assert(b); // we set this to max_size_t so...
continue;
} catch (...) {
recycle_error = std::current_exception();
// fallthrough
}
clogger.debug("Recycling segment file {}", f.name());
// must rename the file since we must ensure the
// data is not replayed. Changing the name will
// cause header ID to be invalid in the file -> ignored
try {
co_await f.rename(dst);
auto b = _recycled_segments.push(std::move(f));
assert(b); // we set this to max_size_t so...
continue;
} catch (...) {
clogger.error("Could not recycle segment {}: {}", f.name(), std::current_exception());
recycle_error = std::current_exception();
// fallthrough
}
}
// last resort.
co_await delete_file(filename);
co_await f.remove_file();
++num_deleted;
} catch (...) {
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
except = true;
clogger.error("Could not delete segment {}: {}", f.name(), std::current_exception());
}
// if we get here, we either successfully deleted the file,
// or had such an exception that we consider the file dead
// anyway. In either case we _remove_ the file size from
// footprint, because it is no longer our problem.
totals.total_size_on_disk -= f.known_size();
}
// #8376 - if we had an error in recycling (disk rename?), and no elements
@@ -2019,89 +2026,6 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
if (recycle_error && _recycled_segments.empty()) {
abort_recycled_list(recycle_error);
}
// #9348 - if we had an exception, we can't trust our bookeep any more. recalculate.
if (except) {
co_await recalculate_footprint();
}
}
void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) {
// may not call here with elements in list. that would leak files.
assert(_recycled_segments.empty());
_recycled_segments.abort(ep);
// and ensure next lap(s) still has a queue
_recycled_segments = queue<sstring>(std::numeric_limits<size_t>::max());
}
future<> db::commitlog::segment_manager::recalculate_footprint() {
try {
co_await do_pending_deletes();
auto guard = co_await get_units(_reserve_recalculation_guard, 1);
auto segments_copy = _segments;
std::vector<sseg_ptr> reserves;
std::vector<sstring> recycles;
// this causes haywire things while we steal stuff, but...
while (!_reserve_segments.empty()) {
reserves.push_back(_reserve_segments.pop());
}
while (!_recycled_segments.empty()) {
recycles.push_back(_recycled_segments.pop());
}
// #9955 - must re-stock the queues before we do anything
// interruptable/continuation. Because both queues are
// used with push/pop eventually which _waits_ for signal
// but does _not_ verify that the condition is true once
// we return. So copy the objects and look at instead.
for (auto& filename : recycles) {
_recycled_segments.push(sstring(filename));
}
for (auto& s : reserves) {
_reserve_segments.push(sseg_ptr(s)); // you can have it back now.
}
// first, guesstimate sizes
uint64_t recycle_size = recycles.size() * max_size;
auto old = totals.total_size_on_disk;
totals.total_size_on_disk = recycle_size;
for (auto& s : _segments) {
totals.total_size_on_disk += s->size_on_disk();
}
for (auto& s : reserves) {
totals.total_size_on_disk += s->size_on_disk();
}
// now we need to adjust the actual sizes of recycled files
uint64_t actual_recycled_size = 0;
try {
for (auto& filename : recycles) {
auto s = co_await seastar::file_size(filename);
actual_recycled_size += s;
}
} catch (...) {
clogger.error("Exception reading disk footprint ({}).", std::current_exception());
actual_recycled_size = recycle_size; // best we got
}
totals.total_size_on_disk += actual_recycled_size - recycle_size;
// pushing things to reserve/recycled queues will have resumed any
// waiters, so we should be done.
} catch (...) {
clogger.error("Exception recalculating disk footprint ({}). Values might be off...", std::current_exception());
}
}
future<> db::commitlog::segment_manager::do_pending_deletes() {
auto ftc = std::exchange(_files_to_close, {});
auto ftd = std::exchange(_files_to_delete, {});
auto i = ftc.begin();
auto e = ftc.end();
co_await parallel_for_each(ftc, std::mem_fn(&file::close));
co_await delete_segments(boost::copy_range<std::vector<sstring>>(ftd | boost::adaptors::map_keys));
}
future<> db::commitlog::segment_manager::orphan_all() {