diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 507d12ba6a..d9dbfbdd26 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -613,6 +613,12 @@ auto db::commitlog::segment_manager::named_file::remove_file() noexcept { }); } +template +static void write(db::commitlog::output& out, T value) { + auto v = net::hton(value); + out.write(reinterpret_cast(&v), sizeof(v)); +} + template static void write(fragmented_temporary_buffer::ostream& out, T value) { auto v = net::hton(value); @@ -624,6 +630,47 @@ std::enable_if_t::value, T> read(Input& in) { return net::ntoh(in.template read()); } +detail::sector_split_iterator::sector_split_iterator(const sector_split_iterator&) noexcept = default; + +detail::sector_split_iterator::sector_split_iterator() + : _ptr(nullptr) + , _size(0) + , _sector_size(0) +{} + +detail::sector_split_iterator::sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size) + : _iter(i) + , _end(e) + , _ptr(i != e ? const_cast(i->get()) : nullptr) + , _size(i != e ? sector_size - sector_overhead_size : 0) + , _sector_size(sector_size) +{} + +detail::sector_split_iterator& detail::sector_split_iterator::operator++() { + assert(_iter != _end); + _ptr += _sector_size; + // check if we have more pages in this temp-buffer (in out case they are always aligned + sized in page units) + auto rem = _iter->size() - std::distance(_iter->get(), const_cast(_ptr)); + if (rem == 0) { + if (++_iter == _end) { + _ptr = nullptr; + _size = 0; + return *this; + } + rem = _iter->size(); + assert(rem >= _sector_size); + // booh. ugly. + _ptr = const_cast(_iter->get()); + } + return *this; +} + +detail::sector_split_iterator detail::sector_split_iterator::operator++(int) { + auto res = *this; + ++(*this); + return res; +} + /* * A single commit log file on disk. Manages creation of the file and writing mutations to disk, * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment @@ -691,8 +738,11 @@ class db::commitlog::segment : public enable_shared_from_this, public c using clock_type = segment_manager::clock_type; using time_point = segment_manager::time_point; + using base_ostream_type = memory_output_stream; + using frag_ostream_type = typename base_ostream_type::fragmented; + buffer_type _buffer; - fragmented_temporary_buffer::ostream _buffer_ostream; + base_ostream_type _buffer_ostream; std::unordered_map _cf_dirty; std::unordered_map _cf_min_time; time_point _sync_time; @@ -705,8 +755,18 @@ class db::commitlog::segment : public enable_shared_from_this, public c friend std::ostream& operator<<(std::ostream&, const segment&); friend class segment_manager; + size_t sector_overhead(size_t size) const { + return (size / (_alignment - detail::sector_overhead_size)) * detail::sector_overhead_size; + } + size_t buffer_position() const { - return _buffer.size_bytes() - _buffer_ostream.size(); + // need some arithmetics to figure out what out actual position is, including + // page checksums etc. The ostream does not include this, as it is subdivided and + // skips sector_overhead parts of the memory buffer. So to get actual position + // in the buffer, we need to add it back. + auto size = _buffer_ostream.size(); + size += sector_overhead(size); + return _buffer.size_bytes() - size; } future<> begin_flush() { @@ -729,11 +789,11 @@ public: }; friend std::ostream& operator<<(std::ostream&, const cf_mark&); - // The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum) - static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t); + // The commit log entry overhead in bytes (int: length + int: head checksum) + static constexpr size_t entry_overhead_size = 2 * sizeof(uint32_t); static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t); static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t); - static constexpr size_t descriptor_header_size = 5 * sizeof(uint32_t); + static constexpr size_t descriptor_header_size = 6 * sizeof(uint32_t); static constexpr uint32_t segment_magic = ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C'; static constexpr uint32_t multi_entry_size_magic = 0xffffffff; @@ -938,11 +998,21 @@ public: overhead += descriptor_header_size; } - auto a = align_up(s + overhead, _alignment); + s += overhead; + // add bookkeep data reqs. + s += sector_overhead(s); + + auto a = align_up(s, _alignment); auto k = std::max(a, default_size); _buffer = _segment_manager->acquire_buffer(k, _alignment); - _buffer_ostream = _buffer.get_ostream(); + auto size = _buffer.size_bytes(); + auto n_blocks = size / _alignment; + // the amount of data we can actually write into. + auto useable_size = size - n_blocks * detail::sector_overhead_size; + + _buffer_ostream = frag_ostream_type(detail::sector_split_iterator(_buffer.begin(), _buffer.end(), _alignment), useable_size); + auto out = _buffer_ostream.write_substream(overhead); out.fill('\0', overhead); _segment_manager->totals.buffer_list_bytes += _buffer.size_bytes(); @@ -987,10 +1057,12 @@ public: write(out, segment_magic); write(out, _desc.ver); write(out, _desc.id); + write(out, uint32_t(_alignment)); crc32_nbo crc; crc.process(_desc.ver); crc.process(_desc.id & 0xffffffff); crc.process(_desc.id >> 32); + crc.process(uint32_t(_alignment)); write(out, crc.checksum()); header_size = descriptor_header_size; } @@ -1015,6 +1087,32 @@ public: write(out, uint64_t(0)); } + buf.remove_suffix(buf.size_bytes() - size); + + // Build sector checksums. + auto id = net::hton(_desc.id); + auto ss = _alignment - detail::sector_overhead_size; + + for (auto& tbuf : buf) { + auto* p = const_cast(tbuf.get()); + auto* e = p + tbuf.size(); + while (p != e) { + assert(align_up(p, _alignment) == p); + + // include segment id in crc:ed data + auto be = p + ss; + be = std::copy_n(reinterpret_cast(&id), sizeof(id), be); + + crc32_nbo crc; + crc.process_bytes(p, std::distance(p, be)); + + auto checksum = crc.checksum(); + auto v = net::hton(checksum); + // write checksum. + p = std::copy_n(reinterpret_cast(&v), sizeof(v), be); + } + } + replay_position rp(_desc.id, position_type(off)); // The write will be allowed to start now, but flush (below) must wait for not only this, @@ -1182,8 +1280,6 @@ public: // size : uint32_t // crc1 : uint32_t - crc of magic, size // -> entries[] - // post: - // crc2 : uint32_t - crc1 + each entry crc. if (writer.num_entries > 1) { mecrc.emplace(); write(out, multi_entry_size_magic); @@ -1212,26 +1308,10 @@ public: // actual data auto entry_out = out.write_substream(entry_size); - auto entry_data = entry_out.to_input_stream(); writer.write(*this, entry_out, entry); - entry_data.with_stream([&] (auto data_str) { - crc.process_fragmented(ser::buffer_view>::iterator>(data_str)); - }); - - auto checksum = crc.checksum(); - write(out, checksum); - if (mecrc) { - mecrc->process(checksum); - } - writer.result(entry, std::move(h)); } - if (mecrc) { - // write the crc of header + all sub-entry crc - write(out, mecrc->checksum()); - } - ++_segment_manager->totals.allocation_count; ++_num_allocs; @@ -2587,6 +2667,20 @@ const db::commitlog::config& db::commitlog::active_config() const { return _segment_manager->cfg; } + +db::commitlog::segment_truncation::segment_truncation(uint64_t pos) + : _msg(fmt::format("Segment truncation at {}", pos)) + , _pos(pos) +{} + +uint64_t db::commitlog::segment_truncation::position() const { + return _pos; +} + +const char* db::commitlog::segment_truncation::what() const noexcept { + return _msg.c_str(); +} + // No commit_io_check needed in the log reader since the database will fail // on error at startup if required future<> @@ -2611,44 +2705,52 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f size_t start_off = 0; size_t file_size = 0; size_t corrupt_size = 0; + size_t alignment = 0; bool eof = false; bool header = true; bool failed = false; fragmented_temporary_buffer::reader frag_reader; + fragmented_temporary_buffer buffer, initial; work(file f, descriptor din, commit_load_reader_func fn, position_type o = 0) : f(f), d(din), func(std::move(fn)), fin(make_file_input_stream(f, 0, make_file_input_stream_options())), start_off(o) { } work(work&&) = default; - bool advance(const fragmented_temporary_buffer& buf) { - pos += buf.size_bytes(); - if (buf.size_bytes() == 0) { - eof = true; - } - return !eof; - } bool end_of_file() const { return eof; } bool end_of_chunk() const { return eof || next == pos; } - future<> skip(size_t bytes) { - auto n = std::min(file_size - pos, bytes); - pos += n; - if (pos == file_size) { + future<> skip_to_chunk(size_t seek_to_pos) { + if (seek_to_pos >= file_size) { eof = true; + pos = file_size; + co_return; } - if (n < bytes) { - // if we are trying to skip past end, we have at least - // the bytes skipped or the source from where we read - // this corrupt. So add at least four bytes. This is - // inexact, but adding the full "bytes" is equally wrong - // since it could be complete garbled junk. - corrupt_size += std::max(n, sizeof(uint32_t)); + + auto bytes = seek_to_pos - pos; + auto rem = buffer.size_bytes(); + if (bytes < rem) { + buffer.remove_suffix(bytes); + rem -= bytes; + pos += bytes; + co_return; + } + + buffer = {}; + pos += rem; + bytes = seek_to_pos - pos; + + auto skip_bytes = align_down(bytes, alignment); + pos += skip_bytes; + + co_await fin.skip(skip_bytes); + if (bytes > skip_bytes) { + // must crc check if we read into a sector + co_await read_data(bytes - skip_bytes); } - return fin.skip(n); } void stop() { eof = true; @@ -2659,15 +2761,18 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f } future<> read_header() { fragmented_temporary_buffer buf = co_await frag_reader.read_exactly(fin, segment::descriptor_header_size); - if (!advance(buf)) { - // zero length file. accept it just to be nice. + + if (buf.empty()) { + eof = true; co_return; } + // Will throw if we got eof auto in = buf.get_istream(); auto magic = read(in); auto ver = read(in); auto id = read(in); + auto alignment = read(in); auto checksum = read(in); if (magic == 0 && ver == 0 && id == 0 && checksum == 0) { @@ -2685,10 +2790,15 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f if (magic != segment::segment_magic) { throw invalid_segment_format(); } + if (ver != descriptor::current_version) { + throw std::invalid_argument("Cannot replay old commitlog segments"); + } + crc32_nbo crc; crc.process(ver); crc.process(id & 0xffffffff); crc.process(id >> 32); + crc.process(alignment); auto cs = crc.checksum(); if (cs != checksum) { @@ -2697,14 +2807,115 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f this->id = id; this->next = 0; + this->alignment = alignment; + this->initial = std::move(buf); + this->pos = this->initial.size_bytes(); } - future<> read_chunk() { - fragmented_temporary_buffer buf = co_await frag_reader.read_exactly(fin, segment::segment_overhead_size); auto start = pos; - if (!advance(buf)) { - co_return; + future read_data(size_t size) { + auto rem = buffer.size_bytes(); + auto buf_vec = std::move(buffer).release(); + auto block_boundry = align_up(pos - initial.size_bytes(), alignment); + + while (rem < size) { + if (eof) { + throw segment_truncation(block_boundry); + } + + auto block_size = alignment - initial.size_bytes(); + // using a stream is perhaps not 100% effective, but we need to + // potentially address data in pages smaller than the current + // disk/fs we are reading from can handle (but please no). + auto tmp = co_await frag_reader.read_exactly(fin, block_size); + + if (tmp.size_bytes() == 0) { + eof = true; + throw segment_truncation(block_boundry); + } + + crc32_nbo crc; + // crc all but the final crc + size_t n = block_size - sizeof(uint32_t); + + bool all_zero = true; + + if (!initial.empty()) { + for (auto& bv : initial) { + crc.process_bytes(bv.get(), bv.size()); + } + initial = {}; + all_zero = false; + } + + for (auto& bv : tmp) { + auto np = std::min(bv.size(), n); + crc.process_bytes(bv.get(), np); + all_zero &= std::all_of(bv.get(), bv.get() + bv.size(), [](char c) { return c == 0; }); + n -= np; + } + + block_boundry += alignment; + + if (!all_zero) { + auto in = tmp.get_istream(); + in.skip(block_size - detail::sector_overhead_size); + + auto id = read(in); + auto check = read(in); + auto checksum = crc.checksum(); + + if (check != checksum) { + throw segment_data_corruption_error("Data corruption", alignment); + } + if (id != this->id) { + throw segment_truncation(pos + rem); + } + } + tmp.remove_suffix(detail::sector_overhead_size); + + rem += tmp.size_bytes(); + pos += detail::sector_overhead_size; + + auto vec2 = std::move(tmp).release(); + for (auto&& v : vec2) { + buf_vec.emplace_back(std::move(v)); + } } + decltype(buf_vec) next; + + auto bytes_to_leave = rem - size; + + auto i = next.end(); + while (bytes_to_leave > 0) { + auto tmp = std::move(buf_vec.back()); + buf_vec.pop_back(); + auto s = tmp.size(); + if (s <= bytes_to_leave) { + i = next.emplace(i, std::move(tmp)); + } else { + auto diff = s - bytes_to_leave; + auto b1 = tmp.share(0, diff); + auto b2 = tmp.share(diff, bytes_to_leave); + buf_vec.emplace_back(std::move(b1)); + i = next.emplace(i, std::move(b2)); + } + bytes_to_leave -= i->size(); + } + + // this is the remaining buffer now. + buffer = fragmented_temporary_buffer(std::move(next), rem - size); + // this is the returned result. + auto res = fragmented_temporary_buffer(std::move(buf_vec), size); + + pos += size; + + co_return res; + } + + future<> read_chunk() { + auto start = pos; + auto buf = co_await read_data(segment::segment_overhead_size); auto in = buf.get_istream(); auto next = read(in); auto checksum = read(in); @@ -2733,7 +2944,7 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f this->next = next; if (start_off >= next) { - co_return co_await skip(next - pos); + co_return co_await skip_to_chunk(next); } while (!end_of_chunk()) { @@ -2741,23 +2952,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f } } - using produce_func = std::function(buffer_and_replay_position, uint32_t)>; - - future<> produce(buffer_and_replay_position bar) { - try { - co_await func(std::move(bar)); - } catch (...) { - fail(); - throw; - } - } - future<> read_entry() { - return do_read_entry(std::bind(&work::produce, this, std::placeholders::_1)); - } - - future<> do_read_entry(produce_func pf) { - static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t); + static constexpr size_t entry_header_size = segment::entry_overhead_size; /** * #598 - Must check that data left in chunk is enough to even read an entry. @@ -2766,18 +2962,13 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f */ assert(pos <= next); if ((pos + entry_header_size) >= next) { - co_await skip(next - pos); + co_await skip_to_chunk(next); co_return; } - auto buf = co_await frag_reader.read_exactly(fin, entry_header_size); - replay_position rp(id, position_type(pos)); - if (!advance(buf)) { - co_return; - } - + auto buf = co_await read_data(entry_header_size); auto in = buf.get_istream(); auto size = read(in); auto checksum = read(in); @@ -2786,18 +2977,18 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f crc.process(size); // check for multi-entry - if (d.ver >= descriptor::segment_version_2 && size == segment::multi_entry_size_magic) { + if (size == segment::multi_entry_size_magic) { auto actual_size = checksum; auto end = pos + actual_size - entry_header_size - sizeof(uint32_t); assert(end <= next); // really small read... - buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t)); + buf = co_await read_data(sizeof(uint32_t)); in = buf.get_istream(); checksum = read(in); - advance(buf); crc.process(actual_size); + // verify header crc. if (actual_size < 2 * segment::entry_overhead_size || crc.checksum() != checksum) { auto slack = next - pos; @@ -2805,77 +2996,36 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack); corrupt_size += slack; } - co_await skip(slack); + co_await skip_to_chunk(next); co_return; } - std::vector tmp; - tmp.reserve(10); - // now read all sub-entries into buffers, and collect crc. + // now read all sub-entries + // and send data to subscriber. while (pos < end) { - co_await do_read_entry([&](buffer_and_replay_position br, uint32_t checksum) -> future<> { - tmp.emplace_back(std::move(br)); - crc.process(checksum); - co_return; - }); - } - // and verify crc. - buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t)); - in = buf.get_istream(); - checksum = read(in); - - advance(buf); - - if (checksum != crc.checksum()) { - auto slack = next - pos; - clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, actual_size); - corrupt_size += actual_size; - co_await skip(slack); - co_return; - } - // all is ok. send data to subscriber. - for (auto&& br : tmp) { - co_await produce(std::move(br)); + co_await read_entry(); if (failed) { break; } } + co_return; } - if (size < 3 * sizeof(uint32_t) || checksum != crc.checksum()) { + if (size < 2 * sizeof(uint32_t) || checksum != crc.checksum()) { auto slack = next - pos; if (size != 0) { clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack); corrupt_size += slack; } // size == 0 -> special scylla case: zero padding due to dma blocks - co_await skip(slack); + co_await skip_to_chunk(next); co_return; } - buf = co_await frag_reader.read_exactly(fin, size - entry_header_size); + buf = co_await read_data(size - entry_header_size); - advance(buf); - - in = buf.get_istream(); - auto data_size = size - segment::entry_overhead_size; - in.skip(data_size); - checksum = read(in); - - buf.remove_suffix(buf.size_bytes() - data_size); - crc.process_fragmented(fragmented_temporary_buffer::view(buf)); - - if (crc.checksum() != checksum) { - // If we're getting a checksum error here, most likely the rest of - // the file will be corrupt as well. But it does not hurt to retry. - // Just go to the next entry (since "size" in header seemed ok). - clogger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size); - corrupt_size += size; - co_return; - } - - co_await pf({std::move(buf), rp}, checksum); + co_await func({std::move(buf), rp}); } future<> read_file() { diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index d16c047d76..2a62eb7c68 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -134,10 +134,12 @@ public: static inline constexpr uint32_t segment_version_1 = 1u; static inline constexpr uint32_t segment_version_2 = 2u; + static inline constexpr uint32_t segment_version_3 = 4u; + static inline constexpr uint32_t current_version = segment_version_3; descriptor(descriptor&&) noexcept = default; descriptor(const descriptor&) = default; - descriptor(segment_id_type i, const std::string& fname_prefix, uint32_t v = segment_version_2, sstring = {}); + descriptor(segment_id_type i, const std::string& fname_prefix, uint32_t v = current_version, sstring = {}); descriptor(replay_position p, const std::string& fname_prefix = FILENAME_PREFIX); descriptor(const std::string& filename, const std::string& fname_prefix = FILENAME_PREFIX); @@ -171,7 +173,7 @@ public: * of data to be written. (See add). * Don't write less, absolutely don't write more... */ - using output = fragmented_temporary_buffer::ostream; + using output = typename seastar::memory_output_stream; using serializer_func = std::function; /** @@ -373,7 +375,7 @@ public: uint64_t bytes() const { return _bytes; } - virtual const char* what() const noexcept { + const char* what() const noexcept override { return _msg.c_str(); } private: @@ -383,7 +385,7 @@ public: class invalid_segment_format : public segment_error { static constexpr const char* _msg = "Not a scylla format commitlog file"; public: - virtual const char* what() const noexcept { + const char* what() const noexcept override { return _msg; } }; @@ -391,11 +393,21 @@ public: class header_checksum_error : public segment_error { static constexpr const char* _msg = "Checksum error in file header"; public: - virtual const char* what() const noexcept { + const char* what() const noexcept override { return _msg; } }; + class segment_truncation : public segment_error { + std::string _msg; + uint64_t _pos; + public: + segment_truncation(uint64_t); + + uint64_t position() const; + const char* what() const noexcept override; + }; + static future<> read_log_file(sstring filename, sstring prefix, commit_load_reader_func, position_type = 0, const db::extensions* = nullptr); private: commitlog(config); diff --git a/db/commitlog/commitlog_entry.cc b/db/commitlog/commitlog_entry.cc index 4399b7a932..cb243a53e9 100644 --- a/db/commitlog/commitlog_entry.cc +++ b/db/commitlog/commitlog_entry.cc @@ -30,7 +30,7 @@ void commitlog_entry_writer::compute_size() { _size = ms.size(); } -void commitlog_entry_writer::write(typename seastar::memory_output_stream>::iterator>& out) const { +void commitlog_entry_writer::write(ostream& out) const { serialize(out); } diff --git a/db/commitlog/commitlog_entry.hh b/db/commitlog/commitlog_entry.hh index b3c994935b..b980afebb3 100644 --- a/db/commitlog/commitlog_entry.hh +++ b/db/commitlog/commitlog_entry.hh @@ -13,6 +13,62 @@ #include "commitlog_types.hh" #include "mutation/frozen_mutation.hh" #include "schema/schema_fwd.hh" +#include "replay_position.hh" + +namespace detail { + + using buffer_type = fragmented_temporary_buffer; + using base_iterator = typename std::vector>::const_iterator; + + static constexpr auto sector_overhead_size = sizeof(uint32_t) + sizeof(db::segment_id_type); + + // iterator adaptor to enable splitting normal + // frag-buffer temporary buffer objects into + // sub-disk-page sized chunks. + class sector_split_iterator { + base_iterator _iter, _end; + char* _ptr; + size_t _size; + size_t _sector_size; + public: + sector_split_iterator(const sector_split_iterator&) noexcept; + sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size); + sector_split_iterator(); + + char* get_write() const { + return _ptr; + } + size_t size() const { + return _size; + } + char* begin() { + return _ptr; + } + char* end() { + return _ptr + _size; + } + const char* begin() const { + return _ptr; + } + const char* end() const { + return _ptr + _size; + } + + bool operator==(const sector_split_iterator& rhs) const { + return _iter == rhs._iter && _ptr == rhs._ptr; + } + + auto& operator*() const { + return *this; + } + auto* operator->() const { + return this; + } + + sector_split_iterator& operator++(); + sector_split_iterator operator++(int); + }; +} class commitlog_entry { std::optional _mapping; @@ -65,7 +121,10 @@ public: force_sync sync() const { return _sync; } - void write(typename seastar::memory_output_stream>::iterator>& out) const; + + using ostream = typename seastar::memory_output_stream; + + void write(ostream& out) const; }; class commitlog_entry_reader { diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index bf8c84d1bc..be94e54301 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -56,6 +56,7 @@ public: uint64_t skipped_mutations = 0; uint64_t applied_mutations = 0; uint64_t corrupt_bytes = 0; + uint64_t truncated_at = 0; stats& operator+=(const stats& s) { invalid_mutations += s.invalid_mutations; @@ -180,6 +181,8 @@ db::commitlog_replayer::impl::recover(sstring file, const sstring& fname_prefix) f.get(); } catch (commitlog::segment_data_corruption_error& e) { s->corrupt_bytes += e.bytes(); + } catch (commitlog::segment_truncation& e) { + s->truncated_at = e.position(); } catch (...) { throw; } @@ -330,6 +333,9 @@ future<> db::commitlog_replayer::recover(std::vector files, sstring fna if (stats.corrupt_bytes != 0) { rlogger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes); } + if (stats.truncated_at != 0) { + rlogger.warn("Truncated file: {} at position {}.", f, stats.truncated_at); + } rlogger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)" , f , stats.applied_mutations diff --git a/docs/dev/commitlog-file-format.md b/docs/dev/commitlog-file-format.md index 6a84566193..89190c3117 100644 --- a/docs/dev/commitlog-file-format.md +++ b/docs/dev/commitlog-file-format.md @@ -62,4 +62,55 @@ Version 2 size : size of all entries in this multi-entry + headers crc : CRC32 of magic, size * N + crc2 : CRC32 of magic, size and data in each entry +``` + +Version 3 +--------- + +Modified from v2 to improve error detection/false positive elimination. +This version does CRC per written disk block instead of actual entry. +Every block is also tagged with which file is being written, this to +be able to better distinguish new data from that left over from recycling +(reusing old files) and actual disk/file corruption. + +``` + + Segment file header + + magic : uint32_t - ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C'; + version : uint32_t - same as descriptor + id : uint64_t - same as descriptor + alignment : uint32_t - disk block size + crc : uint32_t - CRC32 of version, low 32 of id, high 32 of id, alignment. + + Chunk header + + file_pos : uint32_t - the file position of next chunk + crc : uint32_t - CRC32 of low 32 of segment id, high 32 of id and file offset of end of this header. + + Entry + + size : uint32_t - size of entry (data + full headers). Must be smaller than MAX_UINT32. + crc : uint32_t - CRC32 of size + data : bytes - actual entry data + + Multi-entry + + magic : multi marker - 0xffffffff (MAX_UINT32) + size : size of all entries in this multi-entry + headers + crc : CRC32 of magic, size + * N + + Disk block (block size = `alignment`) + + 0 - : Interleaved file data, i.e. the content above + : uint64_t - same as descriptor. + : uint32_t - CRC32 of block data up until crc (bs - 4), including segment id + +The main benefit of the tagged and CRC:ed block is that if the CRC is broken, we know the part +of this file is corrupt. If the CRC is correct, but segment ID does not match, we can assume +the file is not fully written/prematurely ended. Both cases can mean data loss, depending on +how writing is done as well as OS and hardware. + ``` diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 951a876ad8..7eb059c687 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -492,7 +492,8 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){ auto segments = log.get_active_segment_names(); BOOST_REQUIRE(!segments.empty()); auto seg = segments[0]; - return corrupt_segment(seg, rps->at(0).pos - 4, 0x451234ab).then([seg, rps] { + auto cpos = rps->at(0).pos - 4; + return corrupt_segment(seg, cpos, 0x451234ab).then([seg, rps] { return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [rps](db::commitlog::buffer_and_replay_position buf_rp) { BOOST_FAIL("Should not reach"); return make_ready_future<>(); @@ -510,6 +511,241 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){ }); } + +SEASTAR_TEST_CASE(test_commitlog_chunk_corruption2){ + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 1; + return cl_test(cfg, [](commitlog& log) { + auto rps = make_lw_shared>(); + // write enough entries to fill more than one chunk + return do_until([rps]() {return rps->size() > (128*1024/8);}, + [&log, rps]() { + auto uuid = make_table_id(); + sstring tmp = "hej bubba cow"; + return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) { + dst.write(tmp.data(), tmp.size()); + }).then([rps](rp_handle h) { + BOOST_CHECK_NE(h.rp(), db::replay_position()); + rps->push_back(h.release()); + }); + }).then([&log]() { + return log.sync_all_segments(); + }).then([&log, rps] { + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + auto seg = segments[0]; + auto desc = commitlog::descriptor(seg); + auto e = std::find_if(rps->begin(), rps->end(), [desc](auto& rp) { + return rp.id != desc.id; + }); + auto idx = std::distance(rps->begin(), e); + auto cpos = rps->at(idx/2).pos; + + return corrupt_segment(seg, cpos, 0x451234ab).then([seg, rps, cpos] { + return db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [rps, cpos](db::commitlog::buffer_and_replay_position buf_rp) { + BOOST_CHECK_NE(buf_rp.position.pos, cpos); + return make_ready_future<>(); + }).then_wrapped([](auto&& f) { + try { + f.get(); + BOOST_FAIL("Expected exception"); + } catch (commitlog::segment_data_corruption_error& e) { + // ok. + BOOST_REQUIRE(e.bytes() > 0); + } + }); + }); + }); + }); +} + + +SEASTAR_TEST_CASE(test_commitlog_chunk_corruption3){ + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 1; + cfg.commitlog_total_space_in_mb = 2 * smp::count; + cfg.allow_going_over_size_limit = false; + + return cl_test(cfg, [](commitlog& log) -> future<> { + auto uuid = make_table_id(); + sstring tmp = "hej bubba cow"; + + std::optional last; + db::replay_position last_rp; + + int n = 0; + + for (;;) { + auto h = co_await log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.write(tmp.data(), tmp.size()); + }); + BOOST_CHECK_NE(h.rp(), db::replay_position()); + + // release data immediately + auto rp = h.rp(); + auto id = rp.id; + + if (last && last != id) { + // this should mean we are in a recycled segment. + if (++n > 3) { + last_rp = h.release(); // prevent removal for now. + break; + } + } + last = id; + } + + co_await log.sync_all_segments(); + + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + for (auto& seg : segments) { + auto desc = commitlog::descriptor(seg); + if (desc.id == last_rp.id) { + try { + co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) { + BOOST_CHECK_LE(buf_rp.position, last_rp); + return make_ready_future<>(); + }); + BOOST_FAIL("Expected exception"); + } catch (commitlog::segment_truncation& e) { + // ok. + } + co_return; + } + } + + BOOST_FAIL("Did not find segment"); + }); +} + +SEASTAR_TEST_CASE(test_commitlog_replay_single_large_mutation){ + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 4; + cfg.commitlog_total_space_in_mb = 2 * cfg.commitlog_segment_size_in_mb * smp::count; + cfg.allow_going_over_size_limit = false; + + return cl_test(cfg, [](commitlog& log) -> future<> { + auto uuid = make_table_id(); + auto size = log.max_record_size(); + + auto buf = fragmented_temporary_buffer::allocate_to_fit(size); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist; + + auto out = buf.get_ostream(); + for (size_t i = 0; i < size; ++i) { + auto c = dist(gen); + out.write(&c, 1); + } + + auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + for (auto& tmp : buf) { + dst.write(tmp.get(), tmp.size()); + } + }); + + auto rp = h.release(); + + co_await log.sync_all_segments(); + + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + for (auto& seg : segments) { + auto desc = commitlog::descriptor(seg); + if (desc.id == rp.id) { + co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) { + BOOST_CHECK_EQUAL(buf_rp.position, rp); + auto& rp_buf = buf_rp.buffer; + auto in1 = buf.get_istream(); + auto in2 = rp_buf.get_istream(); + for (size_t i = 0; i < size; ++i) { + auto c1 = in1.read(); + auto c2 = in2.read(); + BOOST_CHECK_EQUAL(c1, c2); + } + return make_ready_future<>(); + }); + co_return; + } + } + + BOOST_FAIL("Did not find segment"); + }); +} + +/** + * Checks same thing as above, but will also ensure the seek mechanism in + * replayer is working, since we will span multiple chunks. + */ +SEASTAR_TEST_CASE(test_commitlog_replay_large_mutations){ + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 14; + cfg.commitlog_total_space_in_mb = 2 * cfg.commitlog_segment_size_in_mb * smp::count; + cfg.allow_going_over_size_limit = false; + + return cl_test(cfg, [](commitlog& log) -> future<> { + auto uuid = make_table_id(); + auto size = log.max_record_size() / 2; + + std::unordered_map buffers; + + for (size_t i = 0; i < 4; ++i) { + auto buf = fragmented_temporary_buffer::allocate_to_fit(size); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist; + + auto out = buf.get_ostream(); + for (size_t i = 0; i < size; ++i) { + auto c = dist(gen); + out.write(&c, 1); + } + + auto h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + for (auto& tmp : buf) { + dst.write(tmp.get(), tmp.size()); + } + }); + + auto rp = h.release(); + + buffers.emplace(rp, std::move(buf)); + } + + co_await log.sync_all_segments(); + + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + size_t n = 0; + + for (auto& seg : segments) { + auto desc = commitlog::descriptor(seg); + if (std::any_of(buffers.begin(), buffers.end(), [&](auto& p) { return p.first.id == desc.id; })) { + co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) { + auto& buf = buffers.at(buf_rp.position); + auto& rp_buf = buf_rp.buffer; + auto in1 = buf.get_istream(); + auto in2 = rp_buf.get_istream(); + for (size_t i = 0; i < size; ++i) { + auto c1 = in1.read(); + auto c2 = in2.read(); + BOOST_CHECK_EQUAL(c1, c2); + } + ++n; + return make_ready_future<>(); + }); + } + } + + BOOST_CHECK_EQUAL(n, buffers.size()); + }); +} + SEASTAR_TEST_CASE(test_commitlog_reader_produce_exception){ commitlog::config cfg; cfg.commitlog_segment_size_in_mb = 1; diff --git a/utils/fragmented_temporary_buffer.hh b/utils/fragmented_temporary_buffer.hh index 98eabb1ea4..70c8bdf951 100644 --- a/utils/fragmented_temporary_buffer.hh +++ b/utils/fragmented_temporary_buffer.hh @@ -60,6 +60,15 @@ public: return ostream::simple(reinterpret_cast(current.get_write()), current.size()); } + using const_fragment_iterator = typename vector_type::const_iterator; + + const_fragment_iterator begin() const { + return _fragments.begin(); + } + const_fragment_iterator end() const { + return _fragments.end(); + } + size_t size_bytes() const { return _size_bytes; } bool empty() const { return !_size_bytes; }