diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index dd9005fa9e..5c100aea2d 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1019,7 +1019,11 @@ void compaction_manager::enable() { SCYLLA_ASSERT(_state == state::none || _state == state::disabled); _state = state::enabled; _compaction_submission_timer.arm_periodic(periodic_compaction_submission_interval()); - _waiting_reevalution = postponed_compactions_reevaluation(); + if (_waiting_reevaluation) { + on_internal_error(cmlog, "postponed compactions reevaluation is already running when enabling compaction manager"); + } + _waiting_reevaluation.emplace(postponed_compactions_reevaluation()); + cmlog.info("Enabled"); } std::function compaction_manager::compaction_submission_callback() { @@ -1066,6 +1070,16 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept { _postponed_reevaluation.signal(); } +future<> compaction_manager::stop_postponed_compactions() noexcept { + auto waiting_reevaluation = std::exchange(_waiting_reevaluation, std::nullopt); + if (!waiting_reevaluation) { + return make_ready_future(); + } + // Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber + reevaluate_postponed_compactions(); + return std::move(*waiting_reevaluation); +} + void compaction_manager::postpone_compaction_for_table(table_state* t) { _postponed.insert(t); } @@ -1130,8 +1144,7 @@ future<> compaction_manager::drain() { _compaction_submission_timer.cancel(); // Stop ongoing compactions, if the request has not been sent already and wait for them to stop. co_await stop_ongoing_compactions("drain"); - // Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber - reevaluate_postponed_compactions(); + co_await stop_postponed_compactions(); cmlog.info("Drained"); } @@ -1156,8 +1169,7 @@ future<> compaction_manager::really_do_stop() noexcept { if (!_tasks.empty()) { on_fatal_internal_error(cmlog, format("{} tasks still exist after being stopped", _tasks.size())); } - reevaluate_postponed_compactions(); - co_await std::move(_waiting_reevalution); + co_await stop_postponed_compactions(); _weight_tracker.clear(); _compaction_submission_timer.cancel(); co_await _compaction_controller.shutdown(); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index f7c0894207..848215a022 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -119,7 +119,7 @@ private: // a sstable from being compacted twice. std::unordered_set _compacting_sstables; - future<> _waiting_reevalution = make_ready_future<>(); + std::optional> _waiting_reevaluation; condition_variable _postponed_reevaluation; // tables that wait for compaction but had its submission postponed due to ongoing compaction. std::unordered_set _postponed; @@ -221,6 +221,7 @@ private: future<> postponed_compactions_reevaluation(); void reevaluate_postponed_compactions() noexcept; + future<> stop_postponed_compactions() noexcept; // Postpone compaction for a table that couldn't be executed due to ongoing // similar-sized compaction. void postpone_compaction_for_table(compaction::table_state* t);