commitlog: rework blocking logic

The current incarnation of commitlog establishes a maximum amount of
writes that can be in-flight, and blocks new requests after that limit
is reached.

That is obviously something we must do, but the current approach to it
is problematic for two main reasons:

1) It forces the requests that trigger a write to wait on the current
   write to finish. That is excessive; ideally we would wait for one
   particular write to finish, not necessarily the current one. That
   is made worse by the fact that when a write is followed by a flush
   (happens when we move to a new segment), then we must wait for
   *all* writes in that segment to finish.

1) it casts concurrency in terms of writes instead of memory, which
   makes the aforementioned problem a lot worse: if we have very big
   buffers in flight and we must wait for them to finish, that can
   take a long time, often in the order of seconds, causing timeouts.

The approach taken by this patch is to replace the _write_semaphore
with a request_controller. This data structure will account the amount
of memory used by the buffers and set a limit on it. New allocations
will be held until we go below that limit, and will be released
as soon as this happens.

This guarantees that the latencies introduced by this mechanism are
spread out a lot better among requests and will keep higher percentile
latencies in check.

To test this, I have ran a workload that times out frequently. That
workload use 10 threads to write 100 partitions (to isolate from the
effects of the memtable introduced latencies) in a loop and each
partition is 2MB in size.

After 10 minutes running this load, we are left with the following
percentiles:

latency mean              : 51.9 [WRITE:51.9]
latency median            : 9.8 [WRITE:9.8]
latency 95th percentile   : 125.6 [WRITE:125.6]
latency 99th percentile   : 1184.0 [WRITE:1184.0]
latency 99.9th percentile : 1991.2 [WRITE:1991.2]
latency max               : 2338.2 [WRITE:2338.2]

After this patch:

latency mean              : 54.9 [WRITE:54.9]
latency median            : 43.5 [WRITE:43.5]
latency 95th percentile   : 126.9 [WRITE:126.9]
latency 99th percentile   : 253.9 [WRITE:253.9]
latency 99.9th percentile : 364.6 [WRITE:364.6]
latency max               : 471.4 [WRITE:471.4]

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2016-10-13 12:23:50 -04:00
parent aec724bbda
commit 1578d7363a
2 changed files with 83 additions and 136 deletions

View File

@@ -57,6 +57,7 @@
#include <core/gate.hh>
#include <core/fstream.hh>
#include <seastar/core/memory.hh>
#include <seastar/core/chunked_fifo.hh>
#include <net/byteorder.hh>
#include "commitlog.hh"
@@ -165,7 +166,6 @@ public:
bool _shutdown = false;
semaphore _new_segment_semaphore {1};
semaphore _write_semaphore;
semaphore _flush_semaphore;
scollectd::registrations _regs;
@@ -175,6 +175,19 @@ public:
using time_point = clock_type::time_point;
using sseg_ptr = lw_shared_ptr<segment>;
semaphore _request_controller;
void account_memory_usage(size_t size) {
_request_controller.consume(size);
}
void notify_memory_written(size_t size) {
_request_controller.signal(size);
}
future<db::replay_position>
allocate_when_possible(const cf_id_type& id, shared_ptr<entry_writer> writer);
struct stats {
uint64_t cycle_count = 0;
uint64_t flush_count = 0;
@@ -183,10 +196,7 @@ public:
uint64_t bytes_slack = 0;
uint64_t segments_created = 0;
uint64_t segments_destroyed = 0;
uint64_t pending_writes = 0;
uint64_t pending_flushes = 0;
uint64_t pending_allocations = 0;
uint64_t write_limit_exceeded = 0;
uint64_t flush_limit_exceeded = 0;
uint64_t total_size = 0;
uint64_t buffer_list_bytes = 0;
@@ -196,20 +206,7 @@ public:
stats totals;
size_t pending_allocations() const {
return totals.pending_allocations;
}
future<> begin_write() {
++totals.pending_writes; // redundant, given semaphore. but easier to read
if (totals.pending_writes >= cfg.max_active_writes) {
++totals.write_limit_exceeded;
logger.trace("Write ops overflow: {}. Will block.", totals.pending_writes);
}
return _write_semaphore.wait();
}
void end_write() {
_write_semaphore.signal();
--totals.pending_writes;
return _request_controller.waiters();
}
future<> begin_flush() {
@@ -224,11 +221,6 @@ public:
_flush_semaphore.signal();
--totals.pending_flushes;
}
bool should_wait_for_write() const {
return cfg.mode == sync_mode::BATCH || _write_semaphore.waiters() > 0 || _flush_semaphore.waiters() > 0;
}
segment_manager(config c);
~segment_manager() {
logger.trace("Commitlog {} disposed", cfg.commit_log_location);
@@ -399,17 +391,6 @@ class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
_segment_manager->end_flush();
}
future<> begin_write() {
// This is maintaining the semantica of only using the write-lock
// as a gate for flushing, i.e. once we've begun a flush for position X
// we are ok with writes to positions > X
return _segment_manager->begin_write();
}
void end_write() {
_segment_manager->end_write();
}
public:
struct cf_mark {
const segment& s;
@@ -481,9 +462,8 @@ public:
*/
future<sseg_ptr> finish_and_get_new() {
_closed = true;
return maybe_wait_for_write(sync()).then([](sseg_ptr s) {
return s->_segment_manager->active_segment();
});
sync();
return _segment_manager->active_segment();
}
void reset_sync_time() {
_sync_time = clock_type::now();
@@ -660,8 +640,6 @@ public:
// The write will be allowed to start now, but flush (below) must wait for not only this,
// but all previous write/flush pairs.
return _pending_ops.run_with_ordered_post_op(rp, [this, size, off, buf = std::move(buf)]() mutable {
// This could "block", if we have to many pending writes.
return begin_write().then([this, size, off, buf = std::move(buf)]() mutable {
auto written = make_lw_shared<size_t>(0);
auto p = buf.get();
return repeat([this, size, off, written, p]() mutable {
@@ -688,62 +666,16 @@ public:
throw;
}
});
}).finally([this, buf = std::move(buf)]() mutable {
}).finally([this, buf = std::move(buf), size]() mutable {
_segment_manager->release_buffer(std::move(buf));
_segment_manager->notify_memory_written(size);
});
}).finally([this]() {
end_write(); // release
});
}, [me, flush_after, top, rp] { // lambda instead of bind, so we keep "me" alive.
assert(me->_pending_ops.has_operation(rp));
return flush_after ? me->do_flush(top) : make_ready_future<sseg_ptr>(me);
});
}
future<sseg_ptr> maybe_wait_for_write(future<sseg_ptr> f) {
if (_segment_manager->should_wait_for_write()) {
++_write_waiters;
logger.trace("Too many pending writes. Must wait.");
return f.finally([this] {
--_write_waiters;
});
}
return make_ready_future<sseg_ptr>(shared_from_this());
}
/**
* If an allocation causes a write, and the write causes a block,
* any allocations post that need to wait for this to finish,
* other wise we will just continue building up more write queue
* eventually (+ loose more ordering)
*
* Some caution here, since maybe_wait_for_write actually
* releases _all_ queued up ops when finishing, we could get
* "bursts" of alloc->write, causing build-ups anyway.
* This should be measured properly. For now I am hoping this
* will work out as these should "block as a group". However,
* buffer memory usage might grow...
*/
bool must_wait_for_alloc() {
// Note: write_waiters is decremented _after_ both semaphores and
// flush queue might be cleared. So we should not look only at it.
// But we still don't want to look at "should_wait_for_write" directly,
// since that is "global" and includes other segments, and we want to
// know if _this_ segment has blocking write ops pending.
// So we also check that the flush queue is non-empty.
return _write_waiters > 0 && !_pending_ops.empty();
}
future<sseg_ptr> wait_for_alloc() {
auto me = shared_from_this();
++_segment_manager->totals.pending_allocations;
logger.trace("Previous allocation is blocking. Must wait.");
return _pending_ops.wait_for_pending().then_wrapped([me](auto f) { // TODO: do we need a finally?
--me->_segment_manager->totals.pending_allocations;
return f.failed() ? me->_segment_manager->active_segment() : make_ready_future<sseg_ptr>(me);
});
}
future<sseg_ptr> batch_cycle() {
/**
* For batch mode we force a write "immediately".
@@ -778,10 +710,17 @@ public:
return make_exception_future<sseg_ptr>(p);
});
}
/**
* Add a "mutation" to the segment.
*/
future<replay_position> allocate(const cf_id_type& id, shared_ptr<entry_writer> writer) {
future<replay_position> allocate(const cf_id_type& id, shared_ptr<entry_writer> writer, semaphore_units<> permit) {
if (must_sync()) {
return sync().then([this, id, writer = std::move(writer), permit = std::move(permit)] (auto s) mutable {
return s->allocate(id, std::move(writer), std::move(permit));
});
}
const auto size = writer->size(*this);
const auto s = size + entry_overhead_size; // total size
auto ep = _segment_manager->sanity_check_size(s);
@@ -789,36 +728,34 @@ public:
return make_exception_future<replay_position>(std::move(ep));
}
std::experimental::optional<future<sseg_ptr>> op;
if (must_sync()) {
op = sync();
} else if (must_wait_for_alloc()) {
op = wait_for_alloc();
} else if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big?
// do this in next segment instead.
op = finish_and_get_new();
} else if (_buffer.empty()) {
new_buffer(s);
} else if (s > (_buffer.size() - _buf_pos)) { // enough data?
if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big?
return finish_and_get_new().then([id, writer = std::move(writer), permit = std::move(permit)] (auto new_seg) mutable {
return new_seg->allocate(id, std::move(writer), std::move(permit));
});
} else if (!_buffer.empty() && (size > (_buffer.size() - _buf_pos))) { // enough data?
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
// TODO: this could cause starvation if we're really unlucky.
// If we run batch mode and find ourselves not fit in a non-empty
// buffer, we must force a cycle and wait for it (to keep flush order)
// This will most likely cause parallel writes, and consecutive flushes.
op = cycle(true);
return cycle(true).then([this, id, writer = std::move(writer), permit = std::move(permit)] (auto new_seg) mutable {
return new_seg->allocate(id, std::move(writer), std::move(permit));
});
} else {
op = maybe_wait_for_write(cycle());
cycle();
}
}
if (op) {
return op->then([id, writer = std::move(writer)] (sseg_ptr new_seg) mutable {
return new_seg->allocate(id, std::move(writer));
});
size_t buf_memory = s;
if (_buffer.empty()) {
new_buffer(s);
buf_memory += _buf_pos;
}
_gate.enter(); // this might throw. I guess we accept this?
buf_memory -= permit.release();
_segment_manager->account_memory_usage(buf_memory);
replay_position rp(_desc.id, position());
auto pos = _buf_pos;
@@ -849,12 +786,20 @@ public:
_gate.leave();
if (_segment_manager->cfg.mode == sync_mode::BATCH) {
return batch_cycle().then([rp](auto s) {
return make_ready_future<replay_position>(rp);
return _pending_ops.wait_for_pending().then([this, rp = std::move(rp)] {
return batch_cycle().then([rp](auto s) {
return make_ready_future<replay_position>(rp);
});
});
} else {
// If this buffer alone is too big, potentially bigger than the maximum allowed size,
// then no other request will be allowed in to force the cycle()ing of this buffer. We
// have to do it ourselves.
if ((_buf_pos >= (db::commitlog::segment::default_size))) {
cycle();
}
return make_ready_future<replay_position>(rp);
}
return make_ready_future<replay_position>(rp);
}
position_type position() const {
@@ -872,6 +817,7 @@ public:
std::fill(_buffer.get_write() + _buf_pos, _buffer.get_write() + size,
0);
_segment_manager->totals.bytes_slack += (size - _buf_pos);
_segment_manager->account_memory_usage(size - _buf_pos);
return size;
}
void mark_clean(const cf_id_type& id, position_type pos) {
@@ -913,6 +859,26 @@ public:
}
};
future<db::replay_position>
db::commitlog::segment_manager::allocate_when_possible(const cf_id_type& id, shared_ptr<entry_writer> writer) {
auto size = writer->size();
// If this is already too big now, we should throw early. It's also a correctness issue, since
// if we are too big at this moment we'll never reach allocate() to actually throw at that
// point.
auto ep = sanity_check_size(size);
if (ep) {
return make_exception_future<replay_position>(std::move(ep));
}
auto fut = get_units(_request_controller, size);
return fut.then([this, id, writer = std::move(writer)] (auto permit) mutable {
return this->active_segment().then([this, id, writer = std::move(writer), permit = std::move(permit)] (auto s) mutable {
return s->allocate(id, std::move(writer), std::move(permit));
});
});
}
const size_t db::commitlog::segment::default_size;
db::commitlog::segment_manager::segment_manager(config c)
@@ -939,8 +905,12 @@ db::commitlog::segment_manager::segment_manager(config c)
, max_size(std::min<size_t>(std::numeric_limits<position_type>::max(), std::max<size_t>(cfg.commitlog_segment_size_in_mb, 1) * 1024 * 1024))
, max_mutation_size(max_size >> 1)
, max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024)
, _write_semaphore(cfg.max_active_writes)
, _flush_semaphore(cfg.max_active_flushes)
// That is enough concurrency to allow for our largest mutation (max_mutation_size), plus
// an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger
// than default_size at the end of the allocation, that allows for every valid mutation to
// always be admitted for processing.
, _request_controller(max_mutation_size + db::commitlog::segment::default_size)
{
assert(max_size > 0);
@@ -1078,10 +1048,6 @@ scollectd::registrations db::commitlog::segment_manager::create_counters() {
, make_typed(data_type::DERIVE, totals.bytes_slack)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "queue_length", "pending_writes")
, make_typed(data_type::GAUGE, totals.pending_writes)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "queue_length", "pending_flushes")
, make_typed(data_type::GAUGE, totals.pending_flushes)
@@ -1092,10 +1058,6 @@ scollectd::registrations db::commitlog::segment_manager::create_counters() {
, make_typed(data_type::GAUGE, [this] { return pending_allocations(); })
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_operations", "write_limit_exceeded")
, make_typed(data_type::DERIVE, totals.write_limit_exceeded)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_operations", "flush_limit_exceeded")
, make_typed(data_type::DERIVE, totals.flush_limit_exceeded)
@@ -1438,9 +1400,7 @@ future<db::replay_position> db::commitlog::add(const cf_id_type& id,
}
};
auto writer = ::make_shared<serializer_func_entry_writer>(size, std::move(func));
return _segment_manager->active_segment().then([id, writer] (auto s) {
return s->allocate(id, writer);
});
return _segment_manager->allocate_when_possible(id, writer);
}
future<db::replay_position> db::commitlog::add_entry(const cf_id_type& id, const commitlog_entry_writer& cew)
@@ -1464,9 +1424,7 @@ future<db::replay_position> db::commitlog::add_entry(const cf_id_type& id, const
}
};
auto writer = ::make_shared<cl_entry_writer>(cew);
return _segment_manager->active_segment().then([id, writer] (auto s) {
return s->allocate(id, writer);
});
return _segment_manager->allocate_when_possible(id, writer);
}
db::commitlog::commitlog(config cfg)
@@ -1805,12 +1763,7 @@ uint64_t db::commitlog::get_flush_count() const {
}
uint64_t db::commitlog::get_pending_tasks() const {
return _segment_manager->totals.pending_writes
+ _segment_manager->totals.pending_flushes;
}
uint64_t db::commitlog::get_pending_writes() const {
return _segment_manager->totals.pending_writes;
return _segment_manager->totals.pending_flushes;
}
uint64_t db::commitlog::get_pending_flushes() const {
@@ -1821,10 +1774,6 @@ uint64_t db::commitlog::get_pending_allocations() const {
return _segment_manager->pending_allocations();
}
uint64_t db::commitlog::get_write_limit_exceeded_count() const {
return _segment_manager->totals.write_limit_exceeded;
}
uint64_t db::commitlog::get_flush_limit_exceeded_count() const {
return _segment_manager->totals.flush_limit_exceeded;
}