commitlog: coroutinize segment::do_flush

This commit is contained in:
Calle Wilund
2021-06-28 12:53:21 +00:00
parent 1a76d735f2
commit cef7ee2014

View File

@@ -719,29 +719,30 @@ public:
}
future<sseg_ptr> do_flush(uint64_t pos) {
auto me = shared_from_this();
return begin_flush().then([this, pos]() {
if (pos <= _flush_pos) {
clogger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
return make_ready_future<>();
}
return _file.flush().then_wrapped([this, pos](future<> f) {
try {
f.get();
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
// we fast-fail the whole commit.
_flush_pos = std::max(pos, _flush_pos);
++_segment_manager->totals.flush_count;
clogger.trace("{} synced to {}", *this, _flush_pos);
} catch (...) {
clogger.error("Failed to flush commits to disk: {}", std::current_exception());
throw;
}
});
}).finally([this] {
co_await begin_flush();
auto finally = defer([&] {
end_flush();
}).then([me] {
return make_ready_future<sseg_ptr>(me);
});
if (pos <= _flush_pos) {
clogger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
co_return me;
}
try {
co_await _file.flush();
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
// we fast-fail the whole commit.
_flush_pos = std::max(pos, _flush_pos);
++_segment_manager->totals.flush_count;
clogger.trace("{} synced to {}", *this, _flush_pos);
} catch (...) {
clogger.error("Failed to flush commits to disk: {}", std::current_exception());
throw;
}
co_return me;
}
/**