From 862f4f2ed30e7d86b017f3ce8c3ca2a0f52ebb54 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 19 Sep 2023 08:39:18 +0000 Subject: [PATCH 1/8] commitlog_replayer: differentiate between truncated file and corrupt entries Refs #11845 When replaying, differentiate between the two cases for failure we have: - A broken actual entry - i.e. entry header/data does not hold up to crc scrutiny - Truncated file - i.e. a chunk header is broken or unreadable. This can be due to either "corruption" (i.e. borked write, post-corruption, hw whatever), or simply an unterminated segment. The difference is that the former is recoverable, the latter is not. We now signal and report the two separately. The end result for a user is not much different, in either case they imply data loss and the need for repair. But there is some value in differentiating which of the two we encountered. Modifies and adds test cases. --- db/commitlog/commitlog.cc | 21 ++++++++++++ db/commitlog/commitlog.hh | 16 ++++++++-- db/commitlog/commitlog_replayer.cc | 6 ++++ test/boost/commitlog_test.cc | 51 +++++++++++++++++++++++++++++- 4 files changed, 90 insertions(+), 4 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index c9f0edfbc6..f0c00d8ce7 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -2580,6 +2580,20 @@ const db::commitlog::config& db::commitlog::active_config() const { return _segment_manager->cfg; } + +db::commitlog::segment_truncation::segment_truncation(uint64_t pos) + : _msg(fmt::format("Segment truncation at {}", pos)) + , _pos(pos) +{} + +uint64_t db::commitlog::segment_truncation::position() const { + return _pos; +} + +const char* db::commitlog::segment_truncation::what() const noexcept { + return _msg.c_str(); +} + // No commit_io_check needed in the log reader since the database will fail // on error at startup if required future<> @@ -2718,6 +2732,13 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f // if a chunk header checksum is broken, we shall just assume that all // remaining is as well. We cannot trust the "next" pointer, so... clogger.debug("Checksum error in segment chunk at {}.", start); + + if (corrupt_size == 0) { + // if we got here and had no broken data previously, we can + // just call it truncation + throw segment_truncation(pos - segment::segment_overhead_size); + } + corrupt_size += (file_size - pos); stop(); co_return; diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index e62b0e06f6..55a53ad68c 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -373,7 +373,7 @@ public: uint64_t bytes() const { return _bytes; } - virtual const char* what() const noexcept { + const char* what() const noexcept override { return _msg.c_str(); } private: @@ -383,7 +383,7 @@ public: class invalid_segment_format : public segment_error { static constexpr const char* _msg = "Not a scylla format commitlog file"; public: - virtual const char* what() const noexcept { + const char* what() const noexcept override { return _msg; } }; @@ -391,11 +391,21 @@ public: class header_checksum_error : public segment_error { static constexpr const char* _msg = "Checksum error in file header"; public: - virtual const char* what() const noexcept { + const char* what() const noexcept override { return _msg; } }; + class segment_truncation : public segment_error { + std::string _msg; + uint64_t _pos; + public: + segment_truncation(uint64_t); + + uint64_t position() const; + const char* what() const noexcept override; + }; + static future<> read_log_file(sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr); private: commitlog(config); diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index bf8c84d1bc..be94e54301 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -56,6 +56,7 @@ public: uint64_t skipped_mutations = 0; uint64_t applied_mutations = 0; uint64_t corrupt_bytes = 0; + uint64_t truncated_at = 0; stats& operator+=(const stats& s) { invalid_mutations += s.invalid_mutations; @@ -180,6 +181,8 @@ db::commitlog_replayer::impl::recover(sstring file, const sstring& fname_prefix) f.get(); } catch (commitlog::segment_data_corruption_error& e) { s->corrupt_bytes += e.bytes(); + } catch (commitlog::segment_truncation& e) { + s->truncated_at = e.position(); } catch (...) { throw; } @@ -330,6 +333,9 @@ future<> db::commitlog_replayer::recover(std::vector files, sstring fna if (stats.corrupt_bytes != 0) { rlogger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes); } + if (stats.truncated_at != 0) { + rlogger.warn("Truncated file: {} at position {}.", f, stats.truncated_at); + } rlogger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)" , f , stats.applied_mutations diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 542fddc293..f9b4152f08 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -492,10 +492,59 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){ auto segments = log.get_active_segment_names(); BOOST_REQUIRE(!segments.empty()); auto seg = segments[0]; - return corrupt_segment(seg, rps->at(0).pos - 4, 0x451234ab).then([seg, rps] { + auto cpos = rps->at(0).pos - 4; + return corrupt_segment(seg, cpos, 0x451234ab).then([seg, rps, cpos] { return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [rps](db::commitlog::buffer_and_replay_position buf_rp) { BOOST_FAIL("Should not reach"); return make_ready_future<>(); + }).then_wrapped([cpos](auto&& f) { + try { + f.get(); + BOOST_FAIL("Expected exception"); + } catch (commitlog::segment_truncation& e) { + // ok. We've destroyed the first chunk of the segment. This counts as a truncation + BOOST_REQUIRE(e.position() <= cpos); + } + }); + }); + }); + }); +} + + +SEASTAR_TEST_CASE(test_commitlog_chunk_corruption2){ + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 1; + return cl_test(cfg, [](commitlog& log) { + auto rps = make_lw_shared>(); + // write enough entries to fill more than one chunk + return do_until([rps]() {return rps->size() > (128*1024/8);}, + [&log, rps]() { + auto uuid = make_table_id(); + sstring tmp = "hej bubba cow"; + return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) { + dst.write(tmp.data(), tmp.size()); + }).then([rps](rp_handle h) { + BOOST_CHECK_NE(h.rp(), db::replay_position()); + rps->push_back(h.release()); + }); + }).then([&log]() { + return log.sync_all_segments(); + }).then([&log, rps] { + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + auto seg = segments[0]; + auto desc = commitlog::descriptor(seg); + auto e = std::find_if(rps->begin(), rps->end(), [desc](auto& rp) { + return rp.id != desc.id; + }); + auto idx = std::distance(rps->begin(), e); + auto cpos = rps->at(idx/2).pos; + + return corrupt_segment(seg, cpos, 0x451234ab).then([seg, rps, cpos] { + return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [rps, cpos](db::commitlog::buffer_and_replay_position buf_rp) { + BOOST_CHECK_NE(buf_rp.position.pos, cpos); + return make_ready_future<>(); }).then_wrapped([](auto&& f) { try { f.get(); From 560364d2789aa590c814145771b97c348715677a Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 20 Sep 2023 13:51:07 +0000 Subject: [PATCH 2/8] fragmented_temporary_buffer: Add const iterator access to underlying buffers Breaks abstraction a bit, but some (me) might need something like it... --- utils/fragmented_temporary_buffer.hh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/utils/fragmented_temporary_buffer.hh b/utils/fragmented_temporary_buffer.hh index 98eabb1ea4..70c8bdf951 100644 --- a/utils/fragmented_temporary_buffer.hh +++ b/utils/fragmented_temporary_buffer.hh @@ -60,6 +60,15 @@ public: return ostream::simple(reinterpret_cast(current.get_write()), current.size()); } + using const_fragment_iterator = typename vector_type::const_iterator; + + const_fragment_iterator begin() const { + return _fragments.begin(); + } + const_fragment_iterator end() const { + return _fragments.end(); + } + size_t size_bytes() const { return _size_bytes; } bool empty() const { return !_size_bytes; } From 18e79d730eb136ea625e21fe4e8f30b4b5a0422b Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 20 Sep 2023 13:44:16 +0000 Subject: [PATCH 3/8] commitlog: Add iterator adaptor for doing buffer splitting into sub-page ranges With somewhat less overhead than creating 100+ temporary_buffer proxies --- db/commitlog/commitlog.cc | 41 ++++++++++++++++++++++++ db/commitlog/commitlog_entry.hh | 56 +++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index f0c00d8ce7..84eb6ed0d2 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -624,6 +624,47 @@ std::enable_if_t::value, T> read(Input& in) { return net::ntoh(in.template read()); } +detail::sector_split_iterator::sector_split_iterator(const sector_split_iterator&) noexcept = default; + +detail::sector_split_iterator::sector_split_iterator() + : _ptr(nullptr) + , _size(0) + , _sector_size(0) +{} + +detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size) + : _iter(i) + , _end(e) + , _ptr(i != e ? const_cast(i->get()) : nullptr) + , _size(i != e ? sector_size - sector_overhead_size : 0) + , _sector_size(sector_size) +{} + +detail::sector_split_iterator& detail::sector_split_iterator::operator++() { + assert(_iter != _end); + _ptr += _sector_size; + // check if we have more pages in this temp-buffer (in out case they are always aligned + sized in page units) + auto rem = _iter->size() - std::distance(_iter->get(), const_cast(_ptr)); + if (rem == 0) { + if (++_iter == _end) { + _ptr = nullptr; + _size = 0; + return *this; + } + rem = _iter->size(); + assert(rem >= _sector_size); + // booh. ugly. + _ptr = const_cast(_iter->get()); + } + return *this; +} + +detail::sector_split_iterator detail::sector_split_iterator::operator++(int) { + auto res = *this; + ++(*this); + return res; +} + /* * A single commit log file on disk. Manages creation of the file and writing mutations to disk, * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment diff --git a/db/commitlog/commitlog_entry.hh b/db/commitlog/commitlog_entry.hh index b3c994935b..3ee5716c80 100644 --- a/db/commitlog/commitlog_entry.hh +++ b/db/commitlog/commitlog_entry.hh @@ -13,6 +13,62 @@ #include "commitlog_types.hh" #include "mutation/frozen_mutation.hh" #include "schema/schema_fwd.hh" +#include "replay_position.hh" + +namespace detail { + + using buffer_type = fragmented_temporary_buffer; + using base_iterator = typename std::vector>::const_iterator; + + static constexpr auto sector_overhead_size = sizeof(uint32_t) + sizeof(db::segment_id_type); + + // iterator adaptor to enable splitting normal + // frag-buffer temporary buffer objects into + // sub-disk-page sized chunks. + class sector_split_iterator { + base_iterator _iter, _end; + char* _ptr; + size_t _size; + size_t _sector_size; + public: + sector_split_iterator(const sector_split_iterator&) noexcept; + sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size); + sector_split_iterator(); + + char* get_write() const { + return _ptr; + } + size_t size() const { + return _size; + } + char* begin() { + return _ptr; + } + char* end() { + return _ptr + _size; + } + const char* begin() const { + return _ptr; + } + const char* end() const { + return _ptr + _size; + } + + bool operator==(const sector_split_iterator& rhs) const { + return _iter == rhs._iter && _ptr == rhs._ptr; + } + + auto& operator*() const { + return *this; + } + auto* operator->() const { + return this; + } + + sector_split_iterator& operator++(); + sector_split_iterator operator++(int); + }; +} class commitlog_entry { std::optional _mapping; From e29bf6f9e82305923fe44839a156b0de8af60e44 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 20 Sep 2023 13:46:15 +0000 Subject: [PATCH 4/8] commitlog: Implement new format using CRC:ed sectors Breaks the file into individually tagged + crc:ed pages. Each page (sized as disk write alignment) gets a trailing 12-byte metadata, including CRC of the first page-12 bytes, and the ID of the segment being written. When reading, each page read is CRC:ed and checked to be part of the expected segment by comparing ID:s. If crc is broken, we have broken data. If crc is ok, but ID does not match, we have a prematurely terminated segment (truncated), which, depending on whether we use batch mode or not, implied data loss. --- db/commitlog/commitlog.cc | 272 +++++++++++++++++++++++++------- db/commitlog/commitlog.hh | 6 +- db/commitlog/commitlog_entry.cc | 2 +- db/commitlog/commitlog_entry.hh | 5 +- test/boost/commitlog_test.cc | 10 +- 5 files changed, 228 insertions(+), 67 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 84eb6ed0d2..8339bbac99 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -613,6 +613,12 @@ auto db::commitlog::segment_manager::named_file::remove_file() noexcept { }); } +template +static void write(db::commitlog::output& out, T value) { + auto v = net::hton(value); + out.write(reinterpret_cast(&v), sizeof(v)); +} + template static void write(fragmented_temporary_buffer::ostream& out, T value) { auto v = net::hton(value); @@ -732,8 +738,11 @@ class db::commitlog::segment : public enable_shared_from_this, public c using clock_type = segment_manager::clock_type; using time_point = segment_manager::time_point; + using base_ostream_type = memory_output_stream; + using frag_ostream_type = typename base_ostream_type::fragmented; + buffer_type _buffer; - fragmented_temporary_buffer::ostream _buffer_ostream; + base_ostream_type _buffer_ostream; std::unordered_map _cf_dirty; std::unordered_map _cf_min_time; time_point _sync_time; @@ -746,8 +755,18 @@ class db::commitlog::segment : public enable_shared_from_this, public c friend std::ostream& operator<<(std::ostream&, const segment&); friend class segment_manager; + size_t sector_overhead(size_t size) const { + return (size / (_alignment - detail::sector_overhead_size)) * detail::sector_overhead_size; + } + size_t buffer_position() const { - return _buffer.size_bytes() - _buffer_ostream.size(); + // need some arithmetics to figure out what out actual position is, including + // page checksums etc. The ostream does not include this, as it is subdivided and + // skips sector_overhead parts of the memory buffer. So to get actual position + // in the buffer, we need to add it back. + auto size = _buffer_ostream.size(); + size += sector_overhead(size); + return _buffer.size_bytes() - size; } future<> begin_flush() { @@ -774,7 +793,7 @@ public: static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t); static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t); static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t); - static constexpr size_t descriptor_header_size = 5 * sizeof(uint32_t); + static constexpr size_t descriptor_header_size = 6 * sizeof(uint32_t); static constexpr uint32_t segment_magic = ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C'; static constexpr uint32_t multi_entry_size_magic = 0xffffffff; @@ -979,11 +998,21 @@ public: overhead += descriptor_header_size; } - auto a = align_up(s + overhead, _alignment); + s += overhead; + // add bookkeep data reqs. + s += sector_overhead(s); + + auto a = align_up(s, _alignment); auto k = std::max(a, default_size); _buffer = _segment_manager->acquire_buffer(k, _alignment); - _buffer_ostream = _buffer.get_ostream(); + auto size = _buffer.size_bytes(); + auto n_blocks = size / _alignment; + // the amount of data we can actually write into. + auto useable_size = size - n_blocks * detail::sector_overhead_size; + + _buffer_ostream = frag_ostream_type(detail::sector_split_iterator(_buffer.begin(), _buffer.end(), _alignment), useable_size); + auto out = _buffer_ostream.write_substream(overhead); out.fill('\0', overhead); _segment_manager->totals.buffer_list_bytes += _buffer.size_bytes(); @@ -1028,10 +1057,12 @@ public: write(out, segment_magic); write(out, _desc.ver); write(out, _desc.id); + write(out, uint32_t(_alignment)); crc32_nbo crc; crc.process(_desc.ver); crc.process(_desc.id & 0xffffffff); crc.process(_desc.id >> 32); + crc.process(uint32_t(_alignment)); write(out, crc.checksum()); header_size = descriptor_header_size; } @@ -1056,6 +1087,32 @@ public: write(out, uint64_t(0)); } + buf.remove_suffix(buf.size_bytes() - size); + + // Build sector checksums. + auto id = net::hton(_desc.id); + auto ss = _alignment - detail::sector_overhead_size; + + for (auto& tbuf : buf) { + auto* p = const_cast(tbuf.get()); + auto* e = p + tbuf.size(); + while (p != e) { + assert(align_up(p, _alignment) == p); + + // include segment id in crc:ed data + auto be = p + ss; + be = std::copy_n(reinterpret_cast(&id), sizeof(id), be); + + crc32_nbo crc; + crc.process_bytes(p, std::distance(p, be)); + + auto checksum = crc.checksum(); + auto v = net::hton(checksum); + // write checksum. + p = std::copy_n(reinterpret_cast(&v), sizeof(v), be); + } + } + replay_position rp(_desc.id, position_type(off)); // The write will be allowed to start now, but flush (below) must wait for not only this, @@ -1256,7 +1313,7 @@ public: auto entry_data = entry_out.to_input_stream(); writer.write(*this, entry_out, entry); entry_data.with_stream([&] (auto data_str) { - crc.process_fragmented(ser::buffer_view>::iterator>(data_str)); + crc.process_fragmented(ser::buffer_view(data_str)); }); auto checksum = crc.checksum(); @@ -2659,44 +2716,52 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f size_t start_off = 0; size_t file_size = 0; size_t corrupt_size = 0; + size_t alignment = 0; bool eof = false; bool header = true; bool failed = false; fragmented_temporary_buffer::reader frag_reader; + fragmented_temporary_buffer buffer, initial; work(file f, descriptor din, commit_load_reader_func fn, position_type o = 0) : f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), start_off(o) { } work(work&&) = default; - bool advance(const fragmented_temporary_buffer& buf) { - pos += buf.size_bytes(); - if (buf.size_bytes() == 0) { - eof = true; - } - return !eof; - } bool end_of_file() const { return eof; } bool end_of_chunk() const { return eof || next == pos; } - future<> skip(size_t bytes) { - auto n = std::min(file_size - pos, bytes); - pos += n; - if (pos == file_size) { + future<> skip_to_chunk(size_t seek_to_pos) { + if (seek_to_pos >= file_size) { eof = true; + pos = file_size; + co_return; } - if (n < bytes) { - // if we are trying to skip past end, we have at least - // the bytes skipped or the source from where we read - // this corrupt. So add at least four bytes. This is - // inexact, but adding the full "bytes" is equally wrong - // since it could be complete garbled junk. - corrupt_size += std::max(n, sizeof(uint32_t)); + + auto bytes = seek_to_pos - pos; + auto rem = buffer.size_bytes(); + if (bytes < rem) { + buffer.remove_suffix(bytes); + rem -= bytes; + pos += bytes; + co_return; + } + + buffer = {}; + pos += rem; + bytes = seek_to_pos - pos; + + auto skip_bytes = align_down(bytes, alignment); + pos += skip_bytes; + + co_await fin.skip(skip_bytes); + if (bytes > skip_bytes) { + // must crc check if we read into a sector + co_await read_data(bytes - skip_bytes); } - return fin.skip(n); } void stop() { eof = true; @@ -2707,15 +2772,18 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f } future<> read_header() { fragmented_temporary_buffer buf = co_await frag_reader.read_exactly(fin, segment::descriptor_header_size); - if (!advance(buf)) { - // zero length file. accept it just to be nice. + + if (buf.empty()) { + eof = true; co_return; } + // Will throw if we got eof auto in = buf.get_istream(); auto magic = read(in); auto ver = read(in); auto id = read(in); + auto alignment = read(in); auto checksum = read(in); if (magic == 0 && ver == 0 && id == 0 && checksum == 0) { @@ -2733,10 +2801,15 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f if (magic != segment::segment_magic) { throw invalid_segment_format(); } + if (ver != descriptor::current_version) { + throw std::invalid_argument("Cannot replay old commitlog segments"); + } + crc32_nbo crc; crc.process(ver); crc.process(id & 0xffffffff); crc.process(id >> 32); + crc.process(alignment); auto cs = crc.checksum(); if (cs != checksum) { @@ -2745,14 +2818,115 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f this->id = id; this->next = 0; + this->alignment = alignment; + this->initial = std::move(buf); + this->pos = this->initial.size_bytes(); } - future<> read_chunk() { - fragmented_temporary_buffer buf = co_await frag_reader.read_exactly(fin, segment::segment_overhead_size); auto start = pos; - if (!advance(buf)) { - co_return; + future read_data(size_t size) { + auto rem = buffer.size_bytes(); + auto buf_vec = std::move(buffer).release(); + auto block_boundry = align_up(pos - initial.size_bytes(), alignment); + + while (rem < size) { + if (eof) { + throw segment_truncation(block_boundry); + } + + auto block_size = alignment - initial.size_bytes(); + // using a stream is perhaps not 100% effective, but we need to + // potentially address data in pages smaller than the current + // disk/fs we are reading from can handle (but please no). + auto tmp = co_await frag_reader.read_exactly(fin, block_size); + + if (tmp.size_bytes() == 0) { + eof = true; + throw segment_truncation(block_boundry); + } + + crc32_nbo crc; + // crc all but the final crc + size_t n = block_size - sizeof(uint32_t); + + bool all_zero = true; + + if (!initial.empty()) { + for (auto& bv : initial) { + crc.process_bytes(bv.get(), bv.size()); + } + initial = {}; + all_zero = false; + } + + for (auto& bv : tmp) { + auto np = std::min(bv.size(), n); + crc.process_bytes(bv.get(), np); + all_zero &= std::all_of(bv.get(), bv.get() + bv.size(), [](char c) { return c == 0; }); + n -= np; + } + + block_boundry += alignment; + + if (!all_zero) { + auto in = tmp.get_istream(); + in.skip(block_size - detail::sector_overhead_size); + + auto id = read(in); + auto check = read(in); + auto checksum = crc.checksum(); + + if (check != checksum) { + throw segment_data_corruption_error("Data corruption", alignment); + } + if (id != this->id) { + throw segment_truncation(pos + rem); + } + } + tmp.remove_suffix(detail::sector_overhead_size); + + rem += tmp.size_bytes(); + pos += detail::sector_overhead_size; + + auto vec2 = std::move(tmp).release(); + for (auto&& v : vec2) { + buf_vec.emplace_back(std::move(v)); + } } + decltype(buf_vec) next; + + auto bytes_to_leave = rem - size; + + auto i = next.end(); + while (bytes_to_leave > 0) { + auto tmp = std::move(buf_vec.back()); + buf_vec.pop_back(); + auto s = tmp.size(); + if (s <= bytes_to_leave) { + i = next.emplace(i, std::move(tmp)); + } else { + auto diff = s - bytes_to_leave; + auto b1 = tmp.share(0, diff); + auto b2 = tmp.share(diff, bytes_to_leave); + buf_vec.emplace_back(std::move(b1)); + i = next.emplace(i, std::move(b2)); + } + bytes_to_leave -= i->size(); + } + + // this is the remaining buffer now. + buffer = fragmented_temporary_buffer(std::move(next), rem - size); + // this is the returned result. + auto res = fragmented_temporary_buffer(std::move(buf_vec), size); + + pos += size; + + co_return res; + } + + future<> read_chunk() { + auto start = pos; + auto buf = co_await read_data(segment::segment_overhead_size); auto in = buf.get_istream(); auto next = read(in); auto checksum = read(in); @@ -2773,13 +2947,6 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f // if a chunk header checksum is broken, we shall just assume that all // remaining is as well. We cannot trust the "next" pointer, so... clogger.debug("Checksum error in segment chunk at {}.", start); - - if (corrupt_size == 0) { - // if we got here and had no broken data previously, we can - // just call it truncation - throw segment_truncation(pos - segment::segment_overhead_size); - } - corrupt_size += (file_size - pos); stop(); co_return; @@ -2788,7 +2955,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f this->next = next; if (start_off >= next) { - co_return co_await skip(next - pos); + co_return co_await skip_to_chunk(next); } while (!end_of_chunk()) { @@ -2821,18 +2988,13 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f */ assert(pos <= next); if ((pos + entry_header_size) >= next) { - co_await skip(next - pos); + co_await skip_to_chunk(next); co_return; } - auto buf = co_await frag_reader.read_exactly(fin, entry_header_size); - replay_position rp(id, position_type(pos)); - if (!advance(buf)) { - co_return; - } - + auto buf = co_await read_data(entry_header_size); auto in = buf.get_istream(); auto size = read(in); auto checksum = read(in); @@ -2847,11 +3009,10 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f assert(end <= next); // really small read... - buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t)); + buf = co_await read_data(sizeof(uint32_t)); in = buf.get_istream(); checksum = read(in); - advance(buf); crc.process(actual_size); // verify header crc. if (actual_size < 2 * segment::entry_overhead_size || crc.checksum() != checksum) { @@ -2860,7 +3021,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack); corrupt_size += slack; } - co_await skip(slack); + co_await skip_to_chunk(next); co_return; } @@ -2875,17 +3036,14 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f }); } // and verify crc. - buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t)); + buf = co_await read_data(sizeof(uint32_t)); in = buf.get_istream(); checksum = read(in); - advance(buf); - if (checksum != crc.checksum()) { - auto slack = next - pos; clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, actual_size); corrupt_size += actual_size; - co_await skip(slack); + co_await skip_to_chunk(next); co_return; } // all is ok. send data to subscriber. @@ -2905,13 +3063,11 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f corrupt_size += slack; } // size == 0 -> special scylla case: zero padding due to dma blocks - co_await skip(slack); + co_await skip_to_chunk(next); co_return; } - buf = co_await frag_reader.read_exactly(fin, size - entry_header_size); - - advance(buf); + buf = co_await read_data(size - entry_header_size); in = buf.get_istream(); auto data_size = size - segment::entry_overhead_size; diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 55a53ad68c..9c35812d86 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -134,10 +134,12 @@ public: static inline constexpr uint32_t segment_version_1 = 1u; static inline constexpr uint32_t segment_version_2 = 2u; + static inline constexpr uint32_t segment_version_3 = 4u; + static inline constexpr uint32_t current_version = segment_version_3; descriptor(descriptor&&) noexcept = default; descriptor(const descriptor&) = default; - descriptor(segment_id_type i, const std::string& fname_prefix, uint32_t v = segment_version_2, sstring = {}); + descriptor(segment_id_type i, const std::string& fname_prefix, uint32_t v = current_version, sstring = {}); descriptor(replay_position p, const std::string& fname_prefix = FILENAME_PREFIX); descriptor(const std::string& filename, const std::string& fname_prefix = FILENAME_PREFIX); @@ -171,7 +173,7 @@ public: * of data to be written. (See add). * Don't write less, absolutely don't write more... */ - using output = fragmented_temporary_buffer::ostream; + using output = typename seastar::memory_output_stream; using serializer_func = std::function; /** diff --git a/db/commitlog/commitlog_entry.cc b/db/commitlog/commitlog_entry.cc index 4399b7a932..cb243a53e9 100644 --- a/db/commitlog/commitlog_entry.cc +++ b/db/commitlog/commitlog_entry.cc @@ -30,7 +30,7 @@ void commitlog_entry_writer::compute_size() { _size = ms.size(); } -void commitlog_entry_writer::write(typename seastar::memory_output_stream>::iterator>& out) const { +void commitlog_entry_writer::write(ostream& out) const { serialize(out); } diff --git a/db/commitlog/commitlog_entry.hh b/db/commitlog/commitlog_entry.hh index 3ee5716c80..b980afebb3 100644 --- a/db/commitlog/commitlog_entry.hh +++ b/db/commitlog/commitlog_entry.hh @@ -121,7 +121,10 @@ public: force_sync sync() const { return _sync; } - void write(typename seastar::memory_output_stream>::iterator>& out) const; + + using ostream = typename seastar::memory_output_stream; + + void write(ostream& out) const; }; class commitlog_entry_reader { diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index f9b4152f08..f1e5405761 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -493,17 +493,17 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){ BOOST_REQUIRE(!segments.empty()); auto seg = segments[0]; auto cpos = rps->at(0).pos - 4; - return corrupt_segment(seg, cpos, 0x451234ab).then([seg, rps, cpos] { + return corrupt_segment(seg, cpos, 0x451234ab).then([seg, rps] { return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [rps](db::commitlog::buffer_and_replay_position buf_rp) { BOOST_FAIL("Should not reach"); return make_ready_future<>(); - }).then_wrapped([cpos](auto&& f) { + }).then_wrapped([](auto&& f) { try { f.get(); BOOST_FAIL("Expected exception"); - } catch (commitlog::segment_truncation& e) { - // ok. We've destroyed the first chunk of the segment. This counts as a truncation - BOOST_REQUIRE(e.position() <= cpos); + } catch (commitlog::segment_data_corruption_error& e) { + // ok. + BOOST_REQUIRE(e.bytes() > 0); } }); }); From 6b66daabfc77726cb8815fafebb0253dc5188657 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 25 Sep 2023 13:40:57 +0000 Subject: [PATCH 5/8] commitlog: Remove entry CRC from file format Since CRC is already handled by disk blocks, we can remove some of the entry CRC:ing, both simplifying code and making at least that part of both write and read faster. --- db/commitlog/commitlog.cc | 90 +++++---------------------------------- 1 file changed, 11 insertions(+), 79 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 8339bbac99..bce5803de5 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -789,8 +789,8 @@ public: }; friend std::ostream& operator<<(std::ostream&, const cf_mark&); - // The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum) - static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t); + // The commit log entry overhead in bytes (int: length + int: head checksum) + static constexpr size_t entry_overhead_size = 2 * sizeof(uint32_t); static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t); static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t); static constexpr size_t descriptor_header_size = 6 * sizeof(uint32_t); @@ -1280,8 +1280,6 @@ public: // size : uint32_t // crc1 : uint32_t - crc of magic, size // -> entries[] - // post: - // crc2 : uint32_t - crc1 + each entry crc. if (writer.num_entries > 1) { mecrc.emplace(); write(out, multi_entry_size_magic); @@ -1310,26 +1308,10 @@ public: // actual data auto entry_out = out.write_substream(entry_size); - auto entry_data = entry_out.to_input_stream(); writer.write(*this, entry_out, entry); - entry_data.with_stream([&] (auto data_str) { - crc.process_fragmented(ser::buffer_view(data_str)); - }); - - auto checksum = crc.checksum(); - write(out, checksum); - if (mecrc) { - mecrc->process(checksum); - } - writer.result(entry, std::move(h)); } - if (mecrc) { - // write the crc of header + all sub-entry crc - write(out, mecrc->checksum()); - } - ++_segment_manager->totals.allocation_count; ++_num_allocs; @@ -2963,23 +2945,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f } } - using produce_func = std::function(buffer_and_replay_position, uint32_t)>; - - future<> produce(buffer_and_replay_position bar) { - try { - co_await func(std::move(bar)); - } catch (...) { - fail(); - throw; - } - } - future<> read_entry() { - return do_read_entry(std::bind(&work::produce, this, std::placeholders::_1)); - } - - future<> do_read_entry(produce_func pf) { - static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t); + static constexpr size_t entry_header_size = segment::entry_overhead_size; /** * #598 - Must check that data left in chunk is enough to even read an entry. @@ -3003,7 +2970,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f crc.process(size); // check for multi-entry - if (d.ver >= descriptor::segment_version_2 && size == segment::multi_entry_size_magic) { + if (size == segment::multi_entry_size_magic) { auto actual_size = checksum; auto end = pos + actual_size - entry_header_size - sizeof(uint32_t); @@ -3014,6 +2981,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f checksum = read(in); crc.process(actual_size); + // verify header crc. if (actual_size < 2 * segment::entry_overhead_size || crc.checksum() != checksum) { auto slack = next - pos; @@ -3025,38 +2993,19 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f co_return; } - std::vector tmp; - tmp.reserve(10); - // now read all sub-entries into buffers, and collect crc. + // now read all sub-entries + // and send data to subscriber. while (pos < end) { - co_await do_read_entry([&](buffer_and_replay_position br, uint32_t checksum) -> future<> { - tmp.emplace_back(std::move(br)); - crc.process(checksum); - co_return; - }); - } - // and verify crc. - buf = co_await read_data(sizeof(uint32_t)); - in = buf.get_istream(); - checksum = read(in); - - if (checksum != crc.checksum()) { - clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, actual_size); - corrupt_size += actual_size; - co_await skip_to_chunk(next); - co_return; - } - // all is ok. send data to subscriber. - for (auto&& br : tmp) { - co_await produce(std::move(br)); + co_await read_entry(); if (failed) { break; } } + co_return; } - if (size < 3 * sizeof(uint32_t) || checksum != crc.checksum()) { + if (size < 2 * sizeof(uint32_t) || checksum != crc.checksum()) { auto slack = next - pos; if (size != 0) { clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack); @@ -3069,24 +3018,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f buf = co_await read_data(size - entry_header_size); - in = buf.get_istream(); - auto data_size = size - segment::entry_overhead_size; - in.skip(data_size); - checksum = read(in); - - buf.remove_suffix(buf.size_bytes() - data_size); - crc.process_fragmented(fragmented_temporary_buffer::view(buf)); - - if (crc.checksum() != checksum) { - // If we're getting a checksum error here, most likely the rest of - // the file will be corrupt as well. But it does not hurt to retry. - // Just go to the next entry (since "size" in header seemed ok). - clogger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size); - corrupt_size += size; - co_return; - } - - co_await pf({std::move(buf), rp}, checksum); + co_await func({std::move(buf), rp}); } future<> read_file() { From 57a4645c81eac4de8918f97e5be8e36df415bc21 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 25 Sep 2023 13:41:31 +0000 Subject: [PATCH 6/8] docs: Add docs on commitlog format 3 --- docs/dev/commitlog-file-format.md | 51 +++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/docs/dev/commitlog-file-format.md b/docs/dev/commitlog-file-format.md index 6a84566193..89190c3117 100644 --- a/docs/dev/commitlog-file-format.md +++ b/docs/dev/commitlog-file-format.md @@ -62,4 +62,55 @@ Version 2 size : size of all entries in this multi-entry + headers crc : CRC32 of magic, size * N + crc2 : CRC32 of magic, size and data in each entry +``` + +Version 3 +--------- + +Modified from v2 to improve error detection/false positive elimination. +This version does CRC per written disk block instead of actual entry. +Every block is also tagged with which file is being written, this to +be able to better distinguish new data from that left over from recycling +(reusing old files) and actual disk/file corruption. + +``` + + Segment file header + + magic : uint32_t - ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C'; + version : uint32_t - same as descriptor + id : uint64_t - same as descriptor + alignment : uint32_t - disk block size + crc : uint32_t - CRC32 of version, low 32 of id, high 32 of id, alignment. + + Chunk header + + file_pos : uint32_t - the file position of next chunk + crc : uint32_t - CRC32 of low 32 of segment id, high 32 of id and file offset of end of this header. + + Entry + + size : uint32_t - size of entry (data + full headers). Must be smaller than MAX_UINT32. + crc : uint32_t - CRC32 of size + data : bytes - actual entry data + + Multi-entry + + magic : multi marker - 0xffffffff (MAX_UINT32) + size : size of all entries in this multi-entry + headers + crc : CRC32 of magic, size + * N + + Disk block (block size = `alignment`) + + 0 - : Interleaved file data, i.e. the content above + : uint64_t - same as descriptor. + : uint32_t - CRC32 of block data up until crc (bs - 4), including segment id + +The main benefit of the tagged and CRC:ed block is that if the CRC is broken, we know the part +of this file is corrupt. If the CRC is correct, but segment ID does not match, we can assume +the file is not fully written/prematurely ended. Both cases can mean data loss, depending on +how writing is done as well as OS and hardware. + ``` From 0d41769daad740167e9074a999ac78a387ecf02e Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 25 Sep 2023 14:43:57 +0000 Subject: [PATCH 7/8] commitlog_test: Add additional test for segmnent truncation Emulate replay of a non-sealed segment, verifying we don't get data beyond termination point, as well as the correct exception. --- test/boost/commitlog_test.cc | 61 ++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index f1e5405761..9ca476619a 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -559,6 +559,67 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption2){ }); } + +SEASTAR_TEST_CASE(test_commitlog_chunk_corruption3){ + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 1; + cfg.commitlog_total_space_in_mb = 2 * smp::count; + cfg.allow_going_over_size_limit = false; + + return cl_test(cfg, [](commitlog& log) -> future<> { + auto uuid = make_table_id(); + sstring tmp = "hej bubba cow"; + + std::optional last; + db::replay_position last_rp; + + int n = 0; + + for (;;) { + auto h = co_await log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.write(tmp.data(), tmp.size()); + }); + BOOST_CHECK_NE(h.rp(), db::replay_position()); + + // release data immediately + auto rp = h.rp(); + auto id = rp.id; + + if (last && last != id) { + // this should mean we are in a recycled segment. + if (++n > 3) { + last_rp = h.release(); // prevent removal for now. + break; + } + } + last = id; + } + + co_await log.sync_all_segments(); + + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + for (auto& seg : segments) { + auto desc = commitlog::descriptor(seg); + if (desc.id == last_rp.id) { + try { + co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) { + BOOST_CHECK_LE(buf_rp.position, last_rp); + return make_ready_future<>(); + }); + BOOST_FAIL("Expected exception"); + } catch (commitlog::segment_truncation& e) { + // ok. + } + co_return; + } + } + + BOOST_FAIL("Did not find segment"); + }); +} + SEASTAR_TEST_CASE(test_commitlog_reader_produce_exception){ commitlog::config cfg; cfg.commitlog_segment_size_in_mb = 1; From 33fba2826511051bb29f39e5d1a0db8f618a6dd2 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 9 Oct 2023 11:38:32 +0000 Subject: [PATCH 8/8] commitlog_test: Add test for replaying large-ish mutation (i.e. cross several normal-sized buffers). --- test/boost/commitlog_test.cc | 126 +++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 9ca476619a..8b10cfe68b 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -620,6 +620,132 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption3){ }); } +SEASTAR_TEST_CASE(test_commitlog_replay_single_large_mutation){ + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 4; + cfg.commitlog_total_space_in_mb = 2 * cfg.commitlog_segment_size_in_mb * smp::count; + cfg.allow_going_over_size_limit = false; + + return cl_test(cfg, [](commitlog& log) -> future<> { + auto uuid = make_table_id(); + auto size = log.max_record_size(); + + auto buf = fragmented_temporary_buffer::allocate_to_fit(size); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist; + + auto out = buf.get_ostream(); + for (size_t i = 0; i < size; ++i) { + auto c = dist(gen); + out.write(&c, 1); + } + + auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + for (auto& tmp : buf) { + dst.write(tmp.get(), tmp.size()); + } + }); + + auto rp = h.release(); + + co_await log.sync_all_segments(); + + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + for (auto& seg : segments) { + auto desc = commitlog::descriptor(seg); + if (desc.id == rp.id) { + co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) { + BOOST_CHECK_EQUAL(buf_rp.position, rp); + auto& rp_buf = buf_rp.buffer; + auto in1 = buf.get_istream(); + auto in2 = rp_buf.get_istream(); + for (size_t i = 0; i < size; ++i) { + auto c1 = in1.read(); + auto c2 = in2.read(); + BOOST_CHECK_EQUAL(c1, c2); + } + return make_ready_future<>(); + }); + co_return; + } + } + + BOOST_FAIL("Did not find segment"); + }); +} + +/** + * Checks same thing as above, but will also ensure the seek mechanism in + * replayer is working, since we will span multiple chunks. + */ +SEASTAR_TEST_CASE(test_commitlog_replay_large_mutations){ + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 14; + cfg.commitlog_total_space_in_mb = 2 * cfg.commitlog_segment_size_in_mb * smp::count; + cfg.allow_going_over_size_limit = false; + + return cl_test(cfg, [](commitlog& log) -> future<> { + auto uuid = make_table_id(); + auto size = log.max_record_size() / 2; + + std::unordered_map buffers; + + for (size_t i = 0; i < 4; ++i) { + auto buf = fragmented_temporary_buffer::allocate_to_fit(size); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist; + + auto out = buf.get_ostream(); + for (size_t i = 0; i < size; ++i) { + auto c = dist(gen); + out.write(&c, 1); + } + + auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + for (auto& tmp : buf) { + dst.write(tmp.get(), tmp.size()); + } + }); + + auto rp = h.release(); + + buffers.emplace(rp, std::move(buf)); + } + + co_await log.sync_all_segments(); + + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + size_t n = 0; + + for (auto& seg : segments) { + auto desc = commitlog::descriptor(seg); + if (std::any_of(buffers.begin(), buffers.end(), [&](auto& p) { return p.first.id == desc.id; })) { + co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) { + auto& buf = buffers.at(buf_rp.position); + auto& rp_buf = buf_rp.buffer; + auto in1 = buf.get_istream(); + auto in2 = rp_buf.get_istream(); + for (size_t i = 0; i < size; ++i) { + auto c1 = in1.read(); + auto c2 = in2.read(); + BOOST_CHECK_EQUAL(c1, c2); + } + ++n; + return make_ready_future<>(); + }); + } + } + + BOOST_CHECK_EQUAL(n, buffers.size()); + }); +} + SEASTAR_TEST_CASE(test_commitlog_reader_produce_exception){ commitlog::config cfg; cfg.commitlog_segment_size_in_mb = 1;