Merge 'Make compaction manager switch to table abstraction ' from Raphael "Raph" Carvalho

This work gets us a step closer to compaction groups.

Everything in compaction layer but compaction_manager was converted to table_state.

After this work, we can start implementing compaction groups, as each group will be represented by its own table_state. User-triggered operations that span the entire table, not only a group, can be done by calling the manager operation on behalf of each group and then merging the results, if any.

Closes #11028

* github.com:scylladb/scylla:
  compaction: remove forward declaration of replica::table
  compaction_manager: make add() and remove() switch to table_state
  compaction_manager: make run_custom_job() switch to table_state
  compaction_manager: major: switch to table_state
  compaction_manager: scrub: switch to table_state
  compaction_manager: upgrade: switch to table_state
  compaction: table_state: add get_sstables_manager()
  compaction_manager: cleanup: switch to table_state
  compaction_manager: offstrategy: switch to table_state()
  compaction_manager: rewrite_sstables(): switch to table_state
  compaction_manager: make run_with_compaction_disabled() switch to table_state
  compaction_manager: compaction_reenabler: switch to table_state
  compaction_manager: make submit(T) switch to table_state
  compaction_manager: task: switch to table_state
  compaction: table_state: Add is_auto_compaction_disabled_by_user()
  compaction: table_state: Add on_compaction_completion()
  compaction: table_state: Add make_sstable()
  compaction_manager: make can_proceed switch to table_state
  compaction_manager: make stop compaction procedures switch to table_state
  compaction_manager: make get_compactions() switch to table_state
  compaction_manager: change task::update_history() to use table_state instead
  compaction_manager: make can_register_compaction() switch to table_state
  compaction_manager: make get_candidates() switch to table_state
  compaction_manager: make propagate_replacement() switch to table_state
  compaction: Move table::in_strategy_sstables() and switch to table_state
  compaction: table_state: Add maintenance sstable set
  compaction_manager: make has_table_ongoing_compaction() switch to table_state
  compaction_manager: make compaction_disabled() switch to table_state
  compaction_manager: switch to table_state for mapping of compaction_state
  compaction_manager: move task ctor into source
This commit is contained in:
Botond Dénes
2022-07-18 15:18:29 +03:00
16 changed files with 254 additions and 205 deletions

View File

@@ -119,7 +119,7 @@ void set_compaction_manager(http_context& ctx, routes& r) {
auto& cm = db.get_compaction_manager();
return parallel_for_each(table_names, [&db, &cm, &ks_name, type] (sstring& table_name) {
auto& t = db.find_column_family(ks_name, table_name);
return cm.stop_compaction(type, &t);
return cm.stop_compaction(type, &t.as_table_state());
});
});
co_return json_void();

View File

@@ -645,7 +645,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
// as a table can be dropped during loop below, let's find it before issuing the cleanup request.
for (auto& id : table_ids) {
replica::table& t = db.find_column_family(id);
co_await cm.perform_cleanup(db, &t);
co_await cm.perform_cleanup(db, t.as_table_state());
}
co_return;
}).then([]{
@@ -672,7 +672,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return do_for_each(column_families, [=, &db](sstring cfname) {
auto& cm = db.get_compaction_manager();
auto& cf = db.find_column_family(keyspace, cfname);
return cm.perform_sstable_upgrade(db, &cf, exclude_current_version);
return cm.perform_sstable_upgrade(db, cf.as_table_state(), exclude_current_version);
});
}).then([]{
return make_ready_future<json::json_return_type>(0);
@@ -1408,7 +1408,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
return do_for_each(column_families, [=, &db](sstring cfname) {
auto& cm = db.get_compaction_manager();
auto& cf = db.find_column_family(keyspace, cfname);
return cm.perform_sstable_scrub(&cf, opts);
return cm.perform_sstable_scrub(cf.as_table_state(), opts);
});
});
}).then([]{

View File

@@ -159,9 +159,9 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const {
return std::min(unsigned(32), largest_fan_in);
}
bool compaction_manager::can_register_compaction(replica::table* t, int weight, unsigned fan_in) const {
bool compaction_manager::can_register_compaction(compaction::table_state& t, int weight, unsigned fan_in) const {
// Only one weight is allowed if parallel compaction is disabled.
if (!t->get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t)) {
if (!t.get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t)) {
return false;
}
// Weightless compaction doesn't have to be serialized, and won't dillute overall efficiency.
@@ -195,9 +195,16 @@ void compaction_manager::deregister_weight(int weight) {
reevaluate_postponed_compactions();
}
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const replica::table& t) {
std::vector<sstables::shared_sstable> in_strategy_sstables(compaction::table_state& table_s) {
auto sstables = table_s.main_sstable_set().all();
return boost::copy_range<std::vector<sstables::shared_sstable>>(*sstables | boost::adaptors::filtered([] (const sstables::shared_sstable& sst) {
return sstables::is_eligible_for_compaction(sst);
}));
}
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(compaction::table_state& t) {
std::vector<sstables::shared_sstable> candidates;
candidates.reserve(t.sstables_count());
candidates.reserve(t.main_sstable_set().all()->size());
// prevents sstables that belongs to a partial run being generated by ongoing compaction from being
// selected for compaction, which could potentially result in wrong behavior.
auto partial_run_identifiers = boost::copy_range<std::unordered_set<utils::UUID>>(_tasks
@@ -206,7 +213,7 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const r
auto& cs = t.get_compaction_strategy();
// Filter out sstables that are being compacted.
for (auto& sst : t.in_strategy_sstables()) {
for (auto& sst : in_strategy_sstables(t)) {
if (_compacting_sstables.contains(sst)) {
continue;
}
@@ -252,7 +259,7 @@ private:
virtual void replace_sstables(std::vector<sstables::shared_sstable> old_ssts, std::vector<sstables::shared_sstable> new_ssts) override {}
};
compaction_manager::compaction_state& compaction_manager::get_compaction_state(replica::table* t) {
compaction_manager::compaction_state& compaction_manager::get_compaction_state(compaction::table_state* t) {
try {
return _compaction_state.at(t);
} catch (std::out_of_range&) {
@@ -261,6 +268,15 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(r
}
}
compaction_manager::task::task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc)
: _cm(mgr)
, _compacting_table(t)
, _compaction_state(_cm.get_compaction_state(t))
, _type(type)
, _gate_holder(_compaction_state.gate.hold())
, _description(std::move(desc))
{}
future<> compaction_manager::perform_task(shared_ptr<compaction_manager::task> task) {
_tasks.push_back(task);
auto unregister_task = defer([this, task] {
@@ -303,9 +319,9 @@ future<> compaction_manager::task::compact_sstables_and_update_history(sstables:
}
}
future<sstables::compaction_result> compaction_manager::task::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, can_purge_tombstones can_purge) {
replica::table& t = *_compacting_table;
compaction::table_state& t = *_compacting_table;
if (can_purge) {
descriptor.enable_garbage_collection(t.get_sstable_set());
descriptor.enable_garbage_collection(t.main_sstable_set());
}
descriptor.creator = [&t] (shard_id dummy) {
auto sst = t.make_sstable();
@@ -313,27 +329,27 @@ future<sstables::compaction_result> compaction_manager::task::compact_sstables(s
};
descriptor.replacer = [this, &t, release_exhausted] (sstables::compaction_completion_desc desc) {
t.get_compaction_strategy().notify_completion(desc.old_sstables, desc.new_sstables);
_cm.propagate_replacement(&t, desc.old_sstables, desc.new_sstables);
_cm.propagate_replacement(t, desc.old_sstables, desc.new_sstables);
auto old_sstables = desc.old_sstables;
t.on_compaction_completion(std::move(desc));
t.on_compaction_completion(std::move(desc), sstables::offstrategy::no).get();
// Calls compaction manager's task for this compaction to release reference to exhausted SSTables.
if (release_exhausted) {
release_exhausted(old_sstables);
}
};
co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t.as_table_state());
co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t);
}
future<> compaction_manager::task::update_history(replica::table& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
future<> compaction_manager::task::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) {
auto ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(res.ended_at.time_since_epoch());
co_return co_await t.as_table_state().update_compaction_history(cdata.compaction_uuid, t.schema()->ks_name(), t.schema()->cf_name(), ended_at,
co_return co_await t.update_compaction_history(cdata.compaction_uuid, t.schema()->ks_name(), t.schema()->cf_name(), ended_at,
res.start_size, res.end_size);
}
class compaction_manager::major_compaction_task : public compaction_manager::task {
public:
major_compaction_task(compaction_manager& mgr, replica::table* t)
major_compaction_task(compaction_manager& mgr, compaction::table_state* t)
: task(mgr, t, sstables::compaction_type::Compaction, "Major compaction")
{}
@@ -353,9 +369,9 @@ protected:
// candidates are sstables that aren't being operated on by other compaction types.
// those are eligible for major compaction.
auto* t = _compacting_table;
compaction::table_state* t = _compacting_table;
sstables::compaction_strategy cs = t->get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), _cm.get_candidates(*t));
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t));
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
auto release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting.release_compacting(exhausted_sstables);
@@ -377,18 +393,18 @@ protected:
}
};
future<> compaction_manager::perform_major_compaction(replica::table* t) {
future<> compaction_manager::perform_major_compaction(compaction::table_state& t) {
if (_state != state::enabled) {
return make_ready_future<>();
}
return perform_task(make_shared<major_compaction_task>(*this, t));
return perform_task(make_shared<major_compaction_task>(*this, &t));
}
class compaction_manager::custom_compaction_task : public compaction_manager::task {
noncopyable_function<future<>(sstables::compaction_data&)> _job;
public:
custom_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc, noncopyable_function<future<>(sstables::compaction_data&)> job)
custom_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc, noncopyable_function<future<>(sstables::compaction_data&)> job)
: task(mgr, t, type, std::move(desc))
, _job(std::move(job))
{}
@@ -414,17 +430,17 @@ protected:
}
};
future<> compaction_manager::run_custom_job(replica::table* t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job) {
future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job) {
if (_state != state::enabled) {
return make_ready_future<>();
}
return perform_task(make_shared<custom_compaction_task>(*this, t, type, desc, std::move(job)));
return perform_task(make_shared<custom_compaction_task>(*this, &t, type, desc, std::move(job)));
}
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t)
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, compaction::table_state& t)
: _cm(cm)
, _table(t)
, _table(&t)
, _compaction_state(cm.get_compaction_state(_table))
, _holder(_compaction_state.gate.hold())
{
@@ -446,7 +462,7 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() {
cmlog.debug("Reenabling compaction for {}.{}",
_table->schema()->ks_name(), _table->schema()->cf_name());
try {
_cm.submit(_table);
_cm.submit(*_table);
} catch (...) {
cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}",
_table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception());
@@ -455,14 +471,14 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() {
}
future<compaction_manager::compaction_reenabler>
compaction_manager::stop_and_disable_compaction(replica::table* t) {
compaction_manager::stop_and_disable_compaction(compaction::table_state& t) {
compaction_reenabler cre(*this, t);
co_await stop_ongoing_compactions("user-triggered operation", t);
co_await stop_ongoing_compactions("user-triggered operation", &t);
co_return cre;
}
future<>
compaction_manager::run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func) {
compaction_manager::run_with_compaction_disabled(compaction::table_state& t, std::function<future<> ()> func) {
compaction_reenabler cre = co_await stop_and_disable_compaction(t);
co_await func();
@@ -700,7 +716,7 @@ void compaction_manager::enable() {
std::function<void()> compaction_manager::compaction_submission_callback() {
return [this] () mutable {
for (auto& e: _compaction_state) {
submit(e.first);
submit(*e.first);
}
};
}
@@ -717,7 +733,7 @@ void compaction_manager::postponed_compactions_reevaluation() {
for (auto& t : postponed) {
auto s = t->schema();
cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t));
submit(t);
submit(*t);
}
} catch (...) {
_postponed = std::move(postponed);
@@ -731,7 +747,7 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept {
_postponed_reevaluation.signal();
}
void compaction_manager::postpone_compaction_for_table(replica::table* t) {
void compaction_manager::postpone_compaction_for_table(compaction::table_state* t) {
_postponed.insert(t);
}
@@ -756,7 +772,7 @@ future<> compaction_manager::stop_tasks(std::vector<shared_ptr<task>> tasks, sst
});
}
future<> compaction_manager::stop_ongoing_compactions(sstring reason, replica::table* t, std::optional<sstables::compaction_type> type_opt) {
future<> compaction_manager::stop_ongoing_compactions(sstring reason, compaction::table_state* t, std::optional<sstables::compaction_type> type_opt) {
auto ongoing_compactions = get_compactions(t).size();
auto tasks = boost::copy_range<std::vector<shared_ptr<task>>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) {
return (!t || task->compacting_table() == t) && (!type_opt || task->type() == *type_opt);
@@ -821,7 +837,7 @@ void compaction_manager::do_stop() noexcept {
}
}
inline bool compaction_manager::can_proceed(replica::table* t) const {
inline bool compaction_manager::can_proceed(compaction::table_state* t) const {
return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled();
}
@@ -868,8 +884,8 @@ future<stop_iteration> compaction_manager::task::maybe_retry(std::exception_ptr
class compaction_manager::regular_compaction_task : public compaction_manager::task {
public:
regular_compaction_task(compaction_manager& mgr, replica::table* t)
: task(mgr, t, sstables::compaction_type::Compaction, "Compaction")
regular_compaction_task(compaction_manager& mgr, compaction::table_state& t)
: task(mgr, &t, sstables::compaction_type::Compaction, "Compaction")
{}
protected:
virtual future<> do_run() override {
@@ -886,16 +902,16 @@ protected:
co_return;
}
replica::table& t = *_compacting_table;
compaction::table_state& t = *_compacting_table;
sstables::compaction_strategy cs = t.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t));
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t, _cm.get_strategy_control(), _cm.get_candidates(t));
int weight = calculate_weight(descriptor);
if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) {
cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user());
co_return;
}
if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) {
if (!_cm.can_register_compaction(t, weight, descriptor.fan_in())) {
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
switch_state(state::postponed);
@@ -947,8 +963,8 @@ protected:
}
};
void compaction_manager::submit(replica::table* t) {
if (_state != state::enabled || t->is_auto_compaction_disabled_by_user()) {
void compaction_manager::submit(compaction::table_state& t) {
if (_state != state::enabled || t.is_auto_compaction_disabled_by_user()) {
return;
}
@@ -960,7 +976,7 @@ void compaction_manager::submit(replica::table* t) {
class compaction_manager::offstrategy_compaction_task : public compaction_manager::task {
bool _performed = false;
public:
offstrategy_compaction_task(compaction_manager& mgr, replica::table* t)
offstrategy_compaction_task(compaction_manager& mgr, compaction::table_state* t)
: task(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction")
{}
@@ -980,7 +996,7 @@ public:
// by the fact that off-strategy is serialized across all tables, meaning that the
// actual requirement is the size of the largest table's maintenance set.
replica::table& t = *_compacting_table;
compaction::table_state& t = *_compacting_table;
const auto& maintenance_sstables = t.maintenance_sstable_set();
const auto old_sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(*maintenance_sstables.all());
@@ -1008,7 +1024,7 @@ public:
};
auto input = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(desc->sstables);
auto ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t.as_table_state());
auto ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t);
_performed = true;
// update list of reshape candidates without input but with output added to it
@@ -1036,7 +1052,7 @@ public:
.old_sstables = std::move(old_sstables),
.new_sstables = std::move(reshape_candidates)
};
co_await t.update_sstable_lists_on_off_strategy_completion(std::move(completion_desc));
co_await t.on_compaction_completion(std::move(completion_desc), sstables::offstrategy::yes);
cleanup_new_unused_sstables_on_failure.cancel();
// By marking input sstables for deletion instead, the ones which require view building will stay in the staging
@@ -1063,7 +1079,7 @@ protected:
std::exception_ptr ex;
try {
replica::table& t = *_compacting_table;
compaction::table_state& t = *_compacting_table;
auto maintenance_sstables = t.maintenance_sstable_set().all();
cmlog.info("Starting off-strategy compaction for {}.{}, {} candidates were found",
t.schema()->ks_name(), t.schema()->cf_name(), maintenance_sstables->size());
@@ -1083,11 +1099,11 @@ protected:
}
};
future<bool> compaction_manager::perform_offstrategy(replica::table* t) {
future<bool> compaction_manager::perform_offstrategy(compaction::table_state& t) {
if (_state != state::enabled) {
co_return false;
}
auto task = make_shared<offstrategy_compaction_task>(*this, t);
auto task = make_shared<offstrategy_compaction_task>(*this, &t);
co_await perform_task(task);
co_return task->performed();
}
@@ -1098,7 +1114,7 @@ class compaction_manager::rewrite_sstables_compaction_task : public compaction_m
can_purge_tombstones _can_purge;
public:
rewrite_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options,
rewrite_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options,
std::vector<sstables::shared_sstable> sstables, compacting_sstable_registration compacting,
can_purge_tombstones can_purge)
: sstables_task(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables))
@@ -1161,7 +1177,7 @@ private:
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, compaction_manager::task>
future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
future<> compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
if (_state != state::enabled) {
co_return;
}
@@ -1185,16 +1201,16 @@ future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstabl
return a->data_size() > b->data_size();
});
});
co_await perform_task(seastar::make_shared<TaskType>(*this, t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
co_await perform_task(seastar::make_shared<TaskType>(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
}
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
future<> compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
return perform_task_on_all_files<rewrite_sstables_compaction_task>(t, std::move(options), std::move(get_func), can_purge);
}
class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task {
public:
validate_sstables_compaction_task(compaction_manager& mgr, replica::table* t, std::vector<sstables::shared_sstable> sstables)
validate_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, std::vector<sstables::shared_sstable> sstables)
: sstables_task(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables))
{}
@@ -1220,7 +1236,7 @@ private:
sstables::compaction_descriptor::default_max_sstable_bytes,
sst->run_identifier(),
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
co_await sstables::compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state());
co_await sstables::compact_sstables(std::move(desc), _compaction_data, *_compacting_table);
} catch (sstables::compaction_stopped_exception&) {
// ignore, will be handled by can_proceed()
} catch (storage_io_error& e) {
@@ -1238,13 +1254,20 @@ private:
}
};
future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table* t) {
static std::vector<sstables::shared_sstable> get_all_sstables(compaction::table_state& t) {
auto s = boost::copy_range<std::vector<sstables::shared_sstable>>(*t.main_sstable_set().all());
auto maintenance_set = t.maintenance_sstable_set().all();
s.insert(s.end(), maintenance_set->begin(), maintenance_set->end());
return s;
}
future<> compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) {
if (_state != state::enabled) {
return make_ready_future<>();
}
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
auto all_sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(*t->get_sstables());
return perform_task(seastar::make_shared<validate_sstables_compaction_task>(*this, t, std::move(all_sstables)));
auto all_sstables = get_all_sstables(t);
return perform_task(seastar::make_shared<validate_sstables_compaction_task>(*this, &t, std::move(all_sstables)));
}
class compaction_manager::cleanup_sstables_compaction_task : public compaction_manager::task {
@@ -1252,12 +1275,12 @@ class compaction_manager::cleanup_sstables_compaction_task : public compaction_m
compacting_sstable_registration _compacting;
std::vector<sstables::compaction_descriptor> _pending_cleanup_jobs;
public:
cleanup_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options,
cleanup_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options,
std::vector<sstables::shared_sstable> candidates, compacting_sstable_registration compacting)
: task(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
, _cleanup_options(std::move(options))
, _compacting(std::move(compacting))
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), std::move(candidates)))
, _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(*t, std::move(candidates)))
{
// Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs
// will have more space available released by previous jobs.
@@ -1339,23 +1362,23 @@ bool needs_cleanup(const sstables::shared_sstable& sst,
return true;
}
future<> compaction_manager::perform_cleanup(replica::database& db, replica::table* t) {
auto check_for_cleanup = [this, t] {
return boost::algorithm::any_of(_tasks, [t] (auto& task) {
return task->compacting_table() == t && task->type() == sstables::compaction_type::Cleanup;
future<> compaction_manager::perform_cleanup(replica::database& db, compaction::table_state& t) {
auto check_for_cleanup = [this, &t] {
return boost::algorithm::any_of(_tasks, [&t] (auto& task) {
return task->compacting_table() == &t && task->type() == sstables::compaction_type::Cleanup;
});
};
if (check_for_cleanup()) {
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
t->schema()->ks_name(), t->schema()->cf_name()));
t.schema()->ks_name(), t.schema()->cf_name()));
}
auto sorted_owned_ranges = db.get_keyspace_local_ranges(t->schema()->ks_name());
auto get_sstables = [this, &db, t, sorted_owned_ranges] () -> future<std::vector<sstables::shared_sstable>> {
return seastar::async([this, &db, t, sorted_owned_ranges = std::move(sorted_owned_ranges)] {
auto schema = t->schema();
auto sorted_owned_ranges = db.get_keyspace_local_ranges(t.schema()->ks_name());
auto get_sstables = [this, &db, &t, sorted_owned_ranges] () -> future<std::vector<sstables::shared_sstable>> {
return seastar::async([this, &db, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] {
auto schema = t.schema();
auto sstables = std::vector<sstables::shared_sstable>{};
const auto candidates = get_candidates(*t);
const auto candidates = get_candidates(t);
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) {
seastar::thread::maybe_yield();
return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema);
@@ -1369,13 +1392,13 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab
}
// Submit a table to be upgraded and wait for its termination.
future<> compaction_manager::perform_sstable_upgrade(replica::database& db, replica::table* t, bool exclude_current_version) {
auto get_sstables = [this, &db, t, exclude_current_version] {
future<> compaction_manager::perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version) {
auto get_sstables = [this, &db, &t, exclude_current_version] {
std::vector<sstables::shared_sstable> tables;
auto last_version = t->get_sstables_manager().get_highest_supported_format();
auto last_version = t.get_sstables_manager().get_highest_supported_format();
for (auto& sst : get_candidates(*t)) {
for (auto& sst : get_candidates(t)) {
// if we are a "normal" upgrade, we only care about
// tables with older versions, but potentially
// we are to actually rewrite everything. (-a)
@@ -1393,18 +1416,18 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, repl
// Note that we potentially could be doing multiple
// upgrades here in parallel, but that is really the users
// problem.
return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(t->schema()->ks_name())), std::move(get_sstables));
return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(t.schema()->ks_name())), std::move(get_sstables));
}
// Submit a table to be scrubbed and wait for its termination.
future<> compaction_manager::perform_sstable_scrub(replica::table* t, sstables::compaction_type_options::scrub opts) {
future<> compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) {
auto scrub_mode = opts.operation_mode;
if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) {
return perform_sstable_scrub_validate_mode(t);
}
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), [this, t, opts] {
auto all_sstables = t->get_sstable_set().all();
std::vector<sstables::shared_sstable> sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(*all_sstables
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), [this, &t, opts] {
auto all_sstables = get_all_sstables(t);
std::vector<sstables::shared_sstable> sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(all_sstables
| boost::adaptors::filtered([&opts] (const sstables::shared_sstable& sst) {
if (sst->requires_view_building()) {
return false;
@@ -1422,16 +1445,16 @@ future<> compaction_manager::perform_sstable_scrub(replica::table* t, sstables::
}, can_purge_tombstones::no);
}
void compaction_manager::add(replica::table* t) {
auto [_, inserted] = _compaction_state.insert({t, compaction_state{}});
void compaction_manager::add(compaction::table_state& t) {
auto [_, inserted] = _compaction_state.insert({&t, compaction_state{}});
if (!inserted) {
auto s = t->schema();
on_internal_error(cmlog, format("compaction_state for table {}.{} [{}] already exists", s->ks_name(), s->cf_name(), fmt::ptr(t)));
auto s = t.schema();
on_internal_error(cmlog, format("compaction_state for table {}.{} [{}] already exists", s->ks_name(), s->cf_name(), fmt::ptr(&t)));
}
}
future<> compaction_manager::remove(replica::table* t) {
auto handle = _compaction_state.extract(t);
future<> compaction_manager::remove(compaction::table_state& t) {
auto handle = _compaction_state.extract(&t);
if (!handle.empty()) {
auto& c_state = handle.mapped();
@@ -1439,10 +1462,10 @@ future<> compaction_manager::remove(replica::table* t) {
// We need to guarantee that a task being stopped will not retry to compact
// a table being removed.
// The requirement above is provided by stop_ongoing_compactions().
_postponed.erase(t);
_postponed.erase(&t);
// Wait for the termination of an ongoing compaction on table T, if any.
co_await stop_ongoing_compactions("table removal", t);
co_await stop_ongoing_compactions("table removal", &t);
// Wait for all functions running under gate to terminate.
co_await c_state.gate.close();
@@ -1451,7 +1474,7 @@ future<> compaction_manager::remove(replica::table* t) {
auto found = false;
sstring msg;
for (auto& task : _tasks) {
if (task->compacting_table() == t) {
if (task->compacting_table() == &t) {
if (!msg.empty()) {
msg += "\n";
}
@@ -1465,7 +1488,7 @@ future<> compaction_manager::remove(replica::table* t) {
#endif
}
const std::vector<sstables::compaction_info> compaction_manager::get_compactions(replica::table* t) const {
const std::vector<sstables::compaction_info> compaction_manager::get_compactions(compaction::table_state* t) const {
auto to_info = [] (const shared_ptr<task>& task) {
sstables::compaction_info ret;
ret.compaction_uuid = task->compaction_data().compaction_uuid;
@@ -1482,7 +1505,17 @@ const std::vector<sstables::compaction_info> compaction_manager::get_compactions
}) | boost::adaptors::transformed(to_info));
}
future<> compaction_manager::stop_compaction(sstring type, replica::table* table) {
bool compaction_manager::has_table_ongoing_compaction(const compaction::table_state& t) const {
return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr<task>& task) {
return task->compacting_table() == &t && task->compaction_running();
});
};
bool compaction_manager::compaction_disabled(compaction::table_state& t) const {
return _compaction_state.contains(&t) && _compaction_state.at(&t).compaction_disabled();
}
future<> compaction_manager::stop_compaction(sstring type, compaction::table_state* table) {
sstables::compaction_type target_type;
try {
target_type = sstables::to_compaction_type(type);
@@ -1501,10 +1534,10 @@ future<> compaction_manager::stop_compaction(sstring type, replica::table* table
return stop_ongoing_compactions("user request", table, target_type);
}
void compaction_manager::propagate_replacement(replica::table* t,
void compaction_manager::propagate_replacement(compaction::table_state& t,
const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
for (auto& task : _tasks) {
if (task->compacting_table() == t && task->compaction_running()) {
if (task->compacting_table() == &t && task->compaction_running()) {
task->compaction_data().pending_replacements.push_back({ removed, added });
}
}

View File

@@ -34,10 +34,6 @@
#include "seastarx.hh"
#include "sstables/exceptions.hh"
namespace replica {
class table;
}
class compacting_sstable_registration;
// Compaction manager provides facilities to submit and track compaction jobs on
@@ -98,7 +94,7 @@ public:
protected:
compaction_manager& _cm;
replica::table* _compacting_table = nullptr;
compaction::table_state* _compacting_table = nullptr;
compaction_state& _compaction_state;
sstables::compaction_data _compaction_data;
state _state = state::none;
@@ -112,14 +108,7 @@ public:
sstring _description;
public:
explicit task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc)
: _cm(mgr)
, _compacting_table(t)
, _compaction_state(_cm.get_compaction_state(t))
, _type(type)
, _gate_holder(_compaction_state.gate.hold())
, _description(std::move(desc))
{}
explicit task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc);
task(task&&) = delete;
task(const task&) = delete;
@@ -153,14 +142,14 @@ public:
can_purge_tombstones can_purge = can_purge_tombstones::yes);
future<sstables::compaction_result> compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted,
can_purge_tombstones can_purge = can_purge_tombstones::yes);
future<> update_history(replica::table& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata);
future<> update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata);
bool should_update_history(sstables::compaction_type ct) {
return ct == sstables::compaction_type::Compaction;
}
public:
future<> run() noexcept;
const replica::table* compacting_table() const noexcept {
const compaction::table_state* compacting_table() const noexcept {
return _compacting_table;
}
@@ -214,7 +203,7 @@ public:
sstables::shared_sstable consume_sstable();
public:
explicit sstables_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type compaction_type, sstring desc, std::vector<sstables::shared_sstable> sstables)
explicit sstables_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type compaction_type, sstring desc, std::vector<sstables::shared_sstable> sstables)
: task(mgr, t, compaction_type, std::move(desc))
{
set_sstables(std::move(sstables));
@@ -264,12 +253,12 @@ private:
future<> _waiting_reevalution = make_ready_future<>();
condition_variable _postponed_reevaluation;
// tables that wait for compaction but had its submission postponed due to ongoing compaction.
std::unordered_set<replica::table*> _postponed;
std::unordered_set<compaction::table_state*> _postponed;
// tracks taken weights of ongoing compactions, only one compaction per weight is allowed.
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
std::unordered_set<int> _weight_tracker;
std::unordered_map<replica::table*, compaction_state> _compaction_state;
std::unordered_map<compaction::table_state*, compaction_state> _compaction_state;
// Purpose is to serialize all maintenance (non regular) compaction activity to reduce aggressiveness and space requirement.
// If the operation must be serialized with regular, then the per-table write lock must be taken.
@@ -308,7 +297,7 @@ private:
unsigned current_compaction_fan_in_threshold() const;
// Return true if compaction can be initiated
bool can_register_compaction(replica::table* t, int weight, unsigned fan_in) const;
bool can_register_compaction(compaction::table_state& t, int weight, unsigned fan_in) const;
// Register weight for a table. Do that only if can_register_weight()
// returned true.
void register_weight(int weight);
@@ -316,7 +305,7 @@ private:
void deregister_weight(int weight);
// Get candidates for compaction strategy, which are all sstables but the ones being compacted.
std::vector<sstables::shared_sstable> get_candidates(const replica::table& t);
std::vector<sstables::shared_sstable> get_candidates(compaction::table_state& t);
template <typename Iterator, typename Sentinel>
requires std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>
@@ -328,19 +317,19 @@ private:
// gets the table's compaction state
// throws std::out_of_range exception if not found.
compaction_state& get_compaction_state(replica::table* t);
compaction_state& get_compaction_state(compaction::table_state* t);
// Return true if compaction manager is enabled and
// table still exists and compaction is not disabled for the table.
inline bool can_proceed(replica::table* t) const;
inline bool can_proceed(compaction::table_state* t) const;
void postponed_compactions_reevaluation();
void reevaluate_postponed_compactions() noexcept;
// Postpone compaction for a table that couldn't be executed due to ongoing
// similar-sized compaction.
void postpone_compaction_for_table(replica::table* t);
void postpone_compaction_for_table(compaction::table_state* t);
future<> perform_sstable_scrub_validate_mode(replica::table* t);
future<> perform_sstable_scrub_validate_mode(compaction::table_state& t);
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;
@@ -348,16 +337,16 @@ private:
// by retrieving set of candidates only after all compactions for table T were stopped, if any.
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, task>
future<> perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func, Args... args);
future<> perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args);
future<> rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
future<> rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
// Stop all fibers, without waiting. Safe to be called multiple times.
void do_stop() noexcept;
future<> really_do_stop();
// Propagate replacement of sstables to all ongoing compaction of a given table
void propagate_replacement(replica::table* t, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added);
void propagate_replacement(compaction::table_state& t, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added);
// This constructor is suposed to only be used for testing so lets be more explicit
// about invoking it. Ref #10146
@@ -384,11 +373,11 @@ public:
future<> drain();
// Submit a table to be compacted.
void submit(replica::table* t);
void submit(compaction::table_state& t);
// Submit a table to be off-strategy compacted.
// Returns true iff off-strategy compaction was required and performed.
future<bool> perform_offstrategy(replica::table* t);
future<bool> perform_offstrategy(compaction::table_state& t);
// Submit a table to be cleaned up and wait for its termination.
//
@@ -397,16 +386,16 @@ public:
// Cleanup is about discarding keys that are no longer relevant for a
// given sstable, e.g. after node loses part of its token range because
// of a newly added node.
future<> perform_cleanup(replica::database& db, replica::table* t);
future<> perform_cleanup(replica::database& db, compaction::table_state& t);
// Submit a table to be upgraded and wait for its termination.
future<> perform_sstable_upgrade(replica::database& db, replica::table* t, bool exclude_current_version);
future<> perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version);
// Submit a table to be scrubbed and wait for its termination.
future<> perform_sstable_scrub(replica::table* t, sstables::compaction_type_options::scrub opts);
future<> perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts);
// Submit a table for major compaction.
future<> perform_major_compaction(replica::table* t);
future<> perform_major_compaction(compaction::table_state& t);
// Run a custom job for a given table, defined by a function
@@ -416,21 +405,21 @@ public:
// parameter type is the compaction type the operation can most closely be
// associated with, use compaction_type::Compaction, if none apply.
// parameter job is a function that will carry the operation
future<> run_custom_job(replica::table* t, sstables::compaction_type type, const char *desc, noncopyable_function<future<>(sstables::compaction_data&)> job);
future<> run_custom_job(compaction::table_state& s, sstables::compaction_type type, const char *desc, noncopyable_function<future<>(sstables::compaction_data&)> job);
class compaction_reenabler {
compaction_manager& _cm;
replica::table* _table;
compaction::table_state* _table;
compaction_manager::compaction_state& _compaction_state;
gate::holder _holder;
public:
compaction_reenabler(compaction_manager&, replica::table*);
compaction_reenabler(compaction_manager&, compaction::table_state&);
compaction_reenabler(compaction_reenabler&&) noexcept;
~compaction_reenabler();
replica::table* compacting_table() const noexcept {
compaction::table_state* compacting_table() const noexcept {
return _table;
}
@@ -441,42 +430,36 @@ public:
// Disable compaction temporarily for a table t.
// Caller should call the compaction_reenabler::reenable
future<compaction_reenabler> stop_and_disable_compaction(replica::table* t);
future<compaction_reenabler> stop_and_disable_compaction(compaction::table_state& t);
// Run a function with compaction temporarily disabled for a table T.
future<> run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func);
future<> run_with_compaction_disabled(compaction::table_state& t, std::function<future<> ()> func);
// Adds a table to the compaction manager.
// Creates a compaction_state structure that can be used for submitting
// compaction jobs of all types.
void add(replica::table* t);
void add(compaction::table_state& t);
// Remove a table from the compaction manager.
// Cancel requests on table and wait for possible ongoing compactions.
future<> remove(replica::table* t);
future<> remove(compaction::table_state& t);
const stats& get_stats() const {
return _stats;
}
const std::vector<sstables::compaction_info> get_compactions(replica::table* t = nullptr) const;
const std::vector<sstables::compaction_info> get_compactions(compaction::table_state* t = nullptr) const;
// Returns true if table has an ongoing compaction, running on its behalf
bool has_table_ongoing_compaction(const replica::table* t) const {
return std::any_of(_tasks.begin(), _tasks.end(), [t] (const shared_ptr<task>& task) {
return task->compacting_table() == t && task->compaction_running();
});
};
bool has_table_ongoing_compaction(const compaction::table_state& t) const;
bool compaction_disabled(replica::table* t) const {
return _compaction_state.contains(t) && _compaction_state.at(t).compaction_disabled();
}
bool compaction_disabled(compaction::table_state& t) const;
// Stops ongoing compaction of a given type.
future<> stop_compaction(sstring type, replica::table* table = nullptr);
future<> stop_compaction(sstring type, compaction::table_state* table = nullptr);
// Stops ongoing compaction of a given table and/or compaction_type.
future<> stop_ongoing_compactions(sstring reason, replica::table* t = nullptr, std::optional<sstables::compaction_type> type_opt = {});
future<> stop_ongoing_compactions(sstring reason, compaction::table_state* t = nullptr, std::optional<sstables::compaction_type> type_opt = {});
double backlog() {
return _backlog_manager.backlog();
@@ -497,5 +480,8 @@ public:
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s);
// Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir.
std::vector<sstables::shared_sstable> in_strategy_sstables(compaction::table_state& table_s);
std::ostream& operator<<(std::ostream& os, compaction_manager::task::state s);
std::ostream& operator<<(std::ostream& os, const compaction_manager::task& task);

View File

@@ -11,6 +11,8 @@
#include "schema_fwd.hh"
#include "sstables/sstable_set.hh"
#include "sstables/sstables_manager.hh"
#include "compaction_descriptor.hh"
class reader_permit;
@@ -29,13 +31,18 @@ public:
virtual unsigned min_compaction_threshold() const noexcept = 0;
virtual bool compaction_enforce_min_threshold() const noexcept = 0;
virtual const sstables::sstable_set& main_sstable_set() const = 0;
virtual const sstables::sstable_set& maintenance_sstable_set() const = 0;
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const = 0;
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept = 0;
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
virtual sstables::shared_sstable make_sstable() const = 0;
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
virtual api::timestamp_type min_memtable_timestamp() const = 0;
virtual future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) = 0;
virtual future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) = 0;
virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0;
};
}

View File

@@ -2381,10 +2381,10 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
std::vector<compaction_manager::compaction_reenabler> cres;
cres.reserve(1 + cf.views().size());
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&cf));
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(cf.as_table_state()));
co_await coroutine::parallel_for_each(cf.views(), [&, this] (view_ptr v) -> future<> {
auto& vcf = find_column_family(v);
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&vcf));
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(vcf.as_table_state()));
});
bool did_flush = false;

View File

@@ -592,7 +592,6 @@ private:
static seastar::shard_id calculate_shard_from_sstable_generation(sstables::generation_type sstable_generation) {
return sstables::generation_value(sstable_generation) % smp::count;
}
public:
// This will update sstable lists on behalf of off-strategy compaction, where
// input files will be removed from the maintenance set and output files will
// be inserted into the main set.
@@ -902,8 +901,6 @@ public:
lw_shared_ptr<const sstable_list> get_sstables_including_compacted_undeleted() const;
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const;
std::vector<sstables::shared_sstable> select_sstables(const dht::partition_range& range) const;
// Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir.
std::vector<sstables::shared_sstable> in_strategy_sstables() const;
size_t sstables_count() const;
std::vector<uint64_t> sstable_count_per_level() const;
int64_t get_unleveled_sstables() const;

View File

@@ -747,7 +747,7 @@ table::stop() {
return _async_gate.close().then([this] {
return await_pending_ops().finally([this] {
return _memtables->flush().finally([this] {
return _compaction_manager.remove(this).then([this] {
return _compaction_manager.remove(as_table_state()).then([this] {
return _sstable_deletion_gate.close().then([this] {
return get_row_cache().invalidate(row_cache::external_updater([this] {
_main_sstables = _compaction_strategy.make_sstable_set(_schema);
@@ -988,7 +988,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc desc) {
future<>
table::compact_all_sstables() {
co_await flush();
co_await _compaction_manager.perform_major_compaction(this);
co_await _compaction_manager.perform_major_compaction(as_table_state());
}
void table::start_compaction() {
@@ -1011,7 +1011,7 @@ void table::try_trigger_compaction() noexcept {
void table::do_trigger_compaction() {
// But not if we're locked out or stopping
if (!_async_gate.is_closed()) {
_compaction_manager.submit(this);
_compaction_manager.submit(as_table_state());
}
}
@@ -1031,7 +1031,7 @@ future<bool> table::perform_offstrategy_compaction() {
// If the user calls trigger_offstrategy_compaction() to trigger
// off-strategy explicitly, cancel the timeout based automatic trigger.
_off_strategy_trigger.cancel();
return _compaction_manager.perform_offstrategy(this);
return _compaction_manager.perform_offstrategy(as_table_state());
}
void table::set_compaction_strategy(sstables::compaction_strategy_type strategy) {
@@ -1118,14 +1118,6 @@ std::vector<sstables::shared_sstable> table::select_sstables(const dht::partitio
return _sstables->select(range);
}
std::vector<sstables::shared_sstable> table::in_strategy_sstables() const {
auto sstables = _main_sstables->all();
return boost::copy_range<std::vector<sstables::shared_sstable>>(*sstables
| boost::adaptors::filtered([this] (auto& sst) {
return sstables::is_eligible_for_compaction(sst);
}));
}
// Gets the list of all sstables in the column family, including ones that are
// not used for active queries because they have already been compacted, but are
// waiting for delete_atomically() to return.
@@ -1191,7 +1183,7 @@ table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_man
tlogger.warn("Writes disabled, column family no durable.");
}
set_metrics();
_compaction_manager.add(this);
_compaction_manager.add(as_table_state());
update_optimized_twcs_queries_flag();
}
@@ -1521,7 +1513,7 @@ future<> table::clear() {
// NOTE: does not need to be futurized, but might eventually, depending on
// if we implement notifications, whatnot.
future<db::replay_position> table::discard_sstables(db_clock::time_point truncated_at) {
assert(_compaction_manager.compaction_disabled(this));
assert(_compaction_manager.compaction_disabled(as_table_state()));
struct pruner {
column_family& cf;
@@ -2209,7 +2201,7 @@ table::disable_auto_compaction() {
// for new submissions
_compaction_disabled_by_user = true;
return with_gate(_async_gate, [this] {
return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", this, sstables::compaction_type::Compaction);
return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", &as_table_state(), sstables::compaction_type::Compaction);
});
}
@@ -2399,6 +2391,9 @@ public:
const sstables::sstable_set& main_sstable_set() const override {
return _t.get_sstable_set();
}
const sstables::sstable_set& maintenance_sstable_set() const override {
return _t.maintenance_sstable_set();
}
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
return sstables::get_fully_expired_sstables(*this, sstables, query_time);
}
@@ -2411,6 +2406,12 @@ public:
reader_permit make_compaction_reader_permit() const override {
return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout);
}
sstables::sstables_manager& get_sstables_manager() noexcept override {
return _t.get_sstables_manager();
}
sstables::shared_sstable make_sstable() const override {
return _t.make_sstable();
}
sstables::sstable_writer_config configure_writer(sstring origin) const override {
return _t.get_sstables_manager().configure_writer(std::move(origin));
}
@@ -2428,6 +2429,16 @@ public:
return db::system_keyspace::update_compaction_history(compaction_id, ks_name, cf_name, ended_at.count(),
bytes_in, bytes_out, std::unordered_map<int32_t, int64_t>{});
}
future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
if (offstrategy) {
return _t.update_sstable_lists_on_off_strategy_completion(std::move(desc));
}
_t.on_compaction_completion(std::move(desc));
return make_ready_future<>();
}
bool is_auto_compaction_disabled_by_user() const noexcept override {
return _t.is_auto_compaction_disabled_by_user();
}
};
compaction::table_state& table::as_table_state() const noexcept {

View File

@@ -363,7 +363,7 @@ future<uint64_t> sstable_directory::reshape(compaction_manager& cm, replica::tab
desc.creator = creator;
return cm.run_custom_job(&table, compaction_type::Reshape, "Reshape compaction", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable {
return cm.run_custom_job(table.as_table_state(), compaction_type::Reshape, "Reshape compaction", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable {
return sstables::compact_sstables(std::move(desc), info, table.as_table_state()).then([this, sstlist = std::move(sstlist)] (sstables::compaction_result result) mutable {
return remove_input_sstables_from_reshaping(std::move(sstlist)).then([this, new_sstables = std::move(result.new_sstables)] () mutable {
return collect_output_sstables_from_reshaping(std::move(new_sstables));
@@ -418,7 +418,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager&
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return cm.run_custom_job(&table, compaction_type::Reshard, "Reshard compaction", [this, iop, &cm, &table, creator, &sstlist] (sstables::compaction_data& info) {
return cm.run_custom_job(table.as_table_state(), compaction_type::Reshard, "Reshard compaction", [this, iop, &cm, &table, creator, &sstlist] (sstables::compaction_data& info) {
sstables::compaction_descriptor desc(sstlist, iop);
desc.options = sstables::compaction_type_options::make_reshard();
desc.creator = std::move(creator);

View File

@@ -117,7 +117,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
flush(e);
e.db().invoke_on_all([] (replica::database& dbi) {
return parallel_for_each(dbi.get_column_families(), [&dbi] (auto& table) {
return dbi.get_compaction_manager().perform_major_compaction(&*table.second);
return dbi.get_compaction_manager().perform_major_compaction((table.second)->as_table_state());
});
}).get();

View File

@@ -1025,7 +1025,7 @@ SEASTAR_TEST_CASE(upgrade_sstables) {
auto& cm = db.get_compaction_manager();
return do_for_each(db.get_column_families(), [&] (std::pair<utils::UUID, lw_shared_ptr<replica::column_family>> t) {
constexpr bool exclude_current_version = false;
return cm.perform_sstable_upgrade(db, t.second.get(), exclude_current_version);
return cm.perform_sstable_upgrade(db, t.second->as_table_state(), exclude_current_version);
});
}).get();
});
@@ -1051,7 +1051,7 @@ SEASTAR_TEST_CASE(populate_from_quarantine_works) {
for (auto i = 0; i < smp::count && !found; i++) {
found = co_await db.invoke_on((shard + i) % smp::count, [] (replica::database& db) -> future<bool> {
auto& cf = db.find_column_family("ks", "cf");
auto sstables = cf.in_strategy_sstables();
auto sstables = in_strategy_sstables(cf.as_table_state());
if (sstables.empty()) {
co_return false;
}
@@ -1098,7 +1098,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
for (auto i = 0; i < smp::count; i++) {
co_await db.invoke_on((shard + i) % smp::count, [&] (replica::database& db) -> future<> {
auto& cf = db.find_column_family("ks", "cf");
auto sstables = cf.in_strategy_sstables();
auto sstables = in_strategy_sstables(cf.as_table_state());
if (sstables.empty()) {
co_return;
}

View File

@@ -142,6 +142,9 @@ public:
const sstables::sstable_set& main_sstable_set() const override {
return _t->get_sstable_set();
}
const sstables::sstable_set& maintenance_sstable_set() const override {
return _t->maintenance_sstable_set();
}
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
return sstables::get_fully_expired_sstables(_t->as_table_state(), sstables, query_time);
}
@@ -154,6 +157,12 @@ public:
reader_permit make_compaction_reader_permit() const override {
return _env.make_reader_permit();
}
sstables::sstables_manager& get_sstables_manager() noexcept override {
return _env.manager();
}
sstables::shared_sstable make_sstable() const override {
return _t->make_sstable();
}
sstables::sstable_writer_config configure_writer(sstring origin) const override {
return _env.manager().configure_writer(std::move(origin));
}
@@ -164,6 +173,12 @@ public:
future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) override {
return make_ready_future<>();
}
future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
return _t->as_table_state().on_compaction_completion(std::move(desc), offstrategy);
}
bool is_auto_compaction_disabled_by_user() const noexcept override {
return false;
}
};
static std::unique_ptr<table_state> make_table_state_for_test(column_family_for_tests& t, test_env& env) {
@@ -2283,8 +2298,8 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) {
table->add_sstable_and_update_cache(sst).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment_v2>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
@@ -2306,10 +2321,10 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) {
sstables::compaction_type_options::scrub opts = {
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
};
compaction_manager.perform_sstable_scrub(table.get(), opts).get();
compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get();
BOOST_REQUIRE(sst->is_quarantined());
BOOST_REQUIRE(table->in_strategy_sstables().empty());
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).empty());
verify_fragments(sst, corrupt_fragments);
});
}, test_cfg);
@@ -2482,8 +2497,8 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
table->add_sstable_and_update_cache(sst).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment_v2>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, permit));
@@ -2504,20 +2519,20 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
// We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment.
sstables::compaction_type_options::scrub opts = {};
opts.operation_mode = sstables::compaction_type_options::scrub::mode::abort;
compaction_manager.perform_sstable_scrub(table.get(), opts).get();
compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1);
verify_fragments(sst, corrupt_fragments);
testlog.info("Scrub in skip mode");
// We expect the scrub with mode=srub::mode::skip to get rid of all invalid data.
opts.operation_mode = sstables::compaction_type_options::scrub::mode::skip;
compaction_manager.perform_sstable_scrub(table.get(), opts).get();
compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() != sst);
verify_fragments(table->in_strategy_sstables().front(), scrubbed_fragments);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() != sst);
verify_fragments(in_strategy_sstables(table->as_table_state()).front(), scrubbed_fragments);
});
}, test_cfg);
}
@@ -2579,8 +2594,8 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
table->add_sstable_and_update_cache(sst).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment_v2>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
@@ -2601,19 +2616,19 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
// We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment.
sstables::compaction_type_options::scrub opts = {};
opts.operation_mode = sstables::compaction_type_options::scrub::mode::abort;
compaction_manager.perform_sstable_scrub(table.get(), opts).get();
compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1);
verify_fragments(sst, corrupt_fragments);
testlog.info("Scrub in segregate mode");
// We expect the scrub with mode=srub::mode::segregate to fix all out-of-order data.
opts.operation_mode = sstables::compaction_type_options::scrub::mode::segregate;
compaction_manager.perform_sstable_scrub(table.get(), opts).get();
compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get();
testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size());
BOOST_REQUIRE(table->in_strategy_sstables().size() > 1);
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(table->as_table_state()).size());
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() > 1);
{
auto sst_reader = assert_that(table->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
auto mt_reader = scrubbed_mt->as_data_source().make_reader_v2(schema, env.make_reader_permit());
@@ -2691,8 +2706,8 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) {
table->add_sstable_and_update_cache(sst).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1);
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment_v2>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
@@ -2713,9 +2728,9 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) {
// We expect the scrub with mode=scrub::mode::validate to quarantine the sstable.
sstables::compaction_type_options::scrub opts = {};
opts.operation_mode = sstables::compaction_type_options::scrub::mode::validate;
compaction_manager.perform_sstable_scrub(table.get(), opts).get();
compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get();
BOOST_REQUIRE(table->in_strategy_sstables().empty());
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).empty());
BOOST_REQUIRE(sst->is_quarantined());
verify_fragments(sst, corrupt_fragments);
@@ -2724,14 +2739,14 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) {
// We expect the scrub with mode=scrub::mode::segregate to fix all out-of-order data.
opts.operation_mode = sstables::compaction_type_options::scrub::mode::segregate;
opts.quarantine_operation_mode = qmode;
compaction_manager.perform_sstable_scrub(table.get(), opts).get();
compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get();
switch (qmode) {
case sstables::compaction_type_options::scrub::quarantine_mode::include:
case sstables::compaction_type_options::scrub::quarantine_mode::only:
// The sstable should be found and scrubbed when scrub::quarantine_mode is scrub::quarantine_mode::{include,only}
testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size());
BOOST_REQUIRE(table->in_strategy_sstables().size() > 1);
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(table->as_table_state()).size());
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() > 1);
{
auto sst_reader = assert_that(table->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
auto mt_reader = scrubbed_mt->as_data_source().make_reader_v2(schema, env.make_reader_permit());
@@ -2745,7 +2760,7 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) {
break;
case sstables::compaction_type_options::scrub::quarantine_mode::exclude:
// The sstable should not be found when scrub::quarantine_mode is scrub::quarantine_mode::exclude
BOOST_REQUIRE(table->in_strategy_sstables().empty());
BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).empty());
BOOST_REQUIRE(sst->is_quarantined());
verify_fragments(sst, corrupt_fragments);
break;
@@ -3699,7 +3714,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
cf->start();
auto stop_cf = deferred_stop(*cf);
cf->trigger_compaction();
cm.submit(cf.get());
cm.submit(cf->as_table_state());
BOOST_REQUIRE(cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0 && ss.completed_tasks == 0);
// enable auto compaction
cf->enable_auto_compaction();

View File

@@ -840,7 +840,7 @@ void test_commutative_row_deletion(cql_test_env& e, std::function<void()>&& mayb
}});
});
e.local_db().get_compaction_manager().perform_major_compaction(&e.local_db().find_column_family("ks", "vcf")).get();
e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get();
}
SEASTAR_TEST_CASE(test_commutative_row_deletion_without_flush) {
@@ -1076,7 +1076,7 @@ void test_update_with_column_timestamp_bigger_than_pk(cql_test_env& e, std::func
}});
});
e.local_db().get_compaction_manager().perform_major_compaction(&e.local_db().find_column_family("ks", "vcf")).get();
e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get();
eventually([&] {
auto msg = e.execute_cql("select * from vcf limit 1").get0();
assert_that(msg).is_rows().with_rows({{

View File

@@ -212,7 +212,7 @@ class compaction_manager::compaction_manager_test_task : public compaction_manag
public:
compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, utils::UUID run_id, noncopyable_function<future<> (sstables::compaction_data&)> job)
: compaction_manager::task(cm, cf, sstables::compaction_type::Compaction, "Test compaction")
: compaction_manager::task(cm, &cf->as_table_state(), sstables::compaction_type::Compaction, "Test compaction")
, _run_id(run_id)
, _job(std::move(job))
{ }

View File

@@ -377,7 +377,7 @@ public:
future<> run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job);
void propagate_replacement(replica::table* t, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
_cm.propagate_replacement(t, removed, added);
_cm.propagate_replacement(t->as_table_state(), removed, added);
}
private:
sstables::compaction_data& register_compaction(shared_ptr<compaction_manager::task> task);

View File

@@ -1758,7 +1758,7 @@ void populate(const std::vector<dataset*>& datasets, cql_test_env& env, const ta
output_mgr->set_test_param_names({{"flush@ (MiB)", "{:<12}"}}, test_result::stats_names());
db.get_compaction_manager().run_with_compaction_disabled(&cf, [&] {
db.get_compaction_manager().run_with_compaction_disabled(cf.as_table_state(), [&] {
return seastar::async([&] {
auto gen = ds.make_generator(s, cfg);
while (auto mopt = gen()) {
@@ -1864,7 +1864,7 @@ auto make_compaction_disabling_guard(replica::database& db, std::vector<replica:
shared_promise<> pr;
for (auto&& t : tables) {
// FIXME: discarded future.
(void)db.get_compaction_manager().run_with_compaction_disabled(t, [f = shared_future<>(pr.get_shared_future())] {
(void)db.get_compaction_manager().run_with_compaction_disabled(t->as_table_state(), [f = shared_future<>(pr.get_shared_future())] {
return f.get_future();
});
}