diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 0a798a5de7..c5f8dae381 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -1513,28 +1513,45 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) { high = replay_position(high.id + 1, 0); } - auto n = size_to_remove; + // Now get a set of used CF ids: + std::unordered_set ids; - if (size_to_remove != 0) { - for (auto& s : _segments) { - if (n <= s->size_on_disk()) { - high = replay_position(s->_desc.id, db::position_type(s->size_on_disk())); + uint64_t n = size_to_remove; + uint64_t flushing = 0; + + for (auto& s : _segments) { + // if a segment is allocating, it should be included in flush request, + // because we cannot free anything there anyway. If a segment is allocating, + // it is the last one, so just break. + if (s->is_still_allocating()) { + break; + } + + auto rp = replay_position(s->_desc.id, db::position_type(s->size_on_disk())); + if (rp <= _flush_position) { + // already requested. + continue; + } + + auto size = s->size_on_disk(); + auto waste = s->_waste; + + flushing += size - waste; + + for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) { + ids.insert(id); + } + + if (size_to_remove != 0) { + if (n <= size) { + high = rp; break; } n -= s->size_on_disk(); } } - // Now get a set of used CF ids: - std::unordered_set ids; - auto e = std::find_if(_segments.begin(), _segments.end(), std::mem_fn(&segment::is_still_allocating)); - std::for_each(_segments.begin(), e, [&ids](sseg_ptr& s) { - for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) { - ids.insert(id); - } - }); - - clogger.debug("Flushing ({} MB) to {}", size_to_remove/(1024*1024), high); + clogger.debug("Flushing ({} MB) to {}", flushing/(1024*1024), high); // For each CF id: for each callback c: call c(id, high) for (auto& f : callbacks) { @@ -1546,6 +1563,8 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) { } } } + + _flush_position = high; } future db::commitlog::segment_manager::allocate_segment_ex(descriptor d, named_file f, open_flags flags) {