commitlog: add sync method to entry_writer

If the method returns true commitlog should sync to file immediately
after writing the entry and wait for flush to complete before returning.
This commit is contained in:
Gleb Natapov
2020-01-05 12:08:06 +02:00
parent c8a5a27bd9
commit e0bc4aa098
7 changed files with 42 additions and 33 deletions

View File

@@ -1459,7 +1459,7 @@ static future<> maybe_handle_reorder(std::exception_ptr exp) {
future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout) {
if (cf.commitlog() != nullptr) {
return do_with(freeze(m), [this, &m, &cf, timeout] (frozen_mutation& fm) {
commitlog_entry_writer cew(m.schema(), fm);
commitlog_entry_writer cew(m.schema(), fm, db::commitlog::force_sync::no);
return cf.commitlog()->add_entry(m.schema()->id(), cew, timeout);
}).then([this, &m, &cf, timeout] (db::rp_handle h) {
return apply_in_memory(m, cf, std::move(h), timeout).handle_exception(maybe_handle_reorder);
@@ -1471,7 +1471,7 @@ future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db
future<> database::apply_with_commitlog(schema_ptr s, column_family& cf, utils::UUID uuid, const frozen_mutation& m, db::timeout_clock::time_point timeout) {
auto cl = cf.commitlog();
if (cl != nullptr) {
commitlog_entry_writer cew(s, m);
commitlog_entry_writer cew(s, m, db::commitlog::force_sync::no);
return cf.commitlog()->add_entry(uuid, cew, timeout).then([&m, this, s, timeout, cl](db::rp_handle h) {
return this->apply_in_memory(m, s, std::move(h), timeout).handle_exception(maybe_handle_reorder);
});

View File

@@ -847,7 +847,7 @@ public:
return new_seg->allocate(id, std::move(writer), std::move(permit), timeout);
});
} else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data?
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) {
// TODO: this could cause starvation if we're really unlucky.
// If we run batch mode and find ourselves not fit in a non-empty
// buffer, we must force a cycle and wait for it (to keep flush order)
@@ -901,7 +901,7 @@ public:
++_segment_manager->totals.allocation_count;
++_num_allocs;
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) {
return batch_cycle(timeout).then([h = std::move(h)](auto s) mutable {
return make_ready_future<rp_handle>(std::move(h));
});
@@ -1720,13 +1720,13 @@ db::commitlog::segment_manager::buffer_type db::commitlog::segment_manager::acqu
* Add mutation.
*/
future<db::rp_handle> db::commitlog::add(const cf_id_type& id,
size_t size, db::timeout_clock::time_point timeout, serializer_func func) {
size_t size, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, serializer_func func) {
class serializer_func_entry_writer final : public entry_writer {
serializer_func _func;
size_t _size;
public:
serializer_func_entry_writer(size_t sz, serializer_func func)
: _func(std::move(func)), _size(sz)
serializer_func_entry_writer(size_t sz, serializer_func func, db::commitlog::force_sync sync)
: entry_writer(sync), _func(std::move(func)), _size(sz)
{ }
virtual size_t size(segment&) override { return _size; }
virtual size_t size() override { return _size; }
@@ -1734,7 +1734,7 @@ future<db::rp_handle> db::commitlog::add(const cf_id_type& id,
_func(out);
}
};
auto writer = ::make_shared<serializer_func_entry_writer>(size, std::move(func));
auto writer = ::make_shared<serializer_func_entry_writer>(size, std::move(func), sync);
return _segment_manager->allocate_when_possible(id, writer, timeout);
}
@@ -1743,7 +1743,7 @@ future<db::rp_handle> db::commitlog::add_entry(const cf_id_type& id, const commi
class cl_entry_writer final : public entry_writer {
commitlog_entry_writer _writer;
public:
cl_entry_writer(const commitlog_entry_writer& wr) : _writer(wr) { }
cl_entry_writer(const commitlog_entry_writer& wr) : entry_writer(wr.sync()), _writer(wr) { }
virtual size_t size(segment& seg) override {
_writer.set_with_schema(!seg.is_schema_version_known(_writer.schema()));
return _writer.size();

View File

@@ -113,6 +113,7 @@ public:
enum class sync_mode {
PERIODIC, BATCH
};
using force_sync = commitlog_entry_writer::force_sync;
struct config {
config() = default;
config(const config&) = default;
@@ -194,7 +195,7 @@ public:
*
* @param mutation_func a function that writes 'size' bytes to the log, representing the mutation.
*/
future<rp_handle> add(const cf_id_type& id, size_t size, db::timeout_clock::time_point timeout, serializer_func mutation_func);
future<rp_handle> add(const cf_id_type& id, size_t size, db::timeout_clock::time_point timeout, force_sync sync, serializer_func mutation_func);
/**
* Template version of add.
@@ -202,8 +203,8 @@ public:
* @param mu an invokable op that generates the serialized data. (Of size bytes)
*/
template<typename _MutationOp>
future<rp_handle> add_mutation(const cf_id_type& id, size_t size, db::timeout_clock::time_point timeout, _MutationOp&& mu) {
return add(id, size, timeout, [mu = std::forward<_MutationOp>(mu)](output& out) {
future<rp_handle> add_mutation(const cf_id_type& id, size_t size, db::timeout_clock::time_point timeout, force_sync sync, _MutationOp&& mu) {
return add(id, size, timeout, sync, [mu = std::forward<_MutationOp>(mu)](output& out) {
mu(out);
});
}
@@ -213,8 +214,8 @@ public:
* @param mu an invokable op that generates the serialized data. (Of size bytes)
*/
template<typename _MutationOp>
future<rp_handle> add_mutation(const cf_id_type& id, size_t size, _MutationOp&& mu) {
return add_mutation(id, size, db::timeout_clock::time_point::max(), std::forward<_MutationOp>(mu));
future<rp_handle> add_mutation(const cf_id_type& id, size_t size, force_sync sync, _MutationOp&& mu) {
return add_mutation(id, size, db::timeout_clock::time_point::max(), sync, std::forward<_MutationOp>(mu));
}
/**
@@ -391,6 +392,8 @@ private:
commitlog(config);
struct entry_writer {
force_sync sync;
explicit entry_writer(force_sync sync_) : sync(sync_) {}
virtual size_t size(segment&) = 0;
// Returns segment-independent size of the entry. Must be <= than segment-dependant size.
virtual size_t size() = 0;

View File

@@ -38,17 +38,21 @@ public:
};
class commitlog_entry_writer {
public:
using force_sync = bool_class<class force_sync_tag>;
private:
schema_ptr _schema;
const frozen_mutation& _mutation;
bool _with_schema = true;
size_t _size = std::numeric_limits<size_t>::max();
force_sync _sync;
private:
template<typename Output>
void serialize(Output&) const;
void compute_size();
public:
commitlog_entry_writer(schema_ptr s, const frozen_mutation& fm)
: _schema(std::move(s)), _mutation(fm)
commitlog_entry_writer(schema_ptr s, const frozen_mutation& fm, force_sync sync)
: _schema(std::move(s)), _mutation(fm), _sync(sync)
{}
void set_with_schema(bool value) {
@@ -70,7 +74,9 @@ public:
size_t mutation_size() const {
return _mutation.representation().size();
}
force_sync sync() const {
return _sync;
}
void write(typename seastar::memory_output_stream<std::vector<temporary_buffer<char>>::iterator>& out) const;
};

View File

@@ -166,7 +166,7 @@ bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr<co
return with_shared(file_update_mutex(), [this, fm, s, tr_state] () mutable -> future<> {
return get_or_load().then([this, fm = std::move(fm), s = std::move(s), tr_state] (hints_store_ptr log_ptr) mutable {
commitlog_entry_writer cew(s, *fm);
commitlog_entry_writer cew(s, *fm, db::commitlog::force_sync::no);
return log_ptr->add_entry(s->id(), cew, db::timeout_clock::now() + _shard_manager.hint_file_write_timeout);
}).then([this, tr_state] (db::rp_handle rh) {
rh.release();

View File

@@ -78,7 +78,7 @@ static int loggo = [] {
SEASTAR_TEST_CASE(test_create_commitlog){
return cl_test([](commitlog& log) {
sstring tmp = "hej bubba cow";
return log.add_mutation(utils::UUID_gen::get_time_UUID(), tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(utils::UUID_gen::get_time_UUID(), tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([](db::replay_position rp) {
BOOST_CHECK_NE(rp, db::replay_position());
@@ -92,7 +92,7 @@ SEASTAR_TEST_CASE(test_commitlog_written_to_disk_batch){
cfg.mode = commitlog::sync_mode::BATCH;
return cl_test(cfg, [](commitlog& log) {
sstring tmp = "hej bubba cow";
return log.add_mutation(utils::UUID_gen::get_time_UUID(), tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(utils::UUID_gen::get_time_UUID(), tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([&log](replay_position rp) {
BOOST_CHECK_NE(rp, db::replay_position());
@@ -109,7 +109,7 @@ SEASTAR_TEST_CASE(test_commitlog_written_to_disk_periodic){
return do_until([state]() {return *state;},
[&log, state, uuid]() {
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([&log, state](replay_position rp) {
BOOST_CHECK_NE(rp, db::replay_position());
@@ -129,7 +129,7 @@ SEASTAR_TEST_CASE(test_commitlog_new_segment){
auto uuid = utils::UUID_gen::get_time_UUID();
return do_until([&set]() { return set.size() > 1; }, [&log, &set, uuid]() {
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([&set](rp_handle h) {
BOOST_CHECK_NE(h.rp(), db::replay_position());
@@ -187,7 +187,7 @@ SEASTAR_TEST_CASE(test_commitlog_discard_completed_segments){
[&log, state]() {
sstring tmp = "hej bubba cow";
auto uuid = state->next_uuid();
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([state, uuid](db::rp_handle h) {
state->rps[uuid].put(std::move(h));
@@ -223,7 +223,7 @@ SEASTAR_TEST_CASE(test_commitlog_discard_completed_segments){
SEASTAR_TEST_CASE(test_equal_record_limit){
return cl_test([](commitlog& log) {
auto size = log.max_record_size();
return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, [size](db::commitlog::output& dst) {
return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, db::commitlog::force_sync::no, [size](db::commitlog::output& dst) {
dst.fill(char(1), size);
}).then([](db::replay_position rp) {
BOOST_CHECK_NE(rp, db::replay_position());
@@ -234,7 +234,7 @@ SEASTAR_TEST_CASE(test_equal_record_limit){
SEASTAR_TEST_CASE(test_exceed_record_limit){
return cl_test([](commitlog& log) {
auto size = log.max_record_size() + 1;
return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, [size](db::commitlog::output& dst) {
return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, db::commitlog::force_sync::no, [size](db::commitlog::output& dst) {
dst.fill(char(1), size);
}).then_wrapped([](future<db::rp_handle> f) {
try {
@@ -254,7 +254,7 @@ SEASTAR_TEST_CASE(test_commitlog_closed) {
return log.shutdown().then([&log] {
sstring tmp = "test321";
auto uuid = utils::UUID_gen::get_time_UUID();
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then_wrapped([] (future<db::rp_handle> f) {
BOOST_REQUIRE_EXCEPTION(f.get(), gate_closed_exception, exception_predicate::message_equals("gate closed"));
@@ -284,7 +284,7 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
return do_until([set, sem]() {return set->size() > 2 && sem->try_wait();},
[&log, set, uuid]() {
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([set](rp_handle h) {
BOOST_CHECK_NE(h.rp(), db::replay_position());
@@ -332,7 +332,7 @@ SEASTAR_TEST_CASE(test_commitlog_reader){
return do_until([count, set]() {return set->size() > 1;},
[&log, uuid, count, set]() {
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([&log, set, count](auto h) {
BOOST_CHECK_NE(db::replay_position(), h.rp());
@@ -398,7 +398,7 @@ SEASTAR_TEST_CASE(test_commitlog_entry_corruption){
[&log, rps]() {
auto uuid = utils::UUID_gen::get_time_UUID();
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([&log, rps](rp_handle h) {
BOOST_CHECK_NE(h.rp(), db::replay_position());
@@ -442,7 +442,7 @@ SEASTAR_TEST_CASE(test_commitlog_chunk_corruption){
[&log, rps]() {
auto uuid = utils::UUID_gen::get_time_UUID();
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([&log, rps](rp_handle h) {
BOOST_CHECK_NE(h.rp(), db::replay_position());
@@ -485,7 +485,7 @@ SEASTAR_TEST_CASE(test_commitlog_reader_produce_exception){
[&log, rps]() {
auto uuid = utils::UUID_gen::get_time_UUID();
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([&log, rps](rp_handle h) {
BOOST_CHECK_NE(h.rp(), db::replay_position());
@@ -549,7 +549,7 @@ SEASTAR_TEST_CASE(test_allocation_failure){
}
} catch (std::bad_alloc&) {
}
return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, [size](db::commitlog::output& dst) {
return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, db::commitlog::force_sync::no, [size](db::commitlog::output& dst) {
dst.fill(char(1), size);
}).then_wrapped([junk, size](future<db::rp_handle> f) {
std::exception_ptr ep;

View File

@@ -71,7 +71,7 @@ SEASTAR_TEST_CASE(test_commitlog_new_segment_custom_prefix){
auto uuid = utils::UUID_gen::get_time_UUID();
return do_until([&set]() { return set.size() > 1; }, [&log, &set, uuid]() {
sstring tmp = "hej bubba cow";
return log.add_mutation(uuid, tmp.size(), [tmp](db::commitlog::output& dst) {
return log.add_mutation(uuid, tmp.size(), db::commitlog::force_sync::no, [tmp](db::commitlog::output& dst) {
dst.write(tmp.data(), tmp.size());
}).then([&set](rp_handle h) {
BOOST_CHECK_NE(h.rp(), db::replay_position());