commitlog: Use flush queue for write/flush ordering, improve batch
Using an ordering mechanism better than rw-locks for write/flush means we can wait for pending write in batch mode, and coalesce data from more than one mutation into a chunk. It also means we can wait for a specific read+flush pair (based on file position). Downside is that we will not do parallel writes in batch mode (unless we run out of buffer), which might underutilize the disk bandwidth. Upside is that running in batch mode (i.e. per-write consistency) now has way better bandwidth, and also, at least with high mutation rate, better average latency. Message-Id: <1465990064-2258-1-git-send-email-calle@scylladb.com>
This commit is contained in:
@@ -63,6 +63,7 @@
|
||||
#include "utils/data_input.hh"
|
||||
#include "utils/crc.hh"
|
||||
#include "utils/runtime.hh"
|
||||
#include "utils/flush_queue.hh"
|
||||
#include "log.hh"
|
||||
#include "commitlog_entry.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
@@ -194,7 +195,6 @@ public:
|
||||
stats totals;
|
||||
|
||||
future<> begin_write() {
|
||||
_gate.enter();
|
||||
++totals.pending_writes; // redundant, given semaphore. but easier to read
|
||||
if (totals.pending_writes >= cfg.max_active_writes) {
|
||||
++totals.write_limit_exceeded;
|
||||
@@ -205,11 +205,9 @@ public:
|
||||
void end_write() {
|
||||
_write_semaphore.signal();
|
||||
--totals.pending_writes;
|
||||
_gate.leave();
|
||||
}
|
||||
|
||||
future<> begin_flush() {
|
||||
_gate.enter();
|
||||
++totals.pending_flushes;
|
||||
if (totals.pending_flushes >= cfg.max_active_flushes) {
|
||||
++totals.flush_limit_exceeded;
|
||||
@@ -220,11 +218,10 @@ public:
|
||||
void end_flush() {
|
||||
_flush_semaphore.signal();
|
||||
--totals.pending_flushes;
|
||||
_gate.leave();
|
||||
}
|
||||
|
||||
bool should_wait_for_write() const {
|
||||
return _write_semaphore.waiters() > 0 || _flush_semaphore.waiters() > 0;
|
||||
return cfg.mode == sync_mode::BATCH || _write_semaphore.waiters() > 0 || _flush_semaphore.waiters() > 0;
|
||||
}
|
||||
|
||||
segment_manager(config c)
|
||||
@@ -276,7 +273,7 @@ public:
|
||||
future<sseg_ptr> allocate_segment(bool active);
|
||||
|
||||
future<> clear();
|
||||
future<> sync_all_segments();
|
||||
future<> sync_all_segments(bool shutdown = false);
|
||||
future<> shutdown();
|
||||
|
||||
scollectd::registrations create_counters();
|
||||
@@ -391,18 +388,21 @@ class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
|
||||
uint64_t _buf_pos = 0;
|
||||
bool _closed = false;
|
||||
|
||||
size_t _needed_size = 0;
|
||||
|
||||
using buffer_type = segment_manager::buffer_type;
|
||||
using sseg_ptr = segment_manager::sseg_ptr;
|
||||
using clock_type = segment_manager::clock_type;
|
||||
using time_point = segment_manager::time_point;
|
||||
|
||||
buffer_type _buffer;
|
||||
rwlock _dwrite; // used as a barrier between write & flush
|
||||
std::unordered_map<cf_id_type, position_type> _cf_dirty;
|
||||
time_point _sync_time;
|
||||
seastar::gate _gate;
|
||||
uint64_t _write_waiters = 0;
|
||||
semaphore _queue;
|
||||
utils::flush_queue<replay_position> _pending_ops;
|
||||
|
||||
uint64_t _num_allocs = 0;
|
||||
|
||||
std::unordered_set<table_schema_version> _known_schema_versions;
|
||||
|
||||
@@ -413,9 +413,7 @@ class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
|
||||
// This is maintaining the semantica of only using the write-lock
|
||||
// as a gate for flushing, i.e. once we've begun a flush for position X
|
||||
// we are ok with writes to positions > X
|
||||
return _segment_manager->begin_flush().then(std::bind(&rwlock::write_lock, &_dwrite)).finally([this] {
|
||||
_dwrite.write_unlock();
|
||||
});
|
||||
return _segment_manager->begin_flush();
|
||||
}
|
||||
|
||||
void end_flush() {
|
||||
@@ -426,11 +424,10 @@ class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
|
||||
// This is maintaining the semantica of only using the write-lock
|
||||
// as a gate for flushing, i.e. once we've begun a flush for position X
|
||||
// we are ok with writes to positions > X
|
||||
return _segment_manager->begin_write().then(std::bind(&rwlock::read_lock, &_dwrite));
|
||||
return _segment_manager->begin_write();
|
||||
}
|
||||
|
||||
void end_write() {
|
||||
_dwrite.read_unlock();
|
||||
_segment_manager->end_write();
|
||||
}
|
||||
|
||||
@@ -456,7 +453,7 @@ public:
|
||||
segment(::shared_ptr<segment_manager> m, const descriptor& d, file && f, bool active)
|
||||
: _segment_manager(std::move(m)), _desc(std::move(d)), _file(std::move(f)),
|
||||
_file_name(_segment_manager->cfg.commit_log_location + "/" + _desc.filename()), _sync_time(
|
||||
clock_type::now()), _queue(0)
|
||||
clock_type::now())
|
||||
{
|
||||
++_segment_manager->totals.segments_created;
|
||||
logger.debug("Created new {} segment {}", active ? "active" : "reserve", *this);
|
||||
@@ -513,21 +510,30 @@ public:
|
||||
_sync_time = clock_type::now();
|
||||
}
|
||||
// See class comment for info
|
||||
future<sseg_ptr> sync() {
|
||||
future<sseg_ptr> sync(bool shutdown = false) {
|
||||
/**
|
||||
* If we are shutting down, we first
|
||||
* close the allocation gate, thus no new
|
||||
* data can be appended. Then we just issue a
|
||||
* flush, which will wait for any queued ops
|
||||
* to complete as well. Then we close the ops
|
||||
* queue, just to be sure.
|
||||
*/
|
||||
if (shutdown) {
|
||||
auto me = shared_from_this();
|
||||
return _gate.close().then([me] {
|
||||
return me->sync().finally([me] {
|
||||
// When we get here, nothing should add ops,
|
||||
// and we should have waited out all pending.
|
||||
return me->_pending_ops.close();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Note: this is not a marker for when sync was finished.
|
||||
// It is when it was initiated
|
||||
reset_sync_time();
|
||||
|
||||
if (position() <= _flush_pos) {
|
||||
logger.trace("Sync not needed {}: ({} / {})", *this, position(), _flush_pos);
|
||||
return make_ready_future<sseg_ptr>(shared_from_this());
|
||||
}
|
||||
return cycle().then([](sseg_ptr seg) {
|
||||
return seg->flush();
|
||||
});
|
||||
}
|
||||
future<> shutdown() {
|
||||
return _gate.close();
|
||||
return cycle(true);
|
||||
}
|
||||
// See class comment for info
|
||||
future<sseg_ptr> flush(uint64_t pos = 0) {
|
||||
@@ -536,46 +542,55 @@ public:
|
||||
if (pos == 0) {
|
||||
pos = _file_pos;
|
||||
}
|
||||
if (pos != 0 && pos <= _flush_pos) {
|
||||
logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
|
||||
return make_ready_future<sseg_ptr>(std::move(me));
|
||||
}
|
||||
logger.trace("Syncing {} {} -> {}", *this, _flush_pos, pos);
|
||||
// Make sure all disk writes are done.
|
||||
// This is not 100% neccesary, we really only need the ones below our flush pos,
|
||||
// but since we pretty much assume that task ordering will make this the case anyway...
|
||||
|
||||
return begin_flush().then(
|
||||
[this, me, pos]() mutable {
|
||||
pos = std::max(pos, _file_pos);
|
||||
if (pos <= _flush_pos) {
|
||||
logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
|
||||
return make_ready_future<sseg_ptr>(std::move(me));
|
||||
}
|
||||
return _file.flush().then_wrapped([this, pos, me](future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
|
||||
// we fast-fail the whole commit.
|
||||
_flush_pos = std::max(pos, _flush_pos);
|
||||
++_segment_manager->totals.flush_count;
|
||||
logger.trace("{} synced to {}", *this, _flush_pos);
|
||||
return make_ready_future<sseg_ptr>(std::move(me));
|
||||
} catch (...) {
|
||||
logger.error("Failed to flush commits to disk: {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}).finally([this, me] {
|
||||
end_flush();
|
||||
logger.trace("Syncing {} {} -> {}", *this, _flush_pos, pos);
|
||||
|
||||
// Only run the flush when all write ops at lower rp:s
|
||||
// have completed.
|
||||
replay_position rp(_desc.id, position_type(pos));
|
||||
|
||||
// Run like this to ensure flush ordering, and making flushes "waitable"
|
||||
return _pending_ops.run_with_ordered_post_op(rp, [] { return make_ready_future<>(); }, [this, pos, me] {
|
||||
return do_flush(pos);
|
||||
});
|
||||
}
|
||||
|
||||
future<sseg_ptr> do_flush(uint64_t pos) {
|
||||
auto me = shared_from_this();
|
||||
return begin_flush().then([this, pos]() {
|
||||
if (pos <= _flush_pos) {
|
||||
logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _file.flush().then_wrapped([this, pos](future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
|
||||
// we fast-fail the whole commit.
|
||||
_flush_pos = std::max(pos, _flush_pos);
|
||||
++_segment_manager->totals.flush_count;
|
||||
logger.trace("{} synced to {}", *this, _flush_pos);
|
||||
} catch (...) {
|
||||
logger.error("Failed to flush commits to disk: {}", std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}).finally([this] {
|
||||
end_flush();
|
||||
}).then([me] {
|
||||
return make_ready_future<sseg_ptr>(me);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a new buffer
|
||||
*/
|
||||
void new_buffer(size_t s) {
|
||||
assert(_buffer.empty());
|
||||
|
||||
s += _needed_size;
|
||||
_needed_size = 0;
|
||||
|
||||
auto overhead = segment_overhead_size;
|
||||
if (_file_pos == 0) {
|
||||
overhead += descriptor_header_size;
|
||||
@@ -604,25 +619,32 @@ public:
|
||||
_segment_manager->totals.total_size += k;
|
||||
}
|
||||
|
||||
bool buffer_is_empty() const {
|
||||
return _buf_pos <= segment_overhead_size
|
||||
|| (_file_pos == 0 && _buf_pos <= (segment_overhead_size + descriptor_header_size));
|
||||
}
|
||||
/**
|
||||
* Send any buffer contents to disk and get a new tmp buffer
|
||||
*/
|
||||
// See class comment for info
|
||||
future<sseg_ptr> cycle() {
|
||||
future<sseg_ptr> cycle(bool flush_after = false) {
|
||||
if (_buffer.empty()) {
|
||||
return flush_after ? flush() : make_ready_future<sseg_ptr>(shared_from_this());
|
||||
}
|
||||
|
||||
auto size = clear_buffer_slack();
|
||||
auto buf = std::move(_buffer);
|
||||
auto off = _file_pos;
|
||||
auto top = off + size;
|
||||
auto num = _num_allocs;
|
||||
|
||||
_file_pos += size;
|
||||
_file_pos = top;
|
||||
_buf_pos = 0;
|
||||
_num_allocs = 0;
|
||||
|
||||
auto me = shared_from_this();
|
||||
assert(!me.owned());
|
||||
|
||||
if (size == 0) {
|
||||
return make_ready_future<sseg_ptr>(std::move(me));
|
||||
}
|
||||
|
||||
auto * p = buf.get_write();
|
||||
assert(std::count(p, p + 2 * sizeof(uint32_t), 0) == 2 * sizeof(uint32_t));
|
||||
|
||||
@@ -654,41 +676,49 @@ public:
|
||||
|
||||
forget_schema_versions();
|
||||
|
||||
// acquire read lock
|
||||
return begin_write().then([this, size, off, buf = std::move(buf)]() mutable {
|
||||
auto written = make_lw_shared<size_t>(0);
|
||||
auto p = buf.get();
|
||||
return repeat([this, size, off, written, p]() mutable {
|
||||
auto&& priority_class = service::get_local_commitlog_priority();
|
||||
return _file.dma_write(off + *written, p + *written, size - *written, priority_class).then_wrapped([this, size, written](future<size_t>&& f) {
|
||||
try {
|
||||
auto bytes = std::get<0>(f.get());
|
||||
*written += bytes;
|
||||
_segment_manager->totals.bytes_written += bytes;
|
||||
_segment_manager->totals.total_size_on_disk += bytes;
|
||||
++_segment_manager->totals.cycle_count;
|
||||
if (*written == size) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
replay_position rp(_desc.id, position_type(off));
|
||||
|
||||
logger.trace("Writing {} entries, {} k in {} -> {}", num, size, off, off + size);
|
||||
|
||||
// The write will be allowed to start now, but flush (below) must wait for not only this,
|
||||
// but all previous write/flush pairs.
|
||||
return _pending_ops.run_with_ordered_post_op(rp, [this, size, off, buf = std::move(buf)]() mutable {
|
||||
// This could "block", if we have to many pending writes.
|
||||
return begin_write().then([this, size, off, buf = std::move(buf)]() mutable {
|
||||
auto written = make_lw_shared<size_t>(0);
|
||||
auto p = buf.get();
|
||||
return repeat([this, size, off, written, p]() mutable {
|
||||
auto&& priority_class = service::get_local_commitlog_priority();
|
||||
return _file.dma_write(off + *written, p + *written, size - *written, priority_class).then_wrapped([this, size, written](future<size_t>&& f) {
|
||||
try {
|
||||
auto bytes = std::get<0>(f.get());
|
||||
*written += bytes;
|
||||
_segment_manager->totals.bytes_written += bytes;
|
||||
_segment_manager->totals.total_size_on_disk += bytes;
|
||||
++_segment_manager->totals.cycle_count;
|
||||
if (*written == size) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
// gah, partial write. should always get here with dma chunk sized
|
||||
// "bytes", but lets make sure...
|
||||
logger.debug("Partial write {}: {}/{} bytes", *this, *written, size);
|
||||
*written = align_down(*written, alignment);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
|
||||
// we fast-fail the whole commit.
|
||||
} catch (...) {
|
||||
logger.error("Failed to persist commits to disk for {}: {}", *this, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
// gah, partial write. should always get here with dma chunk sized
|
||||
// "bytes", but lets make sure...
|
||||
logger.debug("Partial write {}: {}/{} bytes", *this, *written, size);
|
||||
*written = align_down(*written, alignment);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
|
||||
// we fast-fail the whole commit.
|
||||
} catch (...) {
|
||||
logger.error("Failed to persist commits to disk for {}: {}", *this, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}).finally([this, buf = std::move(buf)]() mutable {
|
||||
_segment_manager->release_buffer(std::move(buf));
|
||||
});
|
||||
}).finally([this, buf = std::move(buf)]() mutable {
|
||||
_segment_manager->release_buffer(std::move(buf));
|
||||
}).finally([this]() {
|
||||
end_write(); // release
|
||||
});
|
||||
}).then([me] {
|
||||
return make_ready_future<sseg_ptr>(std::move(me));
|
||||
}).finally([me, this]() {
|
||||
end_write(); // release
|
||||
}, [me, flush_after, top] { // lambda instead of bind, so we keep "me" alive.
|
||||
return flush_after ? me->do_flush(top) : make_ready_future<sseg_ptr>(me);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -697,9 +727,7 @@ public:
|
||||
++_write_waiters;
|
||||
logger.trace("Too many pending writes. Must wait.");
|
||||
return f.finally([this] {
|
||||
if (--_write_waiters == 0) {
|
||||
_queue.signal(_queue.waiters());
|
||||
}
|
||||
--_write_waiters;
|
||||
});
|
||||
}
|
||||
return make_ready_future<sseg_ptr>(shared_from_this());
|
||||
@@ -726,12 +754,39 @@ public:
|
||||
auto me = shared_from_this();
|
||||
++_segment_manager->totals.pending_allocations;
|
||||
logger.trace("Previous allocation is blocking. Must wait.");
|
||||
return _queue.wait().then([me] { // TODO: do we need a finally?
|
||||
return _pending_ops.wait_for_pending().then([me] { // TODO: do we need a finally?
|
||||
--me->_segment_manager->totals.pending_allocations;
|
||||
return make_ready_future<sseg_ptr>(me);
|
||||
});
|
||||
}
|
||||
|
||||
future<sseg_ptr> batch_cycle() {
|
||||
/**
|
||||
* For batch mode we force a write "immediately".
|
||||
* However, we first wait for all previous writes/flushes
|
||||
* to complete.
|
||||
*
|
||||
* This has the benefit of allowing several allocations to
|
||||
* queue up in a single buffer.
|
||||
*/
|
||||
auto me = shared_from_this();
|
||||
auto fp = _file_pos;
|
||||
return _pending_ops.wait_for_pending().then([me = std::move(me), fp] {
|
||||
if (fp != me->_file_pos) {
|
||||
// some other request already wrote this buffer.
|
||||
// If so, wait for the operation at our intended file offset
|
||||
// to finish, then we know the flush is complete and we
|
||||
// are in accord.
|
||||
// (Note: wait_for_pending(pos) waits for operation _at_ pos (and before),
|
||||
replay_position rp(me->_desc.id, position_type(fp));
|
||||
return me->_pending_ops.wait_for_pending(rp).then([me, fp] {
|
||||
assert(me->_flush_pos > fp);
|
||||
return make_ready_future<sseg_ptr>(me);
|
||||
});
|
||||
}
|
||||
return me->sync();
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Add a "mutation" to the segment.
|
||||
*/
|
||||
@@ -758,7 +813,16 @@ public:
|
||||
} else if (_buffer.empty()) {
|
||||
new_buffer(s);
|
||||
} else if (s > (_buffer.size() - _buf_pos)) { // enough data?
|
||||
op = maybe_wait_for_write(cycle());
|
||||
_needed_size += s; // hint to next new_buffer, in case we are not first.
|
||||
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
|
||||
// TODO: this could cause starvation if we're really unlucky.
|
||||
// If we run batch mode and find ourselves not fit in a non-empty
|
||||
// buffer, we must force a cycle and wait for it (to keep flush order)
|
||||
// This will most likely cause parallel writes, and consecutive flushes.
|
||||
op = cycle(true);
|
||||
} else {
|
||||
op = maybe_wait_for_write(cycle());
|
||||
}
|
||||
}
|
||||
|
||||
if (op) {
|
||||
@@ -793,11 +857,12 @@ public:
|
||||
out.write(crc.checksum());
|
||||
|
||||
++_segment_manager->totals.allocation_count;
|
||||
++_num_allocs;
|
||||
|
||||
_gate.leave();
|
||||
|
||||
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
|
||||
return sync().then([rp](sseg_ptr) {
|
||||
return batch_cycle().then([rp](auto s) {
|
||||
return make_ready_future<replay_position>(rp);
|
||||
});
|
||||
}
|
||||
@@ -1157,10 +1222,10 @@ void db::commitlog::segment_manager::discard_unused_segments() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::sync_all_segments() {
|
||||
future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) {
|
||||
logger.debug("Issuing sync for all segments");
|
||||
return parallel_for_each(_segments, [this](sseg_ptr s) {
|
||||
return s->sync().then([](sseg_ptr s) {
|
||||
return parallel_for_each(_segments, [this, shutdown](sseg_ptr s) {
|
||||
return s->sync(shutdown).then([](sseg_ptr s) {
|
||||
logger.debug("Synced segment {}", *s);
|
||||
});
|
||||
});
|
||||
@@ -1170,11 +1235,9 @@ future<> db::commitlog::segment_manager::shutdown() {
|
||||
if (!_shutdown) {
|
||||
_shutdown = true; // no re-arm, no create new segments.
|
||||
_timer.cancel(); // no more timer calls
|
||||
return parallel_for_each(_segments, [this](sseg_ptr s) {
|
||||
return s->shutdown(); // close each segment (no more alloc)
|
||||
}).then(std::bind(&segment_manager::sync_all_segments, this)).then([this] { // flush all
|
||||
return _gate.close(); // wait for any pending ops
|
||||
});
|
||||
// Now first wait for periodic task to finish, then sync and close all
|
||||
// segments, flushing out any remaining data.
|
||||
return _gate.close().then(std::bind(&segment_manager::sync_all_segments, this, true));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user