database: move memtable throttler to the LSA throttler
The LSA infrastructure, through the use of its region groups, now have a throttler mechanism built-in. This patch converts the current throttlers so that the LSA throttler is used instead. Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
83
database.cc
83
database.cc
@@ -1329,14 +1329,12 @@ database::database(const db::config& cfg)
|
||||
return memtable_total_space;
|
||||
}())
|
||||
, _streaming_memtable_total_space(_memtable_total_space / 4)
|
||||
, _streaming_dirty_memory_region_group(&_dirty_memory_region_group)
|
||||
, _dirty_memory_region_group_reclaimer(_memtable_total_space)
|
||||
, _streaming_dirty_memory_region_group_reclaimer(_streaming_memtable_total_space)
|
||||
, _dirty_memory_region_group(_dirty_memory_region_group_reclaimer)
|
||||
, _streaming_dirty_memory_region_group(&_dirty_memory_region_group, _streaming_dirty_memory_region_group_reclaimer)
|
||||
, _version(empty_version)
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _memtables_throttler(_memtable_total_space, _dirty_memory_region_group)
|
||||
, _streaming_throttler(_streaming_memtable_total_space,
|
||||
_streaming_dirty_memory_region_group,
|
||||
&_memtables_throttler
|
||||
)
|
||||
{
|
||||
_compaction_manager.start();
|
||||
setup_collectd();
|
||||
@@ -2109,14 +2107,15 @@ column_family::check_valid_rp(const db::replay_position& rp) const {
|
||||
}
|
||||
}
|
||||
|
||||
future<> database::apply_in_memory(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) {
|
||||
try {
|
||||
auto& cf = find_column_family(m.column_family_id());
|
||||
cf.apply(m, m_schema, rp);
|
||||
} catch (no_such_column_family&) {
|
||||
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) {
|
||||
return _dirty_memory_region_group.run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] {
|
||||
try {
|
||||
auto& cf = find_column_family(m.column_family_id());
|
||||
cf.apply(m, m_schema, rp);
|
||||
} catch (no_such_column_family&) {
|
||||
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::do_apply(schema_ptr s, const frozen_mutation& m) {
|
||||
@@ -2148,43 +2147,11 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m) {
|
||||
return apply_in_memory(m, s, db::replay_position());
|
||||
}
|
||||
|
||||
future<> throttle_state::throttle() {
|
||||
if (!should_throttle() && _throttled_requests.empty()) {
|
||||
// All is well, go ahead
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// We must throttle, wait a bit
|
||||
if (_throttled_requests.empty()) {
|
||||
_throttling_timer.arm_periodic(10ms);
|
||||
}
|
||||
_throttled_requests.emplace_back();
|
||||
return _throttled_requests.back().get_future();
|
||||
}
|
||||
|
||||
void throttle_state::unthrottle() {
|
||||
// Release one request per free 1MB we have
|
||||
// FIXME: improve this
|
||||
if (should_throttle()) {
|
||||
return;
|
||||
}
|
||||
size_t avail = std::max((_max_space - _region_group.memory_used()) >> 20, size_t(1));
|
||||
avail = std::min(_throttled_requests.size(), avail);
|
||||
for (size_t i = 0; i < avail; ++i) {
|
||||
_throttled_requests.front().set_value();
|
||||
_throttled_requests.pop_front();
|
||||
}
|
||||
if (_throttled_requests.empty()) {
|
||||
_throttling_timer.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
future<> database::apply(schema_ptr s, const frozen_mutation& m) {
|
||||
if (dblog.is_enabled(logging::log_level::trace)) {
|
||||
dblog.trace("apply {}", m.pretty_printer(s));
|
||||
}
|
||||
return _memtables_throttler.throttle().then([this, &m, s = std::move(s)] {
|
||||
return do_apply(std::move(s), m);
|
||||
}).then([this, s = _stats] {
|
||||
return do_apply(std::move(s), m).then([this, s = _stats] {
|
||||
++s->total_writes;
|
||||
});
|
||||
}
|
||||
@@ -2194,23 +2161,7 @@ future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation&
|
||||
throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s",
|
||||
s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
|
||||
// TODO (maybe): This will use the same memory region group as memtables, so when
|
||||
// one of them throttles, both will.
|
||||
//
|
||||
// It would be possible to provide further QoS for CQL originated memtables
|
||||
// by keeping the streaming memtables into a different region group, with its own
|
||||
// separate limit.
|
||||
//
|
||||
// Because, however, there are many other limits in play that may kick in,
|
||||
// I am not convinced that this will ever be a problem.
|
||||
//
|
||||
// If we do find ourselves in the situation that we are throttling incoming
|
||||
// writes due to high level of streaming writes, and we are sure that this
|
||||
// is the best solution, we can just change the memtable creation method so
|
||||
// that each kind of memtable creates from a different region group - and then
|
||||
// update the throttle conditions accordingly.
|
||||
return _streaming_throttler.throttle().then([this, &m, s = std::move(s)] {
|
||||
return _streaming_dirty_memory_region_group.run_when_memory_available([this, &m, s = std::move(s)] {
|
||||
auto uuid = m.column_family_id();
|
||||
auto& cf = find_column_family(uuid);
|
||||
cf.apply_streaming_mutation(s, std::move(m));
|
||||
@@ -2316,6 +2267,10 @@ database::stop() {
|
||||
return parallel_for_each(_column_families, [this] (auto& val_pair) {
|
||||
return val_pair.second->stop();
|
||||
});
|
||||
}).then([this] {
|
||||
return _dirty_memory_region_group.shutdown();
|
||||
}).then([this] {
|
||||
return _streaming_dirty_memory_region_group.shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user