commitlog: Remove entry CRC from file format
Since CRC is already handled by disk blocks, we can remove some of the entry CRC:ing, both simplifying code and making at least that part of both write and read faster.
This commit is contained in:
@@ -789,8 +789,8 @@ public:
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream&, const cf_mark&);
|
||||
|
||||
// The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
|
||||
static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t);
|
||||
// The commit log entry overhead in bytes (int: length + int: head checksum)
|
||||
static constexpr size_t entry_overhead_size = 2 * sizeof(uint32_t);
|
||||
static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t);
|
||||
static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t);
|
||||
static constexpr size_t descriptor_header_size = 6 * sizeof(uint32_t);
|
||||
@@ -1280,8 +1280,6 @@ public:
|
||||
// size : uint32_t
|
||||
// crc1 : uint32_t - crc of magic, size
|
||||
// -> entries[]
|
||||
// post:
|
||||
// crc2 : uint32_t - crc1 + each entry crc.
|
||||
if (writer.num_entries > 1) {
|
||||
mecrc.emplace();
|
||||
write<uint32_t>(out, multi_entry_size_magic);
|
||||
@@ -1310,26 +1308,10 @@ public:
|
||||
|
||||
// actual data
|
||||
auto entry_out = out.write_substream(entry_size);
|
||||
auto entry_data = entry_out.to_input_stream();
|
||||
writer.write(*this, entry_out, entry);
|
||||
entry_data.with_stream([&] (auto data_str) {
|
||||
crc.process_fragmented(ser::buffer_view<detail::sector_split_iterator>(data_str));
|
||||
});
|
||||
|
||||
auto checksum = crc.checksum();
|
||||
write<uint32_t>(out, checksum);
|
||||
if (mecrc) {
|
||||
mecrc->process(checksum);
|
||||
}
|
||||
|
||||
writer.result(entry, std::move(h));
|
||||
}
|
||||
|
||||
if (mecrc) {
|
||||
// write the crc of header + all sub-entry crc
|
||||
write<uint32_t>(out, mecrc->checksum());
|
||||
}
|
||||
|
||||
++_segment_manager->totals.allocation_count;
|
||||
++_num_allocs;
|
||||
|
||||
@@ -2963,23 +2945,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
}
|
||||
}
|
||||
|
||||
using produce_func = std::function<future<>(buffer_and_replay_position, uint32_t)>;
|
||||
|
||||
future<> produce(buffer_and_replay_position bar) {
|
||||
try {
|
||||
co_await func(std::move(bar));
|
||||
} catch (...) {
|
||||
fail();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
future<> read_entry() {
|
||||
return do_read_entry(std::bind(&work::produce, this, std::placeholders::_1));
|
||||
}
|
||||
|
||||
future<> do_read_entry(produce_func pf) {
|
||||
static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t);
|
||||
static constexpr size_t entry_header_size = segment::entry_overhead_size;
|
||||
|
||||
/**
|
||||
* #598 - Must check that data left in chunk is enough to even read an entry.
|
||||
@@ -3003,7 +2970,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
crc.process(size);
|
||||
|
||||
// check for multi-entry
|
||||
if (d.ver >= descriptor::segment_version_2 && size == segment::multi_entry_size_magic) {
|
||||
if (size == segment::multi_entry_size_magic) {
|
||||
auto actual_size = checksum;
|
||||
auto end = pos + actual_size - entry_header_size - sizeof(uint32_t);
|
||||
|
||||
@@ -3014,6 +2981,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
checksum = read<uint32_t>(in);
|
||||
|
||||
crc.process(actual_size);
|
||||
|
||||
// verify header crc.
|
||||
if (actual_size < 2 * segment::entry_overhead_size || crc.checksum() != checksum) {
|
||||
auto slack = next - pos;
|
||||
@@ -3025,38 +2993,19 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
co_return;
|
||||
}
|
||||
|
||||
std::vector<buffer_and_replay_position> tmp;
|
||||
tmp.reserve(10);
|
||||
// now read all sub-entries into buffers, and collect crc.
|
||||
// now read all sub-entries
|
||||
// and send data to subscriber.
|
||||
while (pos < end) {
|
||||
co_await do_read_entry([&](buffer_and_replay_position br, uint32_t checksum) -> future<> {
|
||||
tmp.emplace_back(std::move(br));
|
||||
crc.process(checksum);
|
||||
co_return;
|
||||
});
|
||||
}
|
||||
// and verify crc.
|
||||
buf = co_await read_data(sizeof(uint32_t));
|
||||
in = buf.get_istream();
|
||||
checksum = read<uint32_t>(in);
|
||||
|
||||
if (checksum != crc.checksum()) {
|
||||
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, actual_size);
|
||||
corrupt_size += actual_size;
|
||||
co_await skip_to_chunk(next);
|
||||
co_return;
|
||||
}
|
||||
// all is ok. send data to subscriber.
|
||||
for (auto&& br : tmp) {
|
||||
co_await produce(std::move(br));
|
||||
co_await read_entry();
|
||||
if (failed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (size < 3 * sizeof(uint32_t) || checksum != crc.checksum()) {
|
||||
if (size < 2 * sizeof(uint32_t) || checksum != crc.checksum()) {
|
||||
auto slack = next - pos;
|
||||
if (size != 0) {
|
||||
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
|
||||
@@ -3069,24 +3018,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
|
||||
|
||||
buf = co_await read_data(size - entry_header_size);
|
||||
|
||||
in = buf.get_istream();
|
||||
auto data_size = size - segment::entry_overhead_size;
|
||||
in.skip(data_size);
|
||||
checksum = read<uint32_t>(in);
|
||||
|
||||
buf.remove_suffix(buf.size_bytes() - data_size);
|
||||
crc.process_fragmented(fragmented_temporary_buffer::view(buf));
|
||||
|
||||
if (crc.checksum() != checksum) {
|
||||
// If we're getting a checksum error here, most likely the rest of
|
||||
// the file will be corrupt as well. But it does not hurt to retry.
|
||||
// Just go to the next entry (since "size" in header seemed ok).
|
||||
clogger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size);
|
||||
corrupt_size += size;
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await pf({std::move(buf), rp}, checksum);
|
||||
co_await func({std::move(buf), rp});
|
||||
}
|
||||
|
||||
future<> read_file() {
|
||||
|
||||
Reference in New Issue
Block a user