Merge 'sstable_loader: fix cross-shard resource cleanup in download_task_impl ' from Kefu Chai

This PR addresses two related issues in our task system:

1. Prepares for asynchronous resource cleanup by converting release_resources() to a coroutine. This refactoring enables future improvements in how we handle resource cleanup.

2. Fixes a cross-shard resource cleanup issue in the SSTable loader where destruction of per-shard progress elements could trigger "shared_ptr accessed on non-owner cpu" errors in multi-shard environments. The fix uses coroutines to ensure resources are released on their owner shards.

Fixes #22759

---

this change addresses a regression introduced by d815d7013c, which is contained by 2025.1 and master branches. so it should be backported to 2025.1 branch.

Closes scylladb/scylladb#22791

* github.com:scylladb/scylladb:
  sstable_loader: fix cross-shard resource cleanup in download_task_impl
  tasks: make release_resources() a coroutine
This commit is contained in:
Pavel Emelyanov
2025-02-14 17:07:16 +03:00
committed by Avi Kivity
7 changed files with 66 additions and 40 deletions

View File

@@ -15,6 +15,7 @@
#include "sstables/sstables_manager.hh"
#include <memory>
#include <fmt/ranges.h>
#include <seastar/core/future.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/switch_to.hh>
@@ -503,7 +504,7 @@ public:
virtual ~sstables_task_executor() = default;
virtual void release_resources() noexcept override;
virtual future<> release_resources() noexcept override;
virtual future<tasks::task_manager::task::progress> get_progress() const override {
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
@@ -788,9 +789,10 @@ compaction::compaction_state::~compaction_state() {
compaction_done.broken();
}
void sstables_task_executor::release_resources() noexcept {
future<> sstables_task_executor::release_resources() noexcept {
_cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending);
_sstables = {};
return make_ready_future();
}
future<compaction_manager::compaction_stats_opt> compaction_task_executor::run_compaction() noexcept {
@@ -1565,10 +1567,10 @@ public:
, _can_purge(can_purge)
{}
virtual void release_resources() noexcept override {
virtual future<> release_resources() noexcept override {
_compacting.release_all();
_owned_ranges_ptr = nullptr;
sstables_task_executor::release_resources();
co_await sstables_task_executor::release_resources();
}
protected:
@@ -1846,11 +1848,12 @@ public:
virtual ~cleanup_sstables_compaction_task_executor() = default;
virtual void release_resources() noexcept override {
virtual future<> release_resources() noexcept override {
_cm._stats.pending_tasks -= _pending_cleanup_jobs.size();
_pending_cleanup_jobs = {};
_compacting.release_all();
_owned_ranges_ptr = nullptr;
return make_ready_future();
}
virtual future<tasks::task_manager::task::progress> get_progress() const override {

View File

@@ -1021,7 +1021,7 @@ private:
}
};
void repair::shard_repair_task_impl::release_resources() noexcept {
future<> repair::shard_repair_task_impl::release_resources() noexcept {
erm = {};
cfs = {};
data_centers = {};
@@ -1030,6 +1030,7 @@ void repair::shard_repair_task_impl::release_resources() noexcept {
neighbors = {};
dropped_tables = {};
nodes_down = {};
return make_ready_future();
}
future<> repair::shard_repair_task_impl::do_repair_ranges() {
@@ -2482,10 +2483,11 @@ tasks::is_user_task repair::tablet_repair_task_impl::is_user_task() const noexce
return tasks::is_user_task::yes;
}
void repair::tablet_repair_task_impl::release_resources() noexcept {
future<> repair::tablet_repair_task_impl::release_resources() noexcept {
_metas_size = _metas.size();
_metas = {};
_tables = {};
return make_ready_future();
}
size_t repair::tablet_repair_task_impl::get_metas_size() const noexcept {

View File

@@ -133,7 +133,7 @@ public:
gc_clock::time_point get_flush_time() const { return _flush_time; }
tasks::is_user_task is_user_task() const noexcept override;
virtual void release_resources() noexcept override;
virtual future<> release_resources() noexcept override;
private:
size_t get_metas_size() const noexcept;
protected:
@@ -220,7 +220,7 @@ public:
size_t ranges_size() const noexcept;
virtual void release_resources() noexcept override;
virtual future<> release_resources() noexcept override;
protected:
future<> do_repair_ranges();
virtual future<tasks::task_manager::task::progress> get_progress() const override;

View File

@@ -8,6 +8,7 @@
#include <fmt/ranges.h>
#include <seastar/core/coroutine.hh>
#include <seastar/core/map_reduce.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/switch_to.hh>
@@ -581,7 +582,22 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im
sstring _prefix;
sstables_loader::stream_scope _scope;
std::vector<sstring> _sstables;
std::vector<shared_ptr<stream_progress>> _progress_per_shard;
struct progress_holder {
// Wrap stream_progress in a smart pointer to enable polymorphism.
// This allows derived progress types to be passed down for per-tablet
// progress tracking while maintaining the base interface.
shared_ptr<stream_progress> progress = make_shared<stream_progress>();
};
// user could query for the progress even before _progress_per_shard
// is completed started, and this._status.state does not reflect the
// state of progress, so we have to track it separately.
enum class progress_state {
uninitialized,
initialized,
finalized,
} _progress_state = progress_state::uninitialized;
sharded<progress_holder> _progress_per_shard;
tasks::task_manager::task::progress _final_progress;
protected:
virtual future<> run() override;
@@ -599,7 +615,6 @@ public:
, _prefix(std::move(prefix))
, _scope(scope)
, _sstables(std::move(sstables))
, _progress_per_shard(smp::count)
{
_status.progress_units = "batches";
}
@@ -620,29 +635,29 @@ public:
return tasks::is_abortable::yes;
}
virtual future<> release_resources() noexcept override {
// preserve the final progress, so we can access it after the task is
// finished
_final_progress = co_await get_progress();
_progress_state = progress_state::finalized;
co_await _progress_per_shard.stop();
}
virtual future<tasks::task_manager::task::progress> get_progress() const override {
struct adder {
stream_progress result;
future<> operator()(stream_progress p) {
llog.debug("get_progress: {} / {}", p.completed, p.total);
result.completed += p.completed;
result.total += p.total;
return make_ready_future<>();
}
stream_progress get() const {
return result;
}
};
auto p = co_await _loader.map_reduce(
adder{},
[this] (auto&) -> stream_progress {
auto p = _progress_per_shard[this_shard_id()];
if (p) {
return *p;
} else {
// the task was aborted
return {};
}
switch (_progress_state) {
case progress_state::uninitialized:
co_return tasks::task_manager::task::progress{};
case progress_state::finalized:
co_return _final_progress;
case progress_state::initialized:
break;
}
auto p = co_await _progress_per_shard.map_reduce(
adder<stream_progress>{},
[] (const progress_holder& holder) -> stream_progress {
auto p = holder.progress;
SCYLLA_ASSERT(p);
return *p;
});
co_return tasks::task_manager::task::progress {
.completed = p.completed,
@@ -678,11 +693,11 @@ future<> sstables_loader::download_task_impl::run() {
} catch (...) {
}
});
co_await _progress_per_shard.start();
_progress_state = progress_state::initialized;
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
auto progress = make_shared<stream_progress>();
_progress_per_shard[this_shard_id()] = progress;
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false, _scope,
progress);
_progress_per_shard.local().progress);
});
} catch (...) {
ex = std::current_exception();

View File

@@ -34,7 +34,11 @@ struct stream_progress {
float completed = 0.;
virtual ~stream_progress() = default;
stream_progress& operator+=(const stream_progress& p) {
total += p.total;
completed += p.completed;
return *this;
}
void start(float amount) {
assert(amount >= 0);
total = amount;

View File

@@ -254,7 +254,7 @@ future<> task_manager::task::impl::finish() noexcept {
_status.state = task_manager::task_state::done;
co_await maybe_fold_into_parent();
_done.set_value();
release_resources();
co_await release_resources();
}
}
@@ -265,7 +265,7 @@ future<> task_manager::task::impl::finish_failed(std::exception_ptr ex, std::str
_status.error = std::move(error);
co_await maybe_fold_into_parent();
_done.set_exception(ex);
release_resources();
co_await release_resources();
}
}

View File

@@ -213,7 +213,9 @@ public:
virtual void abort() noexcept;
bool is_complete() const noexcept;
bool is_done() const noexcept;
virtual void release_resources() noexcept {}
virtual future<> release_resources() noexcept {
return make_ready_future();
}
future<std::vector<task_essentials>> get_failed_children() const;
void set_virtual_parent() noexcept;
protected: