diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 88512d7a2e..44e0bf4c21 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -743,6 +743,7 @@ class db::commitlog::segment : public enable_shared_from_this, public c buffer_type _buffer; base_ostream_type _buffer_ostream; + size_t _buffer_ostream_size = 0; std::unordered_map _cf_dirty; std::unordered_map _cf_min_time; time_point _sync_time; @@ -764,9 +765,13 @@ class db::commitlog::segment : public enable_shared_from_this, public c // page checksums etc. The ostream does not include this, as it is subdivided and // skips sector_overhead parts of the memory buffer. So to get actual position // in the buffer, we need to add it back. - auto size = _buffer_ostream.size(); - size += sector_overhead(size); - return _buffer.size_bytes() - size; + + // #16298 - this was based on _buffer_size vs. remaining in ostream. + // ostream type has no "position" type, need to keep track of + // what the original size was, i.e. _buffer.size() adjusted down + // for sector overhead. + auto used = _buffer_ostream_size - _buffer_ostream.size(); + return used + sector_overhead(used); } future<> begin_flush() { @@ -1000,9 +1005,7 @@ public: s += overhead; // add bookkeep data reqs. - s += sector_overhead(s); - - auto a = align_up(s, _alignment); + auto a = align_up(s + sector_overhead(s), _alignment); auto k = std::max(a, default_size); _buffer = _segment_manager->acquire_buffer(k, _alignment); @@ -1011,11 +1014,17 @@ public: // the amount of data we can actually write into. auto useable_size = size - n_blocks * detail::sector_overhead_size; + assert(useable_size >= s); + _buffer_ostream = frag_ostream_type(detail::sector_split_iterator(_buffer.begin(), _buffer.end(), _alignment), useable_size); + // #16298 - keep track of ostream initial size. + _buffer_ostream_size = useable_size; auto out = _buffer_ostream.write_substream(overhead); out.fill('\0', overhead); _segment_manager->totals.buffer_list_bytes += _buffer.size_bytes(); + + assert(buffer_position() == overhead); } bool buffer_is_empty() const { @@ -1044,6 +1053,7 @@ public: _file_pos = top; _buffer_ostream = { }; + _buffer_ostream_size = 0; _num_allocs = 0; assert(me.use_count() > 1); @@ -1243,7 +1253,7 @@ public: _segment_manager->sanity_check_size(s); - if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big? + if (!is_still_allocating() || next_position(s) > _segment_manager->max_size) { // would we make the file too big? return write_result::no_space; } else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data? if (_segment_manager->cfg.mode == sync_mode::BATCH || writer.sync) { @@ -1332,6 +1342,12 @@ public: return position_type(_file_pos + buffer_position()); } + position_type next_position(size_t size) const { + auto used = _buffer_ostream_size - _buffer_ostream.size(); + used += size; + return _file_pos + used + sector_overhead(used); + } + size_t file_position() const { return _file_pos; } @@ -1342,9 +1358,13 @@ public: auto buf_pos = buffer_position(); auto size = align_up(buf_pos, _alignment); auto fill_size = size - buf_pos; - _buffer_ostream.fill('\0', fill_size); - _segment_manager->totals.bytes_slack += fill_size; - _segment_manager->account_memory_usage(fill_size); + if (fill_size > 0) { + // we want to fill to a sector boundry, must leave room for metadata + assert((fill_size - detail::sector_overhead_size) <= _buffer_ostream.size()); + _buffer_ostream.fill('\0', fill_size - detail::sector_overhead_size); + _segment_manager->totals.bytes_slack += fill_size; + _segment_manager->account_memory_usage(fill_size); + } return size; } void mark_clean(const cf_id_type& id, uint64_t count) noexcept { @@ -1888,7 +1908,7 @@ future db::commitlog::segment_manager: // proper descriptor id order. If we renamed in the delete call // that recycled the file we could potentially have // out-of-order files. (Sort does not help). - clogger.debug("Using recycled segment file {} -> {}", f.name(), dst); + clogger.debug("Using recycled segment file {} -> {} ({} MB)", f.name(), dst, f.known_size()/(1024*1024)); co_await f.rename(dst); co_return co_await allocate_segment_ex(std::move(d), std::move(f), flags); } @@ -2267,7 +2287,7 @@ future<> db::commitlog::segment_manager::do_pending_deletes() { descriptor d(next_id(), "Recycled-" + cfg.fname_prefix); auto dst = this->filename(d); - clogger.debug("Recycling segment file {}", f.name()); + clogger.debug("Recycling segment file {} -> {}", f.name(), dst); // must rename the file since we must ensure the // data is not replayed. Changing the name will // cause header ID to be invalid in the file -> ignored @@ -2283,6 +2303,7 @@ future<> db::commitlog::segment_manager::do_pending_deletes() { } } + clogger.debug("Deleting segment file {}", f.name()); // last resort. co_await f.remove_file(); } catch (...) { @@ -2724,29 +2745,40 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f return eof || next == pos; } future<> skip_to_chunk(size_t seek_to_pos) { + clogger.debug("Skip to {} ({}, {})", seek_to_pos, pos, buffer.size_bytes()); + if (seek_to_pos >= file_size) { eof = true; pos = file_size; co_return; } - auto bytes = seek_to_pos - pos; - auto rem = buffer.size_bytes(); - if (bytes < rem) { - buffer.remove_suffix(bytes); - rem -= bytes; - pos += bytes; - co_return; + // Fix the buffer skip to be correct and more resilient. + if (buffer.size_bytes()) { + auto dseek_to_pos = filepos_to_datapos(seek_to_pos); + auto dpos = filepos_to_datapos(pos); + auto rem = buffer.size_bytes(); + auto n = std::min(dseek_to_pos - dpos, rem); + + buffer.remove_prefix(n); + advance_pos(n); } - buffer = {}; - pos += rem; - bytes = seek_to_pos - pos; + if (pos == seek_to_pos) { + co_return; + } + // must be on page boundary now! + assert(align_down(pos, alignment) == pos); + // this is in full sectors. no need to fiddle with overhead here. + auto bytes = seek_to_pos - pos; auto skip_bytes = align_down(bytes, alignment); - pos += skip_bytes; + pos += skip_bytes; co_await fin.skip(skip_bytes); + // Should not be the case - we only ever skip to page boundaries, + // but it is nice if the code can handle it. If we get here, we + // want to get into the current page. Read and discard. if (bytes > skip_bytes) { // must crc check if we read into a sector co_await read_data(bytes - skip_bytes); @@ -2817,6 +2849,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f auto buf_vec = std::move(buffer).release(); auto block_boundry = align_up(pos - initial.size_bytes(), alignment); + clogger.debug("Read {} bytes of data ({}, {})", size, pos, rem); + while (rem < size) { if (eof) { throw segment_truncation(block_boundry); @@ -2874,7 +2908,6 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f tmp.remove_suffix(detail::sector_overhead_size); rem += tmp.size_bytes(); - pos += detail::sector_overhead_size; auto vec2 = std::move(tmp).release(); for (auto&& v : vec2) { @@ -2908,12 +2941,16 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f // this is the returned result. auto res = fragmented_temporary_buffer(std::move(buf_vec), size); - pos += size; + // #16298 - adjust position here, based on data returned. + advance_pos(size); + + assert(((filepos_to_datapos(pos) + buffer.size_bytes()) % (alignment - detail::sector_overhead_size)) == 0); co_return res; } future<> read_chunk() { + clogger.debug("read_chunk {}", pos); auto start = pos; auto buf = co_await read_data(segment::segment_overhead_size); auto in = buf.get_istream(); @@ -2952,16 +2989,41 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f } } + // adjust an actual file position to "data stream" position, i.e. without overhead. + size_t filepos_to_datapos(size_t pos) const { + return pos - (pos / alignment) * detail::sector_overhead_size; + } + + // adjust "data stream" pos to file pos, i.e. add overhead + size_t datapos_to_filepos(size_t pos) const { + return pos + (pos / (alignment - detail::sector_overhead_size)) * detail::sector_overhead_size; + } + + size_t next_pos(size_t off) const { + auto data_pos = filepos_to_datapos(pos); + auto next_data_pos = data_pos + off; + return datapos_to_filepos(next_data_pos); + } + + // #16298 - handle adjusted file position update correctly. + void advance_pos(size_t off) { + auto old = pos; + pos = next_pos(off); + clogger.trace("Pos {} -> {} ({})", old, pos, off); + } + future<> read_entry() { static constexpr size_t entry_header_size = segment::entry_overhead_size; + clogger.debug("read_entry {}", pos); + /** * #598 - Must check that data left in chunk is enough to even read an entry. * If not, this is small slack space in the chunk end, and we should just go * to the next. */ assert(pos <= next); - if ((pos + entry_header_size) >= next) { + if (next_pos(entry_header_size) >= next) { co_await skip_to_chunk(next); co_return; } diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index ef3e3fc845..ef5dbba288 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -971,6 +971,69 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){ using namespace std::chrono_literals; +SEASTAR_TEST_CASE(test_commitlog_add_entry) { + return cl_test([](commitlog& log) { + return seastar::async([&] { + using force_sync = commitlog_entry_writer::force_sync; + + constexpr auto n = 10; + for (auto fs : { force_sync(false), force_sync(true) }) { + std::vector writers; + std::vector mutations; + std::vector rps; + + writers.reserve(n); + mutations.reserve(n); + + for (auto i = 0; i < n; ++i) { + random_mutation_generator gen(random_mutation_generator::generate_counters(false)); + mutations.emplace_back(gen(1).front()); + writers.emplace_back(gen.schema(), mutations.back(), fs); + } + + std::set ids; + + for (auto& w : writers) { + auto h = log.add_entry(w.schema()->id(), w, db::timeout_clock::now() + 60s).get0(); + ids.emplace(h.rp().id); + rps.emplace_back(h.rp()); + } + + BOOST_CHECK_EQUAL(ids.size(), 1); + + log.sync_all_segments().get(); + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + std::unordered_set result; + + for (auto& seg : segments) { + db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) { + commitlog_entry_reader r(buf_rp.buffer); + auto& rp = buf_rp.position; + auto i = std::find(rps.begin(), rps.end(), rp); + // since we are looping, we can be reading last test cases + // segment (force_sync permutations) + if (i != rps.end()) { + auto n = std::distance(rps.begin(), i); + auto& fm1 = mutations.at(n); + auto& fm2 = r.mutation(); + auto s = writers.at(n).schema(); + auto m1 = fm1.unfreeze(s); + auto m2 = fm2.unfreeze(s); + BOOST_CHECK_EQUAL(m1, m2); + result.emplace(rp); + } + return make_ready_future<>(); + }).get(); + } + + BOOST_CHECK_EQUAL(result.size(), rps.size()); + } + }); + }); +} + SEASTAR_TEST_CASE(test_commitlog_add_entries) { return cl_test([](commitlog& log) { return seastar::async([&] { @@ -984,7 +1047,7 @@ SEASTAR_TEST_CASE(test_commitlog_add_entries) { writers.reserve(n); mutations.reserve(n); - + for (auto i = 0; i < n; ++i) { random_mutation_generator gen(random_mutation_generator::generate_counters(false)); mutations.emplace_back(gen(1).front()); @@ -1033,6 +1096,94 @@ SEASTAR_TEST_CASE(test_commitlog_add_entries) { }); } +// #16298 - check entry offsets so that we report the correct file positions both +// when reading and writing CL data. +SEASTAR_TEST_CASE(test_commitlog_entry_offsets) { + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 1; + cfg.commitlog_total_space_in_mb = 2 * smp::count; + cfg.allow_going_over_size_limit = false; + + return cl_test(cfg, [](commitlog& log) -> future<> { + auto uuid = make_table_id(); + sstring tmp = "hej bubba cow"; + + auto h = co_await log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.write(tmp.data(), tmp.size()); + }); + + auto rp = h.release(); + + // verify the first rp is at file offset 32 (file header + chunk header) + BOOST_CHECK_EQUAL(rp.pos, 24 + 8); // TODO: export these sizes for tests. + + co_await log.sync_all_segments(); + + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + for (auto& seg : segments) { + auto desc = commitlog::descriptor(seg); + if (desc.id == rp.id) { + bool found = false; + co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) { + BOOST_CHECK_EQUAL(buf_rp.position, rp); + found = true; + return make_ready_future<>(); + }); + BOOST_REQUIRE(found); + co_return; + } + } + + BOOST_FAIL("Did not find segment"); + }); +} + +SEASTAR_TEST_CASE(test_commitlog_max_segment_size) { + commitlog::config cfg; + cfg.commitlog_segment_size_in_mb = 1; + cfg.commitlog_total_space_in_mb = 2 * smp::count; + cfg.allow_going_over_size_limit = false; + + return cl_test(cfg, [max_size = cfg.commitlog_segment_size_in_mb](commitlog& log) -> future<> { + auto uuid = make_table_id(); + sstring tmp = "hej bubba cow"; + db::replay_position last_rp; + std::optional last; + auto max_size_bytes = max_size * 1024 * 1024; + + for (;;) { + auto h = co_await log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.write(tmp.data(), tmp.size()); + }); + + auto rp = h.release(); + + if (last && last != rp.id) { + break; + } + last = rp.id; + } + + co_await log.sync_all_segments(); + + auto segments = log.get_active_segment_names(); + BOOST_REQUIRE(!segments.empty()); + + for (auto& seg : segments) { + auto desc = commitlog::descriptor(seg); + if (last == desc.id) { + auto size = co_await file_size(seg); + BOOST_REQUIRE_LE(size, max_size_bytes); + co_return; + } + } + + BOOST_FAIL("Did not find segment"); + }); +} + SEASTAR_TEST_CASE(test_commitlog_new_segment_odsync){ commitlog::config cfg; cfg.commitlog_segment_size_in_mb = 1;