diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 1b76205828..b8391e4a17 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -541,6 +541,7 @@ protected: utils::observable<> _stop_request_observable; // optional tombstone_gc_state that is used when gc has to check only the compacting sstables to collect tombstones. std::optional _tombstone_gc_state_with_commitlog_check_disabled; + int64_t _output_repaired_at = 0; private: // Keeps track of monitors for input sstable. // If _update_backlog_tracker is set to true, monitors are responsible for adjusting backlog as compaction progresses. @@ -623,6 +624,7 @@ protected: } void finish_new_sstable(compaction_writer* writer) { + writer->writer.set_repaired_at(_output_repaired_at); writer->writer.consume_end_of_stream(); writer->sst->open_data().get(); _end_size += writer->sst->bytes_on_disk(); @@ -800,9 +802,13 @@ private: double sum_of_estimated_droppable_tombstone_ratio = 0; _input_sstable_generations.reserve(_sstables.size()); _input_sstables_basic_info.reserve(_sstables.size()); + int64_t repaired_at = 0; + std::vector repaired_at_for_compacted_sstables; for (auto& sst : _sstables) { co_await coroutine::maybe_yield(); auto& sst_stats = sst->get_stats_metadata(); + repaired_at_for_compacted_sstables.push_back(sst_stats.repaired_at); + repaired_at = std::max(sst_stats.repaired_at, repaired_at); timestamp_tracker.update(sst_stats.min_timestamp); timestamp_tracker.update(sst_stats.max_timestamp); @@ -836,6 +842,10 @@ private: } } log_debug("{} [{}]", report_start_desc(), fmt::join(_sstables | std::views::transform([] (auto sst) { return to_string(sst, true); }), ",")); + if (repaired_at) { + _output_repaired_at = repaired_at; + } + log_debug("repaired_at_vec={} output_repaired_at={}", repaired_at_for_compacted_sstables, _output_repaired_at); if (ssts->size() < _sstables.size()) { log_debug("{} out of {} input sstables are fully expired sstables that will not be actually compacted", _sstables.size() - ssts->size(), _sstables.size()); @@ -1981,6 +1991,8 @@ get_fully_expired_sstables(const compaction_group_view& table_s, const std::vect } std::unordered_set candidates; + // Note: This contains both repaired and unrepaired sstables which means + // compaction consults both repaired and unrepaired sstables for tombstone gc. auto uncompacting_sstables = get_uncompacting_sstables(table_s, compacting); // Get list of uncompacting sstables that overlap the ones being compacted. std::vector overlapping = leveled_manifest::overlapping(*table_s.schema(), compacting, uncompacting_sstables); diff --git a/compaction/compaction_group_view.hh b/compaction/compaction_group_view.hh index 7a1ed877b1..0a7978ed90 100644 --- a/compaction/compaction_group_view.hh +++ b/compaction/compaction_group_view.hh @@ -61,6 +61,7 @@ public: virtual const std::string get_group_id() const noexcept = 0; virtual seastar::condition_variable& get_staging_done_condition() noexcept = 0; virtual dht::token_range get_token_range_after_split(const dht::token& t) const noexcept = 0; + virtual int64_t get_sstables_repaired_at() const noexcept = 0; }; } // namespace compaction diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 0ff1b1bdf0..f3da4039b6 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -753,6 +753,71 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() { } } +future<> compaction_manager::await_ongoing_compactions(compaction_group_view* t) { + auto name = t ? t->schema()->ks_name() + "." + t->schema()->cf_name() : "ALL"; + try { + auto tasks = _tasks + | std::views::filter([t] (const auto& task) { + return (!t || task.compacting_table() == t); + }) + | std::views::transform([] (auto& task) { return task.shared_from_this(); }) + | std::ranges::to>>(); + auto sz = tasks.size(); + cmlog.debug("Awaiting ongoing unrepaired compactions table={} tasks={}", name, sz); + bool task_stopped = false; + co_await await_tasks(std::move(tasks), task_stopped); + cmlog.debug("Awaiting ongoing unrepaired compactions table={} tasks={} done", name, sz); + } catch (...) { + cmlog.error("Awaiting ongoing unrepaired compactions table={} failed: {}", name, std::current_exception()); + throw; + } +} + +future +compaction_manager::get_incremental_repair_read_lock(compaction::compaction_group_view& t, const sstring& reason) { + if (!reason.empty()) { + cmlog.debug("Get get_incremental_repair_read_lock for {} started", reason); + } + compaction::compaction_state& cs = get_compaction_state(&t); + auto ret = co_await cs.incremental_repair_lock.hold_read_lock(); + if (!reason.empty()) { + cmlog.debug("Get get_incremental_repair_read_lock for {} done", reason); + } + co_return ret; +} + +future +compaction_manager::get_incremental_repair_write_lock(compaction::compaction_group_view& t, const sstring& reason) { + if (!reason.empty()) { + cmlog.debug("Get get_incremental_repair_write_lock for {} started", reason); + } + compaction::compaction_state& cs = get_compaction_state(&t); + auto ret = co_await cs.incremental_repair_lock.hold_write_lock(); + if (!reason.empty()) { + cmlog.debug("Get get_incremental_repair_write_lock for {} done", reason); + } + co_return ret; +} + +future +compaction_manager::await_and_disable_compaction(compaction_group_view& t) { + compaction_reenabler cre(*this, t); + co_await await_ongoing_compactions(&t); + co_return cre; +} + + +compaction_manager::compaction_reenabler +compaction_manager::stop_and_disable_compaction_no_wait(compaction_group_view& t, sstring reason) { + compaction_reenabler cre(*this, t); + try { + do_stop_ongoing_compactions(std::move(reason), &t, {}); + } catch (...) { + cmlog.error("Stopping ongoing compactions failed: {}. Ignored", std::current_exception()); + } + return cre; +} + future compaction_manager::stop_and_disable_compaction(sstring reason, compaction_group_view& t) { compaction_reenabler cre(*this, t); @@ -1094,15 +1159,18 @@ void compaction_manager::postpone_compaction_for_table(compaction_group_view* t) _postponed.insert(t); } -future<> compaction_manager::stop_tasks(std::vector> tasks, sstring reason) noexcept { +void compaction_manager::stop_tasks(const std::vector>& tasks, sstring reason) noexcept { // To prevent compaction from being postponed while tasks are being stopped, // let's stop all tasks before the deferring point below. for (auto& t : tasks) { cmlog.debug("Stopping {}", *t); t->stop_compaction(reason); } - co_await coroutine::parallel_for_each(tasks, [] (auto& task) -> future<> { - auto unlink_task = deferred_action([task] { task->unlink(); }); +} + +future<> compaction_manager::await_tasks(std::vector> tasks, bool task_stopped) const noexcept { + co_await coroutine::parallel_for_each(tasks, [task_stopped] (auto& task) -> future<> { + auto unlink_task = deferred_action([task, task_stopped] { if (task_stopped) { task->unlink(); } }); try { co_await task->compaction_done(); } catch (sstables::compaction_stopped_exception&) { @@ -1110,38 +1178,46 @@ future<> compaction_manager::stop_tasks(std::vector> +compaction_manager::do_stop_ongoing_compactions(sstring reason, compaction_group_view* t, std::optional type_opt) noexcept { + auto ongoing_compactions = get_compactions(t).size(); + auto tasks = _tasks + | std::views::filter([t, type_opt] (const auto& task) { + return (!t || task.compacting_table() == t) && (!type_opt || task.compaction_type() == *type_opt); + }) + | std::views::transform([] (auto& task) { return task.shared_from_this(); }) + | std::ranges::to>>(); + logging::log_level level = tasks.empty() ? log_level::debug : log_level::info; + if (cmlog.is_enabled(level)) { + std::string scope = ""; + if (t) { + scope = fmt::format(" for table {}", *t); + } + if (type_opt) { + scope += fmt::format(" {} type={}", scope.size() ? "and" : "for", *type_opt); + } + cmlog.log(level, "Stopping {} tasks for {} ongoing compactions{} due to {}", tasks.size(), ongoing_compactions, scope, reason); + } + stop_tasks(tasks, std::move(reason)); + return tasks; +} + future<> compaction_manager::stop_ongoing_compactions(sstring reason, compaction_group_view* t, std::optional type_opt) noexcept { try { - auto ongoing_compactions = get_compactions(t).size(); - auto tasks = _tasks - | std::views::filter([t, type_opt] (const auto& task) { - return (!t || task.compacting_table() == t) && (!type_opt || task.compaction_type() == *type_opt); - }) - | std::views::transform([] (auto& task) { return task.shared_from_this(); }) - | std::ranges::to>>(); - logging::log_level level = tasks.empty() ? log_level::debug : log_level::info; - if (cmlog.is_enabled(level)) { - std::string scope = ""; - if (t) { - scope = fmt::format(" for table {}", *t); - } - if (type_opt) { - scope += fmt::format(" {} type={}", scope.size() ? "and" : "for", *type_opt); - } - cmlog.log(level, "Stopping {} tasks for {} ongoing compactions{} due to {}", tasks.size(), ongoing_compactions, scope, reason); - } - return stop_tasks(std::move(tasks), std::move(reason)); + auto tasks = do_stop_ongoing_compactions(std::move(reason), t, type_opt); + bool task_stopped = true; + co_await await_tasks(std::move(tasks), task_stopped); } catch (...) { cmlog.error("Stopping ongoing compactions failed: {}. Ignored", std::current_exception()); } - return make_ready_future(); + co_return; } future<> compaction_manager::drain() { @@ -1291,6 +1367,7 @@ protected: co_await coroutine::switch_to(_cm.compaction_sg()); for (;;) { + auto uuid = utils::make_random_uuid(); if (!can_proceed()) { co_return std::nullopt; } @@ -1305,6 +1382,11 @@ protected: sstables::compaction_strategy cs = t.get_compaction_strategy(); sstables::compaction_descriptor descriptor = co_await cs.get_sstables_for_compaction(t, _cm.get_strategy_control()); int weight = calculate_weight(descriptor); + cmlog.debug("Started minor compaction sstables={} sstables_reapired_at={} range={} uuid={} compaction_uuid={}", + descriptor.sstables, compacting_table()->get_sstables_repaired_at(), + compacting_table()->token_range(), uuid, _compaction_data.compaction_uuid); + + auto old_sstables = ::format("{}", descriptor.sstables); if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) { cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user()); @@ -1334,6 +1416,8 @@ protected: try { bool should_update_history = this->should_update_history(descriptor.options.type()); sstables::compaction_result res = co_await compact_sstables(std::move(descriptor), _compaction_data, on_replace); + cmlog.debug("Finished minor compaction old_sstables={} new_sstables={} sstables_reapired_at={} range={} uuid={} compaction_uuid={}", + old_sstables, res.new_sstables, compacting_table()->get_sstables_repaired_at(), compacting_table()->token_range(), uuid, _compaction_data.compaction_uuid); finish_compaction(); if (should_update_history) { // update_history can take a long time compared to @@ -1860,6 +1944,7 @@ future compaction_manager::perform_sst co_return compaction_stats_opt{}; } // All sstables must be included, even the ones being compacted, such that everything in table is validated. + // No need to split sstables as repaired or unrepaired. No need to take any compaction and repair locks, since this compation does not modify the sstable. auto all_sstables = co_await get_all_sstables(t); co_return co_await perform_compaction(throw_if_stopping::no, info, &t, info.id, std::move(all_sstables), quarantine_sstables); } @@ -2087,6 +2172,9 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r update_sstable_cleanup_state(t, sst, *sorted_owned_ranges); }); }; + // No need to treat repaired and unrepaired sstables separtely here, + // since it only inserts or deletes sstables into or from + // sstables_requiring_cleanup. co_await update_sstables_cleanup_state(co_await t.main_sstable_set()); co_await update_sstables_cleanup_state(co_await t.maintenance_sstable_set()); @@ -2149,7 +2237,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own // Note that we potentially could be doing multiple // upgrades here in parallel, but that is really the users // problem. - return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(), std::move(sorted_owned_ranges), std::move(get_sstables), info).discard_result(); + co_await rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(), std::move(sorted_owned_ranges), std::move(get_sstables), info).discard_result(); } future compaction_manager::perform_split_compaction(compaction_group_view& t, sstables::compaction_type_options::split opt, tasks::task_info info) { @@ -2190,11 +2278,11 @@ compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction future compaction_manager::perform_sstable_scrub(compaction_group_view& t, sstables::compaction_type_options::scrub opts, tasks::task_info info) { auto scrub_mode = opts.operation_mode; if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { - return perform_sstable_scrub_validate_mode(t, info, opts.quarantine_sstables); + co_return co_await perform_sstable_scrub_validate_mode(t, info, opts.quarantine_sstables); } owned_ranges_ptr owned_ranges_ptr = {}; sstring option_desc = fmt::format("mode: {};\nquarantine_mode: {}\n", opts.operation_mode, opts.quarantine_operation_mode); - return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] -> future> { + co_return co_await rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] -> future> { auto all_sstables = co_await get_all_sstables(t); std::vector sstables = all_sstables | std::views::filter([&opts] (const sstables::shared_sstable& sst) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 67e2538bbd..0b3b65c809 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -16,6 +16,7 @@ #include #include #include +#include #include "sstables/shared_sstable.hh" #include "utils/exponential_backoff_retry.hh" #include "utils/updateable_value.hh" @@ -177,7 +178,8 @@ private: } future perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args); - future<> stop_tasks(std::vector> tasks, sstring reason) noexcept; + void stop_tasks(const std::vector>& tasks, sstring reason) noexcept; + future<> await_tasks(std::vector>, bool task_stopped) const noexcept; future<> update_throughput(uint32_t value_mbs); // Return the largest fan-in of currently running compactions @@ -389,6 +391,11 @@ public: // Caller should call the compaction_reenabler::reenable future stop_and_disable_compaction(sstring reason, compaction::compaction_group_view& t); + future await_and_disable_compaction(compaction::compaction_group_view& t); + + future get_incremental_repair_read_lock(compaction::compaction_group_view& t, const sstring& reason); + future get_incremental_repair_write_lock(compaction::compaction_group_view& t, const sstring& reason); + // Run a function with compaction temporarily disabled for a table T. future<> run_with_compaction_disabled(compaction::compaction_group_view& t, std::function ()> func, sstring reason = "custom operation"); @@ -421,9 +428,18 @@ public: // Stops ongoing compaction of a given type. future<> stop_compaction(sstring type, compaction::compaction_group_view* table = nullptr); +private: + std::vector> + do_stop_ongoing_compactions(sstring reason, compaction_group_view* t, std::optional type_opt) noexcept; + +public: // Stops ongoing compaction of a given table and/or compaction_type. future<> stop_ongoing_compactions(sstring reason, compaction::compaction_group_view* t = nullptr, std::optional type_opt = {}) noexcept; + future<> await_ongoing_compactions(compaction_group_view* t); + + compaction_manager::compaction_reenabler stop_and_disable_compaction_no_wait(compaction_group_view& t, sstring reason); + double backlog() { return _backlog_manager.backlog(); } @@ -620,7 +636,8 @@ public: friend future compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args); friend future compaction_manager::perform_task(shared_ptr task, throw_if_stopping do_throw_if_stopping); friend fmt::formatter; - friend future<> compaction_manager::stop_tasks(std::vector> tasks, sstring reason) noexcept; + friend void compaction_manager::stop_tasks(const std::vector>& tasks, sstring reason) noexcept; + friend future<> compaction_manager::await_tasks(std::vector>, bool task_stopped) const noexcept; friend sstables::test_env_compaction_manager; }; diff --git a/compaction/compaction_state.hh b/compaction/compaction_state.hh index 95096ab466..7375606365 100644 --- a/compaction/compaction_state.hh +++ b/compaction/compaction_state.hh @@ -33,6 +33,14 @@ struct compaction_state { // to synchronize with minor, such that major doesn't miss any sstable. seastar::rwlock lock; + // Compations like major need to work on all sstables in the unrepaired + // set, no matter if the sstable is being repaired or not. The + // incremental_repair_lock lock is introduced to serialize repair and such + // compactions. This lock guarantees that no sstables are being repaired. + // Note that the minor compactions do not need to take this lock because + // they ignore sstables that are being repaired. + seastar::rwlock incremental_repair_lock; + // Raised by any function running under run_with_compaction_disabled(); long compaction_disabled_counter = 0; diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 907e513624..7168f55fe5 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -582,7 +582,8 @@ future<> table_upgrade_sstables_compaction_task_impl::run() { tasks::task_info info{_status.id, _status.shard}; co_await run_on_table("upgrade_sstables", _db, _status.keyspace, _ti, [&] (replica::table& t) -> future<> { return t.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> { - return t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version, info); + auto lock_holder = co_await t.get_compaction_manager().get_incremental_repair_read_lock(ts, "upgrade_sstables_compaction"); + co_await t.get_compaction_manager().perform_sstable_upgrade(owned_ranges_ptr, ts, _exclude_current_version, info); }); }); } @@ -621,6 +622,7 @@ future<> table_scrub_sstables_compaction_task_impl::run() { auto& cf = _db.find_column_family(_status.keyspace, _status.table); tasks::task_info info{_status.id, _status.shard}; co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) mutable -> future<> { + auto lock_holder = co_await cm.get_incremental_repair_read_lock(ts, "scrub_sstables_compaction"); auto r = co_await cm.perform_sstable_scrub(ts, _opts, info); _stats += r.value_or(sstables::compaction_stats{}); }); @@ -656,6 +658,7 @@ future<> shard_reshaping_compaction_task_impl::run() { // reshape sstables individually within the compaction groups for (auto& sstables_in_cg : sstables_grouped_by_compaction_group) { + auto lock_holder = co_await table.get_compaction_manager().get_incremental_repair_read_lock(*sstables_in_cg.first, "reshaping_compaction"); co_await reshape_compaction_group(*sstables_in_cg.first, sstables_in_cg.second, table, info); } } diff --git a/locator/tablets.cc b/locator/tablets.cc index 2a6459875e..9e62896382 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -182,15 +182,16 @@ bool tablet_has_excluded_node(const locator::topology& topo, const tablet_info& return false; } -tablet_info::tablet_info(tablet_replica_set replicas, db_clock::time_point repair_time, tablet_task_info repair_task_info, tablet_task_info migration_task_info) +tablet_info::tablet_info(tablet_replica_set replicas, db_clock::time_point repair_time, tablet_task_info repair_task_info, tablet_task_info migration_task_info, int64_t sstables_repaired_at) : replicas(std::move(replicas)) , repair_time(repair_time) , repair_task_info(std::move(repair_task_info)) , migration_task_info(std::move(migration_task_info)) + , sstables_repaired_at(sstables_repaired_at) {} tablet_info::tablet_info(tablet_replica_set replicas) - : tablet_info(std::move(replicas), db_clock::time_point{}, tablet_task_info{}, tablet_task_info{}) + : tablet_info(std::move(replicas), db_clock::time_point{}, tablet_task_info{}, tablet_task_info{}, int64_t(0)) {} std::optional merge_tablet_info(tablet_info a, tablet_info b) { @@ -207,7 +208,9 @@ std::optional merge_tablet_info(tablet_info a, tablet_info b) { } auto repair_time = std::max(a.repair_time, b.repair_time); - return tablet_info(std::move(a.replicas), repair_time, a.repair_task_info, a.migration_task_info); + int64_t sstables_repaired_at = std::max(a.sstables_repaired_at, b.sstables_repaired_at); + auto info = tablet_info(std::move(a.replicas), repair_time, a.repair_task_info, a.migration_task_info, sstables_repaired_at); + return info; } std::optional get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) { diff --git a/locator/tablets.hh b/locator/tablets.hh index 519e7ec6cb..b80ecdf2b8 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -191,9 +191,10 @@ struct tablet_info { db_clock::time_point repair_time; locator::tablet_task_info repair_task_info; locator::tablet_task_info migration_task_info; + int64_t sstables_repaired_at; tablet_info() = default; - tablet_info(tablet_replica_set, db_clock::time_point, tablet_task_info, tablet_task_info); + tablet_info(tablet_replica_set, db_clock::time_point, tablet_task_info, tablet_task_info, int64_t sstables_repaired_at); tablet_info(tablet_replica_set); bool operator==(const tablet_info&) const = default; diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 1502e39a18..79ef27027d 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -8,6 +8,7 @@ #include #include +#include #include "database_fwd.hh" #include "compaction/compaction_descriptor.hh" @@ -38,7 +39,7 @@ enum class repair_sstable_classification { repaired, }; -using repair_classifier_func = std::function; +using repair_classifier_func = std::function; // Compaction group is a set of SSTables which are eligible to be compacted together. // By this definition, we can say: @@ -166,6 +167,10 @@ public: return _tombstone_gc_enabled; } + int64_t get_sstables_repaired_at() const noexcept; + + future<> update_repaired_at_for_merge(); + void set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept; lw_shared_ptr& memtables() noexcept; diff --git a/replica/database.hh b/replica/database.hh index 6b8787a00d..43c1c91d0a 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1327,6 +1327,12 @@ public: future> clone_tablet_storage(locator::tablet_id tid); friend class compaction_group; + + future<> update_repaired_at_for_merge(); + + future> get_compaction_group_views_for_repair(dht::token_range range); + + future<> clear_being_repaired_for_range(dht::token_range range); }; lw_shared_ptr make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&); diff --git a/replica/table.cc b/replica/table.cc index 98a782bb40..563cdc98b0 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -61,6 +61,7 @@ #include "readers/combined.hh" #include "readers/compacting.hh" #include "replica/schema_describe_helper.hh" +#include "repair/incremental.hh" namespace replica { @@ -125,6 +126,24 @@ lw_shared_ptr compaction_group::make_sstable_set() const return make_lw_shared(sstables::make_compound_sstable_set(_t.schema(), { _main_sstables, _maintenance_sstables })); } +int64_t compaction_group::get_sstables_repaired_at() const noexcept { + try { + auto tid = locator::tablet_id(group_id()); + auto erm = _t.get_effective_replication_map(); + if (!erm) { + return 0; + } + if (!erm->get_replication_strategy().uses_tablets()) { + return 0; + } + auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_t.schema()->id()); + auto& tinfo = tmap.get_tablet_info(tid); + return tinfo.sstables_repaired_at; + } catch (locator::no_such_tablet_map) { + return 0; + } +} + lw_shared_ptr table::make_compound_sstable_set() const { return _sg_manager->make_sstable_set(); } @@ -688,7 +707,7 @@ public: storage_group_map r; // Incremental repair is not supported with vnodes, so all sstables will be considered unrepaired. - auto noop_repair_sstable_classifier = [] (const sstables::shared_sstable&) { return repair_sstable_classification::unrepaired; }; + auto noop_repair_sstable_classifier = [] (const sstables::shared_sstable&, int64_t sstables_repaired_at) { return repair_sstable_classification::unrepaired; }; // this might not reflect real vnode range for this node, but with 256 tokens, the actual // first and last tokens are likely to be ~0.5% of the edges, so any measurement against @@ -762,6 +781,8 @@ class tablet_storage_group_manager final : public storage_group_manager { locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits::min(); future<> _merge_completion_fiber; condition_variable _merge_completion_event; + // Holds compaction reenabler which disables compaction temporarily during tablet merge + std::vector _compaction_reenablers_for_merging; private: const schema_ptr& schema() const { return _t.schema(); @@ -820,8 +841,17 @@ private: repair_classifier_func make_repair_sstable_classifier_func() const { // FIXME: implement it for incremental repair! - return [] (const sstables::shared_sstable& sst) { - return repair_sstable_classification::unrepaired; + return [] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) { + bool is_repaired = repair::is_repaired(sstables_repaired_at, sst); + if (is_repaired) { + return repair_sstable_classification::repaired; + } else { + if (!sst->being_repaired.uuid().is_null()) { + return repair_sstable_classification::repairing; + } else { + return repair_sstable_classification::unrepaired; + } + } }; } @@ -980,6 +1010,8 @@ bool storage_group::set_split_mode() { // TODO: use the actual sub-ranges instead, to help incremental selection on the read path. return compaction_group::make_empty_group(*_main_cg); }; + tlogger.debug("storage_group::set_split_mode: Set sstables_repaired_at={} for split old_group={} old_range={}", + _main_cg->get_sstables_repaired_at(), _main_cg->group_id(), _main_cg->token_range()); std::vector split_ready_groups(2); split_ready_groups[to_idx(locator::tablet_range_side::left)] = create_cg(); split_ready_groups[to_idx(locator::tablet_range_side::right)] = create_cg(); @@ -1012,6 +1044,7 @@ future<> compaction_group::split(sstables::compaction_type_options::split opt, t auto& cm = get_compaction_manager(); for (auto view : all_views()) { + auto lock_holder = co_await cm.get_incremental_repair_read_lock(*view, "storage_group_split"); // Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets. co_await cm.perform_offstrategy(*view, tablet_split_task_info); co_await cm.perform_split_compaction(*view, opt, tablet_split_task_info); @@ -1137,7 +1170,9 @@ tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable auto& cg = compaction_group_for_sstable(sst); auto holder = cg.async_gate().hold(); - co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, cg.view_for_sstable(sst), split_compaction_options()); + auto& view = cg.view_for_sstable(sst); + auto lock_holder = co_await _t.get_compaction_manager().get_incremental_repair_read_lock(view, "maybe_split_sstable"); + co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, view, split_compaction_options()); } future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) { @@ -1978,6 +2013,58 @@ std::vector compaction_group::all_sstables() const { return all; } +future<> +compaction_group::update_repaired_at_for_merge() { + auto sstables = all_sstables(); + auto sstables_repaired_at = get_sstables_repaired_at(); + co_await seastar::async([&] { + for (auto& sst : sstables) { + thread::maybe_yield(); + auto& stats = sst->get_stats_metadata(); + if (stats.repaired_at > sstables_repaired_at) { + auto neww = 0; + auto old = sst->update_repaired_at(neww); + tlogger.info("Finished repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}", + sst->get_filename(), old, neww, sstables_repaired_at, group_id(), token_range()); + } else { + auto old = stats.repaired_at; + tlogger.debug("Skipped repaired_at update for tablet merge sstable={} old={} new={} sstables_repaired_at={} group_id={} range={}", + sst->get_filename(), old, old, sstables_repaired_at, group_id(), token_range()); + } + } + }); +} + +future> table::get_compaction_group_views_for_repair(dht::token_range range) { + std::vector ret; + auto sgs = storage_groups_for_token_range(range); + for (auto& sg : sgs) { + co_await coroutine::maybe_yield(); + auto cgs = sg->compaction_groups(); + for (auto& cg : cgs) { + ret.push_back(&cg->view_for_unrepaired_data()); + } + } + co_return ret; +} + +future<> table::clear_being_repaired_for_range(dht::token_range range) { + auto sgs = storage_groups_for_token_range(range); + for (auto& sg : sgs) { + auto cgs = sg->compaction_groups(); + for (auto& cg : cgs) { + auto sstables = cg->all_sstables(); + co_await coroutine::maybe_yield(); + for (auto& sst : sstables) { + co_await coroutine::maybe_yield(); + if (!sst->being_repaired.uuid().is_null()) { + sst->being_repaired = service::session_id(utils::UUID()); + } + } + } + } +} + future<> compaction_group::merge_sstables_from(compaction_group& group) { auto permit = co_await _t.get_sstable_list_permit(); @@ -2124,8 +2211,9 @@ table::compact_all_sstables(tasks::task_info info, do_flush do_flush, bool consi // Forces off-strategy before major, so sstables previously sitting on maintenance set will be included // in the compaction's input set, to provide same semantics as before maintenance set came into existence. co_await perform_offstrategy_compaction(info); - co_await parallel_foreach_compaction_group_view([this, info, consider_only_existing_data] (compaction::compaction_group_view& view) { - return _compaction_manager.perform_major_compaction(view, info, consider_only_existing_data); + co_await parallel_foreach_compaction_group_view([this, info, consider_only_existing_data] (compaction::compaction_group_view& view) -> future<> { + auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables"); + co_await _compaction_manager.perform_major_compaction(view, info, consider_only_existing_data); }); } @@ -2175,6 +2263,7 @@ future table::perform_offstrategy_compaction(tasks::task_info info) { _off_strategy_trigger.cancel(); bool performed = false; co_await parallel_foreach_compaction_group_view([this, &performed, info] (compaction::compaction_group_view& view) -> future<> { + auto lock_holder = co_await _compaction_manager.get_incremental_repair_read_lock(view, "compact_all_sstables"); performed |= co_await _compaction_manager.perform_offstrategy(view, info); }); co_return performed; @@ -2192,6 +2281,7 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o co_await flush(); } + auto lock_holder = co_await get_compaction_manager().get_incremental_repair_read_lock(cg->as_view_for_static_sharding(), "perform_cleanup_compaction"); co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg->as_view_for_static_sharding(), info); } @@ -2570,6 +2660,10 @@ public: dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return _t.get_token_range_after_split(t); } + + int64_t get_sstables_repaired_at() const noexcept override { + return _cg.get_sstables_repaired_at(); + } }; std::unique_ptr compaction_group::make_compacting_view() { @@ -2780,7 +2874,12 @@ void tablet_storage_group_manager::handle_tablet_split_completion(const locator: } for (unsigned i = 0; i < split_size; i++) { auto group_id = first_new_id + i; - split_ready_groups[i]->update_id_and_range(group_id, new_tmap.get_token_range(locator::tablet_id(group_id))); + auto old_range = old_tmap.get_token_range(locator::tablet_id(id)); + auto new_range = new_tmap.get_token_range(locator::tablet_id(group_id)); + auto sstables_repaired_at = new_tmap.get_tablet_info(locator::tablet_id(group_id)).sstables_repaired_at; + tlogger.debug("Setting sstables_repaired_at={} for split tablet_id={} old_tid={} new_tid={} old_range={} new_range={} idx={}", + sstables_repaired_at, table_id, id, group_id, old_range, new_range, i); + split_ready_groups[i]->update_id_and_range(group_id, new_range); new_storage_groups[group_id] = make_lw_shared(std::move(split_ready_groups[i])); } @@ -2797,17 +2896,32 @@ future<> tablet_storage_group_manager::merge_completion_fiber() { while (!_t.async_gate().is_closed()) { try { co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(60s)); - - co_await for_each_storage_group_gently([] (storage_group& sg) -> future<> { + auto ks_name = schema()->ks_name(); + auto cf_name = schema()->cf_name(); + // Enable compaction after merge is done. + auto cres = std::exchange(_compaction_reenablers_for_merging, {}); + co_await for_each_storage_group_gently([ks_name, cf_name] (storage_group& sg) -> future<> { auto main_group = sg.main_compaction_group(); + tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} started", + ks_name, cf_name, main_group->group_id(), main_group->token_range()); + int nr = 0; + int sz = sg.merging_groups().size(); for (auto& group : sg.merging_groups()) { + tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} merging {} out of {} groups", + ks_name, cf_name, main_group->group_id(), main_group->token_range(), ++nr, sz); // Synchronize with ongoing writes that might be blocked waiting for memory. // Also, disabling compaction provides stability on the sstable set. // Flushes memtable, so all the data can be moved. co_await group->stop("tablet merge"); + if (utils::get_local_injector().enter("merge_completion_fiber_error")) { + tlogger.info("Got merge_completion_fiber_error"); + co_await sleep(std::chrono::seconds(60)); + } co_await main_group->merge_sstables_from(*group); } co_await sg.remove_empty_merging_groups(); + tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} finished", + ks_name, cf_name, main_group->group_id(), main_group->token_range()); }); } catch (...) { tlogger.error("Failed to merge compaction groups for table {}.{}", schema()->ks_name(), schema()->cf_name()); @@ -2840,8 +2954,12 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator: continue; } auto new_tid = id >> log2_reduce_factor; - - auto new_cg = make_lw_shared(_t, new_tid, new_tmap.get_token_range(locator::tablet_id(new_tid)), make_repair_sstable_classifier_func()); + auto new_range = new_tmap.get_token_range(locator::tablet_id(new_tid)); + auto new_cg = make_lw_shared(_t, new_tid, new_range, make_repair_sstable_classifier_func()); + for (auto& view : new_cg->all_views()) { + auto cre = _t.get_compaction_manager().stop_and_disable_compaction_no_wait(*view, "tablet merging"); + _compaction_reenablers_for_merging.push_back(std::move(cre)); + } auto new_sg = make_lw_shared(std::move(new_cg)); for (unsigned i = 0; i < merge_size; i++) { @@ -2852,8 +2970,10 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator: throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id)); } auto& sg = it->second; - sg->for_each_compaction_group([&new_sg, new_tid] (const compaction_group_ptr& cg) { + sg->for_each_compaction_group([&new_sg, new_range, new_tid, group_id] (const compaction_group_ptr& cg) { cg->update_id(new_tid); + tlogger.debug("Adding merging_group: sstables_repaired_at={} old_range={} new_range={} old_tid={} new_tid={} old_group_id={}", + cg->get_sstables_repaired_at(), cg->token_range(), new_range, cg->group_id(), new_tid, group_id); new_sg->add_merging_group(cg); }); // Cannot wait for group to be closed, since it can only return after some long-running operation @@ -2864,7 +2984,6 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator: }); }); } - new_storage_groups[new_tid] = std::move(new_sg); } _storage_groups = std::move(new_storage_groups); @@ -2939,6 +3058,25 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato } } +// This function is called in the topology::transition_state::tablet_resize_finalization transition which +// guarantees there is no tablet repair. The cm.stop_and_disable_compaction() ensures no compaction is running. +future<> table::update_repaired_at_for_merge() { + if (!uses_tablets()) { + co_return; + } + auto sgs = storage_groups(); + for (auto& x : sgs) { + auto sg = x.second; + if (sg) { + auto cgs = sg->compaction_groups(); + for (auto& cg : cgs) { + auto cre = co_await cg->get_compaction_manager().stop_and_disable_compaction("update_repaired_at_for_merge", cg->view_for_unrepaired_data()); + co_await cg->update_repaired_at_for_merge(); + } + } + } +} + void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { auto old_erm = std::exchange(_erm, std::move(erm)); @@ -4147,7 +4285,7 @@ compaction::compaction_group_view& compaction_group::view_for_unrepaired_data() } compaction::compaction_group_view& compaction_group::view_for_sstable(const sstables::shared_sstable& sst) const { - switch (_repair_sstable_classifier(sst)) { + switch (_repair_sstable_classifier(sst, get_sstables_repaired_at())) { case repair_sstable_classification::unrepaired: return *_unrepaired_view; case repair_sstable_classification::repairing: return *_repairing_view; case repair_sstable_classification::repaired: return *_repaired_view; diff --git a/replica/tablets.cc b/replica/tablets.cc index 54cbdfebdf..832c0bfb2b 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -155,6 +155,11 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke m.set_clustered_cell(ck, "repair_time", data_value(tablet.repair_time), ts); } } + + if (features.tablet_incremental_repair) { + m.set_clustered_cell(ck, "sstables_repaired_at", data_value(tablet.sstables_repaired_at), ts); + } + if (auto tr_info = tablets.get_tablet_transition_info(tid)) { m.set_clustered_cell(ck, "stage", tablet_transition_stage_to_string(tr_info->stage), ts); m.set_clustered_cell(ck, "transition", tablet_transition_kind_to_string(tr_info->transition), ts); @@ -591,7 +596,8 @@ tablet_id process_one_row(replica::database* db, table_id table, tablet_map& map std::move(new_tablet_replicas), pending_replica, session_id}); } - map.set_tablet(tid, tablet_info{std::move(tablet_replicas), repair_time, repair_task_info, migration_task_info}); + tablet_logger.info("Set sstables_repaired_at={} table={} tablet={}", sstables_repaired_at, table, tid); + map.set_tablet(tid, tablet_info{std::move(tablet_replicas), repair_time, repair_task_info, migration_task_info, sstables_repaired_at}); if (update_repair_time && db) { auto myid = db->get_token_metadata().get_my_id(); diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 28be77ae54..97168d2b98 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -3351,6 +3351,7 @@ private: throw std::runtime_error(format("Unable to merge tablet info of sibling tablets {} (r: {}) and {} (r: {}).", old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas)); } + lblogger.debug("Got merged_tablet_info with sstables_repaired_at={}", merged_tablet_info->sstables_repaired_at); new_tablets.set_tablet(tid, *merged_tablet_info); } diff --git a/sstables/sstable_writer.hh b/sstables/sstable_writer.hh index 90635532b5..9bc315962d 100644 --- a/sstables/sstable_writer.hh +++ b/sstables/sstable_writer.hh @@ -43,6 +43,7 @@ public: stop_iteration consume(range_tombstone_change&& rtc); stop_iteration consume_end_of_partition(); void consume_end_of_stream(); + void set_repaired_at(int64_t repaired_at); }; } // namespace sstables diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 5d7f262b7a..ab7e1469c0 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1298,6 +1298,23 @@ void sstable::write_statistics() { write_simple(_components->statistics); } +void sstable::mark_as_being_repaired(const service::session_id& id) { + being_repaired = id; +} + +int64_t sstable::update_repaired_at(int64_t repaired_at) { + const stats_metadata& old_stats = get_stats_metadata(); + auto old_repaired_at = old_stats.repaired_at; + if (old_repaired_at == repaired_at) { + return old_repaired_at; + } + auto stats = std::make_unique(old_stats); + stats->repaired_at = repaired_at; + _components->statistics.contents[metadata_type::Stats] = std::move(stats); + rewrite_statistics(); + return old_repaired_at; +} + void sstable::rewrite_statistics() { sstlog.debug("Rewriting statistics component of sstable {}", get_filename()); @@ -3403,9 +3420,10 @@ future<> remove_table_directory_if_has_no_snapshots(fs::path table_dir) { } std::string to_string(const shared_sstable& sst, bool include_origin) { + auto repaired_at = sst->get_stats_metadata().repaired_at; return include_origin ? - fmt::format("{}:level={:d}:origin={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin()) : - fmt::format("{}:level={:d}", sst->get_filename(), sst->get_sstable_level()); + fmt::format("{}:level={:d}:origin={}:repaired_at={}", sst->get_filename(), sst->get_sstable_level(), sst->get_origin(), repaired_at) : + fmt::format("{}:level={:d}:repaired_at={}", sst->get_filename(), sst->get_sstable_level(), repaired_at); } std::string sstable_stream_source::component_basename() const { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 4abdac8863..4823fcfff8 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -1076,8 +1076,12 @@ public: friend in_memory_config_type; -public: service::session_id being_repaired; +public: + void mark_as_being_repaired(const service::session_id& id); + // This function must run inside a seastar thread since it calls + // rewrite_statistics which must run inside a seastar thread. + int64_t update_repaired_at(int64_t repaired_at); }; // Validate checksums diff --git a/sstables/types.hh b/sstables/types.hh index 021559ef6c..777a70226d 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -311,7 +311,9 @@ struct stats_metadata : public metadata_base { uint32_t sstable_level; // There is not meaningful value to put in this field, since we have no // incremental repair. Before we have it, let's set it to 0. - uint64_t repaired_at = 0; + // According to architecture/sstable/sstable3/sstables-3-statistics.rst, + // the repaired_at is a int64_t value. + int64_t repaired_at = 0; disk_array> min_column_names; disk_array> max_column_names; bool has_legacy_counter_shards; diff --git a/sstables/writer.cc b/sstables/writer.cc index f44e89aca3..a652c2559f 100644 --- a/sstables/writer.cc +++ b/sstables/writer.cc @@ -73,6 +73,10 @@ void sstable_writer::consume_end_of_stream() { return _impl->consume_end_of_stream(); } +void sstable_writer::set_repaired_at(int64_t repaired_at) { + _impl->set_repaired_at(repaired_at); +} + sstable_writer::sstable_writer(sstable_writer&& o) = default; sstable_writer& sstable_writer::operator=(sstable_writer&& o) = default; sstable_writer::~sstable_writer() { diff --git a/sstables/writer_impl.hh b/sstables/writer_impl.hh index 07e9306598..ab533be098 100644 --- a/sstables/writer_impl.hh +++ b/sstables/writer_impl.hh @@ -49,6 +49,9 @@ struct sstable_writer::writer_impl { virtual stop_iteration consume_end_of_partition() = 0; virtual void consume_end_of_stream() = 0; virtual ~writer_impl() {} + void set_repaired_at(uint64_t repaired_at) { + _collector.set_repaired_at(repaired_at); + } }; } diff --git a/test/boost/compaction_group_test.cc b/test/boost/compaction_group_test.cc index 97e84e6dd1..e0987d5418 100644 --- a/test/boost/compaction_group_test.cc +++ b/test/boost/compaction_group_test.cc @@ -128,6 +128,7 @@ public: virtual const std::string get_group_id() const noexcept override { return "0"; } virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; } dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return dht::token_range(); } + int64_t get_sstables_repaired_at() const noexcept override { return 0; } }; SEASTAR_TEST_CASE(basic_compaction_group_splitting_test) { @@ -244,7 +245,7 @@ SEASTAR_TEST_CASE(compactions_dont_cross_group_boundary_test) { } return replica::repair_sstable_classification::repaired; }; - auto repair_sstable_classifier = [&] (const sstables::shared_sstable& sst) -> replica::repair_sstable_classification { + auto repair_sstable_classifier = [&] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) -> replica::repair_sstable_classification { return repair_token_classifier(sst->get_first_decorated_key().token()); }; t.set_repair_sstable_classifier(repair_sstable_classifier); @@ -257,7 +258,7 @@ SEASTAR_TEST_CASE(compactions_dont_cross_group_boundary_test) { auto reader = sstable_reader(sst, s, env.make_reader_permit()); // reader holds sst and s alive. auto close_reader = deferred_close(reader); - auto expected_classification = repair_sstable_classifier(sst); + auto expected_classification = repair_sstable_classifier(sst, 0); while (auto m = read_mutation_from_mutation_reader(reader).get()) { BOOST_REQUIRE(repair_token_classifier(m->decorated_key().token()) == expected_classification); diff --git a/test/boost/incremental_compaction_test.cc b/test/boost/incremental_compaction_test.cc index 29b32f2a5d..6b1ea045db 100644 --- a/test/boost/incremental_compaction_test.cc +++ b/test/boost/incremental_compaction_test.cc @@ -32,6 +32,7 @@ #include "dht/murmur3_partitioner.hh" #include "db/large_data_handler.hh" #include "db/config.hh" +#include "repair/incremental.hh" #include "test/lib/sstable_utils.hh" #include "test/lib/test_services.hh" diff --git a/test/boost/repair_test.cc b/test/boost/repair_test.cc index 9b225d2b58..abe53fd3ff 100644 --- a/test/boost/repair_test.cc +++ b/test/boost/repair_test.cc @@ -24,6 +24,7 @@ #include "readers/mutation_fragment_v1_stream.hh" #include "schema/schema_registry.hh" #include "utils/chunked_vector.hh" +#include "repair/incremental.hh" BOOST_AUTO_TEST_SUITE(repair_test) @@ -190,7 +191,7 @@ SEASTAR_TEST_CASE(test_reader_with_different_strategies) { }); auto read_all = [&](repair_reader::read_strategy strategy) -> future> { auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e), - random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now()); + random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now(), incremental_repair_meta()); std::vector result; while (auto mf = co_await reader.read_mutation_fragment()) { result.push_back(std::move(*mf)); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index d6dbbaae2b..b3fbd26059 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -1955,9 +1955,12 @@ SEASTAR_TEST_CASE(size_tiered_beyond_max_threshold_test) { std::vector candidates; int max_threshold = cf->schema()->max_compaction_threshold(); candidates.reserve(max_threshold+1); + const auto keys = tests::generate_partition_keys(2, cf->schema()); for (auto i = 0; i < (max_threshold+1); i++) { // (max_threshold+1) sstables of similar size auto sst = env.make_sstable(cf->schema()); sstables::test(sst).set_data_file_size(1); + // So that the stats_metadata of the sst is written which is checked by incremental repair code + sstables::test(sst).set_values(keys[0].key(), keys[1].key(), stats_metadata{}); candidates.push_back(std::move(sst)); } auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(candidates)).get(); diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 034bc92dfe..cddaadac3f 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -163,7 +163,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { }, db_clock::now(), locator::tablet_task_info::make_auto_repair_request({}, {"dc1", "dc2"}), - locator::tablet_task_info::make_intranode_migration_request() + locator::tablet_task_info::make_intranode_migration_request(), + 0 }); tm.set_tablet_map(table1, std::move(tmap)); } @@ -180,7 +181,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { }, {}, {}, - locator::tablet_task_info::make_migration_request() + locator::tablet_task_info::make_migration_request(), + 0 }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { @@ -195,7 +197,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { }, {}, {}, - locator::tablet_task_info::make_migration_request() + locator::tablet_task_info::make_migration_request(), + 0 }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { @@ -347,7 +350,8 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence_with_colocated_tables) { }, db_clock::now(), locator::tablet_task_info::make_auto_repair_request({}, {"dc1", "dc2"}), - locator::tablet_task_info::make_intranode_migration_request() + locator::tablet_task_info::make_intranode_migration_request(), + 0 }); tm.set_tablet_map(table1, std::move(tmap)); } diff --git a/test/lib/sstable_run_based_compaction_strategy_for_tests.hh b/test/lib/sstable_run_based_compaction_strategy_for_tests.hh index 763047aecc..6725e786bf 100644 --- a/test/lib/sstable_run_based_compaction_strategy_for_tests.hh +++ b/test/lib/sstable_run_based_compaction_strategy_for_tests.hh @@ -14,6 +14,7 @@ #include "sstables/sstable_set.hh" #include "compaction/compaction.hh" #include "replica/database.hh" +#include "repair/incremental.hh" namespace sstables { diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 88c6bb582f..712240cc48 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -131,6 +131,7 @@ public: dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return table().get_token_range_after_split(t); } + int64_t get_sstables_repaired_at() const noexcept override { return 0; } }; table_for_tests::data::data() diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index 92e1c0efc6..2cd881ed00 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -976,6 +976,7 @@ public: virtual const std::string get_group_id() const noexcept override { return _group_id; } virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; } dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return dht::token_range(); } + int64_t get_sstables_repaired_at() const noexcept override { return 0; } }; void validate_output_dir(std::filesystem::path output_dir, bool accept_nonempty_output_dir) {