repair: repair_flush_hints_batchlog_request::target_nodes is not used any more, so mark it as such

After b3b3e880d3 target_nodes is not used
by the receiver, so we can skip setting it on sender as well.
This commit is contained in:
Gleb Natapov
2024-12-03 11:09:56 +02:00
parent 92c2558a83
commit c095f63ea5
5 changed files with 25 additions and 30 deletions

View File

@@ -87,7 +87,7 @@ struct repair_update_system_table_response {
struct repair_flush_hints_batchlog_request {
tasks::task_id repair_uuid;
std::list<gms::inet_address> target_nodes;
std::list<gms::inet_address> unused;
std::chrono::seconds hints_timeout;
std::chrono::seconds batchlog_timeout;
};

View File

@@ -375,7 +375,7 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(
future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repair_uniq_id id,
sstring keyspace, std::vector<sstring> cfs,
std::unordered_set<gms::inet_address> ignore_nodes, std::list<gms::inet_address> participants) {
std::unordered_set<gms::inet_address> ignore_nodes) {
auto& db = get_db().local();
auto uuid = id.uuid();
bool needs_flush_before_repair = false;
@@ -400,13 +400,13 @@ future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repai
});
auto hints_timeout = std::chrono::seconds(300);
auto batchlog_timeout = std::chrono::seconds(300);
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
repair_flush_hints_batchlog_request req{id.uuid(), {}, hints_timeout, batchlog_timeout};
auto start_time = gc_clock::now();
std::vector<gc_clock::time_point> times;
try {
co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, &times, &req, &participants] (gms::inet_address node) -> future<> {
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
uuid, node, participants);
co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, &times, &req] (gms::inet_address node) -> future<> {
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, started",
uuid, node);
try {
auto& ms = get_messaging();
auto resp = co_await ser::repair_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
@@ -418,8 +418,8 @@ future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repai
times.push_back(resp.flush_time);
}
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
uuid, node, participants, std::current_exception());
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, failed: {}",
uuid, node, std::current_exception());
throw;
}
});
@@ -431,11 +431,10 @@ future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repai
auto duration = std::chrono::duration<float>(gc_clock::now() - start_time);
rlogger.info("repair[{}]: Finished repair_flush_hints_batchlog flush_times={} flush_time={} flush_duration={}", uuid, times, flush_time, duration);
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
uuid, participants);
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog failed, continue to run repair", uuid);
}
} else {
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants);
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog", uuid);
}
co_return std::make_tuple(hints_batchlog_flushed, flush_time);
}
@@ -1360,7 +1359,7 @@ future<> repair::user_requested_repair_task_impl::run() {
} else {
participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get();
}
auto [hints_batchlog_flushed, flush_time] = rs.flush_hints(id, keyspace, cfs, ignore_nodes, participants).get();
auto [hints_batchlog_flushed, flush_time] = rs.flush_hints(id, keyspace, cfs, ignore_nodes).get();
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
@@ -2588,10 +2587,7 @@ future<> repair::tablet_repair_task_impl::run() {
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
auto my_address = erm->get_topology().my_address();
auto participants = std::list<gms::inet_address>(m.neighbors.all.begin(), m.neighbors.all.end());
participants.push_front(my_address);
auto [hints_batchlog_flushed, flush_time] = co_await rs.flush_hints(id, m.keyspace_name, tables, ignore_nodes, participants);
auto [hints_batchlog_flushed, flush_time] = co_await rs.flush_hints(id, m.keyspace_name, tables, ignore_nodes);
bool small_table_optimization = false;
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(rs._repair_module, tasks::task_id::create_random_id(),

View File

@@ -260,7 +260,7 @@ struct repair_update_system_table_response {
struct repair_flush_hints_batchlog_request {
tasks::task_id repair_uuid;
std::list<gms::inet_address> target_nodes;
std::list<gms::inet_address> unused;
std::chrono::seconds hints_timeout;
std::chrono::seconds batchlog_timeout;
};

View File

@@ -2309,8 +2309,8 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
return rs.repair_flush_hints_batchlog_handler(from, std::move(req));
});
}
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} target_nodes={} hints_timeout={}s batchlog_timeout={}s",
req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count());
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} hints_timeout={}s batchlog_timeout={}s",
req.repair_uuid, from, req.hints_timeout.count(), req.batchlog_timeout.count());
auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1);
bool updated = false;
auto now = gc_clock::now();
@@ -2319,8 +2319,7 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
auto flush_time = now;
if (cache_disabled || (now - _flush_hints_batchlog_time > cache_time)) {
// Empty targets meants all nodes
std::vector<gms::inet_address> target_nodes;
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes));
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::vector<gms::inet_address>{});
lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
try {
bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized");
@@ -2329,13 +2328,13 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
}
co_await coroutine::all(
[this, &from, &req, &sync_point, &deadline] () -> future<> {
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline);
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes);
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
co_return;
},
[this, now, cache_disabled, &flush_time, &cache_time, &from, &req] () -> future<> {
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
auto last_replay = _bm.local().get_last_replay();
bool issue_flush = false;
if (cache_disabled) {
@@ -2364,12 +2363,12 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no);
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time));
}
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}, flushed={}", req.repair_uuid, from, req.target_nodes, issue_flush);
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={}", req.repair_uuid, from, issue_flush);
}
);
} catch (...) {
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={} target_hosts={}: {}",
req.repair_uuid, from, req.target_nodes, std::current_exception());
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}: {}",
req.repair_uuid, from, std::current_exception());
throw;
}
co_await container().invoke_on_all([flush_time] (repair_service& rs) {
@@ -2380,8 +2379,8 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush", fmt::to_string(flush_time));
}
auto duration = std::chrono::duration<float>(gc_clock::now() - now);
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} target_nodes={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}",
req.repair_uuid, from, req.target_nodes, updated, _flush_hints_batchlog_time, cache_time, duration);
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}",
req.repair_uuid, from, updated, _flush_hints_batchlog_time, cache_time, duration);
repair_flush_hints_batchlog_response resp{ .flush_time = _flush_hints_batchlog_time };
co_return resp;
}

View File

@@ -124,7 +124,7 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
gc_clock::time_point _flush_hints_batchlog_time;
future<std::tuple<bool, gc_clock::time_point>> flush_hints(repair_uniq_id id,
sstring keyspace, std::vector<sstring> cfs,
std::unordered_set<gms::inet_address> ignore_nodes, std::list<gms::inet_address> participants);
std::unordered_set<gms::inet_address> ignore_nodes);
public:
repair_service(sharded<service::topology_state_machine>& tsm,