commitlog: Make segment allocation wait iff disk usage > max

Instead of allowing new segments to be added, explicitly wait
for either disk delete or recycle to happen iff current disk
usage is larger than limit.
This commit is contained in:
Calle Wilund
2021-01-05 14:48:27 +00:00
parent ab55a1b4e6
commit be8c359a62

View File

@@ -233,6 +233,7 @@ public:
using request_controller_type = basic_semaphore<timeout_exception_factory, db::timeout_clock>;
using request_controller_units = semaphore_units<timeout_exception_factory, db::timeout_clock>;
request_controller_type _request_controller;
shared_promise<> _disk_deletions;
std::optional<shared_future<with_clock<db::timeout_clock>>> _segment_allocating;
std::unordered_map<sstring, descriptor> _files_to_delete;
@@ -372,7 +373,7 @@ private:
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
queue<sseg_ptr> _reserve_segments;
std::deque<sstring> _recycled_segments;
queue<sstring> _recycled_segments;
std::unordered_map<flush_handler_id, flush_handler> _flush_handlers;
flush_handler_id _flush_ids = 0;
replay_position _flush_position;
@@ -1060,6 +1061,7 @@ db::commitlog::segment_manager::segment_manager(config c)
// always be admitted for processing.
, _request_controller(max_request_controller_units(), request_controller_timeout_exception_factory{})
, _reserve_segments(1)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
{
assert(max_size > 0);
@@ -1396,8 +1398,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
}
if (!_recycled_segments.empty()) {
auto src = std::move(_recycled_segments.front());
_recycled_segments.pop_front();
auto src = _recycled_segments.pop();
// Note: we have to do the rename here to ensure
// proper descriptor id order. If we renamed in the delete call
// that recycled the file we could potentially have
@@ -1408,6 +1409,14 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
});
}
if (max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future();
return f.then([this] {
return allocate_segment();
});
}
return allocate_segment_ex(std::move(d), std::move(dst), flags|open_flags::create);
}
@@ -1533,14 +1542,21 @@ future<> db::commitlog::segment_manager::clear_reserve_segments() {
_reserve_segments.pop();
}
auto re = std::exchange(_recycled_segments, {});
auto i = re.begin();
auto e = re.end();
std::vector<sstring> tmp;
tmp.reserve(_recycled_segments.size());
_recycled_segments.consume([&](sstring s) {
tmp.emplace_back(std::move(s));
return true;
});
auto i = tmp.begin();
auto e = tmp.end();
return parallel_for_each(i, e, [this](const sstring& filename) {
clogger.debug("Deleting recycled segment file {}", filename);
return delete_file(filename);
}).finally([this, re = std::move(re)] {
}).finally([this, tmp = std::move(tmp)] {
return do_pending_deletes();
});
}
@@ -1609,7 +1625,10 @@ future<> db::commitlog::segment_manager::delete_file(const sstring& filename) {
return seastar::file_size(filename).then([this, filename](uint64_t size) {
clogger.debug("Deleting segment file {}", filename);
return commit_io_check(&seastar::remove_file, filename).then([this, size] {
totals.total_size_on_disk -= size;
clogger.trace("Reclaimed {} MB", size/(1024*1024));
totals.total_size_on_disk -= size;
auto p = std::exchange(_disk_deletions, {});
p.set_value();
});
});
}
@@ -1629,7 +1648,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
return f.finally([&] {
// We allow reuse of the segment if the current disk size is less than shard max.
auto usage = totals.total_size_on_disk;
if (!_shutdown && cfg.reuse_segments && usage <= max_disk_size) {
if (!_shutdown && cfg.reuse_segments) {
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
auto dst = this->filename(d);
@@ -1637,8 +1656,9 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> fi
// must rename the file since we must ensure the
// data is not replayed. Changing the name will
// cause header ID to be invalid in the file -> ignored
return rename_file(filename, dst).then([this, dst] {
_recycled_segments.emplace_back(dst);
return rename_file(filename, dst).then([this, dst]() mutable {
auto b = _recycled_segments.push(std::move(dst));
assert(b); // we set this to max_size_t so...
return make_ready_future<>();
}).handle_exception([this, filename](auto&&) {
return delete_file(filename);