database: wrap semaphore and region group into a new dirty memory manager

We currently have a semaphore in the column family level that protects us against
multiple concurrent sstable flushes. However, storing that semaphore into the CF,
not the database, was a (implementation, not design) mistake.

One comment in particular makes it quite clear:

   // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind
   // would take care of the rest. But that still has issues, so we'll limit parallelism to some
   // number (4), that we will hopefully reduce to 1 when write behind works.

So I aimed for the shard, but ended up coding it into the CF because that's closer to the
flush point - my bad.

This patch fixes this while paving the way for active reclaim to take place. It wraps the semaphore
and the region group in a new structure, the dirty_memory_manager. The immediate benefit is that we
don't need to be passing both the semaphore and the region group downwards in the DB -> CF path. The
long term benefit is that we now have a one unified structure that can hold shared flush data in all
of the CFs.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2016-07-05 15:29:04 -04:00
parent d41fcd45d1
commit c358947284
2 changed files with 99 additions and 56 deletions

View File

@@ -92,21 +92,21 @@ lw_shared_ptr<memtable_list>
column_family::make_memory_only_memtable_list() { column_family::make_memory_only_memtable_list() {
auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); }; auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); };
auto get_schema = [this] { return schema(); }; auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager);
} }
lw_shared_ptr<memtable_list> lw_shared_ptr<memtable_list>
column_family::make_memtable_list() { column_family::make_memtable_list() {
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); }; auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); };
auto get_schema = [this] { return schema(); }; auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager);
} }
lw_shared_ptr<memtable_list> lw_shared_ptr<memtable_list>
column_family::make_streaming_memtable_list() { column_family::make_streaming_memtable_list() {
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); }; auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); };
auto get_schema = [this] { return schema(); }; auto get_schema = [this] { return schema(); };
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group, _streaming_serializer); return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager);
} }
column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager) column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager)
@@ -1329,10 +1329,8 @@ database::database(const db::config& cfg)
return memtable_total_space; return memtable_total_space;
}()) }())
, _streaming_memtable_total_space(_memtable_total_space / 4) , _streaming_memtable_total_space(_memtable_total_space / 4)
, _dirty_memory_region_group_reclaimer(_memtable_total_space) , _dirty_memory_manager(_memtable_total_space)
, _streaming_dirty_memory_region_group_reclaimer(_streaming_memtable_total_space) , _streaming_dirty_memory_manager(&_dirty_memory_manager, _streaming_memtable_total_space)
, _dirty_memory_region_group(_dirty_memory_region_group_reclaimer)
, _streaming_dirty_memory_region_group(&_dirty_memory_region_group, _streaming_dirty_memory_region_group_reclaimer)
, _version(empty_version) , _version(empty_version)
, _enable_incremental_backups(cfg.incremental_backups()) , _enable_incremental_backups(cfg.incremental_backups())
{ {
@@ -1349,7 +1347,7 @@ database::setup_collectd() {
, scollectd::per_cpu_plugin_instance , scollectd::per_cpu_plugin_instance
, "bytes", "dirty") , "bytes", "dirty")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] { , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
return _dirty_memory_region_group.memory_used(); return dirty_memory_region_group().memory_used();
}))); })));
_collectd.push_back( _collectd.push_back(
@@ -1778,8 +1776,8 @@ keyspace::make_column_family_config(const schema& s) const {
cfg.enable_cache = _config.enable_cache; cfg.enable_cache = _config.enable_cache;
cfg.max_memtable_size = _config.max_memtable_size; cfg.max_memtable_size = _config.max_memtable_size;
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size; cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
cfg.dirty_memory_region_group = _config.dirty_memory_region_group; cfg.dirty_memory_manager = _config.dirty_memory_manager;
cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group; cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
cfg.read_concurrency_config = _config.read_concurrency_config; cfg.read_concurrency_config = _config.read_concurrency_config;
cfg.cf_stats = _config.cf_stats; cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups; cfg.enable_incremental_backups = _config.enable_incremental_backups;
@@ -2107,8 +2105,14 @@ column_family::check_valid_rp(const db::replay_position& rp) const {
} }
} }
future<> dirty_memory_manager::shutdown() {
return _waiting_flush_gate.close().then([this] {
return _region_group.shutdown();
});
}
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) { future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) {
return _dirty_memory_region_group.run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] {
try { try {
auto& cf = find_column_family(m.column_family_id()); auto& cf = find_column_family(m.column_family_id());
cf.apply(m, m_schema, rp); cf.apply(m, m_schema, rp);
@@ -2161,7 +2165,7 @@ future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation&
throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s",
s->ks_name(), s->cf_name(), s->version())); s->ks_name(), s->cf_name(), s->version()));
} }
return _streaming_dirty_memory_region_group.run_when_memory_available([this, &m, s = std::move(s)] { return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, s = std::move(s)] {
auto uuid = m.column_family_id(); auto uuid = m.column_family_id();
auto& cf = find_column_family(uuid); auto& cf = find_column_family(uuid);
cf.apply_streaming_mutation(s, std::move(m)); cf.apply_streaming_mutation(s, std::move(m));
@@ -2193,8 +2197,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
// All writes should go to the main memtable list if we're not durable // All writes should go to the main memtable list if we're not durable
cfg.max_streaming_memtable_size = 0; cfg.max_streaming_memtable_size = 0;
} }
cfg.dirty_memory_region_group = &_dirty_memory_region_group; cfg.dirty_memory_manager = &_dirty_memory_manager;
cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group; cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager;
cfg.read_concurrency_config.sem = &_read_concurrency_sem; cfg.read_concurrency_config.sem = &_read_concurrency_sem;
cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms; cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms;
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads. // Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
@@ -2268,9 +2272,9 @@ database::stop() {
return val_pair.second->stop(); return val_pair.second->stop();
}); });
}).then([this] { }).then([this] {
return _dirty_memory_region_group.shutdown(); return _dirty_memory_manager.shutdown();
}).then([this] { }).then([this] {
return _streaming_dirty_memory_region_group.shutdown(); return _streaming_dirty_memory_manager.shutdown();
}); });
} }

View File

@@ -101,6 +101,73 @@ void make(database& db, bool durable, bool volatile_testing_only);
class replay_position_reordered_exception : public std::exception {}; class replay_position_reordered_exception : public std::exception {};
using shared_memtable = lw_shared_ptr<memtable>;
class dirty_memory_manager: private logalloc::region_group_reclaimer {
logalloc::region_group _region_group;
// We would like to serialize the flushing of memtables. While flushing many memtables
// simultaneously can sustain high levels of throughput, the memory is not freed until the
// memtable is totally gone. That means that if we have throttled requests, they will stay
// throttled for a long time. Even when we have virtual dirty, that only provides a rough
// estimate, and we can't release requests that early.
//
// Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind
// would take care of the rest. But that still has issues, so we'll limit parallelism to some
// number (4), that we will hopefully reduce to 1 when write behind works.
//
// When streaming is going on, we'll separate half of that for the streaming code, which
// effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O
// Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have
// to do for the moment. Hopefully we can set both to 1 soon (with write behind)
//
// FIXME: enable write behind and set both to 1. Right now we will take advantage of the fact
// that memtables and streaming will use different specialized classes here and set them as
// default values here.
size_t _concurrency;
semaphore _flush_serializer;
seastar::gate _waiting_flush_gate;
std::vector<shared_memtable> _pending_flushes;
public:
future<> shutdown();
dirty_memory_manager(size_t threshold, size_t concurrency)
: logalloc::region_group_reclaimer(threshold)
, _region_group(*this)
, _concurrency(concurrency)
, _flush_serializer(concurrency) {}
dirty_memory_manager(dirty_memory_manager *parent, size_t threshold, size_t concurrency)
: logalloc::region_group_reclaimer(threshold)
, _region_group(&parent->_region_group, *this)
, _concurrency(concurrency)
, _flush_serializer(concurrency) {}
logalloc::region_group& region_group() {
return _region_group;
}
const logalloc::region_group& region_group() const {
return _region_group;
}
template <typename Func>
future<> serialize_flush(Func&& func) {
return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable {
return with_semaphore(_flush_serializer, 1, func);
});
}
};
class streaming_dirty_memory_manager: public dirty_memory_manager {
public:
streaming_dirty_memory_manager(dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(parent, threshold, 2) {}
};
class memtable_dirty_memory_manager: public dirty_memory_manager {
public:
memtable_dirty_memory_manager(size_t threshold) : dirty_memory_manager(threshold, 4) {}
};
// We could just add all memtables, regardless of types, to a single list, and // We could just add all memtables, regardless of types, to a single list, and
// then filter them out when we read them. Here's why I have chosen not to do // then filter them out when we read them. Here's why I have chosen not to do
// it: // it:
@@ -123,21 +190,18 @@ class memtable_list {
public: public:
enum class flush_behavior { delayed, immediate }; enum class flush_behavior { delayed, immediate };
private: private:
using shared_memtable = lw_shared_ptr<memtable>;
std::vector<shared_memtable> _memtables; std::vector<shared_memtable> _memtables;
std::function<future<> (flush_behavior)> _seal_fn; std::function<future<> (flush_behavior)> _seal_fn;
std::function<schema_ptr()> _current_schema; std::function<schema_ptr()> _current_schema;
size_t _max_memtable_size; size_t _max_memtable_size;
logalloc::region_group* _dirty_memory_region_group; dirty_memory_manager* _dirty_memory_manager;
semaphore& _region_group_serializer;
public: public:
memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, size_t max_memtable_size, logalloc::region_group* region_group, semaphore& sem) memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, size_t max_memtable_size, dirty_memory_manager* dirty_memory_manager)
: _memtables({}) : _memtables({})
, _seal_fn(seal_fn) , _seal_fn(seal_fn)
, _current_schema(cs) , _current_schema(cs)
, _max_memtable_size(max_memtable_size) , _max_memtable_size(max_memtable_size)
, _dirty_memory_region_group(region_group) , _dirty_memory_manager(dirty_memory_manager) {
, _region_group_serializer(sem) {
add_memtable(); add_memtable();
} }
@@ -163,11 +227,7 @@ public:
if (behavior == flush_behavior::delayed) { if (behavior == flush_behavior::delayed) {
return _seal_fn(behavior); return _seal_fn(behavior);
} }
return _region_group_serializer.wait().then([this] { return _dirty_memory_manager->serialize_flush([this] { return _seal_fn(flush_behavior::immediate); });
return _seal_fn(flush_behavior::immediate);
}).finally([this] {
_region_group_serializer.signal();
});
} }
auto begin() noexcept { auto begin() noexcept {
@@ -207,7 +267,7 @@ public:
} }
private: private:
lw_shared_ptr<memtable> new_memtable() { lw_shared_ptr<memtable> new_memtable() {
return make_lw_shared<memtable>(_current_schema(), _dirty_memory_region_group); return make_lw_shared<memtable>(_current_schema(), &(_dirty_memory_manager->region_group()));
} }
}; };
@@ -234,8 +294,8 @@ public:
bool enable_incremental_backups = false; bool enable_incremental_backups = false;
size_t max_memtable_size = 5'000'000; size_t max_memtable_size = 5'000'000;
size_t max_streaming_memtable_size = 5'000'000; size_t max_streaming_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr; ::dirty_memory_manager* dirty_memory_manager = nullptr;
logalloc::region_group* streaming_dirty_memory_region_group = nullptr; ::dirty_memory_manager* streaming_dirty_memory_manager = nullptr;
restricted_mutation_reader_config read_concurrency_config; restricted_mutation_reader_config read_concurrency_config;
::cf_stats* cf_stats = nullptr; ::cf_stats* cf_stats = nullptr;
}; };
@@ -268,24 +328,6 @@ private:
config _config; config _config;
stats _stats; stats _stats;
// We would like to serialize the flushing of memtables. While flushing many memtables
// simultaneously can sustain high levels of throughput, the memory is not freed until the
// memtable is totally gone. That means that if we have throttled requests, they will stay
// throttled for a long time. Even when we have virtual dirty, that only provides a rough
// estimate, and we can't release requests that early.
//
// Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind
// would take care of the rest. But that still has issues, so we'll limit parallelism to some
// number (4), that we will hopefully reduce to 1 when write behind works.
//
// When streaming is going on, we'll separate half of that for the streaming code, which
// effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O
// Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have
// to do for the moment. Hopefully we can set both to 1 soon (with write behind)
//
// FIXME: enable write behind and set both to 1.
semaphore _memtables_serializer = { 4 };
semaphore _streaming_serializer = { 2 };
lw_shared_ptr<memtable_list> _memtables; lw_shared_ptr<memtable_list> _memtables;
// In older incarnations, we simply commited the mutations to memtables. // In older incarnations, we simply commited the mutations to memtables.
@@ -765,8 +807,8 @@ public:
bool enable_incremental_backups = false; bool enable_incremental_backups = false;
size_t max_memtable_size = 5'000'000; size_t max_memtable_size = 5'000'000;
size_t max_streaming_memtable_size = 5'000'000; size_t max_streaming_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr; ::dirty_memory_manager* dirty_memory_manager = nullptr;
logalloc::region_group* streaming_dirty_memory_region_group = nullptr; ::dirty_memory_manager* streaming_dirty_memory_manager = nullptr;
restricted_mutation_reader_config read_concurrency_config; restricted_mutation_reader_config read_concurrency_config;
::cf_stats* cf_stats = nullptr; ::cf_stats* cf_stats = nullptr;
}; };
@@ -860,11 +902,8 @@ class database {
std::unique_ptr<db::config> _cfg; std::unique_ptr<db::config> _cfg;
size_t _memtable_total_space = 500 << 20; size_t _memtable_total_space = 500 << 20;
size_t _streaming_memtable_total_space = 500 << 20; size_t _streaming_memtable_total_space = 500 << 20;
logalloc::region_group_reclaimer _dirty_memory_region_group_reclaimer; memtable_dirty_memory_manager _dirty_memory_manager;
logalloc::region_group_reclaimer _streaming_dirty_memory_region_group_reclaimer; streaming_dirty_memory_manager _streaming_dirty_memory_manager;
logalloc::region_group _dirty_memory_region_group;
logalloc::region_group _streaming_dirty_memory_region_group;
semaphore _read_concurrency_sem{max_concurrent_reads()}; semaphore _read_concurrency_sem{max_concurrent_reads()};
restricted_mutation_reader_config _read_concurrency_config; restricted_mutation_reader_config _read_concurrency_config;
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()}; semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
@@ -1005,7 +1044,7 @@ public:
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func); future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func);
const logalloc::region_group& dirty_memory_region_group() const { const logalloc::region_group& dirty_memory_region_group() const {
return _dirty_memory_region_group; return _dirty_memory_manager.region_group();
} }
std::unordered_set<sstring> get_initial_tokens(); std::unordered_set<sstring> get_initial_tokens();