Merge "Serialize memtable flushes" from Glauber

"One of the things we need to do as part of the throttle rework I am doing is to
serialize memtable flushes to some extent - that will guarantee that in case
we're throttling, the flushes finish earlier and release memory earlier, if
compared to the case in which we just let all tables flush freely and
simultaneously."
This commit is contained in:
Avi Kivity
2016-06-01 18:30:43 +03:00
2 changed files with 66 additions and 29 deletions

View File

@@ -89,23 +89,23 @@ public:
lw_shared_ptr<memtable_list>
column_family::make_memory_only_memtable_list() {
auto seal = [this] { return make_ready_future<>(); };
auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); };
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer);
}
lw_shared_ptr<memtable_list>
column_family::make_memtable_list() {
auto seal = [this] { return seal_active_memtable(); };
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); };
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer);
}
lw_shared_ptr<memtable_list>
column_family::make_streaming_memtable_list() {
auto seal = [this] { return seal_active_streaming_memtable_delayed(); };
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); };
auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group);
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group, _streaming_serializer);
}
column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager)
@@ -589,7 +589,7 @@ column_family::seal_active_streaming_memtable_delayed() {
}
if (_streaming_memtables->should_flush()) {
return seal_active_streaming_memtable();
return seal_active_streaming_memtable_immediate();
}
if (!_delayed_streaming_flush.armed()) {
@@ -608,7 +608,7 @@ column_family::seal_active_streaming_memtable_delayed() {
}
future<>
column_family::seal_active_streaming_memtable() {
column_family::seal_active_streaming_memtable_immediate() {
auto old = _streaming_memtables->back();
if (old->empty()) {
return make_ready_future<>();
@@ -667,7 +667,7 @@ column_family::seal_active_streaming_memtable() {
}
future<>
column_family::seal_active_memtable() {
column_family::seal_active_memtable(memtable_list::flush_behavior ignored) {
auto old = _memtables->back();
dblog.debug("Sealing active memtable, partitions: {}, occupancy: {}", old->partition_count(), old->occupancy());
@@ -771,14 +771,8 @@ column_family::start() {
future<>
column_family::stop() {
// Please note that in here, we shouldn't use the implicit seal function in each memtable's
// list. The reason is that for the streaming memtables, the memtable_list's seal function does
// not guarantee anything to be immediately flushed, and will set a timer instead (so we can
// coalesce writes). During stop, we need to force flushing behavior so we call
// seal_active_streaming_memtable() instead. That problem does not exist for memtables and we
// could call _memtables->seal_active_memtable() here. We don't, for consistency with streaming.
seal_active_memtable();
seal_active_streaming_memtable();
_memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
_streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
return _compaction_manager.remove(this).then([this] {
// Nest, instead of using when_all, so we don't lose any exceptions.
return _flush_queue->close().then([this] {
@@ -2594,7 +2588,7 @@ future<> column_family::flush() {
// FIXME: this will synchronously wait for this write to finish, but doesn't guarantee
// anything about previous writes.
_stats.pending_flushes++;
return _memtables->seal_active_memtable().finally([this]() mutable {
return _memtables->seal_active_memtable(memtable_list::flush_behavior::immediate).finally([this]() mutable {
_stats.pending_flushes--;
// In origin memtable_switch_count is incremented inside
// ColumnFamilyMeetrics Flush.run
@@ -2616,7 +2610,7 @@ future<> column_family::flush(const db::replay_position& pos) {
// We ignore this for now and just say that if we're asked for
// a CF and it exists, we pretty much have to have data that needs
// flushing. Let's do it.
return _memtables->seal_active_memtable();
return _memtables->seal_active_memtable(memtable_list::flush_behavior::immediate);
}
// FIXME: We can do much better than this in terms of cache management. Right
@@ -2633,7 +2627,7 @@ future<> column_family::flush_streaming_mutations(std::vector<query::partition_r
// need this code to go away as soon as we can (see FIXME above). So the double gate is a better
// temporary counter measure.
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
return _streaming_memtables->seal_active_memtable().finally([this, ranges = std::move(ranges)] {
return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed).finally([this, ranges = std::move(ranges)] {
if (_config.enable_cache) {
for (auto& range : ranges) {
_cache.invalidate(range);

View File

@@ -147,19 +147,24 @@ class replay_position_reordered_exception : public std::exception {};
// If we are going to have different methods, better have different instances
// of a common class.
class memtable_list {
public:
enum class flush_behavior { delayed, immediate };
private:
using shared_memtable = lw_shared_ptr<memtable>;
std::vector<shared_memtable> _memtables;
std::function<future<> ()> _seal_fn;
std::function<future<> (flush_behavior)> _seal_fn;
std::function<schema_ptr()> _current_schema;
size_t _max_memtable_size;
logalloc::region_group* _dirty_memory_region_group;
semaphore& _region_group_serializer;
public:
memtable_list(std::function<future<> ()> seal_fn, std::function<schema_ptr()> cs, size_t max_memtable_size, logalloc::region_group* region_group)
memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, size_t max_memtable_size, logalloc::region_group* region_group, semaphore& sem)
: _memtables({})
, _seal_fn(seal_fn)
, _current_schema(cs)
, _max_memtable_size(max_memtable_size)
, _dirty_memory_region_group(region_group) {
, _dirty_memory_region_group(region_group)
, _region_group_serializer(sem) {
add_memtable();
}
@@ -179,8 +184,17 @@ public:
return _memtables.size();
}
future<> seal_active_memtable() {
return _seal_fn();
future<> seal_active_memtable(flush_behavior behavior) {
// We may need a lot of delayed flushes to get to the point where we force an immediate
// flush. So just let it go immediately.
if (behavior == flush_behavior::delayed) {
return _seal_fn(behavior);
}
return _region_group_serializer.wait().then([this] {
return _seal_fn(flush_behavior::immediate);
}).finally([this] {
_region_group_serializer.signal();
});
}
auto begin() noexcept {
@@ -215,7 +229,7 @@ public:
if (should_flush()) {
// FIXME: if sparse, do some in-memory compaction first
// FIXME: maybe merge with other in-memory memtables
_seal_fn();
seal_active_memtable(flush_behavior::immediate);
}
}
private:
@@ -279,6 +293,25 @@ private:
schema_ptr _schema;
config _config;
stats _stats;
// We would like to serialize the flushing of memtables. While flushing many memtables
// simultaneously can sustain high levels of throughput, the memory is not freed until the
// memtable is totally gone. That means that if we have throttled requests, they will stay
// throttled for a long time. Even when we have virtual dirty, that only provides a rough
// estimate, and we can't release requests that early.
//
// Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind
// would take care of the rest. But that still has issues, so we'll limit parallelism to some
// number (4), that we will hopefully reduce to 1 when write behind works.
//
// When streaming is going on, we'll separate half of that for the streaming code, which
// effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O
// Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have
// to do for the moment. Hopefully we can set both to 1 soon (with write behind)
//
// FIXME: enable write behind and set both to 1.
semaphore _memtables_serializer = { 4 };
semaphore _streaming_serializer = { 2 };
lw_shared_ptr<memtable_list> _memtables;
// In older incarnations, we simply commited the mutations to memtables.
@@ -596,7 +629,7 @@ private:
// But it is possible to synchronously wait for the seal to complete by
// waiting on this future. This is useful in situations where we want to
// synchronously flush data to disk.
future<> seal_active_memtable();
future<> seal_active_memtable(memtable_list::flush_behavior behavior = memtable_list::flush_behavior::delayed);
// I am assuming here that the repair process will potentially send ranges containing
// few mutations, definitely not enough to fill a memtable. It wants to know whether or
@@ -619,9 +652,19 @@ private:
// repair can now choose whatever strategy - small or big ranges - it wants, resting assure
// that the incoming memtables will be coalesced together.
shared_promise<> _waiting_streaming_flushes;
timer<> _delayed_streaming_flush{[this] { seal_active_streaming_memtable(); }};
future<> seal_active_streaming_memtable();
timer<> _delayed_streaming_flush{[this] { seal_active_streaming_memtable_immediate(); }};
future<> seal_active_streaming_memtable_delayed();
future<> seal_active_streaming_memtable_immediate();
future<> seal_active_streaming_memtable(memtable_list::flush_behavior behavior) {
if (behavior == memtable_list::flush_behavior::delayed) {
return seal_active_streaming_memtable_delayed();
} else if (behavior == memtable_list::flush_behavior::immediate) {
return seal_active_streaming_memtable_immediate();
} else {
// Impossible
assert(0);
}
}
// filter manifest.json files out
static bool manifest_json_filter(const sstring& fname);