From c358947284f55c0299353cac2024d64b643fac74 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Tue, 5 Jul 2016 15:29:04 -0400 Subject: [PATCH] database: wrap semaphore and region group into a new dirty memory manager We currently have a semaphore in the column family level that protects us against multiple concurrent sstable flushes. However, storing that semaphore into the CF, not the database, was a (implementation, not design) mistake. One comment in particular makes it quite clear: // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind // would take care of the rest. But that still has issues, so we'll limit parallelism to some // number (4), that we will hopefully reduce to 1 when write behind works. So I aimed for the shard, but ended up coding it into the CF because that's closer to the flush point - my bad. This patch fixes this while paving the way for active reclaim to take place. It wraps the semaphore and the region group in a new structure, the dirty_memory_manager. The immediate benefit is that we don't need to be passing both the semaphore and the region group downwards in the DB -> CF path. The long term benefit is that we now have a one unified structure that can hold shared flush data in all of the CFs. Signed-off-by: Glauber Costa --- database.cc | 36 +++++++++------- database.hh | 119 ++++++++++++++++++++++++++++++++++------------------ 2 files changed, 99 insertions(+), 56 deletions(-) diff --git a/database.cc b/database.cc index 0ffc30cf76..fb5761ba24 100644 --- a/database.cc +++ b/database.cc @@ -92,21 +92,21 @@ lw_shared_ptr column_family::make_memory_only_memtable_list() { auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager); } lw_shared_ptr column_family::make_memtable_list() { auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager); } lw_shared_ptr column_family::make_streaming_memtable_list() { auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); }; auto get_schema = [this] { return schema(); }; - return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group, _streaming_serializer); + return make_lw_shared(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager); } column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager) @@ -1329,10 +1329,8 @@ database::database(const db::config& cfg) return memtable_total_space; }()) , _streaming_memtable_total_space(_memtable_total_space / 4) - , _dirty_memory_region_group_reclaimer(_memtable_total_space) - , _streaming_dirty_memory_region_group_reclaimer(_streaming_memtable_total_space) - , _dirty_memory_region_group(_dirty_memory_region_group_reclaimer) - , _streaming_dirty_memory_region_group(&_dirty_memory_region_group, _streaming_dirty_memory_region_group_reclaimer) + , _dirty_memory_manager(_memtable_total_space) + , _streaming_dirty_memory_manager(&_dirty_memory_manager, _streaming_memtable_total_space) , _version(empty_version) , _enable_incremental_backups(cfg.incremental_backups()) { @@ -1349,7 +1347,7 @@ database::setup_collectd() { , scollectd::per_cpu_plugin_instance , "bytes", "dirty") , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { - return _dirty_memory_region_group.memory_used(); + return dirty_memory_region_group().memory_used(); }))); _collectd.push_back( @@ -1778,8 +1776,8 @@ keyspace::make_column_family_config(const schema& s) const { cfg.enable_cache = _config.enable_cache; cfg.max_memtable_size = _config.max_memtable_size; cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size; - cfg.dirty_memory_region_group = _config.dirty_memory_region_group; - cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group; + cfg.dirty_memory_manager = _config.dirty_memory_manager; + cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager; cfg.read_concurrency_config = _config.read_concurrency_config; cfg.cf_stats = _config.cf_stats; cfg.enable_incremental_backups = _config.enable_incremental_backups; @@ -2107,8 +2105,14 @@ column_family::check_valid_rp(const db::replay_position& rp) const { } } +future<> dirty_memory_manager::shutdown() { + return _waiting_flush_gate.close().then([this] { + return _region_group.shutdown(); + }); +} + future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) { - return _dirty_memory_region_group.run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { + return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] { try { auto& cf = find_column_family(m.column_family_id()); cf.apply(m, m_schema, rp); @@ -2161,7 +2165,7 @@ future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation& throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s", s->ks_name(), s->cf_name(), s->version())); } - return _streaming_dirty_memory_region_group.run_when_memory_available([this, &m, s = std::move(s)] { + return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, s = std::move(s)] { auto uuid = m.column_family_id(); auto& cf = find_column_family(uuid); cf.apply_streaming_mutation(s, std::move(m)); @@ -2193,8 +2197,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) { // All writes should go to the main memtable list if we're not durable cfg.max_streaming_memtable_size = 0; } - cfg.dirty_memory_region_group = &_dirty_memory_region_group; - cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group; + cfg.dirty_memory_manager = &_dirty_memory_manager; + cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager; cfg.read_concurrency_config.sem = &_read_concurrency_sem; cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms; // Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads. @@ -2268,9 +2272,9 @@ database::stop() { return val_pair.second->stop(); }); }).then([this] { - return _dirty_memory_region_group.shutdown(); + return _dirty_memory_manager.shutdown(); }).then([this] { - return _streaming_dirty_memory_region_group.shutdown(); + return _streaming_dirty_memory_manager.shutdown(); }); } diff --git a/database.hh b/database.hh index b437a8c197..cf554bd4d2 100644 --- a/database.hh +++ b/database.hh @@ -101,6 +101,73 @@ void make(database& db, bool durable, bool volatile_testing_only); class replay_position_reordered_exception : public std::exception {}; +using shared_memtable = lw_shared_ptr; + +class dirty_memory_manager: private logalloc::region_group_reclaimer { + logalloc::region_group _region_group; + + // We would like to serialize the flushing of memtables. While flushing many memtables + // simultaneously can sustain high levels of throughput, the memory is not freed until the + // memtable is totally gone. That means that if we have throttled requests, they will stay + // throttled for a long time. Even when we have virtual dirty, that only provides a rough + // estimate, and we can't release requests that early. + // + // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind + // would take care of the rest. But that still has issues, so we'll limit parallelism to some + // number (4), that we will hopefully reduce to 1 when write behind works. + // + // When streaming is going on, we'll separate half of that for the streaming code, which + // effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O + // Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have + // to do for the moment. Hopefully we can set both to 1 soon (with write behind) + // + // FIXME: enable write behind and set both to 1. Right now we will take advantage of the fact + // that memtables and streaming will use different specialized classes here and set them as + // default values here. + size_t _concurrency; + semaphore _flush_serializer; + + seastar::gate _waiting_flush_gate; + std::vector _pending_flushes; +public: + future<> shutdown(); + dirty_memory_manager(size_t threshold, size_t concurrency) + : logalloc::region_group_reclaimer(threshold) + , _region_group(*this) + , _concurrency(concurrency) + , _flush_serializer(concurrency) {} + + dirty_memory_manager(dirty_memory_manager *parent, size_t threshold, size_t concurrency) + : logalloc::region_group_reclaimer(threshold) + , _region_group(&parent->_region_group, *this) + , _concurrency(concurrency) + , _flush_serializer(concurrency) {} + logalloc::region_group& region_group() { + return _region_group; + } + + const logalloc::region_group& region_group() const { + return _region_group; + } + + template + future<> serialize_flush(Func&& func) { + return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable { + return with_semaphore(_flush_serializer, 1, func); + }); + } +}; + +class streaming_dirty_memory_manager: public dirty_memory_manager { +public: + streaming_dirty_memory_manager(dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(parent, threshold, 2) {} +}; + +class memtable_dirty_memory_manager: public dirty_memory_manager { +public: + memtable_dirty_memory_manager(size_t threshold) : dirty_memory_manager(threshold, 4) {} +}; + // We could just add all memtables, regardless of types, to a single list, and // then filter them out when we read them. Here's why I have chosen not to do // it: @@ -123,21 +190,18 @@ class memtable_list { public: enum class flush_behavior { delayed, immediate }; private: - using shared_memtable = lw_shared_ptr; std::vector _memtables; std::function (flush_behavior)> _seal_fn; std::function _current_schema; size_t _max_memtable_size; - logalloc::region_group* _dirty_memory_region_group; - semaphore& _region_group_serializer; + dirty_memory_manager* _dirty_memory_manager; public: - memtable_list(std::function (flush_behavior)> seal_fn, std::function cs, size_t max_memtable_size, logalloc::region_group* region_group, semaphore& sem) + memtable_list(std::function (flush_behavior)> seal_fn, std::function cs, size_t max_memtable_size, dirty_memory_manager* dirty_memory_manager) : _memtables({}) , _seal_fn(seal_fn) , _current_schema(cs) , _max_memtable_size(max_memtable_size) - , _dirty_memory_region_group(region_group) - , _region_group_serializer(sem) { + , _dirty_memory_manager(dirty_memory_manager) { add_memtable(); } @@ -163,11 +227,7 @@ public: if (behavior == flush_behavior::delayed) { return _seal_fn(behavior); } - return _region_group_serializer.wait().then([this] { - return _seal_fn(flush_behavior::immediate); - }).finally([this] { - _region_group_serializer.signal(); - }); + return _dirty_memory_manager->serialize_flush([this] { return _seal_fn(flush_behavior::immediate); }); } auto begin() noexcept { @@ -207,7 +267,7 @@ public: } private: lw_shared_ptr new_memtable() { - return make_lw_shared(_current_schema(), _dirty_memory_region_group); + return make_lw_shared(_current_schema(), &(_dirty_memory_manager->region_group())); } }; @@ -234,8 +294,8 @@ public: bool enable_incremental_backups = false; size_t max_memtable_size = 5'000'000; size_t max_streaming_memtable_size = 5'000'000; - logalloc::region_group* dirty_memory_region_group = nullptr; - logalloc::region_group* streaming_dirty_memory_region_group = nullptr; + ::dirty_memory_manager* dirty_memory_manager = nullptr; + ::dirty_memory_manager* streaming_dirty_memory_manager = nullptr; restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; @@ -268,24 +328,6 @@ private: config _config; stats _stats; - // We would like to serialize the flushing of memtables. While flushing many memtables - // simultaneously can sustain high levels of throughput, the memory is not freed until the - // memtable is totally gone. That means that if we have throttled requests, they will stay - // throttled for a long time. Even when we have virtual dirty, that only provides a rough - // estimate, and we can't release requests that early. - // - // Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind - // would take care of the rest. But that still has issues, so we'll limit parallelism to some - // number (4), that we will hopefully reduce to 1 when write behind works. - // - // When streaming is going on, we'll separate half of that for the streaming code, which - // effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O - // Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have - // to do for the moment. Hopefully we can set both to 1 soon (with write behind) - // - // FIXME: enable write behind and set both to 1. - semaphore _memtables_serializer = { 4 }; - semaphore _streaming_serializer = { 2 }; lw_shared_ptr _memtables; // In older incarnations, we simply commited the mutations to memtables. @@ -765,8 +807,8 @@ public: bool enable_incremental_backups = false; size_t max_memtable_size = 5'000'000; size_t max_streaming_memtable_size = 5'000'000; - logalloc::region_group* dirty_memory_region_group = nullptr; - logalloc::region_group* streaming_dirty_memory_region_group = nullptr; + ::dirty_memory_manager* dirty_memory_manager = nullptr; + ::dirty_memory_manager* streaming_dirty_memory_manager = nullptr; restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; @@ -860,11 +902,8 @@ class database { std::unique_ptr _cfg; size_t _memtable_total_space = 500 << 20; size_t _streaming_memtable_total_space = 500 << 20; - logalloc::region_group_reclaimer _dirty_memory_region_group_reclaimer; - logalloc::region_group_reclaimer _streaming_dirty_memory_region_group_reclaimer; - - logalloc::region_group _dirty_memory_region_group; - logalloc::region_group _streaming_dirty_memory_region_group; + memtable_dirty_memory_manager _dirty_memory_manager; + streaming_dirty_memory_manager _streaming_dirty_memory_manager; semaphore _read_concurrency_sem{max_concurrent_reads()}; restricted_mutation_reader_config _read_concurrency_config; semaphore _system_read_concurrency_sem{max_system_concurrent_reads()}; @@ -1005,7 +1044,7 @@ public: future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func); const logalloc::region_group& dirty_memory_region_group() const { - return _dirty_memory_region_group; + return _dirty_memory_manager.region_group(); } std::unordered_set get_initial_tokens();