repair: Always reset node ops progress to 100% upon completion
Always set the node ops progress to 100% when the operation finishes,
regardless of success or failure. This ensures the progress never
remains below 100%, which would otherwise indicates a pending node
operation in case of an error.
Fixes #26193
Closes scylladb/scylladb#26194
(cherry picked from commit b31e651657)
Closes scylladb/scylladb#26265
This commit is contained in:
@@ -1623,13 +1623,13 @@ future<std::optional<double>> repair::data_sync_repair_task_impl::expected_total
|
||||
|
||||
future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens)] () mutable {
|
||||
auto reason = streaming::stream_reason::bootstrap;
|
||||
return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens), reason] () mutable {
|
||||
auto& db = get_db().local();
|
||||
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
|
||||
auto& topology = tmptr->get_topology();
|
||||
auto myloc = topology.get_location();
|
||||
auto myid = tmptr->get_my_id();
|
||||
auto reason = streaming::stream_reason::bootstrap;
|
||||
// Calculate number of ranges to sync data
|
||||
size_t nr_ranges_total = 0;
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
@@ -1795,10 +1795,31 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges * nr_tables);
|
||||
}
|
||||
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | std::views::keys);
|
||||
}).finally([this, reason] { return reset_node_ops_progress(reason); });
|
||||
}
|
||||
|
||||
future<> repair_service::reset_node_ops_progress(streaming::stream_reason reason) {
|
||||
return container().invoke_on_all([reason] (repair_service& rs) {
|
||||
if (reason == streaming::stream_reason::bootstrap) {
|
||||
rs.get_metrics().bootstrap_finished_ranges = 0;
|
||||
rs.get_metrics().bootstrap_total_ranges = 0;
|
||||
} else if (reason == streaming::stream_reason::replace) {
|
||||
rs.get_metrics().replace_finished_ranges = 0;
|
||||
rs.get_metrics().replace_total_ranges = 0;
|
||||
} else if (reason == streaming::stream_reason::rebuild) {
|
||||
rs.get_metrics().rebuild_finished_ranges = 0;
|
||||
rs.get_metrics().rebuild_total_ranges = 0;
|
||||
} else if (reason == streaming::stream_reason::decommission) {
|
||||
rs.get_metrics().decommission_finished_ranges = 0;
|
||||
rs.get_metrics().decommission_total_ranges = 0;
|
||||
} else if (reason == streaming::stream_reason::removenode) {
|
||||
rs.get_metrics().removenode_finished_ranges = 0;
|
||||
rs.get_metrics().removenode_total_ranges = 0;
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node_id, shared_ptr<node_ops_info> ops) {
|
||||
future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node_id, shared_ptr<node_ops_info> ops, streaming::stream_reason reason) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
return seastar::async([this, tmptr = std::move(tmptr), leaving_node_id = std::move(leaving_node_id), ops] () mutable {
|
||||
auto& db = get_db().local();
|
||||
@@ -1989,18 +2010,18 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m
|
||||
op, keyspace_name, leaving_node_id, nr_ranges_total, nr_ranges_synced * nr_tables, nr_ranges_skipped * nr_tables);
|
||||
}
|
||||
rlogger.info("{}: finished with keyspaces={}, leaving_node={}", op, ks_erms | std::views::keys, leaving_node_id);
|
||||
});
|
||||
}).finally([this, reason] { return reset_node_ops_progress(reason); });
|
||||
}
|
||||
|
||||
future<> repair_service::decommission_with_repair(locator::token_metadata_ptr tmptr) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
auto my_address = tmptr->get_topology().my_host_id();
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), my_address, {});
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), my_address, {}, streaming::stream_reason::decommission);
|
||||
}
|
||||
|
||||
future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), std::move(leaving_node), std::move(ops)).then([this] {
|
||||
return do_decommission_removenode_with_repair(std::move(tmptr), std::move(leaving_node), std::move(ops), streaming::stream_reason::removenode).then([this] {
|
||||
rlogger.debug("Triggering off-strategy compaction for all non-system tables on removenode completion");
|
||||
seastar::sharded<replica::database>& db = get_db();
|
||||
return db.invoke_on_all([](replica::database &db) {
|
||||
@@ -2220,7 +2241,7 @@ future<> repair_service::rebuild_with_repair(std::unordered_map<sstring, locator
|
||||
}
|
||||
auto reason = streaming::stream_reason::rebuild;
|
||||
rlogger.info("{}: this-node={} source_dc={}", op, *topology.this_node(), source_dc);
|
||||
co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(tmptr), std::move(op), std::move(source_dc), reason);
|
||||
co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(tmptr), std::move(op), std::move(source_dc), reason).finally([this, reason] { return reset_node_ops_progress(reason);});
|
||||
co_await get_db().invoke_on_all([](replica::database& db) {
|
||||
for (auto& t : db.get_non_system_column_families()) {
|
||||
t->trigger_offstrategy_compaction();
|
||||
@@ -2242,7 +2263,7 @@ future<> repair_service::replace_with_repair(std::unordered_map<sstring, locator
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id());
|
||||
auto source_dc = utils::optional_param(myloc.dc);
|
||||
rlogger.info("{}: this-node={} ignore_nodes={} source_dc={}", op, *topology.this_node(), ignore_nodes, source_dc);
|
||||
co_return co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes), replaced_node);
|
||||
co_await do_rebuild_replace_with_repair(std::move(ks_erms), std::move(cloned_tmptr), std::move(op), std::move(source_dc), reason, std::move(ignore_nodes), replaced_node).finally([this, reason] { return reset_node_ops_progress(reason); });
|
||||
}
|
||||
|
||||
static std::unordered_set<locator::host_id> get_token_owners_in_dcs(std::vector<sstring> data_centers, locator::effective_replication_map_ptr erm) {
|
||||
|
||||
@@ -165,7 +165,7 @@ public:
|
||||
future<> rebuild_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, utils::optional_param source_dc);
|
||||
future<> replace_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<locator::host_id> ignore_nodes, locator::host_id replaced_node);
|
||||
private:
|
||||
future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, locator::host_id leaving_node, shared_ptr<node_ops_info> ops, streaming::stream_reason reason);
|
||||
|
||||
future<> do_rebuild_replace_with_repair(std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> ks_erms, locator::token_metadata_ptr tmptr, sstring op, utils::optional_param source_dc, streaming::stream_reason reason, std::unordered_set<locator::host_id> ignore_nodes = {}, locator::host_id replaced_node = {});
|
||||
|
||||
@@ -177,6 +177,8 @@ private:
|
||||
streaming::stream_reason reason,
|
||||
shared_ptr<node_ops_info> ops_info);
|
||||
|
||||
future<> reset_node_ops_progress(streaming::stream_reason reason);
|
||||
|
||||
public:
|
||||
future<> repair_tablets(repair_uniq_id id, sstring keyspace_name, std::vector<sstring> table_names, bool primary_replica_only = true, dht::token_range_vector ranges_specified = {}, std::vector<sstring> dcs = {}, std::unordered_set<locator::host_id> hosts = {}, std::unordered_set<locator::host_id> ignore_nodes = {}, std::optional<int> ranges_parallelism = std::nullopt);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user