Merge "move more verbs to idl" from Gleb
" The series moves node ops, repair and streaming verbs to IDL. Also contains IDL related cleanups. In addition to the CI tested manually by bootstrapping a node with the series into a cluster of old nodes with repair and streaming both in gossiper and raft mode. This exercises repair, streaming and node_ops paths. " * 'gleb/move-more-rpcs-to-idl-v3' of github.com:scylladb/scylla-dev: repair: repair_flush_hints_batchlog_request::target_nodes is not used any more, so mark it as such streaming: move streaming verbs to IDL messaging_service: move repair verbs to IDL node_ops: move node_ops_cmd to IDL idl: rename partition_checksum.dist.hh to repair.dist.hh idl: move node_ops related stuff from the repair related IDL
This commit is contained in:
@@ -1288,7 +1288,7 @@ idls = ['idl/gossip_digest.idl.hh',
|
||||
'idl/streaming.idl.hh',
|
||||
'idl/paging_state.idl.hh',
|
||||
'idl/frozen_schema.idl.hh',
|
||||
'idl/partition_checksum.idl.hh',
|
||||
'idl/repair.idl.hh',
|
||||
'idl/replay_position.idl.hh',
|
||||
'idl/mutation.idl.hh',
|
||||
'idl/query.idl.hh',
|
||||
@@ -1316,6 +1316,7 @@ idls = ['idl/gossip_digest.idl.hh',
|
||||
'idl/utils.idl.hh',
|
||||
'idl/gossip.idl.hh',
|
||||
'idl/migration_manager.idl.hh',
|
||||
"idl/node_ops.idl.hh",
|
||||
|
||||
]
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ set(idl_headers
|
||||
streaming.idl.hh
|
||||
paging_state.idl.hh
|
||||
frozen_schema.idl.hh
|
||||
partition_checksum.idl.hh
|
||||
repair.idl.hh
|
||||
replay_position.idl.hh
|
||||
mutation.idl.hh
|
||||
query.idl.hh
|
||||
@@ -63,6 +63,7 @@ set(idl_headers
|
||||
utils.idl.hh
|
||||
gossip.idl.hh
|
||||
migration_manager.idl.hh
|
||||
node_ops.idl.hh
|
||||
)
|
||||
|
||||
foreach(idl_header ${idl_headers})
|
||||
|
||||
54
idl/node_ops.idl.hh
Normal file
54
idl/node_ops.idl.hh
Normal file
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "node_ops/id.hh"
|
||||
|
||||
enum class node_ops_cmd : uint32_t {
|
||||
removenode_prepare,
|
||||
removenode_heartbeat,
|
||||
removenode_sync_data,
|
||||
removenode_abort,
|
||||
removenode_done,
|
||||
replace_prepare,
|
||||
replace_prepare_mark_alive,
|
||||
replace_prepare_pending_ranges,
|
||||
replace_heartbeat,
|
||||
replace_abort,
|
||||
replace_done,
|
||||
decommission_prepare,
|
||||
decommission_heartbeat,
|
||||
decommission_abort,
|
||||
decommission_done,
|
||||
bootstrap_prepare,
|
||||
bootstrap_heartbeat,
|
||||
bootstrap_abort,
|
||||
bootstrap_done,
|
||||
query_pending_ops,
|
||||
repair_updater,
|
||||
};
|
||||
|
||||
class node_ops_id final {
|
||||
utils::UUID uuid();
|
||||
};
|
||||
|
||||
struct node_ops_cmd_request {
|
||||
node_ops_cmd cmd;
|
||||
node_ops_id ops_uuid;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
std::list<gms::inet_address> leaving_nodes;
|
||||
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes;
|
||||
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap_nodes;
|
||||
std::list<table_id> repair_tables;
|
||||
};
|
||||
|
||||
struct node_ops_cmd_response {
|
||||
bool ok;
|
||||
std::list<node_ops_id> pending_ops;
|
||||
};
|
||||
|
||||
verb [[with_client_info]] node_ops_cmd(node_ops_cmd_request) -> node_ops_cmd_response
|
||||
@@ -73,58 +73,6 @@ struct repair_row_level_start_response {
|
||||
repair_row_level_start_status status;
|
||||
};
|
||||
|
||||
enum class node_ops_cmd : uint32_t {
|
||||
removenode_prepare,
|
||||
removenode_heartbeat,
|
||||
removenode_sync_data,
|
||||
removenode_abort,
|
||||
removenode_done,
|
||||
replace_prepare,
|
||||
replace_prepare_mark_alive,
|
||||
replace_prepare_pending_ranges,
|
||||
replace_heartbeat,
|
||||
replace_abort,
|
||||
replace_done,
|
||||
decommission_prepare,
|
||||
decommission_heartbeat,
|
||||
decommission_abort,
|
||||
decommission_done,
|
||||
bootstrap_prepare,
|
||||
bootstrap_heartbeat,
|
||||
bootstrap_abort,
|
||||
bootstrap_done,
|
||||
query_pending_ops,
|
||||
repair_updater,
|
||||
};
|
||||
|
||||
class node_ops_id final {
|
||||
utils::UUID uuid();
|
||||
};
|
||||
|
||||
struct node_ops_cmd_request {
|
||||
// Mandatory field, set by all cmds
|
||||
node_ops_cmd cmd;
|
||||
// Mandatory field, set by all cmds
|
||||
node_ops_id ops_uuid;
|
||||
// Optional field, list nodes to ignore, set by all cmds
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
// Optional field, list leaving nodes, set by decommission and removenode cmd
|
||||
std::list<gms::inet_address> leaving_nodes;
|
||||
// Optional field, map existing nodes to replacing nodes, set by replace cmd
|
||||
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes;
|
||||
// Optional field, map bootstrapping nodes to bootstrap tokens, set by bootstrap cmd
|
||||
std::unordered_map<gms::inet_address, std::list<dht::token>> bootstrap_nodes;
|
||||
// Optional field, list uuids of tables being repaired, set by repair cmd
|
||||
std::list<table_id> repair_tables;
|
||||
};
|
||||
|
||||
struct node_ops_cmd_response {
|
||||
// Mandatory field, set by all cmds
|
||||
bool ok;
|
||||
// Optional field, set by query_pending_ops cmd
|
||||
std::list<node_ops_id> pending_ops;
|
||||
};
|
||||
|
||||
struct repair_update_system_table_request {
|
||||
tasks::task_id repair_uuid;
|
||||
table_id table_uuid;
|
||||
@@ -139,7 +87,7 @@ struct repair_update_system_table_response {
|
||||
|
||||
struct repair_flush_hints_batchlog_request {
|
||||
tasks::task_id repair_uuid;
|
||||
std::list<gms::inet_address> target_nodes;
|
||||
std::list<gms::inet_address> unused;
|
||||
std::chrono::seconds hints_timeout;
|
||||
std::chrono::seconds batchlog_timeout;
|
||||
};
|
||||
@@ -150,3 +98,13 @@ struct repair_flush_hints_batchlog_response {
|
||||
|
||||
verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request req [[ref]]) -> repair_update_system_table_response;
|
||||
verb [[with_client_info]] repair_flush_hints_batchlog (repair_flush_hints_batchlog_request req [[ref]]) -> repair_flush_hints_batchlog_response;
|
||||
verb [[with_client_info]] repair_get_full_row_hashes (uint32_t repair_meta_id, shard_id dst_shard_id [[version 5.2]]) -> repair_hash_set;
|
||||
verb [[with_client_info]] repair_get_combined_row_hash (uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, shard_id dst_shard_id [[version 5.2]]) -> get_combined_row_hash_response;
|
||||
verb [[with_client_info]] repair_get_sync_boundary (uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, shard_id dst_shard_id [[version 5.2]]) -> get_sync_boundary_response;
|
||||
verb [[with_client_info]] repair_get_row_diff (uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_shard_id [[version 5.2]]) -> repair_rows_on_wire;
|
||||
verb [[with_client_info]] repair_put_row_diff (uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_shard_id [[version 5.2]]);
|
||||
verb [[with_client_info]] repair_row_level_start (uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason [[version 4.1.0]], gc_clock::time_point compaction_time [[version 5.2]], shard_id dst_shard_id [[version 5.2]]) -> repair_row_level_start_response [[version 4.2.0]];
|
||||
verb [[with_client_info]] repair_row_level_stop (uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_shard_id [[version 5.2]]);
|
||||
verb [[with_client_info]] repair_get_estimated_partitions (uint32_t repair_meta_id, shard_id dst_shard_id [[version 5.2]]) -> uint64_t;
|
||||
verb [[with_client_info]] repair_set_estimated_partitions (uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_shard_id [[version 5.2]]);
|
||||
verb [[with_client_info]] repair_get_diff_algorithms () -> std::vector<row_level_diff_detect_algorithm>;
|
||||
@@ -12,6 +12,19 @@
|
||||
|
||||
#include "streaming/stream_fwd.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
// Before the mode of prepare_message verb to the IDL
|
||||
// there was no serizlizer for session_id and one from
|
||||
// raft_storage.idl.hh for tagged_id was erroneously
|
||||
// used. It does not marked as `final`, so here we have
|
||||
// to omit it as well for compatibility.
|
||||
class session_id {
|
||||
utils::UUID uuid();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace streaming {
|
||||
|
||||
class plan_id final {
|
||||
@@ -55,4 +68,8 @@ enum class stream_mutation_fragments_cmd : uint8_t {
|
||||
end_of_stream,
|
||||
};
|
||||
|
||||
verb [[with_client_info]] prepare_message (streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, streaming::stream_reason reason [[version 3.1.0]], service::session_id session [[version 6.0.0]]) -> streaming::prepare_message;
|
||||
verb [[with_client_info]] prepare_done_message (streaming::plan_id plan_id, unsigned dst_cpu_id);
|
||||
verb [[with_client_info]] stream_mutation_done (streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id);
|
||||
verb [[with_client_info]] complete_message (streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed [[version 2.1.0]]);
|
||||
}
|
||||
|
||||
@@ -42,4 +42,3 @@ class host_id final {
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
|
||||
|
||||
@@ -62,7 +62,8 @@
|
||||
#include "idl/read_command.dist.hh"
|
||||
#include "idl/range.dist.hh"
|
||||
#include "idl/position_in_partition.dist.hh"
|
||||
#include "idl/partition_checksum.dist.hh"
|
||||
#include "idl/repair.dist.hh"
|
||||
#include "idl/node_ops.dist.hh"
|
||||
#include "idl/query.dist.hh"
|
||||
#include "idl/cache_temperature.dist.hh"
|
||||
#include "idl/view.dist.hh"
|
||||
@@ -116,7 +117,8 @@
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "streaming/stream_manager.hh"
|
||||
#include "streaming/stream_mutation_fragments_cmd.hh"
|
||||
#include "idl/partition_checksum.dist.impl.hh"
|
||||
#include "idl/repair.dist.impl.hh"
|
||||
#include "idl/node_ops.dist.impl.hh"
|
||||
#include "idl/mapreduce_request.dist.hh"
|
||||
#include "idl/mapreduce_request.dist.impl.hh"
|
||||
#include "idl/storage_service.dist.impl.hh"
|
||||
@@ -1203,182 +1205,6 @@ future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_strea
|
||||
|
||||
// Wrappers for verbs
|
||||
|
||||
// PREPARE_MESSAGE
|
||||
void messaging_service::register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
|
||||
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason, rpc::optional<service::session_id>)>&& func) {
|
||||
register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func));
|
||||
}
|
||||
future<streaming::prepare_message> messaging_service::send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id,
|
||||
sstring description, streaming::stream_reason reason, service::session_id session) {
|
||||
return send_message<streaming::prepare_message>(this, messaging_verb::PREPARE_MESSAGE, id,
|
||||
std::move(msg), plan_id, std::move(description), reason, session);
|
||||
}
|
||||
future<> messaging_service::unregister_prepare_message() {
|
||||
return unregister_handler(messaging_verb::PREPARE_MESSAGE);
|
||||
}
|
||||
|
||||
// PREPARE_DONE_MESSAGE
|
||||
void messaging_service::register_prepare_done_message(std::function<future<> (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id)>&& func) {
|
||||
register_handler(this, messaging_verb::PREPARE_DONE_MESSAGE, std::move(func));
|
||||
}
|
||||
future<> messaging_service::send_prepare_done_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id) {
|
||||
return send_message<void>(this, messaging_verb::PREPARE_DONE_MESSAGE, id,
|
||||
plan_id, dst_cpu_id);
|
||||
}
|
||||
future<> messaging_service::unregister_prepare_done_message() {
|
||||
return unregister_handler(messaging_verb::PREPARE_DONE_MESSAGE);
|
||||
}
|
||||
|
||||
// STREAM_MUTATION_DONE
|
||||
void messaging_service::register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo,
|
||||
streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id)>&& func) {
|
||||
register_handler(this, messaging_verb::STREAM_MUTATION_DONE,
|
||||
[func = std::move(func)] (const rpc::client_info& cinfo,
|
||||
streaming::plan_id plan_id, std::vector<wrapping_interval<dht::token>> ranges,
|
||||
table_id cf_id, unsigned dst_cpu_id) mutable {
|
||||
return func(cinfo, plan_id, ::compat::unwrap(std::move(ranges)), cf_id, dst_cpu_id);
|
||||
});
|
||||
}
|
||||
future<> messaging_service::send_stream_mutation_done(msg_addr id, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id) {
|
||||
return send_message<void>(this, messaging_verb::STREAM_MUTATION_DONE, id,
|
||||
plan_id, std::move(ranges), cf_id, dst_cpu_id);
|
||||
}
|
||||
future<> messaging_service::unregister_stream_mutation_done() {
|
||||
return unregister_handler(messaging_verb::STREAM_MUTATION_DONE);
|
||||
}
|
||||
|
||||
// COMPLETE_MESSAGE
|
||||
void messaging_service::register_complete_message(std::function<future<> (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id, rpc::optional<bool> failed)>&& func) {
|
||||
register_handler(this, messaging_verb::COMPLETE_MESSAGE, std::move(func));
|
||||
}
|
||||
future<> messaging_service::send_complete_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed) {
|
||||
return send_message<void>(this, messaging_verb::COMPLETE_MESSAGE, id,
|
||||
plan_id, dst_cpu_id, failed);
|
||||
}
|
||||
future<> messaging_service::unregister_complete_message() {
|
||||
return unregister_handler(messaging_verb::COMPLETE_MESSAGE);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void messaging_service::register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_full_row_hashes() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES);
|
||||
}
|
||||
future<repair_hash_set> messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id, shard_id dst_shard_id) {
|
||||
return send_message<future<repair_hash_set>>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
|
||||
void messaging_service::register_repair_get_combined_row_hash(std::function<future<get_combined_row_hash_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_combined_row_hash() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_COMBINED_ROW_HASH);
|
||||
}
|
||||
future<get_combined_row_hash_response> messaging_service::send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, shard_id dst_shard_id) {
|
||||
return send_message<future<get_combined_row_hash_response>>(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(id), repair_meta_id, std::move(common_sync_boundary), dst_shard_id);
|
||||
}
|
||||
|
||||
void messaging_service::register_repair_get_sync_boundary(std::function<future<get_sync_boundary_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_sync_boundary() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_SYNC_BOUNDARY);
|
||||
}
|
||||
future<get_sync_boundary_response> messaging_service::send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, shard_id dst_shard_id) {
|
||||
return send_message<future<get_sync_boundary_response>>(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(id), repair_meta_id, std::move(skipped_sync_boundary), dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF
|
||||
void messaging_service::register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_row_diff() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF);
|
||||
}
|
||||
future<repair_rows_on_wire> messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_shard_id) {
|
||||
return send_message<future<repair_rows_on_wire>>(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(id), repair_meta_id, std::move(set_diff), needs_all_rows, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF
|
||||
void messaging_service::register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_put_row_diff() {
|
||||
return unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF);
|
||||
}
|
||||
future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_shard_id) {
|
||||
return send_message<void>(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(id), repair_meta_id, std::move(row_diff), dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_START
|
||||
void messaging_service::register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_row_level_start() {
|
||||
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
|
||||
}
|
||||
future<rpc::optional<repair_row_level_start_response>> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_shard_id) {
|
||||
return send_message<rpc::optional<repair_row_level_start_response>>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason, compaction_time, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
||||
void messaging_service::register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_row_level_stop() {
|
||||
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_STOP);
|
||||
}
|
||||
future<> messaging_service::send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_shard_id) {
|
||||
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS
|
||||
void messaging_service::register_repair_get_estimated_partitions(std::function<future<uint64_t> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_estimated_partitions() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS);
|
||||
}
|
||||
future<uint64_t> messaging_service::send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id, shard_id dst_shard_id) {
|
||||
return send_message<future<uint64_t>>(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS
|
||||
void messaging_service::register_repair_set_estimated_partitions(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_set_estimated_partitions() {
|
||||
return unregister_handler(messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS);
|
||||
}
|
||||
future<> messaging_service::send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_shard_id) {
|
||||
return send_message<void>(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, estimated_partitions, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_DIFF_ALGORITHMS
|
||||
void messaging_service::register_repair_get_diff_algorithms(std::function<future<std::vector<row_level_diff_detect_algorithm>> (const rpc::client_info& cinfo)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_diff_algorithms() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_DIFF_ALGORITHMS);
|
||||
}
|
||||
future<std::vector<row_level_diff_detect_algorithm>> messaging_service::send_repair_get_diff_algorithms(msg_addr id) {
|
||||
return send_message<future<std::vector<row_level_diff_detect_algorithm>>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id));
|
||||
}
|
||||
|
||||
// Wrapper for NODE_OPS_CMD
|
||||
void messaging_service::register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func) {
|
||||
register_handler(this, messaging_verb::NODE_OPS_CMD, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_node_ops_cmd() {
|
||||
return unregister_handler(messaging_verb::NODE_OPS_CMD);
|
||||
}
|
||||
future<node_ops_cmd_response> messaging_service::send_node_ops_cmd(msg_addr id, node_ops_cmd_request req) {
|
||||
return send_message<future<node_ops_cmd_response>>(this, messaging_verb::NODE_OPS_CMD, std::move(id), std::move(req));
|
||||
}
|
||||
|
||||
// Wrapper for TASKS_CHILDREN_REQUEST
|
||||
void messaging_service::register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) {
|
||||
register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func));
|
||||
|
||||
@@ -394,18 +394,6 @@ public:
|
||||
|
||||
future<> unregister_handler(messaging_verb verb);
|
||||
|
||||
// Wrapper for PREPARE_MESSAGE verb
|
||||
void register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
|
||||
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason, rpc::optional<service::session_id>)>&& func);
|
||||
future<streaming::prepare_message> send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id,
|
||||
sstring description, streaming::stream_reason, service::session_id);
|
||||
future<> unregister_prepare_message();
|
||||
|
||||
// Wrapper for PREPARE_DONE_MESSAGE verb
|
||||
void register_prepare_done_message(std::function<future<> (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id)>&& func);
|
||||
future<> send_prepare_done_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id);
|
||||
future<> unregister_prepare_done_message();
|
||||
|
||||
// Wrapper for STREAM_MUTATION_FRAGMENTS
|
||||
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, -2 means error and table is dropped, other status code value are reserved for future use.
|
||||
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source, rpc::optional<service::session_id>)>&& func);
|
||||
@@ -431,69 +419,6 @@ public:
|
||||
void register_repair_get_full_row_hashes_with_rpc_stream(std::function<future<rpc::sink<repair_hash_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_stream_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt)>&& func);
|
||||
future<> unregister_repair_get_full_row_hashes_with_rpc_stream();
|
||||
|
||||
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id)>&& func);
|
||||
future<> send_stream_mutation_done(msg_addr id, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id);
|
||||
future<> unregister_stream_mutation_done();
|
||||
|
||||
void register_complete_message(std::function<future<> (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id, rpc::optional<bool> failed)>&& func);
|
||||
future<> send_complete_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed = false);
|
||||
future<> unregister_complete_message();
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_full_row_hashes();
|
||||
future<repair_hash_set> send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
|
||||
void register_repair_get_combined_row_hash(std::function<future<get_combined_row_hash_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_combined_row_hash();
|
||||
future<get_combined_row_hash_response> send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_SYNC_BOUNDARY
|
||||
void register_repair_get_sync_boundary(std::function<future<get_sync_boundary_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_sync_boundary();
|
||||
future<get_sync_boundary_response> send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF
|
||||
void register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_row_diff();
|
||||
future<repair_rows_on_wire> send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF
|
||||
void register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_put_row_diff();
|
||||
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_START
|
||||
void register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_row_level_start();
|
||||
future<rpc::optional<repair_row_level_start_response>> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
||||
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_row_level_stop();
|
||||
future<> send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS
|
||||
void register_repair_get_estimated_partitions(std::function<future<uint64_t> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_estimated_partitions();
|
||||
future<uint64_t> send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS
|
||||
void register_repair_set_estimated_partitions(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_set_estimated_partitions();
|
||||
future<> send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_DIFF_ALGORITHMS
|
||||
void register_repair_get_diff_algorithms(std::function<future<std::vector<row_level_diff_detect_algorithm>> (const rpc::client_info& cinfo)>&& func);
|
||||
future<> unregister_repair_get_diff_algorithms();
|
||||
future<std::vector<row_level_diff_detect_algorithm>> send_repair_get_diff_algorithms(msg_addr id);
|
||||
|
||||
// Wrapper for NODE_OPS_CMD
|
||||
void register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func);
|
||||
future<> unregister_node_ops_cmd();
|
||||
future<node_ops_cmd_response> send_node_ops_cmd(msg_addr id, node_ops_cmd_request);
|
||||
|
||||
// Wrapper for TASKS_GET_CHILDREN
|
||||
void register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func);
|
||||
future<> unregister_tasks_get_children();
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "idl/node_ops.dist.hh"
|
||||
|
||||
static logging::logger nlogger("node_ops");
|
||||
|
||||
@@ -104,7 +105,7 @@ void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) {
|
||||
future<> node_ops_ctl::query_pending_op() {
|
||||
req.cmd = node_ops_cmd::query_pending_ops;
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> {
|
||||
auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
auto resp = co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
|
||||
if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
|
||||
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
|
||||
@@ -152,7 +153,7 @@ future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) {
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
|
||||
} catch (const seastar::rpc::unknown_verb_error&) {
|
||||
if (cmd_category == node_ops_cmd_category::prepare) {
|
||||
@@ -195,7 +196,7 @@ future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) {
|
||||
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
|
||||
try {
|
||||
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
|
||||
} catch (...) {
|
||||
nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
|
||||
|
||||
@@ -50,7 +50,8 @@
|
||||
#include <atomic>
|
||||
#include <utility>
|
||||
|
||||
#include "idl/partition_checksum.dist.hh"
|
||||
#include "idl/repair.dist.hh"
|
||||
#include "idl/node_ops.dist.hh"
|
||||
#include "utils/user_provided_param.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@@ -375,7 +376,7 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(
|
||||
|
||||
future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repair_uniq_id id,
|
||||
sstring keyspace, std::vector<sstring> cfs,
|
||||
std::unordered_set<gms::inet_address> ignore_nodes, std::list<gms::inet_address> participants) {
|
||||
std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
auto& db = get_db().local();
|
||||
auto uuid = id.uuid();
|
||||
bool needs_flush_before_repair = false;
|
||||
@@ -400,16 +401,16 @@ future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repai
|
||||
});
|
||||
auto hints_timeout = std::chrono::seconds(300);
|
||||
auto batchlog_timeout = std::chrono::seconds(300);
|
||||
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
|
||||
repair_flush_hints_batchlog_request req{id.uuid(), {}, hints_timeout, batchlog_timeout};
|
||||
auto start_time = gc_clock::now();
|
||||
std::vector<gc_clock::time_point> times;
|
||||
try {
|
||||
co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, ×, &req, &participants] (gms::inet_address node) -> future<> {
|
||||
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
|
||||
uuid, node, participants);
|
||||
co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, ×, &req] (gms::inet_address node) -> future<> {
|
||||
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, started",
|
||||
uuid, node);
|
||||
try {
|
||||
auto& ms = get_messaging();
|
||||
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
|
||||
auto resp = co_await ser::repair_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
|
||||
if (resp.flush_time == gc_clock::time_point()) {
|
||||
// This means the node does not support sending flush_time back. Use the time when the flush is requested for flush_time.
|
||||
rlogger.debug("repair[{}]: Got empty flush_time from node={}. Please upgrade the node={}.", uuid, node, node);
|
||||
@@ -418,8 +419,8 @@ future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repai
|
||||
times.push_back(resp.flush_time);
|
||||
}
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
|
||||
uuid, node, participants, std::current_exception());
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, failed: {}",
|
||||
uuid, node, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
@@ -431,11 +432,10 @@ future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repai
|
||||
auto duration = std::chrono::duration<float>(gc_clock::now() - start_time);
|
||||
rlogger.info("repair[{}]: Finished repair_flush_hints_batchlog flush_times={} flush_time={} flush_duration={}", uuid, times, flush_time, duration);
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
|
||||
uuid, participants);
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog failed, continue to run repair", uuid);
|
||||
}
|
||||
} else {
|
||||
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants);
|
||||
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog", uuid);
|
||||
}
|
||||
co_return std::make_tuple(hints_batchlog_flushed, flush_time);
|
||||
}
|
||||
@@ -1360,7 +1360,7 @@ future<> repair::user_requested_repair_task_impl::run() {
|
||||
} else {
|
||||
participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get();
|
||||
}
|
||||
auto [hints_batchlog_flushed, flush_time] = rs.flush_hints(id, keyspace, cfs, ignore_nodes, participants).get();
|
||||
auto [hints_batchlog_flushed, flush_time] = rs.flush_hints(id, keyspace, cfs, ignore_nodes).get();
|
||||
|
||||
std::vector<future<>> repair_results;
|
||||
repair_results.reserve(smp::count);
|
||||
@@ -1373,7 +1373,7 @@ future<> repair::user_requested_repair_task_impl::run() {
|
||||
while (!as.abort_requested()) {
|
||||
sleep_abortable(update_interval, as).get();
|
||||
parallel_for_each(participants, [&rs, uuid, &req] (gms::inet_address node) {
|
||||
return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
return ser::node_ops_rpc_verbs::send_node_ops_cmd(&rs._messaging, netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node);
|
||||
}).handle_exception([uuid, node] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node);
|
||||
@@ -2528,7 +2528,7 @@ future<> repair::tablet_repair_task_impl::run() {
|
||||
while (!as.abort_requested()) {
|
||||
sleep_abortable(update_interval, as).get();
|
||||
parallel_for_each(participants, [&rs, uuid, &req] (gms::inet_address node) {
|
||||
return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
return ser::node_ops_rpc_verbs::send_node_ops_cmd(&rs._messaging, netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node);
|
||||
}).handle_exception([uuid, node] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node);
|
||||
@@ -2588,10 +2588,7 @@ future<> repair::tablet_repair_task_impl::run() {
|
||||
auto data_centers = std::vector<sstring>();
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ignore_nodes = std::unordered_set<gms::inet_address>();
|
||||
auto my_address = erm->get_topology().my_address();
|
||||
auto participants = std::list<gms::inet_address>(m.neighbors.all.begin(), m.neighbors.all.end());
|
||||
participants.push_front(my_address);
|
||||
auto [hints_batchlog_flushed, flush_time] = co_await rs.flush_hints(id, m.keyspace_name, tables, ignore_nodes, participants);
|
||||
auto [hints_batchlog_flushed, flush_time] = co_await rs.flush_hints(id, m.keyspace_name, tables, ignore_nodes);
|
||||
bool small_table_optimization = false;
|
||||
|
||||
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(rs._repair_module, tasks::task_id::create_random_id(),
|
||||
|
||||
@@ -260,7 +260,7 @@ struct repair_update_system_table_response {
|
||||
|
||||
struct repair_flush_hints_batchlog_request {
|
||||
tasks::task_id repair_uuid;
|
||||
std::list<gms::inet_address> target_nodes;
|
||||
std::list<gms::inet_address> unused;
|
||||
std::chrono::seconds hints_timeout;
|
||||
std::chrono::seconds batchlog_timeout;
|
||||
};
|
||||
|
||||
@@ -49,7 +49,7 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "db/batchlog_manager.hh"
|
||||
#include "idl/partition_checksum.dist.hh"
|
||||
#include "idl/repair.dist.hh"
|
||||
#include "readers/empty_v2.hh"
|
||||
#include "readers/evictable.hh"
|
||||
#include "readers/queue.hh"
|
||||
@@ -224,7 +224,7 @@ static const std::vector<row_level_diff_detect_algorithm>& suportted_diff_detect
|
||||
static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(netw::messaging_service& ms, const inet_address_vector_replica_set& nodes) {
|
||||
std::vector<std::vector<row_level_diff_detect_algorithm>> nodes_algorithms(nodes.size());
|
||||
parallel_for_each(std::views::iota(size_t(0), nodes.size()), coroutine::lambda([&] (size_t idx) -> future<> {
|
||||
std::vector<row_level_diff_detect_algorithm> algorithms = co_await ms.send_repair_get_diff_algorithms(netw::messaging_service::msg_addr(nodes[idx]));
|
||||
std::vector<row_level_diff_detect_algorithm> algorithms = co_await ser::repair_rpc_verbs::send_repair_get_diff_algorithms(&ms, netw::messaging_service::msg_addr(nodes[idx]));
|
||||
std::sort(algorithms.begin(), algorithms.end());
|
||||
nodes_algorithms[idx] = std::move(algorithms);
|
||||
rlogger.trace("Got node_algorithms={}, from node={}", nodes_algorithms[idx], nodes[idx]);
|
||||
@@ -1431,7 +1431,7 @@ public:
|
||||
if (remote_node == myip()) {
|
||||
co_return co_await get_full_row_hashes_handler();
|
||||
}
|
||||
repair_hash_set hashes = co_await _messaging.send_repair_get_full_row_hashes(msg_addr(remote_node),
|
||||
repair_hash_set hashes = co_await ser::repair_rpc_verbs::send_repair_get_full_row_hashes(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, dst_cpu_id);
|
||||
rlogger.debug("Got full hashes from peer={}, nr_hashes={}", remote_node, hashes.size());
|
||||
_metrics.rx_hashes_nr += hashes.size();
|
||||
@@ -1508,7 +1508,7 @@ public:
|
||||
if (remote_node == myip()) {
|
||||
co_return co_await get_combined_row_hash_handler(common_sync_boundary);
|
||||
}
|
||||
get_combined_row_hash_response resp = co_await _messaging.send_repair_get_combined_row_hash(msg_addr(remote_node),
|
||||
get_combined_row_hash_response resp = co_await ser::repair_rpc_verbs::send_repair_get_combined_row_hash(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, common_sync_boundary, dst_cpu_id);
|
||||
stats().rpc_call_nr++;
|
||||
stats().rx_hashes_nr++;
|
||||
@@ -1542,7 +1542,7 @@ public:
|
||||
// the time this change is introduced.
|
||||
sstring remote_partitioner_name = "org.apache.cassandra.dht.Murmur3Partitioner";
|
||||
rpc::optional<repair_row_level_start_response> resp =
|
||||
co_await _messaging.send_repair_row_level_start(msg_addr(remote_node),
|
||||
co_await ser::repair_rpc_verbs::send_repair_row_level_start(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, ks_name, cf_name, std::move(range), _algo, _max_row_buf_size, _seed,
|
||||
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb,
|
||||
remote_partitioner_name, std::move(schema_version), reason, compaction_time, dst_cpu_id);
|
||||
@@ -1575,7 +1575,7 @@ public:
|
||||
co_return co_await stop();
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
co_return co_await _messaging.send_repair_row_level_stop(msg_addr(remote_node),
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_row_level_stop(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), dst_cpu_id);
|
||||
}
|
||||
|
||||
@@ -1596,7 +1596,7 @@ public:
|
||||
co_return co_await get_estimated_partitions();
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
co_return co_await _messaging.send_repair_get_estimated_partitions(msg_addr(remote_node), _repair_meta_id, dst_cpu_id);
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_get_estimated_partitions(&_messaging, msg_addr(remote_node), _repair_meta_id, dst_cpu_id);
|
||||
}
|
||||
|
||||
|
||||
@@ -1615,7 +1615,7 @@ public:
|
||||
co_return co_await set_estimated_partitions(estimated_partitions);
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
co_return co_await _messaging.send_repair_set_estimated_partitions(msg_addr(remote_node), _repair_meta_id, estimated_partitions, dst_cpu_id);
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_set_estimated_partitions(&_messaging, msg_addr(remote_node), _repair_meta_id, estimated_partitions, dst_cpu_id);
|
||||
}
|
||||
|
||||
|
||||
@@ -1635,7 +1635,7 @@ public:
|
||||
co_return co_await get_sync_boundary_handler(skipped_sync_boundary);
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
co_return co_await _messaging.send_repair_get_sync_boundary(msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary, dst_cpu_id);
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_get_sync_boundary(&_messaging, msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary, dst_cpu_id);
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
@@ -1662,7 +1662,7 @@ public:
|
||||
_metrics.tx_hashes_nr += set_diff.size();
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
repair_rows_on_wire rows = _messaging.send_repair_get_row_diff(msg_addr(remote_node),
|
||||
repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, std::move(set_diff), bool(needs_all_rows), dst_cpu_id).get();
|
||||
if (!rows.empty()) {
|
||||
apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::no, node_idx);
|
||||
@@ -1676,7 +1676,7 @@ public:
|
||||
return;
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
repair_rows_on_wire rows = _messaging.send_repair_get_row_diff(msg_addr(remote_node),
|
||||
repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, {}, bool(needs_all_rows_t::yes), dst_cpu_id).get();
|
||||
if (!rows.empty()) {
|
||||
apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::yes, node_idx);
|
||||
@@ -1803,7 +1803,7 @@ public:
|
||||
stats().tx_row_bytes += row_bytes;
|
||||
stats().rpc_call_nr++;
|
||||
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
|
||||
co_await _messaging.send_repair_put_row_diff(msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id);
|
||||
co_await ser::repair_rpc_verbs::send_repair_put_row_diff(&_messaging, msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2309,8 +2309,8 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
|
||||
return rs.repair_flush_hints_batchlog_handler(from, std::move(req));
|
||||
});
|
||||
}
|
||||
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} target_nodes={} hints_timeout={}s batchlog_timeout={}s",
|
||||
req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count());
|
||||
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} hints_timeout={}s batchlog_timeout={}s",
|
||||
req.repair_uuid, from, req.hints_timeout.count(), req.batchlog_timeout.count());
|
||||
auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1);
|
||||
bool updated = false;
|
||||
auto now = gc_clock::now();
|
||||
@@ -2319,8 +2319,7 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
|
||||
auto flush_time = now;
|
||||
if (cache_disabled || (now - _flush_hints_batchlog_time > cache_time)) {
|
||||
// Empty targets meants all nodes
|
||||
std::vector<gms::inet_address> target_nodes;
|
||||
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes));
|
||||
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::vector<gms::inet_address>{});
|
||||
lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
|
||||
try {
|
||||
bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized");
|
||||
@@ -2329,13 +2328,13 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
|
||||
}
|
||||
co_await coroutine::all(
|
||||
[this, &from, &req, &sync_point, &deadline] () -> future<> {
|
||||
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
|
||||
co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline);
|
||||
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes);
|
||||
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
|
||||
co_return;
|
||||
},
|
||||
[this, now, cache_disabled, &flush_time, &cache_time, &from, &req] () -> future<> {
|
||||
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
|
||||
auto last_replay = _bm.local().get_last_replay();
|
||||
bool issue_flush = false;
|
||||
if (cache_disabled) {
|
||||
@@ -2364,12 +2363,12 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
|
||||
co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no);
|
||||
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time));
|
||||
}
|
||||
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}, flushed={}", req.repair_uuid, from, req.target_nodes, issue_flush);
|
||||
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={}", req.repair_uuid, from, issue_flush);
|
||||
}
|
||||
);
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={} target_hosts={}: {}",
|
||||
req.repair_uuid, from, req.target_nodes, std::current_exception());
|
||||
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}: {}",
|
||||
req.repair_uuid, from, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
co_await container().invoke_on_all([flush_time] (repair_service& rs) {
|
||||
@@ -2380,8 +2379,8 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
|
||||
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush", fmt::to_string(flush_time));
|
||||
}
|
||||
auto duration = std::chrono::duration<float>(gc_clock::now() - now);
|
||||
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} target_nodes={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}",
|
||||
req.repair_uuid, from, req.target_nodes, updated, _flush_hints_batchlog_time, cache_time, duration);
|
||||
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}",
|
||||
req.repair_uuid, from, updated, _flush_hints_batchlog_time, cache_time, duration);
|
||||
repair_flush_hints_batchlog_response resp{ .flush_time = _flush_hints_batchlog_time };
|
||||
co_return resp;
|
||||
}
|
||||
@@ -2425,7 +2424,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
|
||||
});
|
||||
ms.register_repair_get_full_row_hashes([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
ser::repair_rpc_verbs::register_repair_get_full_row_hashes(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2439,7 +2438,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
});
|
||||
}) ;
|
||||
});
|
||||
ms.register_repair_get_combined_row_hash([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_get_combined_row_hash(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
std::optional<repair_sync_boundary> common_sync_boundary, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2455,7 +2454,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
});
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_sync_boundary([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_get_sync_boundary(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
std::optional<repair_sync_boundary> skipped_sync_boundary, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
@@ -2470,7 +2469,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
});
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_row_diff([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_get_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
repair_hash_set set_diff, bool needs_all_rows, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2493,7 +2492,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
}
|
||||
});
|
||||
});
|
||||
ms.register_repair_put_row_diff([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_put_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
repair_rows_on_wire row_diff, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2513,7 +2512,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
}
|
||||
});
|
||||
});
|
||||
ms.register_repair_row_level_start([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
|
||||
ser::repair_rpc_verbs::register_repair_row_level_start(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
|
||||
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
|
||||
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version,
|
||||
rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
@@ -2535,7 +2534,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
schema_version, r, ct, _repair_module->abort_source());
|
||||
});
|
||||
});
|
||||
ms.register_repair_row_level_stop([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_row_level_stop(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
sstring ks_name, sstring cf_name, dht::token_range range, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2545,7 +2544,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
std::move(ks_name), std::move(cf_name), std::move(range));
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_estimated_partitions([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
ser::repair_rpc_verbs::register_repair_get_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
@@ -2553,7 +2552,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
return repair_meta::repair_get_estimated_partitions_handler(local_repair, from, repair_meta_id);
|
||||
});
|
||||
});
|
||||
ms.register_repair_set_estimated_partitions([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_set_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
uint64_t estimated_partitions, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2562,14 +2561,14 @@ future<> repair_service::init_ms_handlers() {
|
||||
return repair_meta::repair_set_estimated_partitions_handler(local_repair, from, repair_meta_id, estimated_partitions);
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_diff_algorithms([] (const rpc::client_info& cinfo) {
|
||||
ser::repair_rpc_verbs::register_repair_get_diff_algorithms(&ms, [] (const rpc::client_info& cinfo) {
|
||||
return make_ready_future<std::vector<row_level_diff_detect_algorithm>>(suportted_diff_detect_algorithms());
|
||||
});
|
||||
ser::partition_checksum_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) {
|
||||
ser::repair_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) {
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return repair_update_system_table_handler(from, std::move(req));
|
||||
});
|
||||
ser::partition_checksum_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) {
|
||||
ser::repair_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) {
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return repair_flush_hints_batchlog_handler(from, std::move(req));
|
||||
});
|
||||
@@ -2584,18 +2583,7 @@ future<> repair_service::uninit_ms_handlers() {
|
||||
ms.unregister_repair_get_row_diff_with_rpc_stream(),
|
||||
ms.unregister_repair_put_row_diff_with_rpc_stream(),
|
||||
ms.unregister_repair_get_full_row_hashes_with_rpc_stream(),
|
||||
ms.unregister_repair_get_full_row_hashes(),
|
||||
ms.unregister_repair_get_combined_row_hash(),
|
||||
ms.unregister_repair_get_sync_boundary(),
|
||||
ms.unregister_repair_get_row_diff(),
|
||||
ms.unregister_repair_put_row_diff(),
|
||||
ms.unregister_repair_row_level_start(),
|
||||
ms.unregister_repair_row_level_stop(),
|
||||
ms.unregister_repair_get_estimated_partitions(),
|
||||
ms.unregister_repair_set_estimated_partitions(),
|
||||
ms.unregister_repair_get_diff_algorithms(),
|
||||
ser::partition_checksum_rpc_verbs::unregister_repair_update_system_table(&ms),
|
||||
ser::partition_checksum_rpc_verbs::unregister_repair_flush_hints_batchlog(&ms)
|
||||
ser::repair_rpc_verbs::unregister(&ms)
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
@@ -3019,7 +3007,7 @@ private:
|
||||
co_await coroutine::parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> {
|
||||
try {
|
||||
auto& ms = _shard_task.messaging.local();
|
||||
repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req);
|
||||
repair_update_system_table_response resp = co_await ser::repair_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req);
|
||||
(void)resp; // nothing to do with the response yet
|
||||
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _shard_task.global_repair_id.uuid(), node);
|
||||
} catch (...) {
|
||||
|
||||
@@ -124,7 +124,7 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
gc_clock::time_point _flush_hints_batchlog_time;
|
||||
future<std::tuple<bool, gc_clock::time_point>> flush_hints(repair_uniq_id id,
|
||||
sstring keyspace, std::vector<sstring> cfs,
|
||||
std::unordered_set<gms::inet_address> ignore_nodes, std::list<gms::inet_address> participants);
|
||||
std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
|
||||
public:
|
||||
repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
|
||||
@@ -99,6 +99,7 @@
|
||||
#include "service/raft/join_node.hh"
|
||||
#include "idl/join_node.dist.hh"
|
||||
#include "idl/migration_manager.dist.hh"
|
||||
#include "idl/node_ops.dist.hh"
|
||||
#include "protocol_server.hh"
|
||||
#include "node_ops/node_ops_ctl.hh"
|
||||
#include "node_ops/task_manager_module.hh"
|
||||
@@ -3871,7 +3872,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
std::unordered_map<gms::inet_address, std::list<node_ops_id>> pending_ops;
|
||||
auto req = node_ops_cmd_request(node_ops_cmd::query_pending_ops, uuid);
|
||||
parallel_for_each(ctl.sync_nodes, [this, req, uuid, &pending_ops] (const gms::inet_address& node) {
|
||||
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node, &pending_ops] (node_ops_cmd_response resp) {
|
||||
return ser::node_ops_rpc_verbs::send_node_ops_cmd(&_messaging.local(), netw::msg_addr(node), req).then([uuid, node, &pending_ops] (node_ops_cmd_response resp) {
|
||||
slogger.debug("bootstrap[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", uuid, node, resp.pending_ops);
|
||||
if (!resp.pending_ops.empty()) {
|
||||
pending_ops.emplace(node, resp.pending_ops);
|
||||
@@ -6941,7 +6942,7 @@ node_state storage_service::get_node_state(locator::host_id id) {
|
||||
}
|
||||
|
||||
void storage_service::init_messaging_service() {
|
||||
_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
|
||||
ser::node_ops_rpc_verbs::register_node_ops_cmd(&_messaging.local(), [this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
|
||||
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
std::optional<locator::host_id> coordinator_host_id;
|
||||
if (const auto* id = cinfo.retrieve_auxiliary_opt<locator::host_id>("host_id")) {
|
||||
@@ -7091,7 +7092,7 @@ void storage_service::init_messaging_service() {
|
||||
|
||||
future<> storage_service::uninit_messaging_service() {
|
||||
return when_all_succeed(
|
||||
_messaging.local().unregister_node_ops_cmd(),
|
||||
ser::node_ops_rpc_verbs::unregister(&_messaging.local()),
|
||||
ser::storage_service_rpc_verbs::unregister(&_messaging.local()),
|
||||
ser::join_node_rpc_verbs::unregister(&_messaging.local())
|
||||
).discard_result();
|
||||
|
||||
@@ -21,3 +21,9 @@ class stream_state;
|
||||
using plan_id = utils::tagged_uuid<struct plan_id_tag>;
|
||||
|
||||
} // namespace streaming
|
||||
|
||||
namespace service {
|
||||
|
||||
using session_id = utils::tagged_uuid<struct session_id_tag>;
|
||||
|
||||
}
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "service/topology_guard.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "idl/streaming.dist.hh"
|
||||
|
||||
namespace streaming {
|
||||
|
||||
@@ -89,7 +90,7 @@ stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_re
|
||||
void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
auto& ms = _ms.local();
|
||||
|
||||
ms.register_prepare_message([this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<stream_reason> reason_opt, rpc::optional<service::session_id> session) {
|
||||
ser::streaming_rpc_verbs::register_prepare_message(&ms, [this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<stream_reason> reason_opt, rpc::optional<service::session_id> session) {
|
||||
const auto& src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
auto dst_cpu_id = this_shard_id();
|
||||
@@ -105,7 +106,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
return session->prepare(std::move(msg.requests), std::move(msg.summaries));
|
||||
});
|
||||
});
|
||||
ms.register_prepare_done_message([this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id) {
|
||||
ser::streaming_rpc_verbs::register_prepare_done_message(&ms, [this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id) {
|
||||
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return container().invoke_on(dst_cpu_id, [plan_id, from] (auto& sm) mutable {
|
||||
auto session = sm.get_session(plan_id, from, "PREPARE_DONE_MESSAGE");
|
||||
@@ -252,14 +253,14 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
return make_ready_future<rpc::sink<int>>(sink);
|
||||
});
|
||||
});
|
||||
ms.register_stream_mutation_done([this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id) {
|
||||
ser::streaming_rpc_verbs::register_stream_mutation_done(&ms, [this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id) {
|
||||
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return container().invoke_on(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] (auto& sm) mutable {
|
||||
auto session = sm.get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id);
|
||||
session->receive_task_completed(cf_id);
|
||||
});
|
||||
});
|
||||
ms.register_complete_message([this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id, rpc::optional<bool> failed) {
|
||||
ser::streaming_rpc_verbs::register_complete_message(&ms, [this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id, rpc::optional<bool> failed) {
|
||||
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
if (failed && *failed) {
|
||||
return container().invoke_on(dst_cpu_id, [plan_id, from, dst_cpu_id] (auto& sm) {
|
||||
@@ -279,11 +280,8 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
future<> stream_manager::uninit_messaging_service_handler() {
|
||||
auto& ms = _ms.local();
|
||||
return when_all_succeed(
|
||||
ms.unregister_prepare_message(),
|
||||
ms.unregister_prepare_done_message(),
|
||||
ms.unregister_stream_mutation_fragments(),
|
||||
ms.unregister_stream_mutation_done(),
|
||||
ms.unregister_complete_message()).discard_result();
|
||||
ser::streaming_rpc_verbs::unregister(&ms),
|
||||
ms.unregister_stream_mutation_fragments()).discard_result();
|
||||
}
|
||||
|
||||
stream_session::stream_session(stream_manager& mgr, inet_address peer_)
|
||||
@@ -305,7 +303,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
}
|
||||
auto id = msg_addr{this->peer, 0};
|
||||
sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id);
|
||||
return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason(), topo_guard()).then_wrapped([this, id] (auto&& f) {
|
||||
return ser::streaming_rpc_verbs::send_prepare_message(&manager().ms(), id, std::move(prepare), plan_id(), description(), get_reason(), topo_guard()).then_wrapped([this, id] (auto&& f) {
|
||||
try {
|
||||
auto msg = f.get();
|
||||
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE Reply from {}", this->plan_id(), this->peer);
|
||||
@@ -324,7 +322,7 @@ future<> stream_session::on_initialization_complete() {
|
||||
}).then([this, id] {
|
||||
auto plan_id = this->plan_id();
|
||||
sslog.debug("[Stream #{}] SEND PREPARE_DONE_MESSAGE to {}", plan_id, id);
|
||||
return manager().ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
|
||||
return ser::streaming_rpc_verbs::send_prepare_done_message(&manager().ms(), id, plan_id, this->dst_cpu_id).then([this] {
|
||||
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer);
|
||||
}).handle_exception([id, plan_id] (auto ep) {
|
||||
sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
|
||||
@@ -467,7 +465,7 @@ void stream_session::send_failed_complete_message() {
|
||||
auto session = shared_from_this();
|
||||
bool failed = true;
|
||||
//FIXME: discarded future.
|
||||
(void)manager().ms().send_complete_message(id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] {
|
||||
(void)ser::streaming_rpc_verbs::send_complete_message(&manager().ms(), id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] {
|
||||
sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE Reply from {}", plan_id, id.addr);
|
||||
}).handle_exception([session, id, plan_id] (auto ep) {
|
||||
sslog.debug("[Stream #{}] COMPLETE_MESSAGE for {} has failed: {}", plan_id, id.addr, ep);
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
#include "repair/table_check.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "idl/streaming.dist.hh"
|
||||
|
||||
namespace streaming {
|
||||
|
||||
@@ -233,7 +234,7 @@ future<> stream_transfer_task::execute() {
|
||||
});
|
||||
}).then([this, plan_id, cf_id, id, &sm] {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
|
||||
return sm.ms().send_stream_mutation_done(id, plan_id, _ranges,
|
||||
return ser::streaming_rpc_verbs::send_stream_mutation_done(&sm.ms(), id, plan_id, _ranges,
|
||||
cf_id, session->dst_cpu_id).handle_exception([plan_id, id] (auto ep) {
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
@@ -256,7 +257,7 @@ future<> stream_transfer_task::execute() {
|
||||
if (table_dropped) {
|
||||
sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming", plan_id, cf_id);
|
||||
if (!_mutation_done_sent) {
|
||||
co_await session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id);
|
||||
co_await ser::streaming_rpc_verbs::send_stream_mutation_done(&session->manager().ms(), id, plan_id, _ranges, cf_id, session->dst_cpu_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user