compaction: Add tablet incremental repair support
This patch addes incremental_repair support in compaction. - The sstables are split into repaired and unrepaired set. - Repaired and unrepaired set compact sperately. - The repaired_at from sstable and sstables_repaired_at from system.tablets table are used to decide if a sstable is repaired or not. - Different compactions tasks, e.g., minor, major, scrub, split, are serialized with tablet repair.
This commit is contained in:
@@ -541,6 +541,7 @@ protected:
|
||||
utils::observable<> _stop_request_observable;
|
||||
// optional tombstone_gc_state that is used when gc has to check only the compacting sstables to collect tombstones.
|
||||
std::optional<tombstone_gc_state> _tombstone_gc_state_with_commitlog_check_disabled;
|
||||
int64_t _output_repaired_at = 0;
|
||||
private:
|
||||
// Keeps track of monitors for input sstable.
|
||||
// If _update_backlog_tracker is set to true, monitors are responsible for adjusting backlog as compaction progresses.
|
||||
@@ -623,6 +624,7 @@ protected:
|
||||
}
|
||||
|
||||
void finish_new_sstable(compaction_writer* writer) {
|
||||
writer->writer.set_repaired_at(_output_repaired_at);
|
||||
writer->writer.consume_end_of_stream();
|
||||
writer->sst->open_data().get();
|
||||
_end_size += writer->sst->bytes_on_disk();
|
||||
@@ -800,9 +802,13 @@ private:
|
||||
double sum_of_estimated_droppable_tombstone_ratio = 0;
|
||||
_input_sstable_generations.reserve(_sstables.size());
|
||||
_input_sstables_basic_info.reserve(_sstables.size());
|
||||
int64_t repaired_at = 0;
|
||||
std::vector<int64_t> repaired_at_for_compacted_sstables;
|
||||
for (auto& sst : _sstables) {
|
||||
co_await coroutine::maybe_yield();
|
||||
auto& sst_stats = sst->get_stats_metadata();
|
||||
repaired_at_for_compacted_sstables.push_back(sst_stats.repaired_at);
|
||||
repaired_at = std::max(sst_stats.repaired_at, repaired_at);
|
||||
timestamp_tracker.update(sst_stats.min_timestamp);
|
||||
timestamp_tracker.update(sst_stats.max_timestamp);
|
||||
|
||||
@@ -836,6 +842,10 @@ private:
|
||||
}
|
||||
}
|
||||
log_debug("{} [{}]", report_start_desc(), fmt::join(_sstables | std::views::transform([] (auto sst) { return to_string(sst, true); }), ","));
|
||||
if (repaired_at) {
|
||||
_output_repaired_at = repaired_at;
|
||||
}
|
||||
log_debug("repaired_at_vec={} output_repaired_at={}", repaired_at_for_compacted_sstables, _output_repaired_at);
|
||||
if (ssts->size() < _sstables.size()) {
|
||||
log_debug("{} out of {} input sstables are fully expired sstables that will not be actually compacted",
|
||||
_sstables.size() - ssts->size(), _sstables.size());
|
||||
@@ -1981,6 +1991,8 @@ get_fully_expired_sstables(const compaction_group_view& table_s, const std::vect
|
||||
}
|
||||
|
||||
std::unordered_set<sstables::shared_sstable> candidates;
|
||||
// Note: This contains both repaired and unrepaired sstables which means
|
||||
// compaction consults both repaired and unrepaired sstables for tombstone gc.
|
||||
auto uncompacting_sstables = get_uncompacting_sstables(table_s, compacting);
|
||||
// Get list of uncompacting sstables that overlap the ones being compacted.
|
||||
std::vector<sstables::shared_sstable> overlapping = leveled_manifest::overlapping(*table_s.schema(), compacting, uncompacting_sstables);
|
||||
|
||||
@@ -61,6 +61,7 @@ public:
|
||||
virtual const std::string get_group_id() const noexcept = 0;
|
||||
virtual seastar::condition_variable& get_staging_done_condition() noexcept = 0;
|
||||
virtual dht::token_range get_token_range_after_split(const dht::token& t) const noexcept = 0;
|
||||
virtual int64_t get_sstables_repaired_at() const noexcept = 0;
|
||||
};
|
||||
|
||||
} // namespace compaction
|
||||
|
||||
@@ -753,6 +753,71 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> compaction_manager::await_ongoing_compactions(compaction_group_view* t) {
|
||||
auto name = t ? t->schema()->ks_name() + "." + t->schema()->cf_name() : "ALL";
|
||||
try {
|
||||
auto tasks = _tasks
|
||||
| std::views::filter([t] (const auto& task) {
|
||||
return (!t || task.compacting_table() == t);
|
||||
})
|
||||
| std::views::transform([] (auto& task) { return task.shared_from_this(); })
|
||||
| std::ranges::to<std::vector<shared_ptr<compaction_task_executor>>>();
|
||||
auto sz = tasks.size();
|
||||
cmlog.debug("Awaiting ongoing unrepaired compactions table={} tasks={}", name, sz);
|
||||
bool task_stopped = false;
|
||||
co_await await_tasks(std::move(tasks), task_stopped);
|
||||
cmlog.debug("Awaiting ongoing unrepaired compactions table={} tasks={} done", name, sz);
|
||||
} catch (...) {
|
||||
cmlog.error("Awaiting ongoing unrepaired compactions table={} failed: {}", name, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
future<seastar::rwlock::holder>
|
||||
compaction_manager::get_incremental_repair_read_lock(compaction::compaction_group_view& t, const sstring& reason) {
|
||||
if (!reason.empty()) {
|
||||
cmlog.debug("Get get_incremental_repair_read_lock for {} started", reason);
|
||||
}
|
||||
compaction::compaction_state& cs = get_compaction_state(&t);
|
||||
auto ret = co_await cs.incremental_repair_lock.hold_read_lock();
|
||||
if (!reason.empty()) {
|
||||
cmlog.debug("Get get_incremental_repair_read_lock for {} done", reason);
|
||||
}
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<seastar::rwlock::holder>
|
||||
compaction_manager::get_incremental_repair_write_lock(compaction::compaction_group_view& t, const sstring& reason) {
|
||||
if (!reason.empty()) {
|
||||
cmlog.debug("Get get_incremental_repair_write_lock for {} started", reason);
|
||||
}
|
||||
compaction::compaction_state& cs = get_compaction_state(&t);
|
||||
auto ret = co_await cs.incremental_repair_lock.hold_write_lock();
|
||||
if (!reason.empty()) {
|
||||
cmlog.debug("Get get_incremental_repair_write_lock for {} done", reason);
|
||||
}
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<compaction_manager::compaction_reenabler>
|
||||
compaction_manager::await_and_disable_compaction(compaction_group_view& t) {
|
||||
compaction_reenabler cre(*this, t);
|
||||
co_await await_ongoing_compactions(&t);
|
||||
co_return cre;
|
||||
}
|
||||
|
||||
|
||||
compaction_manager::compaction_reenabler
|
||||
compaction_manager::stop_and_disable_compaction_no_wait(compaction_group_view& t, sstring reason) {
|
||||
compaction_reenabler cre(*this, t);
|
||||
try {
|
||||
do_stop_ongoing_compactions(std::move(reason), &t, {});
|
||||
} catch (...) {
|
||||
cmlog.error("Stopping ongoing compactions failed: {}. Ignored", std::current_exception());
|
||||
}
|
||||
return cre;
|
||||
}
|
||||
|
||||
future<compaction_manager::compaction_reenabler>
|
||||
compaction_manager::stop_and_disable_compaction(sstring reason, compaction_group_view& t) {
|
||||
compaction_reenabler cre(*this, t);
|
||||
@@ -1094,15 +1159,18 @@ void compaction_manager::postpone_compaction_for_table(compaction_group_view* t)
|
||||
_postponed.insert(t);
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_executor>> tasks, sstring reason) noexcept {
|
||||
void compaction_manager::stop_tasks(const std::vector<shared_ptr<compaction_task_executor>>& tasks, sstring reason) noexcept {
|
||||
// To prevent compaction from being postponed while tasks are being stopped,
|
||||
// let's stop all tasks before the deferring point below.
|
||||
for (auto& t : tasks) {
|
||||
cmlog.debug("Stopping {}", *t);
|
||||
t->stop_compaction(reason);
|
||||
}
|
||||
co_await coroutine::parallel_for_each(tasks, [] (auto& task) -> future<> {
|
||||
auto unlink_task = deferred_action([task] { task->unlink(); });
|
||||
}
|
||||
|
||||
future<> compaction_manager::await_tasks(std::vector<shared_ptr<compaction_task_executor>> tasks, bool task_stopped) const noexcept {
|
||||
co_await coroutine::parallel_for_each(tasks, [task_stopped] (auto& task) -> future<> {
|
||||
auto unlink_task = deferred_action([task, task_stopped] { if (task_stopped) { task->unlink(); } });
|
||||
try {
|
||||
co_await task->compaction_done();
|
||||
} catch (sstables::compaction_stopped_exception&) {
|
||||
@@ -1110,38 +1178,46 @@ future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_e
|
||||
// as it happens with reshard and reshape.
|
||||
} catch (...) {
|
||||
// just log any other errors as the callers have nothing to do with them.
|
||||
cmlog.debug("Stopping {}: task returned error: {}", *task, std::current_exception());
|
||||
cmlog.debug("Awaiting {}: task returned error: {}", *task, std::current_exception());
|
||||
co_return;
|
||||
}
|
||||
cmlog.debug("Stopping {}: done", *task);
|
||||
cmlog.debug("Awaiting {}: done", *task);
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<shared_ptr<compaction_task_executor>>
|
||||
compaction_manager::do_stop_ongoing_compactions(sstring reason, compaction_group_view* t, std::optional<sstables::compaction_type> type_opt) noexcept {
|
||||
auto ongoing_compactions = get_compactions(t).size();
|
||||
auto tasks = _tasks
|
||||
| std::views::filter([t, type_opt] (const auto& task) {
|
||||
return (!t || task.compacting_table() == t) && (!type_opt || task.compaction_type() == *type_opt);
|
||||
})
|
||||
| std::views::transform([] (auto& task) { return task.shared_from_this(); })
|
||||
| std::ranges::to<std::vector<shared_ptr<compaction_task_executor>>>();
|
||||
logging::log_level level = tasks.empty() ? log_level::debug : log_level::info;
|
||||
if (cmlog.is_enabled(level)) {
|
||||
std::string scope = "";
|
||||
if (t) {
|
||||
scope = fmt::format(" for table {}", *t);
|
||||
}
|
||||
if (type_opt) {
|
||||
scope += fmt::format(" {} type={}", scope.size() ? "and" : "for", *type_opt);
|
||||
}
|
||||
cmlog.log(level, "Stopping {} tasks for {} ongoing compactions{} due to {}", tasks.size(), ongoing_compactions, scope, reason);
|
||||
}
|
||||
stop_tasks(tasks, std::move(reason));
|
||||
return tasks;
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_ongoing_compactions(sstring reason, compaction_group_view* t, std::optional<sstables::compaction_type> type_opt) noexcept {
|
||||
try {
|
||||
auto ongoing_compactions = get_compactions(t).size();
|
||||
auto tasks = _tasks
|
||||
| std::views::filter([t, type_opt] (const auto& task) {
|
||||
return (!t || task.compacting_table() == t) && (!type_opt || task.compaction_type() == *type_opt);
|
||||
})
|
||||
| std::views::transform([] (auto& task) { return task.shared_from_this(); })
|
||||
| std::ranges::to<std::vector<shared_ptr<compaction_task_executor>>>();
|
||||
logging::log_level level = tasks.empty() ? log_level::debug : log_level::info;
|
||||
if (cmlog.is_enabled(level)) {
|
||||
std::string scope = "";
|
||||
if (t) {
|
||||
scope = fmt::format(" for table {}", *t);
|
||||
}
|
||||
if (type_opt) {
|
||||
scope += fmt::format(" {} type={}", scope.size() ? "and" : "for", *type_opt);
|
||||
}
|
||||
cmlog.log(level, "Stopping {} tasks for {} ongoing compactions{} due to {}", tasks.size(), ongoing_compactions, scope, reason);
|
||||
}
|
||||
return stop_tasks(std::move(tasks), std::move(reason));
|
||||
auto tasks = do_stop_ongoing_compactions(std::move(reason), t, type_opt);
|
||||
bool task_stopped = true;
|
||||
co_await await_tasks(std::move(tasks), task_stopped);
|
||||
} catch (...) {
|
||||
cmlog.error("Stopping ongoing compactions failed: {}. Ignored", std::current_exception());
|
||||
}
|
||||
return make_ready_future();
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> compaction_manager::drain() {
|
||||
@@ -1291,6 +1367,7 @@ protected:
|
||||
co_await coroutine::switch_to(_cm.compaction_sg());
|
||||
|
||||
for (;;) {
|
||||
auto uuid = utils::make_random_uuid();
|
||||
if (!can_proceed()) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
@@ -1305,6 +1382,11 @@ protected:
|
||||
sstables::compaction_strategy cs = t.get_compaction_strategy();
|
||||
sstables::compaction_descriptor descriptor = co_await cs.get_sstables_for_compaction(t, _cm.get_strategy_control());
|
||||
int weight = calculate_weight(descriptor);
|
||||
cmlog.debug("Started minor compaction sstables={} sstables_reapired_at={} range={} uuid={} compaction_uuid={}",
|
||||
descriptor.sstables, compacting_table()->get_sstables_repaired_at(),
|
||||
compacting_table()->token_range(), uuid, _compaction_data.compaction_uuid);
|
||||
|
||||
auto old_sstables = ::format("{}", descriptor.sstables);
|
||||
|
||||
if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) {
|
||||
cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user());
|
||||
@@ -1334,6 +1416,8 @@ protected:
|
||||
try {
|
||||
bool should_update_history = this->should_update_history(descriptor.options.type());
|
||||
sstables::compaction_result res = co_await compact_sstables(std::move(descriptor), _compaction_data, on_replace);
|
||||
cmlog.debug("Finished minor compaction old_sstables={} new_sstables={} sstables_reapired_at={} range={} uuid={} compaction_uuid={}",
|
||||
old_sstables, res.new_sstables, compacting_table()->get_sstables_repaired_at(), compacting_table()->token_range(), uuid, _compaction_data.compaction_uuid);
|
||||
finish_compaction();
|
||||
if (should_update_history) {
|
||||
// update_history can take a long time compared to
|
||||
@@ -1860,6 +1944,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
|
||||
co_return compaction_stats_opt{};
|
||||
}
|
||||
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
|
||||
// No need to split sstables as repaired or unrepaired. No need to take any compaction and repair locks, since this compation does not modify the sstable.
|
||||
auto all_sstables = co_await get_all_sstables(t);
|
||||
co_return co_await perform_compaction<validate_sstables_compaction_task_executor>(throw_if_stopping::no, info, &t, info.id, std::move(all_sstables), quarantine_sstables);
|
||||
}
|
||||
@@ -2087,6 +2172,9 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r
|
||||
update_sstable_cleanup_state(t, sst, *sorted_owned_ranges);
|
||||
});
|
||||
};
|
||||
// No need to treat repaired and unrepaired sstables separtely here,
|
||||
// since it only inserts or deletes sstables into or from
|
||||
// sstables_requiring_cleanup.
|
||||
co_await update_sstables_cleanup_state(co_await t.main_sstable_set());
|
||||
co_await update_sstables_cleanup_state(co_await t.maintenance_sstable_set());
|
||||
|
||||
@@ -2149,7 +2237,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own
|
||||
// Note that we potentially could be doing multiple
|
||||
// upgrades here in parallel, but that is really the users
|
||||
// problem.
|
||||
return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(), std::move(sorted_owned_ranges), std::move(get_sstables), info).discard_result();
|
||||
co_await rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(), std::move(sorted_owned_ranges), std::move(get_sstables), info).discard_result();
|
||||
}
|
||||
|
||||
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_split_compaction(compaction_group_view& t, sstables::compaction_type_options::split opt, tasks::task_info info) {
|
||||
@@ -2190,11 +2278,11 @@ compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction
|
||||
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub(compaction_group_view& t, sstables::compaction_type_options::scrub opts, tasks::task_info info) {
|
||||
auto scrub_mode = opts.operation_mode;
|
||||
if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) {
|
||||
return perform_sstable_scrub_validate_mode(t, info, opts.quarantine_sstables);
|
||||
co_return co_await perform_sstable_scrub_validate_mode(t, info, opts.quarantine_sstables);
|
||||
}
|
||||
owned_ranges_ptr owned_ranges_ptr = {};
|
||||
sstring option_desc = fmt::format("mode: {};\nquarantine_mode: {}\n", opts.operation_mode, opts.quarantine_operation_mode);
|
||||
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] -> future<std::vector<sstables::shared_sstable>> {
|
||||
co_return co_await rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] -> future<std::vector<sstables::shared_sstable>> {
|
||||
auto all_sstables = co_await get_all_sstables(t);
|
||||
std::vector<sstables::shared_sstable> sstables = all_sstables
|
||||
| std::views::filter([&opts] (const sstables::shared_sstable& sst) {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
@@ -177,7 +178,8 @@ private:
|
||||
}
|
||||
future<compaction_manager::compaction_stats_opt> perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args);
|
||||
|
||||
future<> stop_tasks(std::vector<shared_ptr<compaction::compaction_task_executor>> tasks, sstring reason) noexcept;
|
||||
void stop_tasks(const std::vector<shared_ptr<compaction::compaction_task_executor>>& tasks, sstring reason) noexcept;
|
||||
future<> await_tasks(std::vector<shared_ptr<compaction::compaction_task_executor>>, bool task_stopped) const noexcept;
|
||||
future<> update_throughput(uint32_t value_mbs);
|
||||
|
||||
// Return the largest fan-in of currently running compactions
|
||||
@@ -389,6 +391,11 @@ public:
|
||||
// Caller should call the compaction_reenabler::reenable
|
||||
future<compaction_reenabler> stop_and_disable_compaction(sstring reason, compaction::compaction_group_view& t);
|
||||
|
||||
future<compaction_reenabler> await_and_disable_compaction(compaction::compaction_group_view& t);
|
||||
|
||||
future<seastar::rwlock::holder> get_incremental_repair_read_lock(compaction::compaction_group_view& t, const sstring& reason);
|
||||
future<seastar::rwlock::holder> get_incremental_repair_write_lock(compaction::compaction_group_view& t, const sstring& reason);
|
||||
|
||||
// Run a function with compaction temporarily disabled for a table T.
|
||||
future<> run_with_compaction_disabled(compaction::compaction_group_view& t, std::function<future<> ()> func, sstring reason = "custom operation");
|
||||
|
||||
@@ -421,9 +428,18 @@ public:
|
||||
// Stops ongoing compaction of a given type.
|
||||
future<> stop_compaction(sstring type, compaction::compaction_group_view* table = nullptr);
|
||||
|
||||
private:
|
||||
std::vector<shared_ptr<compaction_task_executor>>
|
||||
do_stop_ongoing_compactions(sstring reason, compaction_group_view* t, std::optional<sstables::compaction_type> type_opt) noexcept;
|
||||
|
||||
public:
|
||||
// Stops ongoing compaction of a given table and/or compaction_type.
|
||||
future<> stop_ongoing_compactions(sstring reason, compaction::compaction_group_view* t = nullptr, std::optional<sstables::compaction_type> type_opt = {}) noexcept;
|
||||
|
||||
future<> await_ongoing_compactions(compaction_group_view* t);
|
||||
|
||||
compaction_manager::compaction_reenabler stop_and_disable_compaction_no_wait(compaction_group_view& t, sstring reason);
|
||||
|
||||
double backlog() {
|
||||
return _backlog_manager.backlog();
|
||||
}
|
||||
@@ -620,7 +636,8 @@ public:
|
||||
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args);
|
||||
friend future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_task_executor> task, throw_if_stopping do_throw_if_stopping);
|
||||
friend fmt::formatter<compaction_task_executor>;
|
||||
friend future<> compaction_manager::stop_tasks(std::vector<shared_ptr<compaction_task_executor>> tasks, sstring reason) noexcept;
|
||||
friend void compaction_manager::stop_tasks(const std::vector<shared_ptr<compaction_task_executor>>& tasks, sstring reason) noexcept;
|
||||
friend future<> compaction_manager::await_tasks(std::vector<shared_ptr<compaction_task_executor>>, bool task_stopped) const noexcept;
|
||||
friend sstables::test_env_compaction_manager;
|
||||
};
|
||||
|
||||
|
||||
@@ -33,6 +33,14 @@ struct compaction_state {
|
||||
// to synchronize with minor, such that major doesn't miss any sstable.
|
||||
seastar::rwlock lock;
|
||||
|
||||
// Compations like major need to work on all sstables in the unrepaired
|
||||
// set, no matter if the sstable is being repaired or not. The
|
||||
// incremental_repair_lock lock is introduced to serialize repair and such
|
||||
// compactions. This lock guarantees that no sstables are being repaired.
|
||||
// Note that the minor compactions do not need to take this lock because
|
||||
// they ignore sstables that are being repaired.
|
||||
seastar::rwlock incremental_repair_lock;
|
||||
|
||||
// Raised by any function running under run_with_compaction_disabled();
|
||||
long compaction_disabled_counter = 0;
|
||||
|
||||
|
||||
@@ -582,7 +582,8 @@ future<> table_upgrade_sstables_compaction_task_impl::run() {
|
||||
tasks::task_info info{_status.id, _status.shard};
|
||||
co_await run_on_table("upgrade_sstables", _db, _status.keyspace, _ti, [&] (replica::table& t) -> future<> {
|
||||
return t.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
||||
return t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version, info);
|
||||
auto lock_holder = co_await t.get_compaction_manager().get_incremental_repair_read_lock(ts, "upgrade_sstables_compaction");
|
||||
co_await t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version, info);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -621,6 +622,7 @@ future<> table_scrub_sstables_compaction_task_impl::run() {
|
||||
auto& cf = _db.find_column_family(_status.keyspace, _status.table);
|
||||
tasks::task_info info{_status.id, _status.shard};
|
||||
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) mutable -> future<> {
|
||||
auto lock_holder = co_await cm.get_incremental_repair_read_lock(ts, "scrub_sstables_compaction");
|
||||
auto r = co_await cm.perform_sstable_scrub(ts, _opts, info);
|
||||
_stats += r.value_or(sstables::compaction_stats{});
|
||||
});
|
||||
@@ -656,6 +658,7 @@ future<> shard_reshaping_compaction_task_impl::run() {
|
||||
|
||||
// reshape sstables individually within the compaction groups
|
||||
for (auto& sstables_in_cg : sstables_grouped_by_compaction_group) {
|
||||
auto lock_holder = co_await table.get_compaction_manager().get_incremental_repair_read_lock(*sstables_in_cg.first, "reshaping_compaction");
|
||||
co_await reshape_compaction_group(*sstables_in_cg.first, sstables_in_cg.second, table, info);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,15 +182,16 @@ bool tablet_has_excluded_node(const locator::topology& topo, const tablet_info&
|
||||
return false;
|
||||
}
|
||||
|
||||
tablet_info::tablet_info(tablet_replica_set replicas, db_clock::time_point repair_time, tablet_task_info repair_task_info, tablet_task_info migration_task_info)
|
||||
tablet_info::tablet_info(tablet_replica_set replicas, db_clock::time_point repair_time, tablet_task_info repair_task_info, tablet_task_info migration_task_info, int64_t sstables_repaired_at)
|
||||
: replicas(std::move(replicas))
|
||||
, repair_time(repair_time)
|
||||
, repair_task_info(std::move(repair_task_info))
|
||||
, migration_task_info(std::move(migration_task_info))
|
||||
, sstables_repaired_at(sstables_repaired_at)
|
||||
{}
|
||||
|
||||
tablet_info::tablet_info(tablet_replica_set replicas)
|
||||
: tablet_info(std::move(replicas), db_clock::time_point{}, tablet_task_info{}, tablet_task_info{})
|
||||
: tablet_info(std::move(replicas), db_clock::time_point{}, tablet_task_info{}, tablet_task_info{}, int64_t(0))
|
||||
{}
|
||||
|
||||
std::optional<tablet_info> merge_tablet_info(tablet_info a, tablet_info b) {
|
||||
@@ -207,7 +208,9 @@ std::optional<tablet_info> merge_tablet_info(tablet_info a, tablet_info b) {
|
||||
}
|
||||
|
||||
auto repair_time = std::max(a.repair_time, b.repair_time);
|
||||
return tablet_info(std::move(a.replicas), repair_time, a.repair_task_info, a.migration_task_info);
|
||||
int64_t sstables_repaired_at = std::max(a.sstables_repaired_at, b.sstables_repaired_at);
|
||||
auto info = tablet_info(std::move(a.replicas), repair_time, a.repair_task_info, a.migration_task_info, sstables_repaired_at);
|
||||
return info;
|
||||
}
|
||||
|
||||
std::optional<tablet_replica> get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) {
|
||||
|
||||
@@ -191,9 +191,10 @@ struct tablet_info {
|
||||
db_clock::time_point repair_time;
|
||||
locator::tablet_task_info repair_task_info;
|
||||
locator::tablet_task_info migration_task_info;
|
||||
int64_t sstables_repaired_at;
|
||||
|
||||
tablet_info() = default;
|
||||
tablet_info(tablet_replica_set, db_clock::time_point, tablet_task_info, tablet_task_info);
|
||||
tablet_info(tablet_replica_set, db_clock::time_point, tablet_task_info, tablet_task_info, int64_t sstables_repaired_at);
|
||||
tablet_info(tablet_replica_set);
|
||||
|
||||
bool operator==(const tablet_info&) const = default;
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/rwlock.hh>
|
||||
|
||||
#include "database_fwd.hh"
|
||||
#include "compaction/compaction_descriptor.hh"
|
||||
@@ -38,7 +39,7 @@ enum class repair_sstable_classification {
|
||||
repaired,
|
||||
};
|
||||
|
||||
using repair_classifier_func = std::function<repair_sstable_classification(const sstables::shared_sstable&)>;
|
||||
using repair_classifier_func = std::function<repair_sstable_classification(const sstables::shared_sstable&, int64_t sstables_repaired_at)>;
|
||||
|
||||
// Compaction group is a set of SSTables which are eligible to be compacted together.
|
||||
// By this definition, we can say:
|
||||
@@ -166,6 +167,10 @@ public:
|
||||
return _tombstone_gc_enabled;
|
||||
}
|
||||
|
||||
int64_t get_sstables_repaired_at() const noexcept;
|
||||
|
||||
future<> update_repaired_at_for_merge();
|
||||
|
||||
void set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept;
|
||||
|
||||
lw_shared_ptr<memtable_list>& memtables() noexcept;
|
||||
|
||||
@@ -1327,6 +1327,12 @@ public:
|
||||
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid);
|
||||
|
||||
friend class compaction_group;
|
||||
|
||||
future<> update_repaired_at_for_merge();
|
||||
|
||||
future<std::vector<compaction::compaction_group_view*>> get_compaction_group_views_for_repair(dht::token_range range);
|
||||
|
||||
future<> clear_being_repaired_for_range(dht::token_range range);
|
||||
};
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&);
|
||||
|
||||
166
replica/table.cc
166
replica/table.cc
@@ -61,6 +61,7 @@
|
||||
#include "readers/combined.hh"
|
||||
#include "readers/compacting.hh"
|
||||
#include "replica/schema_describe_helper.hh"
|
||||
#include "repair/incremental.hh"
|
||||
|
||||
namespace replica {
|
||||
|
||||
@@ -125,6 +126,24 @@ lw_shared_ptr<sstables::sstable_set> compaction_group::make_sstable_set() const
|
||||
return make_lw_shared(sstables::make_compound_sstable_set(_t.schema(), { _main_sstables, _maintenance_sstables }));
|
||||
}
|
||||
|
||||
int64_t compaction_group::get_sstables_repaired_at() const noexcept {
|
||||
try {
|
||||
auto tid = locator::tablet_id(group_id());
|
||||
auto erm = _t.get_effective_replication_map();
|
||||
if (!erm) {
|
||||
return 0;
|
||||
}
|
||||
if (!erm->get_replication_strategy().uses_tablets()) {
|
||||
return 0;
|
||||
}
|
||||
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_t.schema()->id());
|
||||
auto& tinfo = tmap.get_tablet_info(tid);
|
||||
return tinfo.sstables_repaired_at;
|
||||
} catch (locator::no_such_tablet_map) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstables::sstable_set> table::make_compound_sstable_set() const {
|
||||
return _sg_manager->make_sstable_set();
|
||||
}
|
||||
@@ -688,7 +707,7 @@ public:
|
||||
storage_group_map r;
|
||||
|
||||
// Incremental repair is not supported with vnodes, so all sstables will be considered unrepaired.
|
||||
auto noop_repair_sstable_classifier = [] (const sstables::shared_sstable&) { return repair_sstable_classification::unrepaired; };
|
||||
auto noop_repair_sstable_classifier = [] (const sstables::shared_sstable&, int64_t sstables_repaired_at) { return repair_sstable_classification::unrepaired; };
|
||||
|
||||
// this might not reflect real vnode range for this node, but with 256 tokens, the actual
|
||||
// first and last tokens are likely to be ~0.5% of the edges, so any measurement against
|
||||
@@ -762,6 +781,8 @@ class tablet_storage_group_manager final : public storage_group_manager {
|
||||
locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
|
||||
future<> _merge_completion_fiber;
|
||||
condition_variable _merge_completion_event;
|
||||
// Holds compaction reenabler which disables compaction temporarily during tablet merge
|
||||
std::vector<compaction_manager::compaction_reenabler> _compaction_reenablers_for_merging;
|
||||
private:
|
||||
const schema_ptr& schema() const {
|
||||
return _t.schema();
|
||||
@@ -820,8 +841,17 @@ private:
|
||||
|
||||
repair_classifier_func make_repair_sstable_classifier_func() const {
|
||||
// FIXME: implement it for incremental repair!
|
||||
return [] (const sstables::shared_sstable& sst) {
|
||||
return repair_sstable_classification::unrepaired;
|
||||
return [] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) {
|
||||
bool is_repaired = repair::is_repaired(sstables_repaired_at, sst);
|
||||
if (is_repaired) {
|
||||
return repair_sstable_classification::repaired;
|
||||
} else {
|
||||
if (!sst->being_repaired.uuid().is_null()) {
|
||||
return repair_sstable_classification::repairing;
|
||||
} else {
|
||||
return repair_sstable_classification::unrepaired;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -980,6 +1010,8 @@ bool storage_group::set_split_mode() {
|
||||
// TODO: use the actual sub-ranges instead, to help incremental selection on the read path.
|
||||
return compaction_group::make_empty_group(*_main_cg);
|
||||
};
|
||||
tlogger.debug("storage_group::set_split_mode: Set sstables_repaired_at={} for split old_group={} old_range={}",
|
||||
_main_cg->get_sstables_repaired_at(), _main_cg->group_id(), _main_cg->token_range());
|
||||
std::vector<compaction_group_ptr> split_ready_groups(2);
|
||||
split_ready_groups[to_idx(locator::tablet_range_side::left)] = create_cg();
|
||||
split_ready_groups[to_idx(locator::tablet_range_side::right)] = create_cg();
|
||||
@@ -1012,6 +1044,7 @@ future<> compaction_group::split(sstables::compaction_type_options::split opt, t
|
||||
auto& cm = get_compaction_manager();
|
||||
|
||||
for (auto view : all_views()) {
|
||||
auto lock_holder = co_await cm.get_incremental_repair_read_lock(*view, "storage_group_split");
|
||||
// Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets.
|
||||
co_await cm.perform_offstrategy(*view, tablet_split_task_info);
|
||||
co_await cm.perform_split_compaction(*view, opt, tablet_split_task_info);
|
||||
@@ -1137,7 +1170,9 @@ tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable
|
||||
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
auto holder = cg.async_gate().hold();
|
||||
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, cg.view_for_sstable(sst), split_compaction_options());
|
||||
auto& view = cg.view_for_sstable(sst);
|
||||
auto lock_holder = co_await _t.get_compaction_manager().get_incremental_repair_read_lock(view, "maybe_split_sstable");
|
||||
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, view, split_compaction_options());
|
||||
}
|
||||
|
||||
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
|
||||
@@ -1978,6 +2013,58 @@ std::vector<sstables::shared_sstable> compaction_group::all_sstables() const {
|
||||
return all;
|
||||
}
|
||||
|
||||
future<>
|
||||
compaction_group::update_repaired_at_for_merge() {
|
||||
auto sstables = all_sstables();
|
||||
auto sstables_repaired_at = get_sstables_repaired_at();
|
||||
co_await seastar::async([&] {
|
||||
for (auto& sst : sstables) {
|
||||
thread::maybe_yield();
|
||||
auto& stats = sst->get_stats_metadata();
|
||||
if (stats.repaired_at > sstables_repaired_at) {
|
||||
auto neww = 0;
|
||||
auto old = sst->update_repaired_at(neww);
|
||||
tlogger.info("Finished repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}",
|
||||
sst->get_filename(), old, neww, sstables_repaired_at, group_id(), token_range());
|
||||
} else {
|
||||
auto old = stats.repaired_at;
|
||||
tlogger.debug("Skipped repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}",
|
||||
sst->get_filename(), old, old, sstables_repaired_at, group_id(), token_range());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<compaction::compaction_group_view*>> table::get_compaction_group_views_for_repair(dht::token_range range) {
|
||||
std::vector<compaction::compaction_group_view*> ret;
|
||||
auto sgs = storage_groups_for_token_range(range);
|
||||
for (auto& sg : sgs) {
|
||||
co_await coroutine::maybe_yield();
|
||||
auto cgs = sg->compaction_groups();
|
||||
for (auto& cg : cgs) {
|
||||
ret.push_back(&cg->view_for_unrepaired_data());
|
||||
}
|
||||
}
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<> table::clear_being_repaired_for_range(dht::token_range range) {
|
||||
auto sgs = storage_groups_for_token_range(range);
|
||||
for (auto& sg : sgs) {
|
||||
auto cgs = sg->compaction_groups();
|
||||
for (auto& cg : cgs) {
|
||||
auto sstables = cg->all_sstables();
|
||||
co_await coroutine::maybe_yield();
|
||||
for (auto& sst : sstables) {
|
||||
co_await coroutine::maybe_yield();
|
||||
if (!sst->being_repaired.uuid().is_null()) {
|
||||
sst->being_repaired = service::session_id(utils::UUID());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<>
|
||||
compaction_group::merge_sstables_from(compaction_group& group) {
|
||||
auto permit = co_await _t.get_sstable_list_permit();
|
||||
@@ -2124,8 +2211,9 @@ table::compact_all_sstables(tasks::task_info info, do_flush do_flush, bool consi
|
||||
// Forces off-strategy before major, so sstables previously sitting on maintenance set will be included
|
||||
// in the compaction's input set, to provide same semantics as before maintenance set came into existence.
|
||||
co_await perform_offstrategy_compaction(info);
|
||||
co_await parallel_foreach_compaction_group_view([this, info, consider_only_existing_data] (compaction::compaction_group_view& view) {
|
||||
return _compaction_manager.perform_major_compaction(view, info, consider_only_existing_data);
|
||||
co_await parallel_foreach_compaction_group_view([this, info, consider_only_existing_data] (compaction::compaction_group_view& view) -> future<> {
|
||||
auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables");
|
||||
co_await _compaction_manager.perform_major_compaction(view, info, consider_only_existing_data);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2175,6 +2263,7 @@ future<bool> table::perform_offstrategy_compaction(tasks::task_info info) {
|
||||
_off_strategy_trigger.cancel();
|
||||
bool performed = false;
|
||||
co_await parallel_foreach_compaction_group_view([this, &performed, info] (compaction::compaction_group_view& view) -> future<> {
|
||||
auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables");
|
||||
performed |= co_await _compaction_manager.perform_offstrategy(view, info);
|
||||
});
|
||||
co_return performed;
|
||||
@@ -2192,6 +2281,7 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o
|
||||
co_await flush();
|
||||
}
|
||||
|
||||
auto lock_holder = co_await get_compaction_manager().get_incremental_repair_read_lock(cg->as_view_for_static_sharding(), "perform_cleanup_compaction");
|
||||
co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg->as_view_for_static_sharding(), info);
|
||||
}
|
||||
|
||||
@@ -2570,6 +2660,10 @@ public:
|
||||
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override {
|
||||
return _t.get_token_range_after_split(t);
|
||||
}
|
||||
|
||||
int64_t get_sstables_repaired_at() const noexcept override {
|
||||
return _cg.get_sstables_repaired_at();
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<compaction_group::compaction_group_view> compaction_group::make_compacting_view() {
|
||||
@@ -2780,7 +2874,12 @@ void tablet_storage_group_manager::handle_tablet_split_completion(const locator:
|
||||
}
|
||||
for (unsigned i = 0; i < split_size; i++) {
|
||||
auto group_id = first_new_id + i;
|
||||
split_ready_groups[i]->update_id_and_range(group_id, new_tmap.get_token_range(locator::tablet_id(group_id)));
|
||||
auto old_range = old_tmap.get_token_range(locator::tablet_id(id));
|
||||
auto new_range = new_tmap.get_token_range(locator::tablet_id(group_id));
|
||||
auto sstables_repaired_at = new_tmap.get_tablet_info(locator::tablet_id(group_id)).sstables_repaired_at;
|
||||
tlogger.debug("Setting sstables_repaired_at={} for split tablet_id={} old_tid={} new_tid={} old_range={} new_range={} idx={}",
|
||||
sstables_repaired_at, table_id, id, group_id, old_range, new_range, i);
|
||||
split_ready_groups[i]->update_id_and_range(group_id, new_range);
|
||||
new_storage_groups[group_id] = make_lw_shared<storage_group>(std::move(split_ready_groups[i]));
|
||||
}
|
||||
|
||||
@@ -2797,17 +2896,32 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
|
||||
while (!_t.async_gate().is_closed()) {
|
||||
try {
|
||||
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(60s));
|
||||
|
||||
co_await for_each_storage_group_gently([] (storage_group& sg) -> future<> {
|
||||
auto ks_name = schema()->ks_name();
|
||||
auto cf_name = schema()->cf_name();
|
||||
// Enable compaction after merge is done.
|
||||
auto cres = std::exchange(_compaction_reenablers_for_merging, {});
|
||||
co_await for_each_storage_group_gently([ks_name, cf_name] (storage_group& sg) -> future<> {
|
||||
auto main_group = sg.main_compaction_group();
|
||||
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} started",
|
||||
ks_name, cf_name, main_group->group_id(), main_group->token_range());
|
||||
int nr = 0;
|
||||
int sz = sg.merging_groups().size();
|
||||
for (auto& group : sg.merging_groups()) {
|
||||
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} merging {} out of {} groups",
|
||||
ks_name, cf_name, main_group->group_id(), main_group->token_range(), ++nr, sz);
|
||||
// Synchronize with ongoing writes that might be blocked waiting for memory.
|
||||
// Also, disabling compaction provides stability on the sstable set.
|
||||
// Flushes memtable, so all the data can be moved.
|
||||
co_await group->stop("tablet merge");
|
||||
if (utils::get_local_injector().enter("merge_completion_fiber_error")) {
|
||||
tlogger.info("Got merge_completion_fiber_error");
|
||||
co_await sleep(std::chrono::seconds(60));
|
||||
}
|
||||
co_await main_group->merge_sstables_from(*group);
|
||||
}
|
||||
co_await sg.remove_empty_merging_groups();
|
||||
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} finished",
|
||||
ks_name, cf_name, main_group->group_id(), main_group->token_range());
|
||||
});
|
||||
} catch (...) {
|
||||
tlogger.error("Failed to merge compaction groups for table {}.{}", schema()->ks_name(), schema()->cf_name());
|
||||
@@ -2840,8 +2954,12 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator:
|
||||
continue;
|
||||
}
|
||||
auto new_tid = id >> log2_reduce_factor;
|
||||
|
||||
auto new_cg = make_lw_shared<compaction_group>(_t, new_tid, new_tmap.get_token_range(locator::tablet_id(new_tid)), make_repair_sstable_classifier_func());
|
||||
auto new_range = new_tmap.get_token_range(locator::tablet_id(new_tid));
|
||||
auto new_cg = make_lw_shared<compaction_group>(_t, new_tid, new_range, make_repair_sstable_classifier_func());
|
||||
for (auto& view : new_cg->all_views()) {
|
||||
auto cre = _t.get_compaction_manager().stop_and_disable_compaction_no_wait(*view, "tablet merging");
|
||||
_compaction_reenablers_for_merging.push_back(std::move(cre));
|
||||
}
|
||||
auto new_sg = make_lw_shared<storage_group>(std::move(new_cg));
|
||||
|
||||
for (unsigned i = 0; i < merge_size; i++) {
|
||||
@@ -2852,8 +2970,10 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator:
|
||||
throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id));
|
||||
}
|
||||
auto& sg = it->second;
|
||||
sg->for_each_compaction_group([&new_sg, new_tid] (const compaction_group_ptr& cg) {
|
||||
sg->for_each_compaction_group([&new_sg, new_range, new_tid, group_id] (const compaction_group_ptr& cg) {
|
||||
cg->update_id(new_tid);
|
||||
tlogger.debug("Adding merging_group: sstables_repaired_at={} old_range={} new_range={} old_tid={} new_tid={} old_group_id={}",
|
||||
cg->get_sstables_repaired_at(), cg->token_range(), new_range, cg->group_id(), new_tid, group_id);
|
||||
new_sg->add_merging_group(cg);
|
||||
});
|
||||
// Cannot wait for group to be closed, since it can only return after some long-running operation
|
||||
@@ -2864,7 +2984,6 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator:
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
new_storage_groups[new_tid] = std::move(new_sg);
|
||||
}
|
||||
_storage_groups = std::move(new_storage_groups);
|
||||
@@ -2939,6 +3058,25 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
|
||||
}
|
||||
}
|
||||
|
||||
// This function is called in the topology::transition_state::tablet_resize_finalization transition which
|
||||
// guarantees there is no tablet repair. The cm.stop_and_disable_compaction() ensures no compaction is running.
|
||||
future<> table::update_repaired_at_for_merge() {
|
||||
if (!uses_tablets()) {
|
||||
co_return;
|
||||
}
|
||||
auto sgs = storage_groups();
|
||||
for (auto& x : sgs) {
|
||||
auto sg = x.second;
|
||||
if (sg) {
|
||||
auto cgs = sg->compaction_groups();
|
||||
for (auto& cg : cgs) {
|
||||
auto cre = co_await cg->get_compaction_manager().stop_and_disable_compaction("update_repaired_at_for_merge", cg->view_for_unrepaired_data());
|
||||
co_await cg->update_repaired_at_for_merge();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
|
||||
auto old_erm = std::exchange(_erm, std::move(erm));
|
||||
|
||||
@@ -4147,7 +4285,7 @@ compaction::compaction_group_view& compaction_group::view_for_unrepaired_data()
|
||||
}
|
||||
|
||||
compaction::compaction_group_view& compaction_group::view_for_sstable(const sstables::shared_sstable& sst) const {
|
||||
switch (_repair_sstable_classifier(sst)) {
|
||||
switch (_repair_sstable_classifier(sst, get_sstables_repaired_at())) {
|
||||
case repair_sstable_classification::unrepaired: return *_unrepaired_view;
|
||||
case repair_sstable_classification::repairing: return *_repairing_view;
|
||||
case repair_sstable_classification::repaired: return *_repaired_view;
|
||||
|
||||
@@ -155,6 +155,11 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke
|
||||
m.set_clustered_cell(ck, "repair_time", data_value(tablet.repair_time), ts);
|
||||
}
|
||||
}
|
||||
|
||||
if (features.tablet_incremental_repair) {
|
||||
m.set_clustered_cell(ck, "sstables_repaired_at", data_value(tablet.sstables_repaired_at), ts);
|
||||
}
|
||||
|
||||
if (auto tr_info = tablets.get_tablet_transition_info(tid)) {
|
||||
m.set_clustered_cell(ck, "stage", tablet_transition_stage_to_string(tr_info->stage), ts);
|
||||
m.set_clustered_cell(ck, "transition", tablet_transition_kind_to_string(tr_info->transition), ts);
|
||||
@@ -591,7 +596,8 @@ tablet_id process_one_row(replica::database* db, table_id table, tablet_map& map
|
||||
std::move(new_tablet_replicas), pending_replica, session_id});
|
||||
}
|
||||
|
||||
map.set_tablet(tid, tablet_info{std::move(tablet_replicas), repair_time, repair_task_info, migration_task_info});
|
||||
tablet_logger.info("Set sstables_repaired_at={} table={} tablet={}", sstables_repaired_at, table, tid);
|
||||
map.set_tablet(tid, tablet_info{std::move(tablet_replicas), repair_time, repair_task_info, migration_task_info, sstables_repaired_at});
|
||||
|
||||
if (update_repair_time && db) {
|
||||
auto myid = db->get_token_metadata().get_my_id();
|
||||
|
||||
@@ -3351,6 +3351,7 @@ private:
|
||||
throw std::runtime_error(format("Unable to merge tablet info of sibling tablets {} (r: {}) and {} (r: {}).",
|
||||
old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas));
|
||||
}
|
||||
lblogger.debug("Got merged_tablet_info with sstables_repaired_at={}", merged_tablet_info->sstables_repaired_at);
|
||||
|
||||
new_tablets.set_tablet(tid, *merged_tablet_info);
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ public:
|
||||
stop_iteration consume(range_tombstone_change&& rtc);
|
||||
stop_iteration consume_end_of_partition();
|
||||
void consume_end_of_stream();
|
||||
void set_repaired_at(int64_t repaired_at);
|
||||
};
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -1298,6 +1298,23 @@ void sstable::write_statistics() {
|
||||
write_simple<component_type::Statistics>(_components->statistics);
|
||||
}
|
||||
|
||||
void sstable::mark_as_being_repaired(const service::session_id& id) {
|
||||
being_repaired = id;
|
||||
}
|
||||
|
||||
int64_t sstable::update_repaired_at(int64_t repaired_at) {
|
||||
const stats_metadata& old_stats = get_stats_metadata();
|
||||
auto old_repaired_at = old_stats.repaired_at;
|
||||
if (old_repaired_at == repaired_at) {
|
||||
return old_repaired_at;
|
||||
}
|
||||
auto stats = std::make_unique<stats_metadata>(old_stats);
|
||||
stats->repaired_at = repaired_at;
|
||||
_components->statistics.contents[metadata_type::Stats] = std::move(stats);
|
||||
rewrite_statistics();
|
||||
return old_repaired_at;
|
||||
}
|
||||
|
||||
void sstable::rewrite_statistics() {
|
||||
sstlog.debug("Rewriting statistics component of sstable {}", get_filename());
|
||||
|
||||
@@ -3403,9 +3420,10 @@ future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir) {
|
||||
}
|
||||
|
||||
std::string to_string(const shared_sstable& sst, bool include_origin) {
|
||||
auto repaired_at = sst->get_stats_metadata().repaired_at;
|
||||
return include_origin ?
|
||||
fmt::format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin()) :
|
||||
fmt::format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level());
|
||||
fmt::format("{}:level={:d}:origin={}:repaired_at={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin(), repaired_at) :
|
||||
fmt::format("{}:level={:d}:repaired_at={}", sst->get_filename(), sst->get_sstable_level(), repaired_at);
|
||||
}
|
||||
|
||||
std::string sstable_stream_source::component_basename() const {
|
||||
|
||||
@@ -1076,8 +1076,12 @@ public:
|
||||
|
||||
friend in_memory_config_type;
|
||||
|
||||
public:
|
||||
service::session_id being_repaired;
|
||||
public:
|
||||
void mark_as_being_repaired(const service::session_id& id);
|
||||
// This function must run inside a seastar thread since it calls
|
||||
// rewrite_statistics which must run inside a seastar thread.
|
||||
int64_t update_repaired_at(int64_t repaired_at);
|
||||
};
|
||||
|
||||
// Validate checksums
|
||||
|
||||
@@ -311,7 +311,9 @@ struct stats_metadata : public metadata_base<stats_metadata> {
|
||||
uint32_t sstable_level;
|
||||
// There is not meaningful value to put in this field, since we have no
|
||||
// incremental repair. Before we have it, let's set it to 0.
|
||||
uint64_t repaired_at = 0;
|
||||
// According to architecture/sstable/sstable3/sstables-3-statistics.rst,
|
||||
// the repaired_at is a int64_t value.
|
||||
int64_t repaired_at = 0;
|
||||
disk_array<uint32_t, disk_string<uint16_t>> min_column_names;
|
||||
disk_array<uint32_t, disk_string<uint16_t>> max_column_names;
|
||||
bool has_legacy_counter_shards;
|
||||
|
||||
@@ -73,6 +73,10 @@ void sstable_writer::consume_end_of_stream() {
|
||||
return _impl->consume_end_of_stream();
|
||||
}
|
||||
|
||||
void sstable_writer::set_repaired_at(int64_t repaired_at) {
|
||||
_impl->set_repaired_at(repaired_at);
|
||||
}
|
||||
|
||||
sstable_writer::sstable_writer(sstable_writer&& o) = default;
|
||||
sstable_writer& sstable_writer::operator=(sstable_writer&& o) = default;
|
||||
sstable_writer::~sstable_writer() {
|
||||
|
||||
@@ -49,6 +49,9 @@ struct sstable_writer::writer_impl {
|
||||
virtual stop_iteration consume_end_of_partition() = 0;
|
||||
virtual void consume_end_of_stream() = 0;
|
||||
virtual ~writer_impl() {}
|
||||
void set_repaired_at(uint64_t repaired_at) {
|
||||
_collector.set_repaired_at(repaired_at);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -128,6 +128,7 @@ public:
|
||||
virtual const std::string get_group_id() const noexcept override { return "0"; }
|
||||
virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; }
|
||||
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return dht::token_range(); }
|
||||
int64_t get_sstables_repaired_at() const noexcept override { return 0; }
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(basic_compaction_group_splitting_test) {
|
||||
@@ -244,7 +245,7 @@ SEASTAR_TEST_CASE(compactions_dont_cross_group_boundary_test) {
|
||||
}
|
||||
return replica::repair_sstable_classification::repaired;
|
||||
};
|
||||
auto repair_sstable_classifier = [&] (const sstables::shared_sstable& sst) -> replica::repair_sstable_classification {
|
||||
auto repair_sstable_classifier = [&] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) -> replica::repair_sstable_classification {
|
||||
return repair_token_classifier(sst->get_first_decorated_key().token());
|
||||
};
|
||||
t.set_repair_sstable_classifier(repair_sstable_classifier);
|
||||
@@ -257,7 +258,7 @@ SEASTAR_TEST_CASE(compactions_dont_cross_group_boundary_test) {
|
||||
auto reader = sstable_reader(sst, s, env.make_reader_permit()); // reader holds sst and s alive.
|
||||
auto close_reader = deferred_close(reader);
|
||||
|
||||
auto expected_classification = repair_sstable_classifier(sst);
|
||||
auto expected_classification = repair_sstable_classifier(sst, 0);
|
||||
|
||||
while (auto m = read_mutation_from_mutation_reader(reader).get()) {
|
||||
BOOST_REQUIRE(repair_token_classifier(m->decorated_key().token()) == expected_classification);
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include "dht/murmur3_partitioner.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "db/config.hh"
|
||||
#include "repair/incremental.hh"
|
||||
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "test/lib/test_services.hh"
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "readers/mutation_fragment_v1_stream.hh"
|
||||
#include "schema/schema_registry.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "repair/incremental.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(repair_test)
|
||||
|
||||
@@ -190,7 +191,7 @@ SEASTAR_TEST_CASE(test_reader_with_different_strategies) {
|
||||
});
|
||||
auto read_all = [&](repair_reader::read_strategy strategy) -> future<std::vector<mutation_fragment>> {
|
||||
auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e),
|
||||
random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now());
|
||||
random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now(), incremental_repair_meta());
|
||||
std::vector<mutation_fragment> result;
|
||||
while (auto mf = co_await reader.read_mutation_fragment()) {
|
||||
result.push_back(std::move(*mf));
|
||||
|
||||
@@ -1955,9 +1955,12 @@ SEASTAR_TEST_CASE(size_tiered_beyond_max_threshold_test) {
|
||||
std::vector<sstables::shared_sstable> candidates;
|
||||
int max_threshold = cf->schema()->max_compaction_threshold();
|
||||
candidates.reserve(max_threshold+1);
|
||||
const auto keys = tests::generate_partition_keys(2, cf->schema());
|
||||
for (auto i = 0; i < (max_threshold+1); i++) { // (max_threshold+1) sstables of similar size
|
||||
auto sst = env.make_sstable(cf->schema());
|
||||
sstables::test(sst).set_data_file_size(1);
|
||||
// So that the stats_metadata of the sst is written which is checked by incremental repair code
|
||||
sstables::test(sst).set_values(keys[0].key(), keys[1].key(), stats_metadata{});
|
||||
candidates.push_back(std::move(sst));
|
||||
}
|
||||
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(candidates)).get();
|
||||
|
||||
@@ -163,7 +163,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
|
||||
},
|
||||
db_clock::now(),
|
||||
locator::tablet_task_info::make_auto_repair_request({}, {"dc1", "dc2"}),
|
||||
locator::tablet_task_info::make_intranode_migration_request()
|
||||
locator::tablet_task_info::make_intranode_migration_request(),
|
||||
0
|
||||
});
|
||||
tm.set_tablet_map(table1, std::move(tmap));
|
||||
}
|
||||
@@ -180,7 +181,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
|
||||
},
|
||||
{},
|
||||
{},
|
||||
locator::tablet_task_info::make_migration_request()
|
||||
locator::tablet_task_info::make_migration_request(),
|
||||
0
|
||||
});
|
||||
tb = *tmap.next_tablet(tb);
|
||||
tmap.set_tablet(tb, tablet_info {
|
||||
@@ -195,7 +197,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
|
||||
},
|
||||
{},
|
||||
{},
|
||||
locator::tablet_task_info::make_migration_request()
|
||||
locator::tablet_task_info::make_migration_request(),
|
||||
0
|
||||
});
|
||||
tb = *tmap.next_tablet(tb);
|
||||
tmap.set_tablet(tb, tablet_info {
|
||||
@@ -347,7 +350,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence_with_colocated_tables) {
|
||||
},
|
||||
db_clock::now(),
|
||||
locator::tablet_task_info::make_auto_repair_request({}, {"dc1", "dc2"}),
|
||||
locator::tablet_task_info::make_intranode_migration_request()
|
||||
locator::tablet_task_info::make_intranode_migration_request(),
|
||||
0
|
||||
});
|
||||
tm.set_tablet_map(table1, std::move(tmap));
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "compaction/compaction.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "repair/incremental.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
|
||||
@@ -131,6 +131,7 @@ public:
|
||||
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override {
|
||||
return table().get_token_range_after_split(t);
|
||||
}
|
||||
int64_t get_sstables_repaired_at() const noexcept override { return 0; }
|
||||
};
|
||||
|
||||
table_for_tests::data::data()
|
||||
|
||||
@@ -976,6 +976,7 @@ public:
|
||||
virtual const std::string get_group_id() const noexcept override { return _group_id; }
|
||||
virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; }
|
||||
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return dht::token_range(); }
|
||||
int64_t get_sstables_repaired_at() const noexcept override { return 0; }
|
||||
};
|
||||
|
||||
void validate_output_dir(std::filesystem::path output_dir, bool accept_nonempty_output_dir) {
|
||||
|
||||
Reference in New Issue
Block a user