logstor: enable/disable compaction per table

add functions to enable or disable compaction for a specific compaction
group or for all compaction groups of a table.
This commit is contained in:
Michael Litvak
2026-02-21 13:43:59 +01:00
parent 21db4f3ed8
commit 489efca47c
5 changed files with 78 additions and 7 deletions

View File

@@ -52,10 +52,18 @@ void logstor::enable_auto_compaction() {
_segment_manager.enable_auto_compaction();
}
void logstor::enable_auto_compaction(table_id tid) {
_segment_manager.enable_auto_compaction(tid);
}
future<> logstor::disable_auto_compaction() {
return _segment_manager.disable_auto_compaction();
}
future<> logstor::disable_auto_compaction(table_id tid) {
return _segment_manager.disable_auto_compaction(tid);
}
future<> logstor::trigger_compaction(bool major) {
return _segment_manager.trigger_compaction(major);
}

View File

@@ -49,7 +49,11 @@ public:
static void free_crypto();
void enable_auto_compaction();
void enable_auto_compaction(table_id);
future<> disable_auto_compaction();
future<> disable_auto_compaction(table_id);
future<> trigger_compaction(bool major = false);
future<> do_barrier();

View File

@@ -579,6 +579,7 @@ private:
struct compaction_group {
segment_descriptor_hist segment_hist;
bool compaction_enabled{true};
};
using compaction_group_map = std::map<group_id, compaction_group>;
@@ -625,11 +626,34 @@ public:
_cfg.compaction_enabled = true;
}
void enable_auto_compaction(table_id table) {
auto lower = _compaction_groups.lower_bound(group_id{table, 0});
for (auto it = lower; it != _compaction_groups.end() && it->first.table == table; ++it) {
it->second.compaction_enabled = true;
}
}
future<> disable_auto_compaction() {
_cfg.compaction_enabled = false;
return _compaction_action.join();
}
future<> disable_auto_compaction(group_id gid) {
auto it = _compaction_groups.find(gid);
if (it != _compaction_groups.end()) {
it->second.compaction_enabled = false;
co_await _compaction_action.join();
}
}
future<> disable_auto_compaction(table_id table) {
auto lower = _compaction_groups.lower_bound(group_id{table, 0});
for (auto it = lower; it != _compaction_groups.end() && it->first.table == table; ++it) {
it->second.compaction_enabled = false;
}
co_await _compaction_action.join();
}
future<> trigger_compaction(bool major) {
if (major) {
return trigger_major_compaction();
@@ -883,12 +907,22 @@ public:
_compaction_mgr.enable_auto_compaction();
}
void enable_auto_compaction(table_id tid) {
logstor_logger.info("Enabling automatic compaction for table {}", tid);
_compaction_mgr.enable_auto_compaction(tid);
}
future<> disable_auto_compaction() {
logstor_logger.info("Disabling automatic compaction");
_cfg.compaction_enabled = false;
co_await _compaction_mgr.disable_auto_compaction();
}
future<> disable_auto_compaction(table_id tid) {
logstor_logger.info("Disabling automatic compaction for table {}", tid);
co_await _compaction_mgr.disable_auto_compaction(tid);
}
future<> trigger_compaction(bool major = false) {
co_await _compaction_mgr.trigger_compaction(major);
}
@@ -1653,9 +1687,11 @@ compaction_manager::select_segments_for_compaction() {
auto& [gid, cg] = *_next_group_for_compaction;
++_next_group_for_compaction;
auto candidates = select_segments_for_compaction(cg);
if (candidates.size() > 0) {
co_return std::make_pair(gid, std::move(candidates));
if (cg.compaction_enabled) {
auto candidates = select_segments_for_compaction(cg);
if (candidates.size() > 0) {
co_return std::make_pair(gid, std::move(candidates));
}
}
if (_next_group_for_compaction == _compaction_groups.end()) {
@@ -2145,9 +2181,19 @@ future<> segment_manager::for_each_record(const std::vector<log_segment_id>& seg
void segment_manager::enable_auto_compaction() {
_impl->enable_auto_compaction();
}
void segment_manager::enable_auto_compaction(table_id tid) {
_impl->enable_auto_compaction(tid);
}
future<> segment_manager::disable_auto_compaction() {
return _impl->disable_auto_compaction();
}
future<> segment_manager::disable_auto_compaction(table_id tid) {
return _impl->disable_auto_compaction(tid);
}
future<> segment_manager::trigger_compaction(bool major) {
return _impl->trigger_compaction(major);
}

View File

@@ -70,7 +70,11 @@ public:
std::function<future<>(log_location, log_record)> callback);
void enable_auto_compaction();
void enable_auto_compaction(table_id);
future<> disable_auto_compaction();
future<> disable_auto_compaction(table_id);
future<> trigger_compaction(bool major = false);
size_t get_segment_size() const noexcept;

View File

@@ -4773,6 +4773,10 @@ table::enable_auto_compaction() {
// see table::disable_auto_compaction() notes.
_compaction_disabled_by_user = false;
trigger_compaction();
if (_logstor) {
_logstor->enable_auto_compaction(_schema->id());
}
}
future<>
@@ -4804,11 +4808,16 @@ table::disable_auto_compaction() {
// - it will break computation of major compaction descriptor
// for new submissions
_compaction_disabled_by_user = true;
return with_gate(_async_gate, [this] {
return parallel_foreach_compaction_group_view([this] (compaction::compaction_group_view& view) {
return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", &view, compaction::compaction_type::Compaction);
});
auto holder = _async_gate.hold();
co_await parallel_foreach_compaction_group_view([this] (compaction::compaction_group_view& view) {
return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", &view, compaction::compaction_type::Compaction);
});
if (_logstor) {
co_await _logstor->disable_auto_compaction(_schema->id());
}
}
void table::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {