compaction_manager: fix use-after-free in postponed_compactions_reevaluation()
Some checks failed
Check if commits are promoted / check-commit (push) Has been cancelled
Backport with Jira Integration / backport-on-push (push) Has been cancelled
Backport with Jira Integration / backport-on-label (push) Has been cancelled
Backport with Jira Integration / backport-chain (push) Has been cancelled
Some checks failed
Check if commits are promoted / check-commit (push) Has been cancelled
Backport with Jira Integration / backport-on-push (push) Has been cancelled
Backport with Jira Integration / backport-on-label (push) Has been cancelled
Backport with Jira Integration / backport-chain (push) Has been cancelled
drain() signals the postponed_reevaluation condition variable to terminate
the postponed_compactions_reevaluation() coroutine but does not await its
completion. When enable() is called afterwards, it overwrites
_waiting_reevalution with a new coroutine, orphaning the old one. During
shutdown, really_do_stop() only awaits the latest coroutine via
_waiting_reevalution, leaving the orphaned coroutine still alive. After
sharded::stop() destroys the compaction_manager, the orphaned coroutine
resumes and reads freed memory (is_disabled() accesses _state).
Fix by introducing stop_postponed_compactions(), awaiting the reevaluation coroutine in
both drain() and stop() after signaling it, if postponed_compactions_reevaluation() is running.
It uses an std::optional<future<>> for _waiting_reevalution and std::exchange to leave
_waiting_reevalution disengaged when postponed_compactions_reevaluation() is not running.
This prevents a race between drain() and stop().
While at it, fix typo in _waiting_reevalution -> _waiting_reevaluation.
Fixes: SCYLLADB-1600
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Closes scylladb/scylladb#29443
(cherry picked from commit 05a00fe140)
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Closes scylladb/scylladb#29527
Closes scylladb/scylladb#29629
Closes scylladb/scylladb#29639
This commit is contained in:
@@ -1019,7 +1019,11 @@ void compaction_manager::enable() {
|
||||
SCYLLA_ASSERT(_state == state::none || _state == state::disabled);
|
||||
_state = state::enabled;
|
||||
_compaction_submission_timer.arm_periodic(periodic_compaction_submission_interval());
|
||||
_waiting_reevalution = postponed_compactions_reevaluation();
|
||||
if (_waiting_reevaluation) {
|
||||
on_internal_error(cmlog, "postponed compactions reevaluation is already running when enabling compaction manager");
|
||||
}
|
||||
_waiting_reevaluation.emplace(postponed_compactions_reevaluation());
|
||||
cmlog.info("Enabled");
|
||||
}
|
||||
|
||||
std::function<void()> compaction_manager::compaction_submission_callback() {
|
||||
@@ -1066,6 +1070,16 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept {
|
||||
_postponed_reevaluation.signal();
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_postponed_compactions() noexcept {
|
||||
auto waiting_reevaluation = std::exchange(_waiting_reevaluation, std::nullopt);
|
||||
if (!waiting_reevaluation) {
|
||||
return make_ready_future();
|
||||
}
|
||||
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
|
||||
reevaluate_postponed_compactions();
|
||||
return std::move(*waiting_reevaluation);
|
||||
}
|
||||
|
||||
void compaction_manager::postpone_compaction_for_table(table_state* t) {
|
||||
_postponed.insert(t);
|
||||
}
|
||||
@@ -1130,8 +1144,7 @@ future<> compaction_manager::drain() {
|
||||
_compaction_submission_timer.cancel();
|
||||
// Stop ongoing compactions, if the request has not been sent already and wait for them to stop.
|
||||
co_await stop_ongoing_compactions("drain");
|
||||
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
|
||||
reevaluate_postponed_compactions();
|
||||
co_await stop_postponed_compactions();
|
||||
cmlog.info("Drained");
|
||||
}
|
||||
|
||||
@@ -1156,8 +1169,7 @@ future<> compaction_manager::really_do_stop() noexcept {
|
||||
if (!_tasks.empty()) {
|
||||
on_fatal_internal_error(cmlog, format("{} tasks still exist after being stopped", _tasks.size()));
|
||||
}
|
||||
reevaluate_postponed_compactions();
|
||||
co_await std::move(_waiting_reevalution);
|
||||
co_await stop_postponed_compactions();
|
||||
_weight_tracker.clear();
|
||||
_compaction_submission_timer.cancel();
|
||||
co_await _compaction_controller.shutdown();
|
||||
|
||||
@@ -119,7 +119,7 @@ private:
|
||||
// a sstable from being compacted twice.
|
||||
std::unordered_set<sstables::shared_sstable> _compacting_sstables;
|
||||
|
||||
future<> _waiting_reevalution = make_ready_future<>();
|
||||
std::optional<future<>> _waiting_reevaluation;
|
||||
condition_variable _postponed_reevaluation;
|
||||
// tables that wait for compaction but had its submission postponed due to ongoing compaction.
|
||||
std::unordered_set<compaction::table_state*> _postponed;
|
||||
@@ -221,6 +221,7 @@ private:
|
||||
|
||||
future<> postponed_compactions_reevaluation();
|
||||
void reevaluate_postponed_compactions() noexcept;
|
||||
future<> stop_postponed_compactions() noexcept;
|
||||
// Postpone compaction for a table that couldn't be executed due to ongoing
|
||||
// similar-sized compaction.
|
||||
void postpone_compaction_for_table(compaction::table_state* t);
|
||||
|
||||
Reference in New Issue
Block a user