commitlog: Keep track of last flush high position to avoid double request

Apparent mismerge or something. We already have an unused "_flush_position",
intended to keep track of the last requested high rp.
Now actually update and use it. The latter to avoid sending requests for
segments/cf id:s we've already requested external flush of. Also enables
us to ensure we don't do double bookkeep here.
This commit is contained in:
Calle Wilund
2022-05-24 12:52:57 +00:00
parent c904b3cf35
commit 336383c87e

View File

@@ -1513,28 +1513,45 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) {
high = replay_position(high.id + 1, 0);
}
auto n = size_to_remove;
// Now get a set of used CF ids:
std::unordered_set<cf_id_type> ids;
if (size_to_remove != 0) {
for (auto& s : _segments) {
if (n <= s->size_on_disk()) {
high = replay_position(s->_desc.id, db::position_type(s->size_on_disk()));
uint64_t n = size_to_remove;
uint64_t flushing = 0;
for (auto& s : _segments) {
// if a segment is allocating, it should be included in flush request,
// because we cannot free anything there anyway. If a segment is allocating,
// it is the last one, so just break.
if (s->is_still_allocating()) {
break;
}
auto rp = replay_position(s->_desc.id, db::position_type(s->size_on_disk()));
if (rp <= _flush_position) {
// already requested.
continue;
}
auto size = s->size_on_disk();
auto waste = s->_waste;
flushing += size - waste;
for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) {
ids.insert(id);
}
if (size_to_remove != 0) {
if (n <= size) {
high = rp;
break;
}
n -= s->size_on_disk();
}
}
// Now get a set of used CF ids:
std::unordered_set<cf_id_type> ids;
auto e = std::find_if(_segments.begin(), _segments.end(), std::mem_fn(&segment::is_still_allocating));
std::for_each(_segments.begin(), e, [&ids](sseg_ptr& s) {
for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) {
ids.insert(id);
}
});
clogger.debug("Flushing ({} MB) to {}", size_to_remove/(1024*1024), high);
clogger.debug("Flushing ({} MB) to {}", flushing/(1024*1024), high);
// For each CF id: for each callback c: call c(id, high)
for (auto& f : callbacks) {
@@ -1546,6 +1563,8 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) {
}
}
}
_flush_position = high;
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(descriptor d, named_file f, open_flags flags) {