commitlog: Make commitlog entries optionally multi-entry

Allows writing more than one blob of data using a single
"add" call into segment. The old call sites will still
just provide a single entry.

To ensure we can determine the health of all the entries
as a unit, we need to wrap them in a "parent" entry.
For this, we bump the commitlog segment format and
introduce a magic marker, which if present, means
we have entries in entry, totalling "size" bytes.
We checksum the entra header, and also checksum
the individual checksums of each sub-entry (faster).
This is added as a post-word.

When parsing/replaying, if v2+ and marker, we have to
read all entries + checksums into memory, verify, and
_then_ we can actually send the info to caller.
This commit is contained in:
Calle Wilund
2020-11-11 16:21:36 +00:00
parent 6bef3f9cc3
commit 5fcc2066ed
3 changed files with 279 additions and 97 deletions

View File

@@ -62,6 +62,7 @@
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/queue.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/net/byteorder.hh>
#include "seastarx.hh"
@@ -188,14 +189,54 @@ db::commitlog::descriptor::operator db::replay_position() const {
return replay_position(id);
}
/**
* virtual dispatch for actually inputting data.
* purposely de/un-templated
*
* Writes N entries to a single segment,
* where each entry has its own header+crc,
* i.e. will be deserialized separately.
*/
struct db::commitlog::entry_writer {
force_sync sync;
explicit entry_writer(force_sync sync_) : sync(sync_) {}
size_t num_entries;
explicit entry_writer(force_sync fs, size_t ne = 1)
: sync(fs)
, num_entries(ne)
{}
virtual ~entry_writer() = default;
/** return the CF id for n:th entry */
virtual const cf_id_type& id(size_t) const = 0;
/**
* Returns segment-independent size of all entries combined. Must be >= than segment-dependant total size.
* This is always called first, and should return "worst-case"
* for the complete set of entries
*/
virtual size_t size() const = 0;
/**
* Return the total size of all entries in this given segment
* Called after size(void), once a segment has been chosen.
* Should return the total, exact, size for all entries + overhead (i.e. schema)
* for this segment.
*
* Can be called more than once, if segment switch is neccesary (because race)
*/
virtual size_t size(segment&) = 0;
// Returns segment-independent size of the entry. Must be <= than segment-dependant size.
virtual size_t size() = 0;
virtual void write(segment&, output&) = 0;
virtual ~entry_writer() {};
/**
* return the size of the n:th entry in this given segment
* Only called IFF num_entries > 1, and if so, after size(void)/size(segment&)
* and before write(...)
*/
virtual size_t size(segment&, size_t) = 0;
/* write nth entry */
virtual void write(segment&, output&, size_t) const = 0;
/** the resulting rp_handle for writing a given entry */
virtual void result(size_t, rp_handle) = 0;
};
const std::string db::commitlog::descriptor::SEPARATOR("-");
@@ -256,8 +297,8 @@ public:
_request_controller.signal(size);
}
future<rp_handle>
allocate_when_possible(const cf_id_type& id, shared_ptr<entry_writer> writer, db::timeout_clock::time_point timeout);
future<>
allocate_when_possible(shared_ptr<entry_writer> writer, db::timeout_clock::time_point timeout);
struct stats {
uint64_t cycle_count = 0;
@@ -507,9 +548,11 @@ public:
// The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
static constexpr size_t entry_overhead_size = 3 * sizeof(uint32_t);
static constexpr size_t multi_entry_overhead_size = entry_overhead_size + sizeof(uint32_t);
static constexpr size_t segment_overhead_size = 2 * sizeof(uint32_t);
static constexpr size_t descriptor_header_size = 5 * sizeof(uint32_t);
static constexpr uint32_t segment_magic = ('S'<<24) |('C'<< 16) | ('L' << 8) | 'C';
static constexpr uint32_t multi_entry_size_magic = 0xffffffff;
// The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position])
static constexpr size_t sync_marker_size = 2 * sizeof(uint32_t);
@@ -861,24 +904,23 @@ public:
/**
* Add a "mutation" to the segment.
*/
future<rp_handle> allocate(const cf_id_type& id, shared_ptr<entry_writer> writer, segment_manager::request_controller_units permit, db::timeout_clock::time_point timeout) {
future<> allocate(shared_ptr<entry_writer> writer, segment_manager::request_controller_units permit, db::timeout_clock::time_point timeout) {
if (must_sync()) {
return with_timeout(timeout, sync()).then([this, id, writer = std::move(writer), permit = std::move(permit), timeout] (auto s) mutable {
return s->allocate(id, std::move(writer), std::move(permit), timeout);
return with_timeout(timeout, sync()).then([this, writer = std::move(writer), permit = std::move(permit), timeout] (auto s) mutable {
return s->allocate(std::move(writer), std::move(permit), timeout);
});
}
const auto size = writer->size(*this);
const auto s = size + entry_overhead_size; // total size
const auto s = size + writer->num_entries * entry_overhead_size + (writer->num_entries > 1 ? multi_entry_overhead_size : 0u); // total size
auto ep = _segment_manager->sanity_check_size(s);
if (ep) {
return make_exception_future<rp_handle>(std::move(ep));
return make_exception_future<>(std::move(ep));
}
if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big?
return finish_and_get_new(timeout).then([id, writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable {
return new_seg->allocate(id, std::move(writer), std::move(permit), timeout);
return finish_and_get_new(timeout).then([writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable {
return new_seg->allocate(std::move(writer), std::move(permit), timeout);
});
} else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data?
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) {
@@ -886,8 +928,8 @@ public:
// If we run batch mode and find ourselves not fit in a non-empty
// buffer, we must force a cycle and wait for it (to keep flush order)
// This will most likely cause parallel writes, and consecutive flushes.
return with_timeout(timeout, sync()).then([this, id, writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable {
return new_seg->allocate(id, std::move(writer), std::move(permit), timeout);
return with_timeout(timeout, sync()).then([this, writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable {
return new_seg->allocate(std::move(writer), std::move(permit), timeout);
});
} else {
//FIXME: discarded future
@@ -904,41 +946,77 @@ public:
}
if (_closed) {
return make_exception_future<rp_handle>(std::runtime_error("commitlog: Cannot add data to a closed segment"));
return make_exception_future<>(std::runtime_error("commitlog: Cannot add data to a closed segment"));
}
buf_memory -= permit.release();
_segment_manager->account_memory_usage(buf_memory);
replay_position rp(_desc.id, position());
_cf_dirty[id]++; // increase use count for cf.
auto& out = _buffer_ostream;
rp_handle h(static_pointer_cast<cf_holder>(shared_from_this()), std::move(id), rp);
std::optional<crc32_nbo> mecrc;
auto out = _buffer_ostream.write_substream(s);
crc32_nbo crc;
// if this is multi-entry write, we need to add an extra header + crc
// the header and crc formula is:
// header:
// magic : uint32_t
// size : uint32_t
// crc1 : uint32_t - crc of magic, size
// -> entries[]
// post:
// crc2 : uint32_t - crc1 + each entry crc.
if (writer->num_entries > 1) {
mecrc.emplace();
write<uint32_t>(out, multi_entry_size_magic);
write<uint32_t>(out, s);
mecrc->process(multi_entry_size_magic);
mecrc->process(uint32_t(s));
write<uint32_t>(out, mecrc->checksum());
}
write<uint32_t>(out, s);
crc.process(uint32_t(s));
write<uint32_t>(out, crc.checksum());
for (size_t entry = 0; entry < writer->num_entries; ++entry) {
replay_position rp(_desc.id, position());
auto id = writer->id(entry);
auto entry_size = writer->num_entries == 1 ? size : writer->size(*this, entry);
auto es = entry_size + entry_overhead_size;
// actual data
auto entry_out = out.write_substream(size);
auto entry_data = entry_out.to_input_stream();
writer->write(*this, entry_out);
entry_data.with_stream([&] (auto data_str) {
crc.process_fragmented(ser::buffer_view<typename std::vector<temporary_buffer<char>>::iterator>(data_str));
});
_cf_dirty[id]++; // increase use count for cf.
write<uint32_t>(out, crc.checksum());
rp_handle h(static_pointer_cast<cf_holder>(shared_from_this()), std::move(id), rp);
crc32_nbo crc;
write<uint32_t>(out, es);
crc.process(uint32_t(es));
write<uint32_t>(out, crc.checksum());
// actual data
auto entry_out = out.write_substream(entry_size);
auto entry_data = entry_out.to_input_stream();
writer->write(*this, entry_out, entry);
entry_data.with_stream([&] (auto data_str) {
crc.process_fragmented(ser::buffer_view<typename std::vector<temporary_buffer<char>>::iterator>(data_str));
});
auto checksum = crc.checksum();
write<uint32_t>(out, checksum);
if (mecrc) {
mecrc->process(checksum);
}
writer->result(entry, std::move(h));
}
if (mecrc) {
// write the crc of header + all sub-entry crc
write<uint32_t>(out, mecrc->checksum());
}
++_segment_manager->totals.allocation_count;
++_num_allocs;
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) {
return batch_cycle(timeout).then([h = std::move(h)](auto s) mutable {
return make_ready_future<rp_handle>(std::move(h));
});
return batch_cycle(timeout).discard_result();
} else {
// If this buffer alone is too big, potentially bigger than the maximum allowed size,
// then no other request will be allowed in to force the cycle()ing of this buffer. We
@@ -949,7 +1027,7 @@ public:
clogger.error("Failed to flush commits to disk: {}", ex);
});
}
return make_ready_future<rp_handle>(std::move(h));
return make_ready_future<>();
}
}
@@ -1011,24 +1089,24 @@ public:
}
};
future<db::rp_handle>
db::commitlog::segment_manager::allocate_when_possible(const cf_id_type& id, shared_ptr<entry_writer> writer, db::timeout_clock::time_point timeout) {
future<>
db::commitlog::segment_manager::allocate_when_possible(shared_ptr<entry_writer> writer, db::timeout_clock::time_point timeout) {
auto size = writer->size();
// If this is already too big now, we should throw early. It's also a correctness issue, since
// if we are too big at this moment we'll never reach allocate() to actually throw at that
// point.
auto ep = sanity_check_size(size);
if (ep) {
return make_exception_future<rp_handle>(std::move(ep));
return make_exception_future<>(std::move(ep));
}
auto fut = get_units(_request_controller, size, timeout);
if (_request_controller.waiters()) {
totals.requests_blocked_memory++;
}
return fut.then([this, id, writer = std::move(writer), timeout] (auto permit) mutable {
return active_segment(timeout).then([this, timeout, id, writer = std::move(writer), permit = std::move(permit)] (auto s) mutable {
return s->allocate(id, std::move(writer), std::move(permit), timeout);
return fut.then([this, writer = std::move(writer), timeout] (auto permit) mutable {
return active_segment(timeout).then([this, timeout, writer = std::move(writer), permit = std::move(permit)] (auto s) mutable {
return s->allocate(std::move(writer), std::move(permit), timeout);
});
});
}
@@ -1071,6 +1149,7 @@ db::commitlog::segment_manager::segment_manager(config c)
, _reserve_replenisher(make_ready_future<>())
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
clogger.trace("Commitlog {} maximum disk size: {} MB / cpu ({} cpus)",
cfg.commit_log_location, max_disk_size / (1024 * 1024),
@@ -1758,44 +1837,72 @@ db::commitlog::segment_manager::buffer_type db::commitlog::segment_manager::acqu
future<db::rp_handle> db::commitlog::add(const cf_id_type& id,
size_t size, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, serializer_func func) {
class serializer_func_entry_writer final : public entry_writer {
cf_id_type _id;
serializer_func _func;
size_t _size;
public:
serializer_func_entry_writer(size_t sz, serializer_func func, db::commitlog::force_sync sync)
: entry_writer(sync), _func(std::move(func)), _size(sz)
{ }
virtual size_t size(segment&) override { return _size; }
virtual size_t size() override { return _size; }
virtual void write(segment&, output& out) override {
db::rp_handle res;
serializer_func_entry_writer(const cf_id_type& id, size_t sz, serializer_func func, db::commitlog::force_sync sync)
: entry_writer(sync), _id(id), _func(std::move(func)), _size(sz)
{}
const cf_id_type& id(size_t) const { return _id; }
size_t size(segment&, size_t) override { return _size; }
size_t size(segment&) override { return _size; }
size_t size() const override { return _size; }
void write(segment&, output& out, size_t) const override {
_func(out);
}
void result(size_t, rp_handle h) override {
res = std::move(h);
}
};
auto writer = ::make_shared<serializer_func_entry_writer>(size, std::move(func), sync);
return _segment_manager->allocate_when_possible(id, writer, timeout);
auto writer = ::make_shared<serializer_func_entry_writer>(id, size, std::move(func), sync);
return _segment_manager->allocate_when_possible(writer, timeout).then([writer] {
return std::move(writer->res);
});
}
future<db::rp_handle> db::commitlog::add_entry(const cf_id_type& id, const commitlog_entry_writer& cew, timeout_clock::time_point timeout)
{
assert(id == cew.schema()->id());
class cl_entry_writer final : public entry_writer {
commitlog_entry_writer _writer;
public:
cl_entry_writer(const commitlog_entry_writer& wr) : entry_writer(wr.sync()), _writer(wr) { }
virtual size_t size(segment& seg) override {
rp_handle res;
cl_entry_writer(const commitlog_entry_writer& wr)
: entry_writer(wr.sync()), _writer(wr)
{}
const cf_id_type& id(size_t) const override {
return _writer.schema()->id();
}
size_t size(segment& seg) override {
_writer.set_with_schema(!seg.is_schema_version_known(_writer.schema()));
return _writer.size();
}
virtual size_t size() override {
size_t size(segment& seg, size_t) override {
return size(seg);
}
size_t size() const override {
return _writer.mutation_size();
}
virtual void write(segment& seg, output& out) override {
void write(segment& seg, output& out, size_t) const override {
if (_writer.with_schema()) {
seg.add_schema_version(_writer.schema());
}
_writer.write(out);
}
void result(size_t, rp_handle h) override {
res = std::move(h);
}
};
auto writer = ::make_shared<cl_entry_writer>(cew);
return _segment_manager->allocate_when_possible(id, writer, timeout);
return _segment_manager->allocate_when_possible(writer, timeout).then([writer] {
return std::move(writer->res);
});
}
}
db::commitlog::commitlog(config cfg)
@@ -2039,7 +2146,20 @@ db::commitlog::read_log_file(const sstring& filename, const sstring& pfx, seasta
return do_until(std::bind(&work::end_of_chunk, this), std::bind(&work::read_entry, this));
});
}
using produce_func = std::function<future<>(buffer_and_replay_position, uint32_t)>;
future<> produce(buffer_and_replay_position bar) {
return s.produce(std::move(bar)).handle_exception([this](auto ep) {
return fail();
});
}
future<> read_entry() {
return do_read_entry(std::bind(&work::produce, this, std::placeholders::_1));
}
future<> do_read_entry(produce_func pf) {
static constexpr size_t entry_header_size = segment::entry_overhead_size - sizeof(uint32_t);
/**
@@ -2049,59 +2169,118 @@ db::commitlog::read_log_file(const sstring& filename, const sstring& pfx, seasta
*/
assert(pos <= next);
if ((pos + entry_header_size) >= next) {
return skip(next - pos);
co_await skip(next - pos);
co_return;
}
return frag_reader.read_exactly(fin, entry_header_size).then([this](fragmented_temporary_buffer buf) {
replay_position rp(id, position_type(pos));
auto buf = co_await frag_reader.read_exactly(fin, entry_header_size);
if (!advance(buf)) {
return make_ready_future<>();
}
replay_position rp(id, position_type(pos));
auto in = buf.get_istream();
auto size = read<uint32_t>(in);
auto checksum = read<uint32_t>(in);
if (!advance(buf)) {
co_return;
}
crc32_nbo crc;
crc.process(size);
auto in = buf.get_istream();
auto size = read<uint32_t>(in);
auto checksum = read<uint32_t>(in);
if (size < 3 * sizeof(uint32_t) || checksum != crc.checksum()) {
crc32_nbo crc;
crc.process(size);
// check for multi-entry
if (d.ver >= descriptor::segment_version_2 && size == segment::multi_entry_size_magic) {
auto actual_size = checksum;
auto end = pos + actual_size - entry_header_size - sizeof(uint32_t);
assert(end <= next);
// really small read...
buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t));
in = buf.get_istream();
checksum = read<uint32_t>(in);
advance(buf);
crc.process(actual_size);
// verify header crc.
if (actual_size < 2 * segment::entry_overhead_size || crc.checksum() != checksum) {
auto slack = next - pos;
if (size != 0) {
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
corrupt_size += slack;
}
// size == 0 -> special scylla case: zero padding due to dma blocks
return skip(slack);
co_await skip(slack);
co_return;
}
return frag_reader.read_exactly(fin, size - entry_header_size).then([this, size, crc = std::move(crc), rp](fragmented_temporary_buffer buf) mutable {
advance(buf);
auto in = buf.get_istream();
auto data_size = size - segment::entry_overhead_size;
in.skip(data_size);
auto checksum = read<uint32_t>(in);
buf.remove_suffix(buf.size_bytes() - data_size);
crc.process_fragmented(fragmented_temporary_buffer::view(buf));
if (crc.checksum() != checksum) {
// If we're getting a checksum error here, most likely the rest of
// the file will be corrupt as well. But it does not hurt to retry.
// Just go to the next entry (since "size" in header seemed ok).
clogger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size);
corrupt_size += size;
return make_ready_future<>();
}
return s.produce({std::move(buf), rp}).handle_exception([this](auto ep) {
return fail();
std::vector<buffer_and_replay_position> tmp;
tmp.reserve(10);
// now read all sub-entries into buffers, and collect crc.
while (pos < end) {
co_await do_read_entry([&](buffer_and_replay_position br, uint32_t checksum) -> future<> {
tmp.emplace_back(std::move(br));
crc.process(checksum);
co_return;
});
});
});
}
// and verify crc.
buf = co_await frag_reader.read_exactly(fin, sizeof(uint32_t));
in = buf.get_istream();
checksum = read<uint32_t>(in);
advance(buf);
if (checksum != crc.checksum()) {
auto slack = next - pos;
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, actual_size);
corrupt_size += actual_size;
co_await skip(slack);
co_return;
}
// all is ok. send data to subscriber.
for (auto&& br : tmp) {
co_await produce(std::move(br));
if (failed) {
break;
}
}
co_return;
}
if (size < 3 * sizeof(uint32_t) || checksum != crc.checksum()) {
auto slack = next - pos;
if (size != 0) {
clogger.debug("Segment entry at {} has broken header. Skipping to next chunk ({} bytes)", rp, slack);
corrupt_size += slack;
}
// size == 0 -> special scylla case: zero padding due to dma blocks
co_await skip(slack);
co_return;
}
buf = co_await frag_reader.read_exactly(fin, size - entry_header_size);
advance(buf);
in = buf.get_istream();
auto data_size = size - segment::entry_overhead_size;
in.skip(data_size);
checksum = read<uint32_t>(in);
buf.remove_suffix(buf.size_bytes() - data_size);
crc.process_fragmented(fragmented_temporary_buffer::view(buf));
if (crc.checksum() != checksum) {
// If we're getting a checksum error here, most likely the rest of
// the file will be corrupt as well. But it does not hurt to retry.
// Just go to the next entry (since "size" in header seemed ok).
clogger.debug("Segment entry at {} checksum error. Skipping {} bytes", rp, size);
corrupt_size += size;
co_return;
}
co_await pf({std::move(buf), rp}, checksum);
}
future<> read_file() {
return f.size().then([this](uint64_t size) {
file_size = size;