Merge 'Commitlog: Fix reading/writing position calculations and allocation size checks' from Calle Wilund

Fixes #16298

The adjusted buffer position calculation in buffer_position(), introduced in https://github.com/scylladb/scylladb/pull/15494
was in fact broken. It calculated (like previously) a "position" based on diff between
underlying buffer size and ostream size() (i.e. avail), then adjusted this according to
sector overhead rules.

However, the underlying buffer size is in unadjusted terms, and the ostream is adjusted.
The two cannot be compared as such, which means the "positions" we get here are borked.

Luckily for us (sarcasm), the position calculation in replayer made a similar error,
in that it adjusts up current position by one sector overhead to much, leading to us
more or less getting the same, erroneous results in both ends.

However, when/iff one needs to adjust the segment file format further, one might very
quickly realize that this does not work well if, say, one needs to be able to safely
read some extra bytes before first chunk in a segment. Conversely, trying to adjust
this also exposes a latent potential error in the skip mechanism, manifesting here.

Issue fixed by keeping track of the initial ostream capacity for segment buffer, and
use this for position calculation, and in the case of replayer, move file pos adjustment
from read_data() to subroutine (shared with skipping), that better takes data stream
position vs. file position adjustment. In implementaion terms, we first inc the
"data stream" pos (i.e. pos in data without overhead), then adjust for overhead.

Also fix replayer::skip, so that we handle the buffer/pos relation correctly now.

Added test for intial entry position, as well as data replay consistency for single
entry_writer paths.

Fixes #16301

The calculation on whether data may be added is based on position vs. size of incoming data.
However, it did not take sector overhead into account, which lead us to writing past allowed
segment end, which in turn also leads to metrics overflows.

Closes scylladb/scylladb#16302

* github.com:scylladb/scylladb:
  commitlog: Fix allocation size check to take sector overhead into account.
  commitlog: Fix commitlog_segment::buffer_position() calculation and replay counterpart
This commit is contained in:
Avi Kivity
2023-12-07 12:27:54 +02:00
2 changed files with 240 additions and 27 deletions

View File

@@ -743,6 +743,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
buffer_type _buffer;
base_ostream_type _buffer_ostream;
size_t _buffer_ostream_size = 0;
std::unordered_map<cf_id_type, uint64_t> _cf_dirty;
std::unordered_map<cf_id_type, gc_clock::time_point> _cf_min_time;
time_point _sync_time;
@@ -764,9 +765,13 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
// page checksums etc. The ostream does not include this, as it is subdivided and
// skips sector_overhead parts of the memory buffer. So to get actual position
// in the buffer, we need to add it back.
auto size = _buffer_ostream.size();
size += sector_overhead(size);
return _buffer.size_bytes() - size;
// #16298 - this was based on _buffer_size vs. remaining in ostream.
// ostream type has no "position" type, need to keep track of
// what the original size was, i.e. _buffer.size() adjusted down
// for sector overhead.
auto used = _buffer_ostream_size - _buffer_ostream.size();
return used + sector_overhead(used);
}
future<> begin_flush() {
@@ -1000,9 +1005,7 @@ public:
s += overhead;
// add bookkeep data reqs.
s += sector_overhead(s);
auto a = align_up(s, _alignment);
auto a = align_up(s + sector_overhead(s), _alignment);
auto k = std::max(a, default_size);
_buffer = _segment_manager->acquire_buffer(k, _alignment);
@@ -1011,11 +1014,17 @@ public:
// the amount of data we can actually write into.
auto useable_size = size - n_blocks * detail::sector_overhead_size;
assert(useable_size >= s);
_buffer_ostream = frag_ostream_type(detail::sector_split_iterator(_buffer.begin(), _buffer.end(), _alignment), useable_size);
// #16298 - keep track of ostream initial size.
_buffer_ostream_size = useable_size;
auto out = _buffer_ostream.write_substream(overhead);
out.fill('\0', overhead);
_segment_manager->totals.buffer_list_bytes += _buffer.size_bytes();
assert(buffer_position() == overhead);
}
bool buffer_is_empty() const {
@@ -1044,6 +1053,7 @@ public:
_file_pos = top;
_buffer_ostream = { };
_buffer_ostream_size = 0;
_num_allocs = 0;
assert(me.use_count() > 1);
@@ -1243,7 +1253,7 @@ public:
_segment_manager->sanity_check_size(s);
if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big?
if (!is_still_allocating() || next_position(s) > _segment_manager->max_size) { // would we make the file too big?
return write_result::no_space;
} else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data?
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer.sync) {
@@ -1332,6 +1342,12 @@ public:
return position_type(_file_pos + buffer_position());
}
position_type next_position(size_t size) const {
auto used = _buffer_ostream_size - _buffer_ostream.size();
used += size;
return _file_pos + used + sector_overhead(used);
}
size_t file_position() const {
return _file_pos;
}
@@ -1342,9 +1358,13 @@ public:
auto buf_pos = buffer_position();
auto size = align_up(buf_pos, _alignment);
auto fill_size = size - buf_pos;
_buffer_ostream.fill('\0', fill_size);
_segment_manager->totals.bytes_slack += fill_size;
_segment_manager->account_memory_usage(fill_size);
if (fill_size > 0) {
// we want to fill to a sector boundry, must leave room for metadata
assert((fill_size - detail::sector_overhead_size) <= _buffer_ostream.size());
_buffer_ostream.fill('\0', fill_size - detail::sector_overhead_size);
_segment_manager->totals.bytes_slack += fill_size;
_segment_manager->account_memory_usage(fill_size);
}
return size;
}
void mark_clean(const cf_id_type& id, uint64_t count) noexcept {
@@ -1888,7 +1908,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
// proper descriptor id order. If we renamed in the delete call
// that recycled the file we could potentially have
// out-of-order files. (Sort does not help).
clogger.debug("Using recycled segment file {} -> {}", f.name(), dst);
clogger.debug("Using recycled segment file {} -> {} ({} MB)", f.name(), dst, f.known_size()/(1024*1024));
co_await f.rename(dst);
co_return co_await allocate_segment_ex(std::move(d), std::move(f), flags);
}
@@ -2267,7 +2287,7 @@ future<> db::commitlog::segment_manager::do_pending_deletes() {
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
auto dst = this->filename(d);
clogger.debug("Recycling segment file {}", f.name());
clogger.debug("Recycling segment file {} -> {}", f.name(), dst);
// must rename the file since we must ensure the
// data is not replayed. Changing the name will
// cause header ID to be invalid in the file -> ignored
@@ -2283,6 +2303,7 @@ future<> db::commitlog::segment_manager::do_pending_deletes() {
}
}
clogger.debug("Deleting segment file {}", f.name());
// last resort.
co_await f.remove_file();
} catch (...) {
@@ -2724,29 +2745,40 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
return eof || next == pos;
}
future<> skip_to_chunk(size_t seek_to_pos) {
clogger.debug("Skip to {} ({}, {})", seek_to_pos, pos, buffer.size_bytes());
if (seek_to_pos >= file_size) {
eof = true;
pos = file_size;
co_return;
}
auto bytes = seek_to_pos - pos;
auto rem = buffer.size_bytes();
if (bytes < rem) {
buffer.remove_suffix(bytes);
rem -= bytes;
pos += bytes;
co_return;
// Fix the buffer skip to be correct and more resilient.
if (buffer.size_bytes()) {
auto dseek_to_pos = filepos_to_datapos(seek_to_pos);
auto dpos = filepos_to_datapos(pos);
auto rem = buffer.size_bytes();
auto n = std::min(dseek_to_pos - dpos, rem);
buffer.remove_prefix(n);
advance_pos(n);
}
buffer = {};
pos += rem;
bytes = seek_to_pos - pos;
if (pos == seek_to_pos) {
co_return;
}
// must be on page boundary now!
assert(align_down(pos, alignment) == pos);
// this is in full sectors. no need to fiddle with overhead here.
auto bytes = seek_to_pos - pos;
auto skip_bytes = align_down(bytes, alignment);
pos += skip_bytes;
pos += skip_bytes;
co_await fin.skip(skip_bytes);
// Should not be the case - we only ever skip to page boundaries,
// but it is nice if the code can handle it. If we get here, we
// want to get into the current page. Read and discard.
if (bytes > skip_bytes) {
// must crc check if we read into a sector
co_await read_data(bytes - skip_bytes);
@@ -2817,6 +2849,8 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
auto buf_vec = std::move(buffer).release();
auto block_boundry = align_up(pos - initial.size_bytes(), alignment);
clogger.debug("Read {} bytes of data ({}, {})", size, pos, rem);
while (rem < size) {
if (eof) {
throw segment_truncation(block_boundry);
@@ -2874,7 +2908,6 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
tmp.remove_suffix(detail::sector_overhead_size);
rem += tmp.size_bytes();
pos += detail::sector_overhead_size;
auto vec2 = std::move(tmp).release();
for (auto&& v : vec2) {
@@ -2908,12 +2941,16 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
// this is the returned result.
auto res = fragmented_temporary_buffer(std::move(buf_vec), size);
pos += size;
// #16298 - adjust position here, based on data returned.
advance_pos(size);
assert(((filepos_to_datapos(pos) + buffer.size_bytes()) % (alignment - detail::sector_overhead_size)) == 0);
co_return res;
}
future<> read_chunk() {
clogger.debug("read_chunk {}", pos);
auto start = pos;
auto buf = co_await read_data(segment::segment_overhead_size);
auto in = buf.get_istream();
@@ -2952,16 +2989,41 @@ db::commitlog::read_log_file(sstring filename, sstring pfx, commit_load_reader_f
}
}
// adjust an actual file position to "data stream" position, i.e. without overhead.
size_t filepos_to_datapos(size_t pos) const {
return pos - (pos / alignment) * detail::sector_overhead_size;
}
// adjust "data stream" pos to file pos, i.e. add overhead
size_t datapos_to_filepos(size_t pos) const {
return pos + (pos / (alignment - detail::sector_overhead_size)) * detail::sector_overhead_size;
}
size_t next_pos(size_t off) const {
auto data_pos = filepos_to_datapos(pos);
auto next_data_pos = data_pos + off;
return datapos_to_filepos(next_data_pos);
}
// #16298 - handle adjusted file position update correctly.
void advance_pos(size_t off) {
auto old = pos;
pos = next_pos(off);
clogger.trace("Pos {} -> {} ({})", old, pos, off);
}
future<> read_entry() {
static constexpr size_t entry_header_size = segment::entry_overhead_size;
clogger.debug("read_entry {}", pos);
/**
* #598 - Must check that data left in chunk is enough to even read an entry.
* If not, this is small slack space in the chunk end, and we should just go
* to the next.
*/
assert(pos <= next);
if ((pos + entry_header_size) >= next) {
if (next_pos(entry_header_size) >= next) {
co_await skip_to_chunk(next);
co_return;
}

View File

@@ -971,6 +971,69 @@ SEASTAR_TEST_CASE(test_commitlog_replay_invalid_key){
using namespace std::chrono_literals;
SEASTAR_TEST_CASE(test_commitlog_add_entry) {
return cl_test([](commitlog& log) {
return seastar::async([&] {
using force_sync = commitlog_entry_writer::force_sync;
constexpr auto n = 10;
for (auto fs : { force_sync(false), force_sync(true) }) {
std::vector<commitlog_entry_writer> writers;
std::vector<frozen_mutation> mutations;
std::vector<replay_position> rps;
writers.reserve(n);
mutations.reserve(n);
for (auto i = 0; i < n; ++i) {
random_mutation_generator gen(random_mutation_generator::generate_counters(false));
mutations.emplace_back(gen(1).front());
writers.emplace_back(gen.schema(), mutations.back(), fs);
}
std::set<segment_id_type> ids;
for (auto& w : writers) {
auto h = log.add_entry(w.schema()->id(), w, db::timeout_clock::now() + 60s).get0();
ids.emplace(h.rp().id);
rps.emplace_back(h.rp());
}
BOOST_CHECK_EQUAL(ids.size(), 1);
log.sync_all_segments().get();
auto segments = log.get_active_segment_names();
BOOST_REQUIRE(!segments.empty());
std::unordered_set<replay_position> result;
for (auto& seg : segments) {
db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) {
commitlog_entry_reader r(buf_rp.buffer);
auto& rp = buf_rp.position;
auto i = std::find(rps.begin(), rps.end(), rp);
// since we are looping, we can be reading last test cases
// segment (force_sync permutations)
if (i != rps.end()) {
auto n = std::distance(rps.begin(), i);
auto& fm1 = mutations.at(n);
auto& fm2 = r.mutation();
auto s = writers.at(n).schema();
auto m1 = fm1.unfreeze(s);
auto m2 = fm2.unfreeze(s);
BOOST_CHECK_EQUAL(m1, m2);
result.emplace(rp);
}
return make_ready_future<>();
}).get();
}
BOOST_CHECK_EQUAL(result.size(), rps.size());
}
});
});
}
SEASTAR_TEST_CASE(test_commitlog_add_entries) {
return cl_test([](commitlog& log) {
return seastar::async([&] {
@@ -984,7 +1047,7 @@ SEASTAR_TEST_CASE(test_commitlog_add_entries) {
writers.reserve(n);
mutations.reserve(n);
for (auto i = 0; i < n; ++i) {
random_mutation_generator gen(random_mutation_generator::generate_counters(false));
mutations.emplace_back(gen(1).front());
@@ -1033,6 +1096,94 @@ SEASTAR_TEST_CASE(test_commitlog_add_entries) {
});
}
// #16298 - check entry offsets so that we report the correct file positions both
// when reading and writing CL data.
SEASTAR_TEST_CASE(test_commitlog_entry_offsets) {
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 1;
cfg.commitlog_total_space_in_mb = 2 * smp::count;
cfg.allow_going_over_size_limit = false;
return cl_test(cfg, [](commitlog& log) -> future<> {
auto uuid = make_table_id();
sstring tmp = "hej bubba cow";
auto h = co_await log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
});
auto rp = h.release();
// verify the first rp is at file offset 32 (file header + chunk header)
BOOST_CHECK_EQUAL(rp.pos, 24 + 8); // TODO: export these sizes for tests.
co_await log.sync_all_segments();
auto segments = log.get_active_segment_names();
BOOST_REQUIRE(!segments.empty());
for (auto& seg : segments) {
auto desc = commitlog::descriptor(seg);
if (desc.id == rp.id) {
bool found = false;
co_await db::commitlog::read_log_file(seg, db::commitlog::descriptor::FILENAME_PREFIX, [&](db::commitlog::buffer_and_replay_position buf_rp) {
BOOST_CHECK_EQUAL(buf_rp.position, rp);
found = true;
return make_ready_future<>();
});
BOOST_REQUIRE(found);
co_return;
}
}
BOOST_FAIL("Did not find segment");
});
}
SEASTAR_TEST_CASE(test_commitlog_max_segment_size) {
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 1;
cfg.commitlog_total_space_in_mb = 2 * smp::count;
cfg.allow_going_over_size_limit = false;
return cl_test(cfg, [max_size = cfg.commitlog_segment_size_in_mb](commitlog& log) -> future<> {
auto uuid = make_table_id();
sstring tmp = "hej bubba cow";
db::replay_position last_rp;
std::optional<db::segment_id_type> last;
auto max_size_bytes = max_size * 1024 * 1024;
for (;;) {
auto h = co_await log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
});
auto rp = h.release();
if (last && last != rp.id) {
break;
}
last = rp.id;
}
co_await log.sync_all_segments();
auto segments = log.get_active_segment_names();
BOOST_REQUIRE(!segments.empty());
for (auto& seg : segments) {
auto desc = commitlog::descriptor(seg);
if (last == desc.id) {
auto size = co_await file_size(seg);
BOOST_REQUIRE_LE(size, max_size_bytes);
co_return;
}
}
BOOST_FAIL("Did not find segment");
});
}
SEASTAR_TEST_CASE(test_commitlog_new_segment_odsync){
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 1;