|
|
|
|
@@ -210,8 +210,8 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(compact
|
|
|
|
|
// prevents sstables that belongs to a partial run being generated by ongoing compaction from being
|
|
|
|
|
// selected for compaction, which could potentially result in wrong behavior.
|
|
|
|
|
auto partial_run_identifiers = boost::copy_range<std::unordered_set<sstables::run_id>>(_tasks
|
|
|
|
|
| boost::adaptors::filtered(std::mem_fn(&compaction::task::generating_output_run))
|
|
|
|
|
| boost::adaptors::transformed(std::mem_fn(&compaction::task::output_run_id)));
|
|
|
|
|
| boost::adaptors::filtered(std::mem_fn(&compaction::compaction_task_executor::generating_output_run))
|
|
|
|
|
| boost::adaptors::transformed(std::mem_fn(&compaction::compaction_task_executor::output_run_id)));
|
|
|
|
|
|
|
|
|
|
// Filter out sstables that are being compacted.
|
|
|
|
|
for (auto& sst : in_strategy_sstables(t)) {
|
|
|
|
|
@@ -269,7 +269,7 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(c
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
compaction::task::task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc)
|
|
|
|
|
compaction::compaction_task_executor::compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc)
|
|
|
|
|
: _cm(mgr)
|
|
|
|
|
, _compacting_table(t)
|
|
|
|
|
, _compaction_state(_cm.get_compaction_state(t))
|
|
|
|
|
@@ -278,7 +278,7 @@ compaction::task::task(compaction_manager& mgr, compaction::table_state* t, ssta
|
|
|
|
|
, _description(std::move(desc))
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction::task> task) {
|
|
|
|
|
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction::compaction_task_executor> task) {
|
|
|
|
|
_tasks.push_back(task);
|
|
|
|
|
auto unregister_task = defer([this, task] {
|
|
|
|
|
_tasks.remove(task);
|
|
|
|
|
@@ -309,7 +309,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
|
|
|
|
|
co_return std::nullopt;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<sstables::compaction_result> compaction::task::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) {
|
|
|
|
|
future<sstables::compaction_result> compaction::compaction_task_executor::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) {
|
|
|
|
|
if (!descriptor.sstables.size()) {
|
|
|
|
|
// if there is nothing to compact, just return.
|
|
|
|
|
co_return sstables::compaction_result{};
|
|
|
|
|
@@ -324,7 +324,7 @@ future<sstables::compaction_result> compaction::task::compact_sstables_and_updat
|
|
|
|
|
|
|
|
|
|
co_return res;
|
|
|
|
|
}
|
|
|
|
|
future<sstables::compaction_result> compaction::task::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) {
|
|
|
|
|
future<sstables::compaction_result> compaction::compaction_task_executor::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) {
|
|
|
|
|
compaction::table_state& t = *_compacting_table;
|
|
|
|
|
if (can_purge) {
|
|
|
|
|
descriptor.enable_garbage_collection(t.main_sstable_set());
|
|
|
|
|
@@ -346,7 +346,7 @@ future<sstables::compaction_result> compaction::task::compact_sstables(sstables:
|
|
|
|
|
|
|
|
|
|
co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t);
|
|
|
|
|
}
|
|
|
|
|
future<> compaction::task::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
|
|
|
|
|
future<> compaction::compaction_task_executor::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
|
|
|
|
|
auto ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(res.stats.ended_at.time_since_epoch());
|
|
|
|
|
|
|
|
|
|
if (_cm._sys_ks) {
|
|
|
|
|
@@ -370,7 +370,7 @@ future<> compaction_manager::get_compaction_history(compaction_history_consumer&
|
|
|
|
|
|
|
|
|
|
namespace compaction {
|
|
|
|
|
|
|
|
|
|
class sstables_task : public task {
|
|
|
|
|
class sstables_task_executor : public compaction_task_executor {
|
|
|
|
|
protected:
|
|
|
|
|
std::vector<sstables::shared_sstable> _sstables;
|
|
|
|
|
|
|
|
|
|
@@ -378,19 +378,19 @@ protected:
|
|
|
|
|
sstables::shared_sstable consume_sstable();
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
explicit sstables_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type compaction_type, sstring desc, std::vector<sstables::shared_sstable> sstables)
|
|
|
|
|
: task(mgr, t, compaction_type, std::move(desc))
|
|
|
|
|
explicit sstables_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type compaction_type, sstring desc, std::vector<sstables::shared_sstable> sstables)
|
|
|
|
|
: compaction_task_executor(mgr, t, compaction_type, std::move(desc))
|
|
|
|
|
{
|
|
|
|
|
set_sstables(std::move(sstables));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
virtual ~sstables_task();
|
|
|
|
|
virtual ~sstables_task_executor();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
class major_compaction_task : public task {
|
|
|
|
|
class major_compaction_task_executor : public compaction_task_executor {
|
|
|
|
|
public:
|
|
|
|
|
major_compaction_task(compaction_manager& mgr, compaction::table_state* t)
|
|
|
|
|
: task(mgr, t, sstables::compaction_type::Compaction, "Major compaction")
|
|
|
|
|
major_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t)
|
|
|
|
|
: compaction_task_executor(mgr, t, sstables::compaction_type::Compaction, "Major compaction")
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
@@ -441,17 +441,17 @@ future<> compaction_manager::perform_major_compaction(compaction::table_state& t
|
|
|
|
|
if (_state != state::enabled) {
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
return perform_task(make_shared<compaction::major_compaction_task>(*this, &t)).discard_result();;
|
|
|
|
|
return perform_task(make_shared<compaction::major_compaction_task_executor>(*this, &t)).discard_result();;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
namespace compaction {
|
|
|
|
|
|
|
|
|
|
class custom_compaction_task : public task {
|
|
|
|
|
class custom_compaction_task_executor : public compaction_task_executor {
|
|
|
|
|
noncopyable_function<future<>(sstables::compaction_data&)> _job;
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
custom_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc, noncopyable_function<future<>(sstables::compaction_data&)> job)
|
|
|
|
|
: task(mgr, t, type, std::move(desc))
|
|
|
|
|
custom_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc, noncopyable_function<future<>(sstables::compaction_data&)> job)
|
|
|
|
|
: compaction_task_executor(mgr, t, type, std::move(desc))
|
|
|
|
|
, _job(std::move(job))
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
@@ -485,7 +485,7 @@ future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return perform_task(make_shared<compaction::custom_compaction_task>(*this, &t, type, desc, std::move(job))).discard_result();
|
|
|
|
|
return perform_task(make_shared<compaction::custom_compaction_task_executor>(*this, &t, type, desc, std::move(job))).discard_result();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<> compaction_manager::update_static_shares(float static_shares) {
|
|
|
|
|
@@ -539,7 +539,7 @@ compaction_manager::run_with_compaction_disabled(compaction::table_state& t, std
|
|
|
|
|
co_await func();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::string_view compaction::task::to_string(state s) {
|
|
|
|
|
std::string_view compaction::compaction_task_executor::to_string(state s) {
|
|
|
|
|
switch (s) {
|
|
|
|
|
case state::none: return "none";
|
|
|
|
|
case state::pending: return "pending";
|
|
|
|
|
@@ -553,11 +553,11 @@ std::string_view compaction::task::to_string(state s) {
|
|
|
|
|
|
|
|
|
|
namespace compaction {
|
|
|
|
|
|
|
|
|
|
std::ostream& operator<<(std::ostream& os, compaction::task::state s) {
|
|
|
|
|
return os << compaction::task::to_string(s);
|
|
|
|
|
std::ostream& operator<<(std::ostream& os, compaction::compaction_task_executor::state s) {
|
|
|
|
|
return os << compaction::compaction_task_executor::to_string(s);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::ostream& operator<<(std::ostream& os, const compaction::task& task) {
|
|
|
|
|
std::ostream& operator<<(std::ostream& os, const compaction::compaction_task_executor& task) {
|
|
|
|
|
return os << task.describe();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -571,21 +571,21 @@ compaction_manager::compaction_state::~compaction_state() {
|
|
|
|
|
compaction_done.broken();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::string compaction::task::describe() const {
|
|
|
|
|
std::string compaction::compaction_task_executor::describe() const {
|
|
|
|
|
auto* t = _compacting_table;
|
|
|
|
|
auto s = t->schema();
|
|
|
|
|
return fmt::format("{} task {} for table {}.{} [{}]", _description, fmt::ptr(this), s->ks_name(), s->cf_name(), fmt::ptr(t));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
compaction::task::~task() {
|
|
|
|
|
compaction::compaction_task_executor::~compaction_task_executor() {
|
|
|
|
|
switch_state(state::none);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
compaction::sstables_task::~sstables_task() {
|
|
|
|
|
compaction::sstables_task_executor::~sstables_task_executor() {
|
|
|
|
|
_cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<compaction_manager::compaction_stats_opt> compaction::task::run() noexcept {
|
|
|
|
|
future<compaction_manager::compaction_stats_opt> compaction::compaction_task_executor::run() noexcept {
|
|
|
|
|
try {
|
|
|
|
|
_compaction_done = do_run();
|
|
|
|
|
return compaction_done();
|
|
|
|
|
@@ -594,7 +594,7 @@ future<compaction_manager::compaction_stats_opt> compaction::task::run() noexcep
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
compaction::task::state compaction::task::switch_state(state new_state) {
|
|
|
|
|
compaction::compaction_task_executor::state compaction::compaction_task_executor::switch_state(state new_state) {
|
|
|
|
|
auto old_state = std::exchange(_state, new_state);
|
|
|
|
|
switch (old_state) {
|
|
|
|
|
case state::none:
|
|
|
|
|
@@ -629,7 +629,7 @@ compaction::task::state compaction::task::switch_state(state new_state) {
|
|
|
|
|
return old_state;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compaction::sstables_task::set_sstables(std::vector<sstables::shared_sstable> new_sstables) {
|
|
|
|
|
void compaction::sstables_task_executor::set_sstables(std::vector<sstables::shared_sstable> new_sstables) {
|
|
|
|
|
if (!_sstables.empty()) {
|
|
|
|
|
on_internal_error(cmlog, format("sstables were already set"));
|
|
|
|
|
}
|
|
|
|
|
@@ -638,7 +638,7 @@ void compaction::sstables_task::set_sstables(std::vector<sstables::shared_sstabl
|
|
|
|
|
_cm._stats.pending_tasks += _sstables.size() - (_state == state::pending);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sstables::shared_sstable compaction::sstables_task::consume_sstable() {
|
|
|
|
|
sstables::shared_sstable compaction::sstables_task_executor::consume_sstable() {
|
|
|
|
|
if (_sstables.empty()) {
|
|
|
|
|
on_internal_error(cmlog, format("no more sstables"));
|
|
|
|
|
}
|
|
|
|
|
@@ -649,7 +649,7 @@ sstables::shared_sstable compaction::sstables_task::consume_sstable() {
|
|
|
|
|
return sst;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<semaphore_units<named_semaphore_exception_factory>> compaction::task::acquire_semaphore(named_semaphore& sem, size_t units) {
|
|
|
|
|
future<semaphore_units<named_semaphore_exception_factory>> compaction::compaction_task_executor::acquire_semaphore(named_semaphore& sem, size_t units) {
|
|
|
|
|
return seastar::get_units(sem, units, _compaction_data.abort).handle_exception_type([this] (const abort_requested_exception& e) {
|
|
|
|
|
auto s = _compacting_table->schema();
|
|
|
|
|
return make_exception_future<semaphore_units<named_semaphore_exception_factory>>(
|
|
|
|
|
@@ -657,13 +657,13 @@ future<semaphore_units<named_semaphore_exception_factory>> compaction::task::acq
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compaction::task::setup_new_compaction(sstables::run_id output_run_id) {
|
|
|
|
|
void compaction::compaction_task_executor::setup_new_compaction(sstables::run_id output_run_id) {
|
|
|
|
|
_compaction_data = _cm.create_compaction_data();
|
|
|
|
|
_output_run_identifier = output_run_id;
|
|
|
|
|
switch_state(state::active);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compaction::task::finish_compaction(state finish_state) noexcept {
|
|
|
|
|
void compaction::compaction_task_executor::finish_compaction(state finish_state) noexcept {
|
|
|
|
|
switch_state(finish_state);
|
|
|
|
|
_output_run_identifier = sstables::run_id::create_null_id();
|
|
|
|
|
if (finish_state != state::failed) {
|
|
|
|
|
@@ -672,11 +672,11 @@ void compaction::task::finish_compaction(state finish_state) noexcept {
|
|
|
|
|
_compaction_state.compaction_done.signal();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compaction::task::stop(sstring reason) noexcept {
|
|
|
|
|
void compaction::compaction_task_executor::stop(sstring reason) noexcept {
|
|
|
|
|
_compaction_data.stop(std::move(reason));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sstables::compaction_stopped_exception compaction::task::make_compaction_stopped_exception() const {
|
|
|
|
|
sstables::compaction_stopped_exception compaction::compaction_task_executor::make_compaction_stopped_exception() const {
|
|
|
|
|
auto s = _compacting_table->schema();
|
|
|
|
|
return sstables::compaction_stopped_exception(s->ks_name(), s->cf_name(), _compaction_data.stop_requested);
|
|
|
|
|
}
|
|
|
|
|
@@ -828,7 +828,7 @@ void compaction_manager::postpone_compaction_for_table(compaction::table_state*
|
|
|
|
|
_postponed.insert(t);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction::task>> tasks, sstring reason) {
|
|
|
|
|
future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction::compaction_task_executor>> tasks, sstring reason) {
|
|
|
|
|
// To prevent compaction from being postponed while tasks are being stopped,
|
|
|
|
|
// let's stop all tasks before the deferring point below.
|
|
|
|
|
for (auto& t : tasks) {
|
|
|
|
|
@@ -852,7 +852,7 @@ future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction::task>
|
|
|
|
|
future<> compaction_manager::stop_ongoing_compactions(sstring reason, compaction::table_state* t, std::optional<sstables::compaction_type> type_opt) noexcept {
|
|
|
|
|
try {
|
|
|
|
|
auto ongoing_compactions = get_compactions(t).size();
|
|
|
|
|
auto tasks = boost::copy_range<std::vector<shared_ptr<compaction::task>>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) {
|
|
|
|
|
auto tasks = boost::copy_range<std::vector<shared_ptr<compaction::compaction_task_executor>>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) {
|
|
|
|
|
return (!t || task->compacting_table() == t) && (!type_opt || task->type() == *type_opt);
|
|
|
|
|
}));
|
|
|
|
|
logging::log_level level = tasks.empty() ? log_level::debug : log_level::info;
|
|
|
|
|
@@ -926,7 +926,7 @@ inline bool compaction_manager::can_proceed(compaction::table_state* t) const {
|
|
|
|
|
return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inline bool compaction::task::can_proceed(throw_if_stopping do_throw_if_stopping) const {
|
|
|
|
|
inline bool compaction::compaction_task_executor::can_proceed(throw_if_stopping do_throw_if_stopping) const {
|
|
|
|
|
if (stopping()) {
|
|
|
|
|
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
|
|
|
|
|
if (do_throw_if_stopping) {
|
|
|
|
|
@@ -937,7 +937,7 @@ inline bool compaction::task::can_proceed(throw_if_stopping do_throw_if_stopping
|
|
|
|
|
return _cm.can_proceed(_compacting_table);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<stop_iteration> compaction::task::maybe_retry(std::exception_ptr err, bool throw_on_abort) {
|
|
|
|
|
future<stop_iteration> compaction::compaction_task_executor::maybe_retry(std::exception_ptr err, bool throw_on_abort) {
|
|
|
|
|
try {
|
|
|
|
|
std::rethrow_exception(err);
|
|
|
|
|
} catch (sstables::compaction_stopped_exception& e) {
|
|
|
|
|
@@ -972,10 +972,10 @@ future<stop_iteration> compaction::task::maybe_retry(std::exception_ptr err, boo
|
|
|
|
|
|
|
|
|
|
namespace compaction {
|
|
|
|
|
|
|
|
|
|
class regular_compaction_task : public task {
|
|
|
|
|
class regular_compaction_task_executor : public compaction_task_executor {
|
|
|
|
|
public:
|
|
|
|
|
regular_compaction_task(compaction_manager& mgr, compaction::table_state& t)
|
|
|
|
|
: task(mgr, &t, sstables::compaction_type::Compaction, "Compaction")
|
|
|
|
|
regular_compaction_task_executor(compaction_manager& mgr, compaction::table_state& t)
|
|
|
|
|
: compaction_task_executor(mgr, &t, sstables::compaction_type::Compaction, "Compaction")
|
|
|
|
|
{}
|
|
|
|
|
protected:
|
|
|
|
|
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
|
|
|
|
|
@@ -1064,7 +1064,7 @@ void compaction_manager::submit(compaction::table_state& t) {
|
|
|
|
|
|
|
|
|
|
// OK to drop future.
|
|
|
|
|
// waited via task->stop()
|
|
|
|
|
(void)perform_task(make_shared<compaction::regular_compaction_task>(*this, t));
|
|
|
|
|
(void)perform_task(make_shared<compaction::regular_compaction_task_executor>(*this, t));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool compaction_manager::can_perform_regular_compaction(compaction::table_state& t) {
|
|
|
|
|
@@ -1113,11 +1113,11 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction::
|
|
|
|
|
|
|
|
|
|
namespace compaction {
|
|
|
|
|
|
|
|
|
|
class offstrategy_compaction_task : public task {
|
|
|
|
|
class offstrategy_compaction_task_executor : public compaction_task_executor {
|
|
|
|
|
bool _performed = false;
|
|
|
|
|
public:
|
|
|
|
|
offstrategy_compaction_task(compaction_manager& mgr, compaction::table_state* t)
|
|
|
|
|
: task(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction")
|
|
|
|
|
offstrategy_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t)
|
|
|
|
|
: compaction_task_executor(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction")
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
bool performed() const noexcept {
|
|
|
|
|
@@ -1260,23 +1260,23 @@ future<bool> compaction_manager::perform_offstrategy(compaction::table_state& t)
|
|
|
|
|
if (_state != state::enabled) {
|
|
|
|
|
co_return false;
|
|
|
|
|
}
|
|
|
|
|
auto task = make_shared<compaction::offstrategy_compaction_task>(*this, &t);
|
|
|
|
|
auto task = make_shared<compaction::offstrategy_compaction_task_executor>(*this, &t);
|
|
|
|
|
co_await perform_task(task);
|
|
|
|
|
co_return task->performed();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
namespace compaction {
|
|
|
|
|
|
|
|
|
|
class rewrite_sstables_compaction_task : public sstables_task {
|
|
|
|
|
class rewrite_sstables_compaction_task_executor : public sstables_task_executor {
|
|
|
|
|
sstables::compaction_type_options _options;
|
|
|
|
|
compacting_sstable_registration _compacting;
|
|
|
|
|
compaction_manager::can_purge_tombstones _can_purge;
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
rewrite_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options,
|
|
|
|
|
rewrite_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options,
|
|
|
|
|
std::vector<sstables::shared_sstable> sstables, compacting_sstable_registration compacting,
|
|
|
|
|
compaction_manager::can_purge_tombstones can_purge)
|
|
|
|
|
: sstables_task(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables))
|
|
|
|
|
: sstables_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables))
|
|
|
|
|
, _options(std::move(options))
|
|
|
|
|
, _compacting(std::move(compacting))
|
|
|
|
|
, _can_purge(can_purge)
|
|
|
|
|
@@ -1343,7 +1343,7 @@ private:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
template<typename TaskType, typename... Args>
|
|
|
|
|
requires std::derived_from<TaskType, compaction::task>
|
|
|
|
|
requires std::derived_from<TaskType, compaction::compaction_task_executor>
|
|
|
|
|
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
|
|
|
|
|
if (_state != state::enabled) {
|
|
|
|
|
co_return std::nullopt;
|
|
|
|
|
@@ -1372,15 +1372,15 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<compaction_manager::compaction_stats_opt> compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
|
|
|
|
|
return perform_task_on_all_files<compaction::rewrite_sstables_compaction_task>(t, std::move(options), std::move(get_func), can_purge);
|
|
|
|
|
return perform_task_on_all_files<compaction::rewrite_sstables_compaction_task_executor>(t, std::move(options), std::move(get_func), can_purge);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
namespace compaction {
|
|
|
|
|
|
|
|
|
|
class validate_sstables_compaction_task : public sstables_task {
|
|
|
|
|
class validate_sstables_compaction_task_executor : public sstables_task_executor {
|
|
|
|
|
public:
|
|
|
|
|
validate_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, std::vector<sstables::shared_sstable> sstables)
|
|
|
|
|
: sstables_task(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables))
|
|
|
|
|
validate_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, std::vector<sstables::shared_sstable> sstables)
|
|
|
|
|
: sstables_task_executor(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables))
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
@@ -1446,19 +1446,19 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
|
|
|
|
|
}
|
|
|
|
|
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
|
|
|
|
|
auto all_sstables = get_all_sstables(t);
|
|
|
|
|
return perform_task(seastar::make_shared<compaction::validate_sstables_compaction_task>(*this, &t, std::move(all_sstables)));
|
|
|
|
|
return perform_task(seastar::make_shared<compaction::validate_sstables_compaction_task_executor>(*this, &t, std::move(all_sstables)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
namespace compaction {
|
|
|
|
|
|
|
|
|
|
class cleanup_sstables_compaction_task : public task {
|
|
|
|
|
class cleanup_sstables_compaction_task_executor : public compaction_task_executor {
|
|
|
|
|
const sstables::compaction_type_options _cleanup_options;
|
|
|
|
|
compacting_sstable_registration _compacting;
|
|
|
|
|
std::vector<sstables::compaction_descriptor> _pending_cleanup_jobs;
|
|
|
|
|
public:
|
|
|
|
|
cleanup_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options,
|
|
|
|
|
cleanup_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options,
|
|
|
|
|
std::vector<sstables::shared_sstable> candidates, compacting_sstable_registration compacting)
|
|
|
|
|
: task(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
|
|
|
|
|
: compaction_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
|
|
|
|
|
, _cleanup_options(std::move(options))
|
|
|
|
|
, _compacting(std::move(compacting))
|
|
|
|
|
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(*t, std::move(candidates)))
|
|
|
|
|
@@ -1469,7 +1469,7 @@ public:
|
|
|
|
|
_cm._stats.pending_tasks += _pending_cleanup_jobs.size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
virtual ~cleanup_sstables_compaction_task() {
|
|
|
|
|
virtual ~cleanup_sstables_compaction_task_executor() {
|
|
|
|
|
_cm._stats.pending_tasks -= _pending_cleanup_jobs.size();
|
|
|
|
|
}
|
|
|
|
|
protected:
|
|
|
|
|
@@ -1504,7 +1504,7 @@ private:
|
|
|
|
|
try {
|
|
|
|
|
setup_new_compaction(descriptor.run_identifier);
|
|
|
|
|
co_await compact_sstables_and_update_history(descriptor, _compaction_data,
|
|
|
|
|
std::bind(&cleanup_sstables_compaction_task::release_exhausted, this, std::placeholders::_1));
|
|
|
|
|
std::bind(&cleanup_sstables_compaction_task_executor::release_exhausted, this, std::placeholders::_1));
|
|
|
|
|
finish_compaction();
|
|
|
|
|
_cm.reevaluate_postponed_compactions();
|
|
|
|
|
co_return; // done with current job
|
|
|
|
|
@@ -1569,7 +1569,7 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
co_await perform_task_on_all_files<compaction::cleanup_sstables_compaction_task>(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)),
|
|
|
|
|
co_await perform_task_on_all_files<compaction::cleanup_sstables_compaction_task_executor>(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)),
|
|
|
|
|
std::move(get_sstables));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1676,7 +1676,7 @@ future<> compaction_manager::remove(compaction::table_state& t) noexcept {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const std::vector<sstables::compaction_info> compaction_manager::get_compactions(compaction::table_state* t) const {
|
|
|
|
|
auto to_info = [] (const shared_ptr<compaction::task>& task) {
|
|
|
|
|
auto to_info = [] (const shared_ptr<compaction::compaction_task_executor>& task) {
|
|
|
|
|
sstables::compaction_info ret;
|
|
|
|
|
ret.compaction_uuid = task->compaction_data().compaction_uuid;
|
|
|
|
|
ret.type = task->type();
|
|
|
|
|
@@ -1687,13 +1687,13 @@ const std::vector<sstables::compaction_info> compaction_manager::get_compactions
|
|
|
|
|
return ret;
|
|
|
|
|
};
|
|
|
|
|
using ret = std::vector<sstables::compaction_info>;
|
|
|
|
|
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const shared_ptr<compaction::task>& task) {
|
|
|
|
|
return boost::copy_range<ret>(_tasks | boost::adaptors::filtered([t] (const shared_ptr<compaction::compaction_task_executor>& task) {
|
|
|
|
|
return (!t || task->compacting_table() == t) && task->compaction_running();
|
|
|
|
|
}) | boost::adaptors::transformed(to_info));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool compaction_manager::has_table_ongoing_compaction(const compaction::table_state& t) const {
|
|
|
|
|
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr<compaction::task>& task) {
|
|
|
|
|
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr<compaction::compaction_task_executor>& task) {
|
|
|
|
|
return task->compacting_table() == &t && task->compaction_running();
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
@@ -1736,7 +1736,7 @@ public:
|
|
|
|
|
explicit strategy_control(compaction_manager& cm) noexcept : _cm(cm) {}
|
|
|
|
|
|
|
|
|
|
bool has_ongoing_compaction(table_state& table_s) const noexcept override {
|
|
|
|
|
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr<compaction::task>& task) {
|
|
|
|
|
return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr<compaction::compaction_task_executor>& task) {
|
|
|
|
|
return task->compaction_running()
|
|
|
|
|
&& task->compacting_table()->schema()->ks_name() == s->ks_name()
|
|
|
|
|
&& task->compacting_table()->schema()->cf_name() == s->cf_name();
|
|
|
|
|
|