diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index b04da30b02..6084881d2b 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -453,7 +453,7 @@ protected: }; setup_new_compaction(descriptor.run_identifier); - cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name()); + cmlog.info0("User initiated compaction started on behalf of {}", *t); // Now that the sstables for major compaction are registered // and the user_initiated_backlog_tracker is set up @@ -533,8 +533,8 @@ compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manage , _holder(_compaction_state.gate.hold()) { _compaction_state.compaction_disabled_counter++; - cmlog.debug("Temporarily disabled compaction for {}.{}. compaction_disabled_counter={}", - _table->schema()->ks_name(), _table->schema()->cf_name(), _compaction_state.compaction_disabled_counter); + cmlog.debug("Temporarily disabled compaction for {}. compaction_disabled_counter={}", + t, _compaction_state.compaction_disabled_counter); } compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenabler&& o) noexcept @@ -547,13 +547,12 @@ compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenab compaction_manager::compaction_reenabler::~compaction_reenabler() { // submit compaction request if we're the last holder of the gate which is still opened. if (_table && --_compaction_state.compaction_disabled_counter == 0 && !_compaction_state.gate.is_closed()) { - cmlog.debug("Reenabling compaction for {}.{}", - _table->schema()->ks_name(), _table->schema()->cf_name()); + cmlog.debug("Reenabling compaction for {}", *_table); try { _cm.submit(*_table); } catch (...) { - cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}", - _table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception()); + cmlog.warn("compaction_reenabler could not reenable compaction for {}: {}", + *_table, std::current_exception()); } } } @@ -606,8 +605,7 @@ compaction::compaction_state::~compaction_state() { std::string compaction_task_executor::describe() const { auto* t = _compacting_table; - auto s = t->schema(); - return fmt::format("{} task {} for table {}.{} [{}]", _description, fmt::ptr(this), s->ks_name(), s->cf_name(), fmt::ptr(t)); + return fmt::format("{} task {} for table {} [{}]", _description, fmt::ptr(this), *t, fmt::ptr(t)); } compaction_task_executor::~compaction_task_executor() { @@ -844,8 +842,7 @@ future<> compaction_manager::postponed_compactions_reevaluation() { if (!_compaction_state.contains(t)) { continue; } - auto s = t->schema(); - cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t)); + cmlog.debug("resubmitting postponed compaction for table {} [{}]", *t, fmt::ptr(t)); submit(*t); co_await coroutine::maybe_yield(); } @@ -894,7 +891,7 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason, table_stat if (cmlog.is_enabled(level)) { std::string scope = ""; if (t) { - scope = fmt::format(" for table {}.{}", t->schema()->ks_name(), t->schema()->cf_name()); + scope = fmt::format(" for table {}", *t); } if (type_opt) { scope += fmt::format(" {} type={}", scope.size() ? "and" : "for", *type_opt); @@ -1037,8 +1034,8 @@ protected: co_return std::nullopt; } if (!_cm.can_register_compaction(t, weight, descriptor.fan_in())) { - cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", - descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); + cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}, postponing it...", + descriptor.sstables.size(), weight, t); switch_state(state::postponed); _cm.postpone_compaction_for_table(&t); co_return std::nullopt; @@ -1048,8 +1045,8 @@ protected: auto release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { compacting.release_compacting(exhausted_sstables); }; - cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}", - fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); + cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}", + fmt::ptr(this), descriptor.sstables.size(), weight, t); setup_new_compaction(descriptor.run_identifier); std::exception_ptr ex; @@ -1109,8 +1106,7 @@ bool compaction_manager::can_perform_regular_compaction(table_state& t) { future<> compaction_manager::maybe_wait_for_sstable_count_reduction(table_state& t) { auto schema = t.schema(); if (!can_perform_regular_compaction(t)) { - cmlog.trace("maybe_wait_for_sstable_count_reduction in {}.{}: cannot perform regular compaction", - schema->ks_name(), schema->cf_name()); + cmlog.trace("maybe_wait_for_sstable_count_reduction in {}: cannot perform regular compaction", t); co_return; } auto num_runs_for_compaction = [&, this] { @@ -1123,8 +1119,8 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(table_state& const auto threshold = size_t(std::max(schema->max_compaction_threshold(), 32)); auto count = num_runs_for_compaction(); if (count <= threshold) { - cmlog.trace("No need to wait for sstable count reduction in {}.{}: {} <= {}", - schema->ks_name(), schema->cf_name(), count, threshold); + cmlog.trace("No need to wait for sstable count reduction in {}: {} <= {}", + t, count, threshold); co_return; } // Reduce the chances of falling into an endless wait, if compaction @@ -1142,8 +1138,8 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(table_state& } auto end = db_clock::now(); auto elapsed_ms = (end - start) / 1ms; - cmlog.warn("Waited {}ms for compaction of {}.{} to catch up on {} sstable runs", - elapsed_ms, schema->ks_name(), schema->cf_name(), count); + cmlog.warn("Waited {}ms for compaction of {} to catch up on {} sstable runs", + elapsed_ms, t, count); } namespace compaction { @@ -1264,12 +1260,16 @@ protected: std::exception_ptr ex; try { table_state& t = *_compacting_table; - auto maintenance_sstables = t.maintenance_sstable_set().all(); - cmlog.info("Starting off-strategy compaction for {}.{}, {} candidates were found", - t.schema()->ks_name(), t.schema()->cf_name(), maintenance_sstables->size()); + auto size = t.maintenance_sstable_set().size(); + if (!size) { + cmlog.debug("Skipping off-strategy compaction for {}, No candidates were found", t); + finish_compaction(); + co_return std::nullopt; + } + cmlog.info("Starting off-strategy compaction for {}, {} candidates were found", t, size); co_await run_offstrategy_compaction(_compaction_data); finish_compaction(); - cmlog.info("Done with off-strategy compaction for {}.{}", t.schema()->ks_name(), t.schema()->cf_name()); + cmlog.info("Done with off-strategy compaction for {}", t); co_return std::nullopt; } catch (...) { ex = std::current_exception(); @@ -1608,8 +1608,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range }); }; if (check_for_cleanup()) { - throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", - t.schema()->ks_name(), t.schema()->cf_name())); + throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}", t)); } if (sorted_owned_ranges->empty()) { @@ -1706,8 +1705,7 @@ compaction::compaction_state::compaction_state(table_state& t) void compaction_manager::add(table_state& t) { auto [_, inserted] = _compaction_state.try_emplace(&t, t); if (!inserted) { - auto s = t.schema(); - on_internal_error(cmlog, format("compaction_state for table {}.{} [{}] already exists", s->ks_name(), s->cf_name(), fmt::ptr(&t))); + on_internal_error(cmlog, format("compaction_state for table {} [{}] already exists", t, fmt::ptr(&t))); } } diff --git a/compaction/table_state.hh b/compaction/table_state.hh index c11bd5a6e0..7f0a52a4fe 100644 --- a/compaction/table_state.hh +++ b/compaction/table_state.hh @@ -50,7 +50,20 @@ public: virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0; virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept = 0; virtual compaction_backlog_tracker& get_backlog_tracker() = 0; + virtual const std::string& get_group_id() const noexcept = 0; }; -} +} // namespace compaction +namespace fmt { + +template <> +struct formatter : formatter { + template + auto format(const compaction::table_state& t, FormatContext& ctx) const { + auto s = t.schema(); + return format_to(ctx.out(), "{}.{} compaction_group={}", s->ks_name(), s->cf_name(), t.get_group_id()); + } +}; + +} // namespace fmt diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 309327019b..fcaec7ca83 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -31,6 +31,7 @@ class compaction_group { table& _t; class table_state; std::unique_ptr _table_state; + std::string _group_id; // Tokens included in this compaction_groups dht::token_range _token_range; compaction::compaction_strategy_state _compaction_strategy_state; @@ -62,7 +63,11 @@ private: future<> delete_sstables_atomically(std::vector sstables_to_remove); public: - compaction_group(table& t, dht::token_range token_range); + compaction_group(table& t, std::string gid, dht::token_range token_range); + + const std::string& get_group_id() const noexcept { + return _group_id; + } // Will stop ongoing compaction on behalf of this group, etc. future<> stop() noexcept; diff --git a/replica/table.cc b/replica/table.cc index 5a44ee54a9..b0aaf59516 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -521,8 +521,10 @@ std::vector> table::make_compaction_groups() { auto&& ranges = dht::split_token_range_msb(_x_log2_compaction_groups); ret.reserve(ranges.size()); tlogger.debug("Created {} compaction groups for {}.{}", ranges.size(), _schema->ks_name(), _schema->cf_name()); + size_t i = 0; for (auto&& range : ranges) { - ret.emplace_back(std::make_unique(*this, std::move(range))); + auto group_id = fmt::format("{}/{}", i++, ranges.size()); + ret.emplace_back(std::make_unique(*this, std::move(group_id), std::move(range))); } return ret; } @@ -1481,9 +1483,10 @@ table::make_memtable_list(compaction_group& cg) { return make_lw_shared(std::move(seal), std::move(get_schema), _config.dirty_memory_manager, _stats, _config.memory_compaction_scheduling_group); } -compaction_group::compaction_group(table& t, dht::token_range token_range) +compaction_group::compaction_group(table& t, std::string group_id, dht::token_range token_range) : _t(t) , _table_state(std::make_unique(t, *this)) + , _group_id(std::move(group_id)) , _token_range(std::move(token_range)) , _compaction_strategy_state(compaction::compaction_strategy_state::make(_t._compaction_strategy)) , _memtables(_t._config.enable_disk_writes ? _t.make_memtable_list(*this) : _t.make_memory_only_memtable_list()) @@ -2798,6 +2801,9 @@ public: compaction_backlog_tracker& get_backlog_tracker() override { return _t._compaction_manager.get_backlog_tracker(*this); } + const std::string& get_group_id() const noexcept override { + return _cg.get_group_id(); + } }; compaction_backlog_tracker& compaction_group::get_backlog_tracker() { diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 8f3a41b870..a88f5b65ad 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -48,6 +48,7 @@ class table_for_tests::table_state : public compaction::table_state { tombstone_gc_state _tombstone_gc_state; mutable compaction_backlog_tracker _backlog_tracker; compaction::compaction_strategy_state _compaction_strategy_state; + std::string _group_id; private: replica::table& table() const noexcept { return *_data.cf; @@ -59,6 +60,7 @@ public: , _tombstone_gc_state(nullptr) , _backlog_tracker(get_compaction_strategy().make_backlog_tracker()) , _compaction_strategy_state(compaction::compaction_strategy_state::make(get_compaction_strategy())) + , _group_id("table_for_tests::table_state") { } const schema_ptr& schema() const noexcept override { @@ -116,6 +118,9 @@ public: compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } + const std::string& get_group_id() const noexcept override { + return _group_id; + } }; table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, schema_ptr s, std::optional datadir)