commitlog: fix replay order by using ordered map per shard

The commitlog replayer groups segments by shard using a
std::unordered_multimap, then iterates per-shard segments via
equal_range(). However, equal_range() does not guarantee iteration
order for elements with the same key, so segments could be replayed
out of order within a shard. This can increase memory and disk
consumption during fragmented entry reconstruction, which accumulates
fragments across segments and benefits from ascending ID order.

This is also required by the strongly consistent tables feature,
particularly commitlog-based storage that relies on replayed raft
items being stored in order.

Fix by changing the data structure from
  std::unordered_multimap<unsigned, commitlog::descriptor>
to
  std::unordered_map<unsigned, utils::chunked_vector<commitlog::descriptor>>

Since the descriptors are inserted from a std::set ordered by ID, the
vector preserves insertion (and thus ID) order. The per-shard iteration
now simply iterates the vector, guaranteeing correct replay order.

Fixes SCYLLADB-1411.
This commit is contained in:
bitpathfinder
2026-04-13 16:18:56 +02:00
parent bc10e1a171
commit c06adffd6a

View File

@@ -13,6 +13,7 @@
#include <algorithm>
#include <unordered_map>
#include <ranges>
#include "utils/chunked_vector.hh"
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
@@ -331,7 +332,7 @@ future<db::commitlog_replayer> db::commitlog_replayer::create_replayer(seastar::
}
future<> db::commitlog_replayer::recover(std::vector<sstring> files, sstring fname_prefix) {
using shard_file_map = std::unordered_multimap<unsigned, commitlog::descriptor>;
using shard_file_map = std::unordered_map<unsigned, utils::chunked_vector<commitlog::descriptor>>;
rlogger.info("Replaying {}", fmt::join(files, ", "));
@@ -349,7 +350,7 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files, sstring fna
for (auto& d : descs) {
replay_position p = d;
map.emplace(p.shard_id() % smp::count, std::move(d));
map[p.shard_id() % smp::count].push_back(std::move(d));
}
}
@@ -363,8 +364,11 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files, sstring fna
// TODO: or something. For now, we do this serialized per shard,
// to reduce mutation congestion. We could probably (says avi)
// do 2 segments in parallel or something, but lets use this first.
auto range = map.equal_range(id);
for (auto& [id, d] : std::ranges::subrange(range.first, range.second)) {
auto it = map.find(id);
if (it == map.end()) {
co_return total;
}
for (auto& d : it->second) {
auto f = d.filename();
rlogger.debug("Replaying {}", f);
auto stats = co_await _impl->recover(d, states[replay_position(d).shard_id()]);