Merge 'Overhaul compaction_manager::task' from Benny Halevy

The series overhauls the compaction_manager::task design and implementation
by properly layering the functionality between the compaction_manager
that deals with generic task execution, and the per-task business logic that is defined
in a set of classes derived from the generic task class.

While at it, the series introduces `task::state` and a set of helper functions to manage it
to prevent leaks in the statistics, fixing #9974.

Two more stats counter were exposed: `completed_tasks` and a new `postponed_tasks`.

Test: sstable_compaction_test
Dtest: compaction_test.py compaction_additional_test.py

Fixes #9974

Closes #10122

* github.com:scylladb/scylla:
  compaction_manager: use coroutine::switch_to
  compaction_manager::task: drop _compaction_running
  compaction_manager: move per-type logic to derived task
  compaction_manager: task: add state enum
  compaction_manager: task: add maybe_retry
  compaction_manager: reevaluate_postponed_compactions: mark as noexcept
  compaction_manager: define derived task types
  compaction_manager: register_metrics: expose postponed_compactions
  compaction_manager: register_metrics: expose failed_compactions
  compaction_manager: register_metrics: expose _stats.completed_tasks
  compaction: add documentation for compaction_type to string conversions
  compaction: expose to_string(compaction_type)
  compaction_manager: task: standardize task description in log messages
  compaction_manager: refactor can_proceed
  compaction_manager: pass compaction_manager& to task ctor
  compaction_manager: use shared_ptr<task> rather than lw_shared_ptr
  compaction_manager: rewrite_sstables: acquire _maintenance_ops_sem once
  compaction_manager: use compaction_state::lock only to synchronize major and regular compaction
This commit is contained in:
Botond Dénes
2022-03-10 13:33:56 +02:00
8 changed files with 718 additions and 463 deletions

View File

@@ -86,7 +86,7 @@ compaction_type to_compaction_type(sstring type_name) {
throw std::runtime_error("Invalid Compaction Type Name");
}
static std::string_view to_string(compaction_type type) {
std::string_view to_string(compaction_type type) {
switch (type) {
case compaction_type::Compaction: return "Compact";
case compaction_type::Cleanup: return "Cleanup";

View File

@@ -41,9 +41,19 @@ public:
friend std::ostream& operator<<(std::ostream&, pretty_printed_throughput);
};
// Return the name of the compaction type
// as used over the REST api, e.g. "COMPACTION" or "CLEANUP".
sstring compaction_name(compaction_type type);
// Reverse map the name of the compaction type
// as used over the REST api, e.g. "COMPACTION" or "CLEANUP",
// to the compaction_type enum code.
compaction_type to_compaction_type(sstring type_name);
// Return a string respresenting the compaction type
// as a verb for logging purposes, e.g. "Compact" or "Cleanup".
std::string_view to_string(compaction_type type);
struct compaction_info {
utils::UUID compaction_uuid;
compaction_type type = compaction_type::Compaction;

File diff suppressed because it is too large Load Diff

View File

@@ -73,48 +73,150 @@ private:
}
};
struct task {
replica::table* compacting_table = nullptr;
shared_future<> compaction_done = make_ready_future<>();
exponential_backoff_retry compaction_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300));
sstables::compaction_type type = sstables::compaction_type::Compaction;
bool compaction_running = false;
utils::UUID output_run_identifier;
sstables::compaction_data compaction_data;
compaction_state& compaction_state;
gate::holder gate_holder;
public:
class task {
public:
enum class state {
none, // initial and final state
pending, // task is blocked on a lock, may alternate with active
// counted in compaction_manager::stats::pending_tasks
active, // task initiated active compaction, may alternate with pending
// counted in compaction_manager::stats::active_tasks
done, // task completed successfully (may transition only to state::none)
// counted in compaction_manager::stats::completed_tasks
postponed, // task was postponed (may transition only to state::none)
// represented by the postponed_compactions metric
failed, // task failed (may transition only to state::none)
// counted in compaction_manager::stats::errors
};
static std::string_view to_string(state);
explicit task(replica::table* t, sstables::compaction_type type, struct compaction_state& cs)
: compacting_table(t)
, type(type)
, compaction_state(cs)
, gate_holder(compaction_state.gate.hold())
protected:
compaction_manager& _cm;
replica::table* _compacting_table = nullptr;
compaction_state& _compaction_state;
sstables::compaction_data _compaction_data;
state _state = state::none;
private:
shared_future<> _compaction_done = make_ready_future<>();
exponential_backoff_retry _compaction_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300));
sstables::compaction_type _type;
utils::UUID _output_run_identifier;
gate::holder _gate_holder;
sstring _description;
public:
explicit task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc)
: _cm(mgr)
, _compacting_table(t)
, _compaction_state(_cm.get_compaction_state(t))
, _type(type)
, _gate_holder(_compaction_state.gate.hold())
, _description(std::move(desc))
{}
task(task&&) = delete;
task(const task&) = delete;
virtual ~task();
protected:
virtual future<> do_run() = 0;
using throw_if_stopping = bool_class<struct throw_if_stopping_tag>;
state switch_state(state new_state);
// Return true if the task isn't stopped
// and the compaction manager allows proceeding.
inline bool can_proceed(throw_if_stopping do_throw_if_stopping = throw_if_stopping::no) const;
void setup_new_compaction(utils::UUID output_run_id = utils::null_uuid());
void finish_compaction() noexcept;
void finish_compaction(state finish_state = state::done) noexcept;
// Compaction manager stop itself if it finds an storage I/O error which results in
// stop of transportation services. It cannot make progress anyway.
// Returns exception if error is judged fatal, and compaction task must be stopped,
// otherwise, returns stop_iteration::no after sleep for exponential retry.
future<stop_iteration> maybe_retry(std::exception_ptr err);
public:
future<> run() noexcept;
const replica::table* compacting_table() const noexcept {
return _compacting_table;
}
sstables::compaction_type type() const noexcept {
return _type;
}
bool compaction_running() const noexcept {
return _state == state::active;
}
const sstables::compaction_data& compaction_data() const noexcept {
return _compaction_data;
}
sstables::compaction_data& compaction_data() noexcept {
return _compaction_data;
}
bool generating_output_run() const noexcept {
return compaction_running && output_run_identifier;
return compaction_running() && _output_run_identifier;
}
const utils::UUID& output_run_id() const noexcept {
return output_run_identifier;
return _output_run_identifier;
}
const sstring& description() const noexcept {
return _description;
}
future<> compaction_done() noexcept {
return _compaction_done.get_future();
}
bool stopping() const noexcept {
return compaction_data.abort.abort_requested();
return _compaction_data.abort.abort_requested();
}
void stop(sstring reason) noexcept;
sstables::compaction_stopped_exception make_compaction_stopped_exception() const;
std::string describe() const;
};
class sstables_task : public task {
protected:
std::vector<sstables::shared_sstable> _sstables;
void set_sstables(std::vector<sstables::shared_sstable> new_sstables);
sstables::shared_sstable consume_sstable();
public:
explicit sstables_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type compaction_type, sstring desc, std::vector<sstables::shared_sstable> sstables)
: task(mgr, t, compaction_type, std::move(desc))
{
set_sstables(std::move(sstables));
}
virtual ~sstables_task();
};
class major_compaction_task;
class custom_compaction_task;
class regular_compaction_task;
class offstrategy_compaction_task;
class rewrite_sstables_compaction_task;
class validate_sstables_compaction_task;
class compaction_manager_test_task;
private:
// compaction manager may have N fibers to allow parallel compaction per shard.
std::list<lw_shared_ptr<task>> _tasks;
std::list<shared_ptr<task>> _tasks;
// Possible states in which the compaction manager can be found.
//
@@ -170,7 +272,9 @@ private:
class strategy_control;
std::unique_ptr<strategy_control> _strategy_control;
private:
future<> stop_tasks(std::vector<lw_shared_ptr<task>> tasks, sstring reason);
future<> perform_task(shared_ptr<task>);
future<> stop_tasks(std::vector<shared_ptr<task>> tasks, sstring reason);
// Return the largest fan-in of currently running compactions
unsigned current_compaction_fan_in_threshold() const;
@@ -198,18 +302,12 @@ private:
// throws std::out_of_range exception if not found.
compaction_state& get_compaction_state(replica::table* t);
// Return true if compaction manager and task weren't asked to stop.
inline bool can_proceed(const lw_shared_ptr<task>& task);
inline future<> put_task_to_sleep(lw_shared_ptr<task>& task);
// Compaction manager stop itself if it finds an storage I/O error which results in
// stop of transportation services. It cannot make progress anyway.
// Returns true if error is judged fatal, and compaction task must be stopped
inline bool maybe_stop_on_error(std::exception_ptr err, bool can_retry);
// Return true if compaction manager is enabled and
// table still exists and compaction is not disabled for the table.
inline bool can_proceed(replica::table* t) const;
void postponed_compactions_reevaluation();
void reevaluate_postponed_compactions();
void reevaluate_postponed_compactions() noexcept;
// Postpone compaction for a table that couldn't be executed due to ongoing
// similar-sized compaction.
void postpone_compaction_for_table(replica::table* t);
@@ -277,7 +375,7 @@ public:
// parameter type is the compaction type the operation can most closely be
// associated with, use compaction_type::Compaction, if none apply.
// parameter job is a function that will carry the operation
future<> run_custom_job(replica::table* t, sstables::compaction_type type, noncopyable_function<future<>(sstables::compaction_data&)> job);
future<> run_custom_job(replica::table* t, sstables::compaction_type type, const char *desc, noncopyable_function<future<>(sstables::compaction_data&)> job);
// Run a function with compaction temporarily disabled for a table T.
future<> run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func);
@@ -299,8 +397,8 @@ public:
// Returns true if table has an ongoing compaction, running on its behalf
bool has_table_ongoing_compaction(const replica::table* t) const {
return std::any_of(_tasks.begin(), _tasks.end(), [t] (const lw_shared_ptr<task>& task) {
return task->compacting_table == t && task->compaction_running;
return std::any_of(_tasks.begin(), _tasks.end(), [t] (const shared_ptr<task>& task) {
return task->compacting_table() == t && task->compaction_running();
});
};
@@ -336,3 +434,5 @@ public:
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s);
std::ostream& operator<<(std::ostream& os, compaction_manager::task::state s);
std::ostream& operator<<(std::ostream& os, const compaction_manager::task& task);

View File

@@ -352,7 +352,7 @@ future<uint64_t> sstable_directory::reshape(compaction_manager& cm, replica::tab
desc.creator = creator;
return cm.run_custom_job(&table, compaction_type::Reshape, [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable {
return cm.run_custom_job(&table, compaction_type::Reshape, "Reshape compaction", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable {
return sstables::compact_sstables(std::move(desc), info, table.as_table_state()).then([this, sstlist = std::move(sstlist)] (sstables::compaction_result result) mutable {
return remove_input_sstables_from_reshaping(std::move(sstlist)).then([this, new_sstables = std::move(result.new_sstables)] () mutable {
return collect_output_sstables_from_reshaping(std::move(new_sstables));
@@ -407,7 +407,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager&
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return cm.run_custom_job(&table, compaction_type::Reshard, [this, iop, &cm, &table, creator, &sstlist] (sstables::compaction_data& info) {
return cm.run_custom_job(&table, compaction_type::Reshard, "Reshard compaction", [this, iop, &cm, &table, creator, &sstlist] (sstables::compaction_data& info) {
sstables::compaction_descriptor desc(sstlist, {}, iop);
desc.options = sstables::compaction_type_options::make_reshard();
desc.creator = std::move(creator);

View File

@@ -3278,15 +3278,9 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
// register partial sstable run
auto cm_test = compaction_manager_test(*cm);
auto& cdata = cm_test.register_compaction(partial_sstable_run_identifier, cf.get());
auto deregister_compaction = defer([&] () noexcept {
cm_test.deregister_compaction(cdata);
});
cf->compact_all_sstables().get();
deregister_compaction.cancel();
cm_test.deregister_compaction(cdata);
cm_test.run(partial_sstable_run_identifier, cf.get(), [cf] (sstables::compaction_data&) {
return cf->compact_all_sstables();
}).get();
// make sure partial sstable run has none of its fragments compacted.
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
@@ -3675,7 +3669,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
// no compactions done yet
auto& ss = cm.get_stats();
BOOST_REQUIRE(ss.pending_tasks == 0 && ss.active_tasks == 0 && ss.completed_tasks == 0);
BOOST_REQUIRE(cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0 && ss.completed_tasks == 0);
// auto compaction is enabled by default
BOOST_REQUIRE(!cf->is_auto_compaction_disabled_by_user());
// disable auto compaction by user
@@ -3706,7 +3700,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
auto stop_cf = deferred_stop(*cf);
cf->trigger_compaction();
cf->get_compaction_manager().submit(cf.get());
BOOST_REQUIRE(ss.pending_tasks == 0 && ss.active_tasks == 0 && ss.completed_tasks == 0);
BOOST_REQUIRE(cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0 && ss.completed_tasks == 0);
// enable auto compaction
cf->enable_auto_compaction();
// check enabled
@@ -3714,7 +3708,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
// trigger background compaction
cf->trigger_compaction();
// wait until compaction finished
do_until([&ss] { return ss.pending_tasks == 0 && ss.active_tasks == 0; }, [] {
do_until([&cm] { return cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0; }, [] {
return sleep(std::chrono::milliseconds(100));
}).wait();
// test compaction successfully finished

View File

@@ -161,13 +161,14 @@ future<compaction_result> compact_sstables(sstables::compaction_descriptor descr
return creator();
};
descriptor.replacer = std::move(replacer);
auto& cm = cf.get_compaction_manager();
auto& cdata = compaction_manager_test(cm).register_compaction(descriptor.run_identifier, &cf);
return sstables::compact_sstables(std::move(descriptor), cdata, cf.as_table_state()).then([&cdata, &cm] (sstables::compaction_result res) {
return res;
}).finally([&cm, &cdata] {
compaction_manager_test(cm).deregister_compaction(cdata);
auto cmt = compaction_manager_test(cf.get_compaction_manager());
sstables::compaction_result ret;
co_await cmt.run(descriptor.run_identifier, &cf, [&] (sstables::compaction_data& cdata) {
return sstables::compact_sstables(std::move(descriptor), cdata, cf.as_table_state()).then([&] (sstables::compaction_result res) {
ret = std::move(res);
});
});
co_return ret;
}
std::vector<std::pair<sstring, dht::token>> token_generation_for_current_shard(unsigned tokens_to_generate) {
@@ -188,23 +189,45 @@ future<shared_sstable> test_env::reusable_sst(schema_ptr schema, sstring dir, un
throw sst_not_found(dir, generation);
}
sstables::compaction_data& compaction_manager_test::register_compaction(utils::UUID output_run_id, replica::column_family* cf) {
auto task = make_lw_shared<compaction_manager::task>(cf, sstables::compaction_type::Compaction, _cm._compaction_state[cf]);
testlog.debug("compaction_manager_test: register_compaction: task {} cf={}", fmt::ptr(task.get()), fmt::ptr(cf));
task->compaction_running = true;
task->compaction_data = compaction_manager::create_compaction_data();
task->output_run_identifier = std::move(output_run_id);
class compaction_manager::compaction_manager_test_task : public compaction_manager::task {
utils::UUID _run_id;
noncopyable_function<future<> (sstables::compaction_data&)> _job;
public:
compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, utils::UUID run_id, noncopyable_function<future<> (sstables::compaction_data&)> job)
: compaction_manager::task(cm, cf, sstables::compaction_type::Compaction, "Test compaction")
, _run_id(run_id)
, _job(std::move(job))
{ }
protected:
virtual future<> do_run() override {
setup_new_compaction(_run_id);
return _job(_compaction_data);
}
};
future<> compaction_manager_test::run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job) {
auto task = make_shared<compaction_manager::compaction_manager_test_task>(_cm, cf, output_run_id, std::move(job));
auto& cdata = register_compaction(task);
return task->run().finally([this, &cdata] {
deregister_compaction(cdata);
});
}
sstables::compaction_data& compaction_manager_test::register_compaction(shared_ptr<compaction_manager::task> task) {
testlog.debug("compaction_manager_test: register_compaction uuid={}: {}", task->compaction_data().compaction_uuid, *task);
_cm._tasks.push_back(task);
return task->compaction_data;
return task->compaction_data();
}
void compaction_manager_test::deregister_compaction(const sstables::compaction_data& c) {
auto it = boost::find_if(_cm._tasks, [&c] (auto& task) { return task->compaction_data.compaction_uuid == c.compaction_uuid; });
auto it = boost::find_if(_cm._tasks, [&c] (auto& task) { return task->compaction_data().compaction_uuid == c.compaction_uuid; });
if (it != _cm._tasks.end()) {
auto task = *it;
testlog.debug("compaction_manager_test: deregister_compaction uuid={}: task {} table={}", c.compaction_uuid, fmt::ptr(task.get()), fmt::ptr(task->compacting_table));
testlog.debug("compaction_manager_test: deregister_compaction uuid={}: {}", c.compaction_uuid, *task);
_cm._tasks.erase(it);
} else {
testlog.debug("compaction_manager_test: deregister_compaction uuid={}: task not found", c.compaction_uuid);
testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", c.compaction_uuid);
}
}

View File

@@ -345,7 +345,9 @@ class compaction_manager_test {
public:
explicit compaction_manager_test(compaction_manager& cm) noexcept : _cm(cm) {}
sstables::compaction_data& register_compaction(utils::UUID output_run_id, replica::column_family* cf);
future<> run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job);
private:
sstables::compaction_data& register_compaction(shared_ptr<compaction_manager::task> task);
void deregister_compaction(const sstables::compaction_data& c);
};