Merge 'Tone down offstrategy log message' from Benny Halevy
In many cases we trigger offstrategy compaction opportunistically also when there's nothing to do. In this case we still print to the log lots of info-level message and call `run_offstrategy_compaction` that wastes more cpu cycles on learning that it has nothing to do. This change bails out early if the maintenance set is empty and prints a "Skipping off-strategy compaction" message in debug level instead. Fixes #13466 Also, add an group_id class and return it from compaction_group and table_state. Use that to identify the compaction_group / table_state by "ks_name.cf_name compaction_group=idx/total" in log messages. Fixes #13467 Closes #13520 * github.com:scylladb/scylladb: compaction_manager: print compaction_group id compaction_group, table_state: add group_id member compaction_manager: offstrategy compaction: skip compaction if no candidates are found
This commit is contained in:
@@ -453,7 +453,7 @@ protected:
|
||||
};
|
||||
setup_new_compaction(descriptor.run_identifier);
|
||||
|
||||
cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name());
|
||||
cmlog.info0("User initiated compaction started on behalf of {}", *t);
|
||||
|
||||
// Now that the sstables for major compaction are registered
|
||||
// and the user_initiated_backlog_tracker is set up
|
||||
@@ -533,8 +533,8 @@ compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manage
|
||||
, _holder(_compaction_state.gate.hold())
|
||||
{
|
||||
_compaction_state.compaction_disabled_counter++;
|
||||
cmlog.debug("Temporarily disabled compaction for {}.{}. compaction_disabled_counter={}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(), _compaction_state.compaction_disabled_counter);
|
||||
cmlog.debug("Temporarily disabled compaction for {}. compaction_disabled_counter={}",
|
||||
t, _compaction_state.compaction_disabled_counter);
|
||||
}
|
||||
|
||||
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenabler&& o) noexcept
|
||||
@@ -547,13 +547,12 @@ compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenab
|
||||
compaction_manager::compaction_reenabler::~compaction_reenabler() {
|
||||
// submit compaction request if we're the last holder of the gate which is still opened.
|
||||
if (_table && --_compaction_state.compaction_disabled_counter == 0 && !_compaction_state.gate.is_closed()) {
|
||||
cmlog.debug("Reenabling compaction for {}.{}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name());
|
||||
cmlog.debug("Reenabling compaction for {}", *_table);
|
||||
try {
|
||||
_cm.submit(*_table);
|
||||
} catch (...) {
|
||||
cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}",
|
||||
_table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception());
|
||||
cmlog.warn("compaction_reenabler could not reenable compaction for {}: {}",
|
||||
*_table, std::current_exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -606,8 +605,7 @@ compaction::compaction_state::~compaction_state() {
|
||||
|
||||
std::string compaction_task_executor::describe() const {
|
||||
auto* t = _compacting_table;
|
||||
auto s = t->schema();
|
||||
return fmt::format("{} task {} for table {}.{} [{}]", _description, fmt::ptr(this), s->ks_name(), s->cf_name(), fmt::ptr(t));
|
||||
return fmt::format("{} task {} for table {} [{}]", _description, fmt::ptr(this), *t, fmt::ptr(t));
|
||||
}
|
||||
|
||||
compaction_task_executor::~compaction_task_executor() {
|
||||
@@ -844,8 +842,7 @@ future<> compaction_manager::postponed_compactions_reevaluation() {
|
||||
if (!_compaction_state.contains(t)) {
|
||||
continue;
|
||||
}
|
||||
auto s = t->schema();
|
||||
cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t));
|
||||
cmlog.debug("resubmitting postponed compaction for table {} [{}]", *t, fmt::ptr(t));
|
||||
submit(*t);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
@@ -894,7 +891,7 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason, table_stat
|
||||
if (cmlog.is_enabled(level)) {
|
||||
std::string scope = "";
|
||||
if (t) {
|
||||
scope = fmt::format(" for table {}.{}", t->schema()->ks_name(), t->schema()->cf_name());
|
||||
scope = fmt::format(" for table {}", *t);
|
||||
}
|
||||
if (type_opt) {
|
||||
scope += fmt::format(" {} type={}", scope.size() ? "and" : "for", *type_opt);
|
||||
@@ -1037,8 +1034,8 @@ protected:
|
||||
co_return std::nullopt;
|
||||
}
|
||||
if (!_cm.can_register_compaction(t, weight, descriptor.fan_in())) {
|
||||
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
|
||||
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
|
||||
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}, postponing it...",
|
||||
descriptor.sstables.size(), weight, t);
|
||||
switch_state(state::postponed);
|
||||
_cm.postpone_compaction_for_table(&t);
|
||||
co_return std::nullopt;
|
||||
@@ -1048,8 +1045,8 @@ protected:
|
||||
auto release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
||||
compacting.release_compacting(exhausted_sstables);
|
||||
};
|
||||
cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}",
|
||||
fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
|
||||
cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}",
|
||||
fmt::ptr(this), descriptor.sstables.size(), weight, t);
|
||||
|
||||
setup_new_compaction(descriptor.run_identifier);
|
||||
std::exception_ptr ex;
|
||||
@@ -1109,8 +1106,7 @@ bool compaction_manager::can_perform_regular_compaction(table_state& t) {
|
||||
future<> compaction_manager::maybe_wait_for_sstable_count_reduction(table_state& t) {
|
||||
auto schema = t.schema();
|
||||
if (!can_perform_regular_compaction(t)) {
|
||||
cmlog.trace("maybe_wait_for_sstable_count_reduction in {}.{}: cannot perform regular compaction",
|
||||
schema->ks_name(), schema->cf_name());
|
||||
cmlog.trace("maybe_wait_for_sstable_count_reduction in {}: cannot perform regular compaction", t);
|
||||
co_return;
|
||||
}
|
||||
auto num_runs_for_compaction = [&, this] {
|
||||
@@ -1123,8 +1119,8 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(table_state&
|
||||
const auto threshold = size_t(std::max(schema->max_compaction_threshold(), 32));
|
||||
auto count = num_runs_for_compaction();
|
||||
if (count <= threshold) {
|
||||
cmlog.trace("No need to wait for sstable count reduction in {}.{}: {} <= {}",
|
||||
schema->ks_name(), schema->cf_name(), count, threshold);
|
||||
cmlog.trace("No need to wait for sstable count reduction in {}: {} <= {}",
|
||||
t, count, threshold);
|
||||
co_return;
|
||||
}
|
||||
// Reduce the chances of falling into an endless wait, if compaction
|
||||
@@ -1142,8 +1138,8 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(table_state&
|
||||
}
|
||||
auto end = db_clock::now();
|
||||
auto elapsed_ms = (end - start) / 1ms;
|
||||
cmlog.warn("Waited {}ms for compaction of {}.{} to catch up on {} sstable runs",
|
||||
elapsed_ms, schema->ks_name(), schema->cf_name(), count);
|
||||
cmlog.warn("Waited {}ms for compaction of {} to catch up on {} sstable runs",
|
||||
elapsed_ms, t, count);
|
||||
}
|
||||
|
||||
namespace compaction {
|
||||
@@ -1264,12 +1260,16 @@ protected:
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
table_state& t = *_compacting_table;
|
||||
auto maintenance_sstables = t.maintenance_sstable_set().all();
|
||||
cmlog.info("Starting off-strategy compaction for {}.{}, {} candidates were found",
|
||||
t.schema()->ks_name(), t.schema()->cf_name(), maintenance_sstables->size());
|
||||
auto size = t.maintenance_sstable_set().size();
|
||||
if (!size) {
|
||||
cmlog.debug("Skipping off-strategy compaction for {}, No candidates were found", t);
|
||||
finish_compaction();
|
||||
co_return std::nullopt;
|
||||
}
|
||||
cmlog.info("Starting off-strategy compaction for {}, {} candidates were found", t, size);
|
||||
co_await run_offstrategy_compaction(_compaction_data);
|
||||
finish_compaction();
|
||||
cmlog.info("Done with off-strategy compaction for {}.{}", t.schema()->ks_name(), t.schema()->cf_name());
|
||||
cmlog.info("Done with off-strategy compaction for {}", t);
|
||||
co_return std::nullopt;
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
@@ -1608,8 +1608,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range
|
||||
});
|
||||
};
|
||||
if (check_for_cleanup()) {
|
||||
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
|
||||
t.schema()->ks_name(), t.schema()->cf_name()));
|
||||
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}", t));
|
||||
}
|
||||
|
||||
if (sorted_owned_ranges->empty()) {
|
||||
@@ -1706,8 +1705,7 @@ compaction::compaction_state::compaction_state(table_state& t)
|
||||
void compaction_manager::add(table_state& t) {
|
||||
auto [_, inserted] = _compaction_state.try_emplace(&t, t);
|
||||
if (!inserted) {
|
||||
auto s = t.schema();
|
||||
on_internal_error(cmlog, format("compaction_state for table {}.{} [{}] already exists", s->ks_name(), s->cf_name(), fmt::ptr(&t)));
|
||||
on_internal_error(cmlog, format("compaction_state for table {} [{}] already exists", t, fmt::ptr(&t)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,7 +50,20 @@ public:
|
||||
virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0;
|
||||
virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept = 0;
|
||||
virtual compaction_backlog_tracker& get_backlog_tracker() = 0;
|
||||
virtual const std::string& get_group_id() const noexcept = 0;
|
||||
};
|
||||
|
||||
}
|
||||
} // namespace compaction
|
||||
|
||||
namespace fmt {
|
||||
|
||||
template <>
|
||||
struct formatter<compaction::table_state> : formatter<std::string_view> {
|
||||
template <typename FormatContext>
|
||||
auto format(const compaction::table_state& t, FormatContext& ctx) const {
|
||||
auto s = t.schema();
|
||||
return format_to(ctx.out(), "{}.{} compaction_group={}", s->ks_name(), s->cf_name(), t.get_group_id());
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace fmt
|
||||
|
||||
@@ -31,6 +31,7 @@ class compaction_group {
|
||||
table& _t;
|
||||
class table_state;
|
||||
std::unique_ptr<table_state> _table_state;
|
||||
std::string _group_id;
|
||||
// Tokens included in this compaction_groups
|
||||
dht::token_range _token_range;
|
||||
compaction::compaction_strategy_state _compaction_strategy_state;
|
||||
@@ -62,7 +63,11 @@ private:
|
||||
|
||||
future<> delete_sstables_atomically(std::vector<sstables::shared_sstable> sstables_to_remove);
|
||||
public:
|
||||
compaction_group(table& t, dht::token_range token_range);
|
||||
compaction_group(table& t, std::string gid, dht::token_range token_range);
|
||||
|
||||
const std::string& get_group_id() const noexcept {
|
||||
return _group_id;
|
||||
}
|
||||
|
||||
// Will stop ongoing compaction on behalf of this group, etc.
|
||||
future<> stop() noexcept;
|
||||
|
||||
@@ -521,8 +521,10 @@ std::vector<std::unique_ptr<compaction_group>> table::make_compaction_groups() {
|
||||
auto&& ranges = dht::split_token_range_msb(_x_log2_compaction_groups);
|
||||
ret.reserve(ranges.size());
|
||||
tlogger.debug("Created {} compaction groups for {}.{}", ranges.size(), _schema->ks_name(), _schema->cf_name());
|
||||
size_t i = 0;
|
||||
for (auto&& range : ranges) {
|
||||
ret.emplace_back(std::make_unique<compaction_group>(*this, std::move(range)));
|
||||
auto group_id = fmt::format("{}/{}", i++, ranges.size());
|
||||
ret.emplace_back(std::make_unique<compaction_group>(*this, std::move(group_id), std::move(range)));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@@ -1481,9 +1483,10 @@ table::make_memtable_list(compaction_group& cg) {
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.dirty_memory_manager, _stats, _config.memory_compaction_scheduling_group);
|
||||
}
|
||||
|
||||
compaction_group::compaction_group(table& t, dht::token_range token_range)
|
||||
compaction_group::compaction_group(table& t, std::string group_id, dht::token_range token_range)
|
||||
: _t(t)
|
||||
, _table_state(std::make_unique<table_state>(t, *this))
|
||||
, _group_id(std::move(group_id))
|
||||
, _token_range(std::move(token_range))
|
||||
, _compaction_strategy_state(compaction::compaction_strategy_state::make(_t._compaction_strategy))
|
||||
, _memtables(_t._config.enable_disk_writes ? _t.make_memtable_list(*this) : _t.make_memory_only_memtable_list())
|
||||
@@ -2798,6 +2801,9 @@ public:
|
||||
compaction_backlog_tracker& get_backlog_tracker() override {
|
||||
return _t._compaction_manager.get_backlog_tracker(*this);
|
||||
}
|
||||
const std::string& get_group_id() const noexcept override {
|
||||
return _cg.get_group_id();
|
||||
}
|
||||
};
|
||||
|
||||
compaction_backlog_tracker& compaction_group::get_backlog_tracker() {
|
||||
|
||||
@@ -48,6 +48,7 @@ class table_for_tests::table_state : public compaction::table_state {
|
||||
tombstone_gc_state _tombstone_gc_state;
|
||||
mutable compaction_backlog_tracker _backlog_tracker;
|
||||
compaction::compaction_strategy_state _compaction_strategy_state;
|
||||
std::string _group_id;
|
||||
private:
|
||||
replica::table& table() const noexcept {
|
||||
return *_data.cf;
|
||||
@@ -59,6 +60,7 @@ public:
|
||||
, _tombstone_gc_state(nullptr)
|
||||
, _backlog_tracker(get_compaction_strategy().make_backlog_tracker())
|
||||
, _compaction_strategy_state(compaction::compaction_strategy_state::make(get_compaction_strategy()))
|
||||
, _group_id("table_for_tests::table_state")
|
||||
{
|
||||
}
|
||||
const schema_ptr& schema() const noexcept override {
|
||||
@@ -116,6 +118,9 @@ public:
|
||||
compaction_backlog_tracker& get_backlog_tracker() override {
|
||||
return _backlog_tracker;
|
||||
}
|
||||
const std::string& get_group_id() const noexcept override {
|
||||
return _group_id;
|
||||
}
|
||||
};
|
||||
|
||||
table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, schema_ptr s, std::optional<sstring> datadir)
|
||||
|
||||
Reference in New Issue
Block a user