Merge 'Memtable flush: wait for sstable count reduction if needed' from Benny Halevy

Called from try_flush_memtable_to_sstable,
maybe_wait_for_sstable_count_reduction will wait for
compaction to catch up with memtable flush if there
the bucket to compact is inflated, having too many
sstables.  In that case we don't want to add fuel
to the fire by creating yet another sstable.

Fixes #4116

Closes #10954

* github.com:scylladb/scylla:
  table: Add test where compaction doesn't keep up with flush rate.
  compaction_manager: add maybe_wait_for_sstable_count_reduction
  time_window_compaction_strategy: get_sstables_for_compaction: clean up code
  time_window_compaction_strategy: make get_sstables_for_compaction idempotent
  time_window_compaction_strategy: get_sstables_for_compaction: improve debug messages
  leveled_manifest: pass compaction_counter as const&
This commit is contained in:
Avi Kivity
2022-07-28 19:11:04 +03:00
8 changed files with 172 additions and 24 deletions

View File

@@ -513,6 +513,10 @@ inline compaction_controller make_compaction_controller(compaction_manager::sche
return compaction_controller(csg, static_shares, 250ms, std::move(fn));
}
compaction_manager::compaction_state::~compaction_state() {
compaction_done.broken();
}
std::string compaction_manager::task::describe() const {
auto* t = _compacting_table;
auto s = t->schema();
@@ -611,6 +615,7 @@ void compaction_manager::task::finish_compaction(state finish_state) noexcept {
if (finish_state != state::failed) {
_compaction_retry.reset();
}
_compaction_state.compaction_done.signal();
}
void compaction_manager::task::stop(sstring reason) noexcept {
@@ -981,6 +986,50 @@ void compaction_manager::submit(compaction::table_state& t) {
(void)perform_task(make_shared<regular_compaction_task>(*this, t));
}
bool compaction_manager::can_perform_regular_compaction(compaction::table_state& t) {
return can_proceed(&t) && !t.is_auto_compaction_disabled_by_user();
}
future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction::table_state& t) {
auto schema = t.schema();
if (!can_perform_regular_compaction(t)) {
cmlog.trace("maybe_wait_for_sstable_count_reduction in {}.{}: cannot perform regular compaction",
schema->ks_name(), schema->cf_name());
co_return;
}
auto num_runs_for_compaction = [&, this] {
auto& cs = t.get_compaction_strategy();
auto desc = cs.get_sstables_for_compaction(t, get_strategy_control(), get_candidates(t));
return boost::copy_range<std::unordered_set<utils::UUID>>(
desc.sstables
| boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))).size();
};
const auto threshold = std::max(schema->max_compaction_threshold(), 32);
auto count = num_runs_for_compaction();
if (count <= threshold) {
cmlog.trace("No need to wait for sstable count reduction in {}.{}: {} <= {}",
schema->ks_name(), schema->cf_name(), count, threshold);
co_return;
}
// Reduce the chances of falling into an endless wait, if compaction
// wasn't scheduled for the table due to a problem.
submit(t);
using namespace std::chrono_literals;
auto start = db_clock::now();
auto& cstate = get_compaction_state(&t);
try {
co_await cstate.compaction_done.wait([this, &num_runs_for_compaction, threshold, &t] {
return num_runs_for_compaction() <= threshold || !can_perform_regular_compaction(t);
});
} catch (const broken_condition_variable&) {
co_return;
}
auto end = db_clock::now();
auto elapsed_ms = (end - start) / 1ms;
cmlog.warn("Waited {}ms for compaction of {}.{} to catch up on {} sstable runs",
elapsed_ms, schema->ks_name(), schema->cf_name(), count);
}
class compaction_manager::offstrategy_compaction_task : public compaction_manager::task {
bool _performed = false;
public:

View File

@@ -66,6 +66,13 @@ private:
// Raised by any function running under run_with_compaction_disabled();
long compaction_disabled_counter = 0;
// Signaled whenever a compaction task completes.
condition_variable compaction_done;
compaction_state() = default;
compaction_state(compaction_state&&) = default;
~compaction_state();
bool compaction_disabled() const noexcept {
return compaction_disabled_counter > 0;
}
@@ -379,6 +386,13 @@ public:
// Submit a table to be compacted.
void submit(compaction::table_state& t);
// Can regular compaction be performed in the given table
bool can_perform_regular_compaction(compaction::table_state& t);
// Maybe wait before adding more sstables
// if there are too many sstables.
future<> maybe_wait_for_sstable_count_reduction(compaction::table_state& t);
// Submit a table to be off-strategy compacted.
// Returns true iff off-strategy compaction was required and performed.
future<bool> perform_offstrategy(compaction::table_state& t);

View File

@@ -141,7 +141,7 @@ public:
sstables::compaction_descriptor get_descriptor_for_level(int level, const std::vector<std::optional<dht::decorated_key>>& last_compacted_keys,
std::vector<int>& compaction_counter) {
const std::vector<int>& compaction_counter) {
auto info = get_candidates_for(level, last_compacted_keys);
if (!info.candidates.empty()) {
int next_level = get_next_level(info.candidates, info.can_promote);
@@ -162,7 +162,7 @@ public:
* If no compactions are necessary, will return null
*/
sstables::compaction_descriptor get_compaction_candidates(const std::vector<std::optional<dht::decorated_key>>& last_compacted_keys,
std::vector<int>& compaction_counter) {
const std::vector<int>& compaction_counter) {
// LevelDB gives each level a score of how much data it contains vs its ideal amount, and
// compacts the level with the highest score. But this falls apart spectacularly once you
// get behind. Consider this set of levels:

View File

@@ -221,23 +221,26 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
return compaction_descriptor();
}
// Find fully expired SSTables. Those will be included no matter what.
std::unordered_set<shared_sstable> expired;
auto now = db_clock::now();
if (now - _last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("[{}] TWCS expired check sufficiently far in the past, checking for fully expired SSTables", fmt::ptr(this));
if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
expired = table_s.fully_expired_sstables(candidates, compaction_time);
_last_expired_check = db_clock::now();
// Find fully expired SSTables. Those will be included no matter what.
auto expired = table_s.fully_expired_sstables(candidates, compaction_time);
if (!expired.empty()) {
clogger.debug("[{}] Going to compact {} expired sstables", fmt::ptr(this), expired.size());
return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()), service::get_local_compaction_priority());
}
// Keep checking for fully_expired_sstables until we don't find
// any among the candidates, meaning they are either already compacted
// or registered for compaction.
_last_expired_check = now;
} else {
clogger.debug("TWCS skipping check for fully expired SSTables");
}
if (!expired.empty()) {
clogger.debug("Going to compact {} expired sstables", expired.size());
return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()), service::get_local_compaction_priority());
clogger.debug("[{}] TWCS skipping check for fully expired SSTables", fmt::ptr(this));
}
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time);
clogger.debug("[{}] Going to compact {} non-expired sstables", fmt::ptr(this), compaction_candidates.size());
return compaction_descriptor(std::move(compaction_candidates), service::get_local_compaction_priority());
}

View File

@@ -761,6 +761,9 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
_memtables->erase(old);
co_return;
}
if (!_async_gate.is_closed()) {
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(as_table_state());
}
} catch (...) {
err = std::current_exception();
}

View File

@@ -32,6 +32,7 @@
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/simple_schema.hh"
#include "utils/error_injection.hh"
#include "db/commitlog/commitlog.hh"
#include "test/lib/make_random_string.hh"
static api::timestamp_type next_timestamp() {
@@ -1005,3 +1006,81 @@ SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
});
#endif
}
SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
BOOST_ASSERT(smp::count == 2);
// The test simulates a situation where 2 threads issue flushes to 2
// tables. Both issue small flushes, but one has injected reactor stalls.
// This can lead to a situation where lots of small sstables accumulate on
// disk, and, if compaction never has a chance to keep up, resources can be
// exhausted.
return do_with_cql_env([](cql_test_env& env) -> future<> {
struct flusher {
cql_test_env& env;
const int num_flushes;
const int sleep_ms;
static sstring cf_name(unsigned thread_id) {
return format("cf_{}", thread_id);
}
static sstring ks_name() {
return "ks";
}
future<> create_table(schema_ptr s) {
return env.migration_manager().invoke_on(0, [s = global_schema_ptr(std::move(s))] (service::migration_manager& mm) -> future<> {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto announcement = co_await mm.prepare_new_column_family_announcement(s, ts);
co_await mm.announce(std::move(announcement), std::move(group0_guard));
});
}
future<> drop_table() {
return env.migration_manager().invoke_on(0, [shard = this_shard_id()] (service::migration_manager& mm) -> future<> {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto announcement = co_await mm.prepare_column_family_drop_announcement(ks_name(), cf_name(shard), ts);
co_await mm.announce(std::move(announcement), std::move(group0_guard));
});
}
future<> operator()() {
const sstring ks_name = this->ks_name();
const sstring cf_name = this->cf_name(this_shard_id());
random_mutation_generator gen{
random_mutation_generator::generate_counters::no,
local_shard_only::yes,
random_mutation_generator::generate_uncompactable::no,
std::nullopt,
ks_name.c_str(),
cf_name.c_str()
};
schema_ptr s = gen.schema();
co_await create_table(s);
replica::database& db = env.local_db();
replica::table& t = db.find_column_family(ks_name, cf_name);
for (int value : boost::irange<int>(0, num_flushes)) {
::usleep(sleep_ms * 1000);
co_await db.apply(t.schema(), freeze(gen()), tracing::trace_state_ptr(), db::commitlog::force_sync::yes, db::no_timeout);
co_await t.flush();
BOOST_ASSERT(t.sstables_count() < t.schema()->max_compaction_threshold() * 2);
}
co_await drop_table();
}
};
int sleep_ms = 2;
for (int i : boost::irange<int>(8)) {
future<> f0 = smp::submit_to(0, flusher{.env=env, .num_flushes=100, .sleep_ms=0});
future<> f1 = smp::submit_to(1, flusher{.env=env, .num_flushes=3, .sleep_ms=sleep_ms});
co_await std::move(f0);
co_await std::move(f1);
sleep_ms *= 2;
}
});
}

View File

@@ -1963,8 +1963,8 @@ private:
return gc_clock::time_point() + std::chrono::seconds(dist(gen));
}
schema_ptr do_make_schema(data_type type) {
auto builder = schema_builder("ks", "cf")
schema_ptr do_make_schema(data_type type, const char* ks_name, const char* cf_name) {
auto builder = schema_builder(ks_name, cf_name)
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("ck1", bytes_type, column_kind::clustering_key)
.with_column("ck2", bytes_type, column_kind::clustering_key);
@@ -1981,9 +1981,9 @@ private:
return builder.build();
}
schema_ptr make_schema() {
return _generate_counters ? do_make_schema(counter_type)
: do_make_schema(bytes_type);
schema_ptr make_schema(const char* ks_name, const char* cf_name) {
return _generate_counters ? do_make_schema(counter_type, ks_name, cf_name)
: do_make_schema(bytes_type, ks_name, cf_name);
}
api::timestamp_type gen_timestamp(timestamp_level l) {
@@ -2001,13 +2001,13 @@ private:
}
public:
explicit impl(generate_counters counters, local_shard_only lso = local_shard_only::yes,
generate_uncompactable uc = generate_uncompactable::no, std::optional<uint32_t> seed_opt = std::nullopt) : _generate_counters(counters), _local_shard_only(lso), _uncompactable(uc) {
generate_uncompactable uc = generate_uncompactable::no, std::optional<uint32_t> seed_opt = std::nullopt, const char* ks_name="ks", const char* cf_name="cf") : _generate_counters(counters), _local_shard_only(lso), _uncompactable(uc) {
// In case of errors, reproduce using the --random-seed command line option with the test_runner seed.
auto seed = seed_opt.value_or(tests::random::get_int<uint32_t>());
std::cout << "random_mutation_generator seed: " << seed << "\n";
_gen = std::mt19937(seed);
_schema = make_schema();
_schema = make_schema(ks_name, cf_name);
auto keys = _local_shard_only ? make_local_keys(n_blobs, _schema, _external_blob_size) : make_keys(n_blobs, _schema, _external_blob_size);
_blobs = boost::copy_range<std::vector<bytes>>(keys | boost::adaptors::transformed([this] (sstring& k) { return to_bytes(k); }));
@@ -2300,8 +2300,8 @@ public:
random_mutation_generator::~random_mutation_generator() {}
random_mutation_generator::random_mutation_generator(generate_counters counters, local_shard_only lso, generate_uncompactable uc, std::optional<uint32_t> seed_opt)
: _impl(std::make_unique<random_mutation_generator::impl>(counters, lso, uc, seed_opt))
random_mutation_generator::random_mutation_generator(generate_counters counters, local_shard_only lso, generate_uncompactable uc, std::optional<uint32_t> seed_opt, const char* ks_name, const char* cf_name)
: _impl(std::make_unique<random_mutation_generator::impl>(counters, lso, uc, seed_opt, ks_name, cf_name))
{ }
mutation random_mutation_generator::operator()() {

View File

@@ -52,7 +52,7 @@ public:
// tombstone will cover data, i.e. compacting the mutation will not result
// in any changes.
explicit random_mutation_generator(generate_counters, local_shard_only lso = local_shard_only::yes,
generate_uncompactable uc = generate_uncompactable::no, std::optional<uint32_t> seed_opt = std::nullopt);
generate_uncompactable uc = generate_uncompactable::no, std::optional<uint32_t> seed_opt = std::nullopt, const char* ks_name="ks", const char* cf_name="cf");
random_mutation_generator(generate_counters gc, uint32_t seed)
: random_mutation_generator(gc, local_shard_only::yes, generate_uncompactable::no, seed) {}
~random_mutation_generator();