Merge 'New commitlog file format using tagged pages' from Calle Wilund

Prototype implementation of format suggested/requested by @avikivity:

Divides segments into disk-write-alignment sized pages, each tagged with segment ID + CRC of data content.
When read, we both verify sector integrity (CRC) to detect corruption, as well as matching ID read with expected one.

If the latter mismatches we have a prematurely terminated segment (read truncation), which, depending on whether the CL is
written in batch or periodic mode, as well as explicit sync, can mean data loss.

Note: all-zero pages are treated as kosher, both to align with newly allocated segments, as well as fully terminated (zero-page) ones.

Note: This is a preview/RFC - the rest of the file format is not modified. At least parts of entry CRC could probably be removed, but I have not done so yet (needs some thinking).

Note: Some slight abstraction breaks in impl. and probably less than maximal efficiency.

v2:
* Removed entry CRC:s in file format.
* Added docs on format v3
* Added one more test for recycling-truncation

v3:
* Fixed typos in size calc and docs
* Changed sect metadata order
* Explicit iter type

Closes scylladb/scylladb#15494

* github.com:scylladb/scylladb:
  commitlog_test: Add test for replaying large-ish mutation
  commitlog_test: Add additional test for segmnent truncation
  docs: Add docs on commitlog format 3
  commitlog: Remove entry CRC from file format
  commitlog: Implement new format using CRC:ed sectors
  commitlog: Add iterator adaptor for doing buffer splitting into sub-page ranges
  fragmented_temporary_buffer: Add const iterator access to underlying buffers
  commitlog_replayer: differentiate between truncated file and corrupt entries
This commit is contained in:
Avi Kivity
2023-12-04 13:59:52 +02:00
committed by Kamil Braun
8 changed files with 658 additions and 135 deletions

View File

@@ -613,6 +613,12 @@ auto db::commitlog::segment_manager::named_file::remove_file() noexcept {
});
}
template<typename T>
static void write(db::commitlog::output& out, T value) {
auto v = net::hton(value);
out.write(reinterpret_cast<const char*>(&v), sizeof(v));
}
template<typename T>
static void write(fragmented_temporary_buffer::ostream& out, T value) {
auto v = net::hton(value);
@@ -624,6 +630,47 @@ std::enable_if_t<std::is_fundamental<T>::value, T> read(Input& in) {
return net::ntoh(in.template read<T>());
}
detail::sector_split_iterator::sector_split_iterator(const sector_split_iterator&) noexcept = default;
detail::sector_split_iterator::sector_split_iterator()
: _ptr(nullptr)
, _size(0)
, _sector_size(0)
{}
detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size)
: _iter(i)
, _end(e)
, _ptr(i != e ? const_cast<char*>(i->get()) : nullptr)
, _size(i != e ? sector_size - sector_overhead_size : 0)
, _sector_size(sector_size)
{}
detail::sector_split_iterator& detail::sector_split_iterator::operator++() {
assert(_iter != _end);
_ptr += _sector_size;
// check if we have more pages in this temp-buffer (in out case they are always aligned + sized in page units)
auto rem = _iter->size() - std::distance(_iter->get(), const_cast<const char*>(_ptr));
if (rem == 0) {
if (++_iter == _end) {
_ptr = nullptr;
_size = 0;
return *this;
}
rem = _iter->size();
assert(rem >= _sector_size);
// booh. ugly.
_ptr = const_cast<char*>(_iter->get());
}
return *this;
}
detail::sector_split_iterator detail::sector_split_iterator::operator++(int) {
auto res = *this;
++(*this);
return res;
}
/*
* A single commit log file on disk. Manages creation of the file and writing mutations to disk,
* as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
@@ -691,8 +738,11 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
using clock_type = segment_manager::clock_type;
using time_point = segment_manager::time_point;
using base_ostream_type = memory_output_stream<detail::sector_split_iterator>;
using frag_ostream_type = typename base_ostream_type::fragmented;
buffer_type _buffer;
fragmented_temporary_buffer::ostream _buffer_ostream;
base_ostream_type _buffer_ostream;
std::unordered_map<cf_id_type, uint64_t> _cf_dirty;
std::unordered_map<cf_id_type, gc_clock::time_point> _cf_min_time;
time_point _sync_time;
@@ -705,8 +755,18 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
friend std::ostream& operator<<(std::ostream&, const segment&);
friend class segment_manager;
size_t sector_overhead(size_t size) const {
return (size / (_alignment - detail::sector_overhead_size)) * detail::sector_overhead_size;
}
size_t buffer_position() const {
return _buffer.size_bytes() - _buffer_ostream.size();
// need some arithmetics to figure out what out actual position is, including
// page checksums etc. The ostream does not include this, as it is subdivided and
// skips sector_overhead parts of the memory buffer. So to get actual position
// in the buffer, we need to add it back.
auto size = _buffer_ostream.size();
size += sector_overhead(size);
return _buffer.size_bytes() - size;
}
future<> begin_flush() {
@@ -729,11 +789,11 @@ public:
};
friend std::ostream& operator<<(std::ostream&, const cf_mark&);
// The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t);
// The commit log entry overhead in bytes (int: length + int: head checksum)
static constexpr size_t entry_overhead_size = 2 * sizeof(uint32_t);
static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t);
static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t);
static constexpr size_t descriptor_header_size = 5 * sizeof(uint32_t);
static constexpr size_t descriptor_header_size = 6 * sizeof(uint32_t);
static constexpr uint32_t segment_magic = ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C';
static constexpr uint32_t multi_entry_size_magic = 0xffffffff;
@@ -938,11 +998,21 @@ public:
overhead += descriptor_header_size;
}
auto a = align_up(s + overhead, _alignment);
s += overhead;
// add bookkeep data reqs.
s += sector_overhead(s);
auto a = align_up(s, _alignment);
auto k = std::max(a, default_size);
_buffer = _segment_manager->acquire_buffer(k, _alignment);
_buffer_ostream = _buffer.get_ostream();
auto size = _buffer.size_bytes();
auto n_blocks = size / _alignment;
// the amount of data we can actually write into.
auto useable_size = size - n_blocks * detail::sector_overhead_size;
_buffer_ostream = frag_ostream_type(detail::sector_split_iterator(_buffer.begin(), _buffer.end(), _alignment), useable_size);
auto out = _buffer_ostream.write_substream(overhead);
out.fill('\0', overhead);
_segment_manager->totals.buffer_list_bytes += _buffer.size_bytes();
@@ -987,10 +1057,12 @@ public:
write(out, segment_magic);
write(out, _desc.ver);
write(out, _desc.id);
write(out, uint32_t(_alignment));
crc32_nbo crc;
crc.process(_desc.ver);
crc.process<int32_t>(_desc.id & 0xffffffff);
crc.process<int32_t>(_desc.id >> 32);
crc.process<uint32_t>(uint32_t(_alignment));
write(out, crc.checksum());
header_size = descriptor_header_size;
}
@@ -1015,6 +1087,32 @@ public:
write(out, uint64_t(0));
}
buf.remove_suffix(buf.size_bytes() - size);
// Build sector checksums.
auto id = net::hton(_desc.id);
auto ss = _alignment - detail::sector_overhead_size;
for (auto& tbuf : buf) {
auto* p = const_cast<char*>(tbuf.get());
auto* e = p + tbuf.size();
while (p != e) {
assert(align_up(p, _alignment) == p);
// include segment id in crc:ed data
auto be = p + ss;
be = std::copy_n(reinterpret_cast<char*>(&id), sizeof(id), be);
crc32_nbo crc;
crc.process_bytes(p, std::distance(p, be));
auto checksum = crc.checksum();
auto v = net::hton(checksum);
// write checksum.
p = std::copy_n(reinterpret_cast<char*>(&v), sizeof(v), be);
}
}
replay_position rp(_desc.id, position_type(off));
// The write will be allowed to start now, but flush (below) must wait for not only this,
@@ -1182,8 +1280,6 @@ public:
// size : uint32_t
// crc1 : uint32_t - crc of magic, size
// -> entries[]
// post:
// crc2 : uint32_t - crc1 + each entry crc.
if (writer.num_entries > 1) {
mecrc.emplace();
write<uint32_t>(out, multi_entry_size_magic);
@@ -1212,26 +1308,10 @@ public:
// actual data
auto entry_out = out.write_substream(entry_size);
auto entry_data = entry_out.to_input_stream();
writer.write(*this, entry_out, entry);
entry_data.with_stream([&] (auto data_str) {
crc.process_fragmented(ser::buffer_view<typename std::vector<temporary_buffer<char>>::iterator>(data_str));
});
auto checksum = crc.checksum();
write<uint32_t>(out, checksum);
if (mecrc) {
mecrc->process(checksum);
}
writer.result(entry, std::move(h));
}
if (mecrc) {
// write the crc of header + all sub-entry crc
write<uint32_t>(out, mecrc->checksum());
}
++_segment_manager->totals.allocation_count;
++_num_allocs;
@@ -2587,6 +2667,20 @@ const db::commitlog::config& db::commitlog::active_config() const {
return _segment_manager->cfg;
}
db::commitlog::segment_truncation::segment_truncation(uint64_t pos)
: _msg(fmt::format("Segment truncation at {}", pos))
, _pos(pos)
{}
uint64_t db::commitlog::segment_truncation::position() const {
return _pos;
}
const char* db::commitlog::segment_truncation::what() const noexcept {
return _msg.c_str();
}
// No commit_io_check needed in the log reader since the database will fail
// on error at startup if required
future<>
@@ -2611,44 +2705,52 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
size_t start_off = 0;
size_t file_size = 0;
size_t corrupt_size = 0;
size_t alignment = 0;
bool eof = false;
bool header = true;
bool failed = false;
fragmented_temporary_buffer::reader frag_reader;
fragmented_temporary_buffer buffer, initial;
work(file f, descriptor din, commit_load_reader_func fn, position_type o = 0)
: f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), start_off(o) {
}
work(work&&) = default;
bool advance(const fragmented_temporary_buffer& buf) {
pos += buf.size_bytes();
if (buf.size_bytes() == 0) {
eof = true;
}
return !eof;
}
bool end_of_file() const {
return eof;
}
bool end_of_chunk() const {
return eof || next == pos;
}
future<> skip(size_t bytes) {
auto n = std::min(file_size - pos, bytes);
pos += n;
if (pos == file_size) {
future<> skip_to_chunk(size_t seek_to_pos) {
if (seek_to_pos >= file_size) {
eof = true;
pos = file_size;
co_return;
}
if (n < bytes) {
// if we are trying to skip past end, we have at least
// the bytes skipped or the source from where we read
// this corrupt. So add at least four bytes. This is
// inexact, but adding the full "bytes" is equally wrong
// since it could be complete garbled junk.
corrupt_size += std::max(n, sizeof(uint32_t));
auto bytes = seek_to_pos - pos;
auto rem = buffer.size_bytes();
if (bytes < rem) {
buffer.remove_suffix(bytes);
rem -= bytes;
pos += bytes;
co_return;
}
buffer = {};
pos += rem;
bytes = seek_to_pos - pos;
auto skip_bytes = align_down(bytes, alignment);
pos += skip_bytes;
co_await fin.skip(skip_bytes);
if (bytes > skip_bytes) {
// must crc check if we read into a sector
co_await read_data(bytes - skip_bytes);
}
return fin.skip(n);
}
void stop() {
eof = true;
@@ -2659,15 +2761,18 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
}
future<> read_header() {
fragmented_temporary_buffer buf = co_await frag_reader.read_exactly(fin, segment::descriptor_header_size);
if (!advance(buf)) {
// zero length file. accept it just to be nice.
if (buf.empty()) {
eof = true;
co_return;
}
// Will throw if we got eof
auto in = buf.get_istream();
auto magic = read<uint32_t>(in);
auto ver = read<uint32_t>(in);
auto id = read<uint64_t>(in);
auto alignment = read<uint32_t>(in);
auto checksum = read<uint32_t>(in);
if (magic == 0 && ver == 0 && id == 0 && checksum == 0) {
@@ -2685,10 +2790,15 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
if (magic != segment::segment_magic) {
throw invalid_segment_format();
}
if (ver != descriptor::current_version) {
throw std::invalid_argument("Cannot replay old commitlog segments");
}
crc32_nbo crc;
crc.process(ver);
crc.process<int32_t>(id & 0xffffffff);
crc.process<int32_t>(id >> 32);
crc.process<uint32_t>(alignment);
auto cs = crc.checksum();
if (cs != checksum) {
@@ -2697,14 +2807,115 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
this->id = id;
this->next = 0;
this->alignment = alignment;
this->initial = std::move(buf);
this->pos = this->initial.size_bytes();
}
future<> read_chunk() {
fragmented_temporary_buffer buf = co_await frag_reader.read_exactly(fin, segment::segment_overhead_size); auto start = pos;
if (!advance(buf)) {
co_return;
future<fragmented_temporary_buffer> read_data(size_t size) {
auto rem = buffer.size_bytes();
auto buf_vec = std::move(buffer).release();
auto block_boundry = align_up(pos - initial.size_bytes(), alignment);
while (rem < size) {
if (eof) {
throw segment_truncation(block_boundry);
}
auto block_size = alignment - initial.size_bytes();
// using a stream is perhaps not 100% effective, but we need to
// potentially address data in pages smaller than the current
// disk/fs we are reading from can handle (but please no).
auto tmp = co_await frag_reader.read_exactly(fin, block_size);
if (tmp.size_bytes() == 0) {
eof = true;
throw segment_truncation(block_boundry);
}
crc32_nbo crc;
// crc all but the final crc
size_t n = block_size - sizeof(uint32_t);
bool all_zero = true;
if (!initial.empty()) {
for (auto& bv : initial) {
crc.process_bytes(bv.get(), bv.size());
}
initial = {};
all_zero = false;
}
for (auto& bv : tmp) {
auto np = std::min(bv.size(), n);
crc.process_bytes(bv.get(), np);
all_zero &= std::all_of(bv.get(), bv.get() + bv.size(), [](char c) { return c == 0; });
n -= np;
}
block_boundry += alignment;
if (!all_zero) {
auto in = tmp.get_istream();
in.skip(block_size - detail::sector_overhead_size);
auto id = read<uint64_t>(in);
auto check = read<uint32_t>(in);
auto checksum = crc.checksum();
if (check != checksum) {
throw segment_data_corruption_error("Data corruption", alignment);
}
if (id != this->id) {
throw segment_truncation(pos + rem);
}
}
tmp.remove_suffix(detail::sector_overhead_size);
rem += tmp.size_bytes();
pos += detail::sector_overhead_size;
auto vec2 = std::move(tmp).release();
for (auto&& v : vec2) {
buf_vec.emplace_back(std::move(v));
}
}
decltype(buf_vec) next;
auto bytes_to_leave = rem - size;
auto i = next.end();
while (bytes_to_leave > 0) {
auto tmp = std::move(buf_vec.back());
buf_vec.pop_back();
auto s = tmp.size();
if (s <= bytes_to_leave) {
i = next.emplace(i, std::move(tmp));
} else {
auto diff = s - bytes_to_leave;
auto b1 = tmp.share(0, diff);
auto b2 = tmp.share(diff, bytes_to_leave);
buf_vec.emplace_back(std::move(b1));
i = next.emplace(i, std::move(b2));
}
bytes_to_leave -= i->size();
}
// this is the remaining buffer now.
buffer = fragmented_temporary_buffer(std::move(next), rem - size);
// this is the returned result.
auto res = fragmented_temporary_buffer(std::move(buf_vec), size);
pos += size;
co_return res;
}
future<> read_chunk() {
auto start = pos;
auto buf = co_await read_data(segment::segment_overhead_size);
auto in = buf.get_istream();
auto next = read<uint32_t>(in);
auto checksum = read<uint32_t>(in);
@@ -2733,7 +2944,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
this->next = next;
if (start_off >= next) {
co_return co_await skip(next - pos);
co_return co_await skip_to_chunk(next);
}
while (!end_of_chunk()) {
@@ -2741,23 +2952,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
}
}
using produce_func = std::function<future<>(buffer_and_replay_position, uint32_t)>;
future<> produce(buffer_and_replay_position bar) {
try {
co_await func(std::move(bar));
} catch (...) {
fail();
throw;
}
}
future<> read_entry() {
return do_read_entry(std::bind(&work::produce, this, std::placeholders::_1));
}
future<> do_read_entry(produce_func pf) {
static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t);
static constexpr size_t entry_header_size = segment::entry_overhead_size;
/**
* #598 - Must check that data left in chunk is enough to even read an entry.
@@ -2766,18 +2962,13 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
*/
assert(pos <= next);
if ((pos + entry_header_size) >= next) {
co_await skip(next - pos);
co_await skip_to_chunk(next);
co_return;
}
auto buf = co_await frag_reader.read_exactly(fin, entry_header_size);
replay_position rp(id, position_type(pos));
if (!advance(buf)) {
co_return;
}
auto buf = co_await read_data(entry_header_size);
auto in = buf.get_istream();
auto size = read<uint32_t>(in);
auto checksum = read<uint32_t>(in);
@@ -2786,18 +2977,18 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
crc.process(size);
// check for multi-entry
if (d.ver >= descriptor::segment_version_2 && size == segment::multi_entry_size_magic) {
if (size == segment::multi_entry_size_magic) {
auto actual_size = checksum;
auto end = pos + actual_size - entry_header_size - sizeof(uint32_t);
assert(end <= next);
// really small read...
buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t));
buf = co_await read_data(sizeof(uint32_t));
in = buf.get_istream();
checksum = read<uint32_t>(in);
advance(buf);
crc.process(actual_size);
// verify header crc.
if (actual_size < 2 * segment::entry_overhead_size || crc.checksum() != checksum) {
auto slack = next - pos;
@@ -2805,77 +2996,36 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
corrupt_size += slack;
}
co_await skip(slack);
co_await skip_to_chunk(next);
co_return;
}
std::vector<buffer_and_replay_position> tmp;
tmp.reserve(10);
// now read all sub-entries into buffers, and collect crc.
// now read all sub-entries
// and send data to subscriber.
while (pos < end) {
co_await do_read_entry([&](buffer_and_replay_position br, uint32_t checksum) -> future<> {
tmp.emplace_back(std::move(br));
crc.process(checksum);
co_return;
});
}
// and verify crc.
buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t));
in = buf.get_istream();
checksum = read<uint32_t>(in);
advance(buf);
if (checksum != crc.checksum()) {
auto slack = next - pos;
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, actual_size);
corrupt_size += actual_size;
co_await skip(slack);
co_return;
}
// all is ok. send data to subscriber.
for (auto&& br : tmp) {
co_await produce(std::move(br));
co_await read_entry();
if (failed) {
break;
}
}
co_return;
}
if (size < 3 * sizeof(uint32_t) || checksum != crc.checksum()) {
if (size < 2 * sizeof(uint32_t) || checksum != crc.checksum()) {
auto slack = next - pos;
if (size != 0) {
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
corrupt_size += slack;
}
// size == 0 -> special scylla case: zero padding due to dma blocks
co_await skip(slack);
co_await skip_to_chunk(next);
co_return;
}
buf = co_await frag_reader.read_exactly(fin, size - entry_header_size);
buf = co_await read_data(size - entry_header_size);
advance(buf);
in = buf.get_istream();
auto data_size = size - segment::entry_overhead_size;
in.skip(data_size);
checksum = read<uint32_t>(in);
buf.remove_suffix(buf.size_bytes() - data_size);
crc.process_fragmented(fragmented_temporary_buffer::view(buf));
if (crc.checksum() != checksum) {
// If we're getting a checksum error here, most likely the rest of
// the file will be corrupt as well. But it does not hurt to retry.
// Just go to the next entry (since "size" in header seemed ok).
clogger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size);
corrupt_size += size;
co_return;
}
co_await pf({std::move(buf), rp}, checksum);
co_await func({std::move(buf), rp});
}
future<> read_file() {

View File

@@ -134,10 +134,12 @@ public:
static inline constexpr uint32_t segment_version_1 = 1u;
static inline constexpr uint32_t segment_version_2 = 2u;
static inline constexpr uint32_t segment_version_3 = 4u;
static inline constexpr uint32_t current_version = segment_version_3;
descriptor(descriptor&&) noexcept = default;
descriptor(const descriptor&) = default;
descriptor(segment_id_type i, const std::string& fname_prefix, uint32_t v = segment_version_2, sstring = {});
descriptor(segment_id_type i, const std::string& fname_prefix, uint32_t v = current_version, sstring = {});
descriptor(replay_position p, const std::string& fname_prefix = FILENAME_PREFIX);
descriptor(const std::string& filename, const std::string& fname_prefix = FILENAME_PREFIX);
@@ -171,7 +173,7 @@ public:
* of data to be written. (See add).
* Don't write less, absolutely don't write more...
*/
using output = fragmented_temporary_buffer::ostream;
using output = typename seastar::memory_output_stream<detail::sector_split_iterator>;
using serializer_func = std::function<void(output&)>;
/**
@@ -373,7 +375,7 @@ public:
uint64_t bytes() const {
return _bytes;
}
virtual const char* what() const noexcept {
const char* what() const noexcept override {
return _msg.c_str();
}
private:
@@ -383,7 +385,7 @@ public:
class invalid_segment_format : public segment_error {
static constexpr const char* _msg = "Not a scylla format commitlog file";
public:
virtual const char* what() const noexcept {
const char* what() const noexcept override {
return _msg;
}
};
@@ -391,11 +393,21 @@ public:
class header_checksum_error : public segment_error {
static constexpr const char* _msg = "Checksum error in file header";
public:
virtual const char* what() const noexcept {
const char* what() const noexcept override {
return _msg;
}
};
class segment_truncation : public segment_error {
std::string _msg;
uint64_t _pos;
public:
segment_truncation(uint64_t);
uint64_t position() const;
const char* what() const noexcept override;
};
static future<> read_log_file(sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr);
private:
commitlog(config);

View File

@@ -30,7 +30,7 @@ void commitlog_entry_writer::compute_size() {
_size = ms.size();
}
void commitlog_entry_writer::write(typename seastar::memory_output_stream<std::vector<temporary_buffer<char>>::iterator>& out) const {
void commitlog_entry_writer::write(ostream& out) const {
serialize(out);
}

View File

@@ -13,6 +13,62 @@
#include "commitlog_types.hh"
#include "mutation/frozen_mutation.hh"
#include "schema/schema_fwd.hh"
#include "replay_position.hh"
namespace detail {
using buffer_type = fragmented_temporary_buffer;
using base_iterator = typename std::vector<temporary_buffer<char>>::const_iterator;
static constexpr auto sector_overhead_size = sizeof(uint32_t) + sizeof(db::segment_id_type);
// iterator adaptor to enable splitting normal
// frag-buffer temporary buffer objects into
// sub-disk-page sized chunks.
class sector_split_iterator {
base_iterator _iter, _end;
char* _ptr;
size_t _size;
size_t _sector_size;
public:
sector_split_iterator(const sector_split_iterator&) noexcept;
sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size);
sector_split_iterator();
char* get_write() const {
return _ptr;
}
size_t size() const {
return _size;
}
char* begin() {
return _ptr;
}
char* end() {
return _ptr + _size;
}
const char* begin() const {
return _ptr;
}
const char* end() const {
return _ptr + _size;
}
bool operator==(const sector_split_iterator& rhs) const {
return _iter == rhs._iter && _ptr == rhs._ptr;
}
auto& operator*() const {
return *this;
}
auto* operator->() const {
return this;
}
sector_split_iterator& operator++();
sector_split_iterator operator++(int);
};
}
class commitlog_entry {
std::optional<column_mapping> _mapping;
@@ -65,7 +121,10 @@ public:
force_sync sync() const {
return _sync;
}
void write(typename seastar::memory_output_stream<std::vector<temporary_buffer<char>>::iterator>& out) const;
using ostream = typename seastar::memory_output_stream<detail::sector_split_iterator>;
void write(ostream& out) const;
};
class commitlog_entry_reader {

View File

@@ -56,6 +56,7 @@ public:
uint64_t skipped_mutations = 0;
uint64_t applied_mutations = 0;
uint64_t corrupt_bytes = 0;
uint64_t truncated_at = 0;
stats& operator+=(const stats& s) {
invalid_mutations += s.invalid_mutations;
@@ -180,6 +181,8 @@ db::commitlog_replayer::impl::recover(sstring file, const sstring& fname_prefix)
f.get();
} catch (commitlog::segment_data_corruption_error& e) {
s->corrupt_bytes += e.bytes();
} catch (commitlog::segment_truncation& e) {
s->truncated_at = e.position();
} catch (...) {
throw;
}
@@ -330,6 +333,9 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files, sstring fna
if (stats.corrupt_bytes != 0) {
rlogger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes);
}
if (stats.truncated_at != 0) {
rlogger.warn("Truncated file: {} at position {}.", f, stats.truncated_at);
}
rlogger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
, f
, stats.applied_mutations

View File

@@ -62,4 +62,55 @@ Version 2
size : size of all entries in this multi-entry + headers
crc : CRC32 of magic, size
<entries> * N
crc2 : CRC32 of magic, size and data in each entry
```
Version 3
---------
Modified from v2 to improve error detection/false positive elimination.
This version does CRC per written disk block instead of actual entry.
Every block is also tagged with which file is being written, this to
be able to better distinguish new data from that left over from recycling
(reusing old files) and actual disk/file corruption.
```
Segment file header
magic : uint32_t - ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C';
version : uint32_t - same as descriptor
id : uint64_t - same as descriptor
alignment : uint32_t - disk block size
crc : uint32_t - CRC32 of version, low 32 of id, high 32 of id, alignment.
Chunk header
file_pos : uint32_t - the file position of next chunk
crc : uint32_t - CRC32 of low 32 of segment id, high 32 of id and file offset of end of this header.
Entry
size : uint32_t - size of entry (data + full headers). Must be smaller than MAX_UINT32.
crc : uint32_t - CRC32 of size
data : bytes - actual entry data
Multi-entry
magic : multi marker - 0xffffffff (MAX_UINT32)
size : size of all entries in this multi-entry + headers
crc : CRC32 of magic, size
<entries> * N
Disk block (block size = `alignment`)
0 - <bs - 12> : Interleaved file data, i.e. the content above
<bs - 12> : uint64_t - same as descriptor.
<bs - 4> : uint32_t - CRC32 of block data up until crc (bs - 4), including segment id
The main benefit of the tagged and CRC:ed block is that if the CRC is broken, we know the part
of this file is corrupt. If the CRC is correct, but segment ID does not match, we can assume
the file is not fully written/prematurely ended. Both cases can mean data loss, depending on
how writing is done as well as OS and hardware.
```

View File

@@ -492,7 +492,8 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){
auto segments = log.get_active_segment_names();
BOOST_REQUIRE(!segments.empty());
auto seg = segments[0];
return corrupt_segment(seg, rps->at(0).pos - 4, 0x451234ab).then([seg, rps] {
auto cpos = rps->at(0).pos - 4;
return corrupt_segment(seg, cpos, 0x451234ab).then([seg, rps] {
return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [rps](db::commitlog::buffer_and_replay_position buf_rp) {
BOOST_FAIL("Should not reach");
return make_ready_future<>();
@@ -510,6 +511,241 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){
});
}
SEASTAR_TEST_CASE(test_commitlog_chunk_corruption2){
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 1;
return cl_test(cfg, [](commitlog& log) {
auto rps = make_lw_shared<std::vector<db::replay_position>>();
// write enough entries to fill more than one chunk
return do_until([rps]() {return rps->size() > (128*1024/8);},
[&log, rps]() {
auto uuid = make_table_id();
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([rps](rp_handle h) {
BOOST_CHECK_NE(h.rp(), db::replay_position());
rps->push_back(h.release());
});
}).then([&log]() {
return log.sync_all_segments();
}).then([&log, rps] {
auto segments = log.get_active_segment_names();
BOOST_REQUIRE(!segments.empty());
auto seg = segments[0];
auto desc = commitlog::descriptor(seg);
auto e = std::find_if(rps->begin(), rps->end(), [desc](auto& rp) {
return rp.id != desc.id;
});
auto idx = std::distance(rps->begin(), e);
auto cpos = rps->at(idx/2).pos;
return corrupt_segment(seg, cpos, 0x451234ab).then([seg, rps, cpos] {
return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [rps, cpos](db::commitlog::buffer_and_replay_position buf_rp) {
BOOST_CHECK_NE(buf_rp.position.pos, cpos);
return make_ready_future<>();
}).then_wrapped([](auto&& f) {
try {
f.get();
BOOST_FAIL("Expected exception");
} catch (commitlog::segment_data_corruption_error& e) {
// ok.
BOOST_REQUIRE(e.bytes() > 0);
}
});
});
});
});
}
SEASTAR_TEST_CASE(test_commitlog_chunk_corruption3){
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 1;
cfg.commitlog_total_space_in_mb = 2 * smp::count;
cfg.allow_going_over_size_limit = false;
return cl_test(cfg, [](commitlog& log) -> future<> {
auto uuid = make_table_id();
sstring tmp = "hej bubba cow";
std::optional<db::segment_id_type> last;
db::replay_position last_rp;
int n = 0;
for (;;) {
auto h = co_await log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
});
BOOST_CHECK_NE(h.rp(), db::replay_position());
// release data immediately
auto rp = h.rp();
auto id = rp.id;
if (last && last != id) {
// this should mean we are in a recycled segment.
if (++n > 3) {
last_rp = h.release(); // prevent removal for now.
break;
}
}
last = id;
}
co_await log.sync_all_segments();
auto segments = log.get_active_segment_names();
BOOST_REQUIRE(!segments.empty());
for (auto& seg : segments) {
auto desc = commitlog::descriptor(seg);
if (desc.id == last_rp.id) {
try {
co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) {
BOOST_CHECK_LE(buf_rp.position, last_rp);
return make_ready_future<>();
});
BOOST_FAIL("Expected exception");
} catch (commitlog::segment_truncation& e) {
// ok.
}
co_return;
}
}
BOOST_FAIL("Did not find segment");
});
}
SEASTAR_TEST_CASE(test_commitlog_replay_single_large_mutation){
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 4;
cfg.commitlog_total_space_in_mb = 2 * cfg.commitlog_segment_size_in_mb * smp::count;
cfg.allow_going_over_size_limit = false;
return cl_test(cfg, [](commitlog& log) -> future<> {
auto uuid = make_table_id();
auto size = log.max_record_size();
auto buf = fragmented_temporary_buffer::allocate_to_fit(size);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<char> dist;
auto out = buf.get_ostream();
for (size_t i = 0; i < size; ++i) {
auto c = dist(gen);
out.write(&c, 1);
}
auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
for (auto& tmp : buf) {
dst.write(tmp.get(), tmp.size());
}
});
auto rp = h.release();
co_await log.sync_all_segments();
auto segments = log.get_active_segment_names();
BOOST_REQUIRE(!segments.empty());
for (auto& seg : segments) {
auto desc = commitlog::descriptor(seg);
if (desc.id == rp.id) {
co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) {
BOOST_CHECK_EQUAL(buf_rp.position, rp);
auto& rp_buf = buf_rp.buffer;
auto in1 = buf.get_istream();
auto in2 = rp_buf.get_istream();
for (size_t i = 0; i < size; ++i) {
auto c1 = in1.read<char>();
auto c2 = in2.read<char>();
BOOST_CHECK_EQUAL(c1, c2);
}
return make_ready_future<>();
});
co_return;
}
}
BOOST_FAIL("Did not find segment");
});
}
/**
* Checks same thing as above, but will also ensure the seek mechanism in
* replayer is working, since we will span multiple chunks.
*/
SEASTAR_TEST_CASE(test_commitlog_replay_large_mutations){
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 14;
cfg.commitlog_total_space_in_mb = 2 * cfg.commitlog_segment_size_in_mb * smp::count;
cfg.allow_going_over_size_limit = false;
return cl_test(cfg, [](commitlog& log) -> future<> {
auto uuid = make_table_id();
auto size = log.max_record_size() / 2;
std::unordered_map<replay_position, fragmented_temporary_buffer> buffers;
for (size_t i = 0; i < 4; ++i) {
auto buf = fragmented_temporary_buffer::allocate_to_fit(size);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<char> dist;
auto out = buf.get_ostream();
for (size_t i = 0; i < size; ++i) {
auto c = dist(gen);
out.write(&c, 1);
}
auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
for (auto& tmp : buf) {
dst.write(tmp.get(), tmp.size());
}
});
auto rp = h.release();
buffers.emplace(rp, std::move(buf));
}
co_await log.sync_all_segments();
auto segments = log.get_active_segment_names();
BOOST_REQUIRE(!segments.empty());
size_t n = 0;
for (auto& seg : segments) {
auto desc = commitlog::descriptor(seg);
if (std::any_of(buffers.begin(), buffers.end(), [&](auto& p) { return p.first.id == desc.id; })) {
co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) {
auto& buf = buffers.at(buf_rp.position);
auto& rp_buf = buf_rp.buffer;
auto in1 = buf.get_istream();
auto in2 = rp_buf.get_istream();
for (size_t i = 0; i < size; ++i) {
auto c1 = in1.read<char>();
auto c2 = in2.read<char>();
BOOST_CHECK_EQUAL(c1, c2);
}
++n;
return make_ready_future<>();
});
}
}
BOOST_CHECK_EQUAL(n, buffers.size());
});
}
SEASTAR_TEST_CASE(test_commitlog_reader_produce_exception){
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 1;

View File

@@ -60,6 +60,15 @@ public:
return ostream::simple(reinterpret_cast<char*>(current.get_write()), current.size());
}
using const_fragment_iterator = typename vector_type::const_iterator;
const_fragment_iterator begin() const {
return _fragments.begin();
}
const_fragment_iterator end() const {
return _fragments.end();
}
size_t size_bytes() const { return _size_bytes; }
bool empty() const { return !_size_bytes; }