diff --git a/gms/gossiper.cc b/gms/gossiper.cc index c4bf1dc917..f0b7f1f743 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -408,8 +408,31 @@ future<> gossiper::handle_ack2_msg(gossip_digest_ack2 msg) { return apply_state_locally(std::move(remote_ep_state_map)).finally([mp = std::move(mp)] {}); } -future<> gossiper::handle_echo_msg() { +future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional generation_number_opt) { + bool respond = true; if (!_advertise_myself) { + respond = false; + } else { + if (!_advertise_to_nodes.empty()) { + auto it = _advertise_to_nodes.find(from); + if (it == _advertise_to_nodes.end()) { + respond = false; + } else { + auto es = get_endpoint_state_for_endpoint_ptr(from); + if (es) { + int64_t saved_generation_number = it->second; + int64_t current_generation_number = generation_number_opt ? + generation_number_opt.value() : es->get_heart_beat_state().get_generation(); + respond = saved_generation_number == current_generation_number; + logger.debug("handle_echo_msg: from={}, saved_generation_number={}, current_generation_number={}", + from, saved_generation_number, current_generation_number); + } else { + respond = false; + } + } + } + } + if (!respond) { return make_exception_future(std::runtime_error("Not ready to respond gossip echo message")); } return make_ready_future<>(); @@ -482,8 +505,9 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { }); return messaging_service::no_wait(); }); - _messaging.register_gossip_echo([] { - return gms::get_local_gossiper().handle_echo_msg(); + _messaging.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional generation_number_opt) { + auto from = cinfo.retrieve_auxiliary("baddr"); + return gms::get_local_gossiper().handle_echo_msg(from, generation_number_opt); }); _messaging.register_gossip_shutdown([] (inet_address from) { // In a new fiber. @@ -1419,9 +1443,10 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) { local_state.mark_dead(); msg_addr id = get_msg_addr(addr); - logger.trace("Sending a EchoMessage to {}", id); + int64_t generation = endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation(); + logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation); // Do it in the background. - (void)_messaging.send_gossip_echo(id).then([this, addr] { + (void)_messaging.send_gossip_echo(id, generation).then([this, addr] { logger.trace("Got EchoMessage Reply"); return seastar::async([this, addr] { // After sending echo message, the Node might not be in the @@ -1566,6 +1591,11 @@ bool gossiper::is_normal(const inet_address& endpoint) const { return get_gossip_status(endpoint) == sstring(versioned_value::STATUS_NORMAL); } +bool gossiper::is_normal_ring_member(const inet_address& endpoint) const { + auto status = get_gossip_status(endpoint); + return status == sstring(versioned_value::STATUS_NORMAL) || status == sstring(versioned_value::SHUTDOWN); +} + bool gossiper::is_silent_shutdown_state(const endpoint_state& ep_state) const{ auto state = get_gossip_status(ep_state); for (auto& deadstate : SILENT_SHUTDOWN_STATES) { @@ -1769,6 +1799,29 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map> +gossiper::get_generation_for_nodes(std::list nodes) { + std::unordered_map ret; + for (const auto& node : nodes) { + auto es = get_endpoint_state_for_endpoint_ptr(node); + if (es) { + auto current_generation_number = es->get_heart_beat_state().get_generation(); + ret.emplace(node, current_generation_number); + } else { + return make_exception_future>( + std::runtime_error(format("Can not find generation number for node={}", node))); + } + } + return make_ready_future>(std::move(ret)); +} + +future<> gossiper::advertise_to_nodes(std::unordered_map advertise_to_nodes) { + return container().invoke_on_all([advertise_to_nodes] (auto& g) { + g._advertise_to_nodes = advertise_to_nodes; + g._advertise_myself = true; + }); +} + future<> gossiper::advertise_myself() { return container().invoke_on_all([] (auto& g) { g._advertise_myself = true; @@ -2131,6 +2184,32 @@ bool gossiper::is_alive(inet_address ep) const { return false; } +// Runs inside seastar::async context +void gossiper::wait_alive(std::vector nodes, std::chrono::milliseconds timeout) { + auto start_time = std::chrono::steady_clock::now(); + for (;;) { + std::vector live_nodes; + for (const auto& node: nodes) { + size_t nr_alive = container().map_reduce0([node] (gossiper& g) -> size_t { + return g.is_alive(node) ? 1 : 0; + }, 0, std::plus()).get0(); + logger.debug("Marked node={} as alive on {} out of {} shards", node, nr_alive, smp::count); + if (nr_alive == smp::count) { + live_nodes.push_back(node); + } + } + logger.debug("Waited for marking node as up, replace_nodes={}, live_nodes={}", nodes, live_nodes); + if (live_nodes.size() == nodes.size()) { + break; + } + if (std::chrono::steady_clock::now() > timeout + start_time) { + throw std::runtime_error(format("Failed to mark node as alive in {} ms, nodes={}, live_nodes={}", + timeout.count(), nodes, live_nodes)); + } + sleep_abortable(std::chrono::milliseconds(100), _abort_source).get(); + } +} + const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept { auto* eps = get_endpoint_state_for_endpoint_ptr(std::move(endpoint)); if (!eps) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 0b6e9c7016..1dfe80632f 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -129,7 +129,7 @@ private: future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg); future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg); future<> handle_ack2_msg(gossip_digest_ack2 msg); - future<> handle_echo_msg(); + future<> handle_echo_msg(inet_address from, std::optional generation_number_opt); future<> handle_shutdown_msg(inet_address from); future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg); future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector ack_msg_digest); @@ -146,8 +146,15 @@ private: std::unordered_map _syn_handlers; std::unordered_map _ack_handlers; bool _advertise_myself = true; + // Map ip address and generation number + std::unordered_map _advertise_to_nodes; public: future<> advertise_myself(); + // Get current generation number for the given nodes + future> + get_generation_for_nodes(std::list nodes); + // Only respond echo message listed in nodes with the generation number + future<> advertise_to_nodes(std::unordered_map advertise_to_nodes = {}); const sstring& get_cluster_name() const noexcept; const sstring& get_partitioner_name() const noexcept; inet_address get_broadcast_address() const noexcept { @@ -442,6 +449,8 @@ private: public: bool is_alive(inet_address ep) const; bool is_dead_state(const endpoint_state& eps) const; + // Wait for nodes to be alive on all shards + void wait_alive(std::vector nodes, std::chrono::milliseconds timeout); future<> apply_state_locally(std::map map); @@ -572,6 +581,10 @@ public: bool is_seed(const inet_address& endpoint) const; bool is_shutdown(const inet_address& endpoint) const; bool is_normal(const inet_address& endpoint) const; + // Check if a node is in NORMAL or SHUTDOWN status which means the node is + // part of the token ring from the gossip point of view and operates in + // normal status or was in normal status but is shutdown. + bool is_normal_ring_member(const inet_address& endpoint) const; bool is_cql_ready(const inet_address& endpoint) const; bool is_silent_shutdown_state(const endpoint_state& ep_state) const; void mark_as_shutdown(const inet_address& endpoint); diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index 5c2bcbc835..322f65a037 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -110,6 +110,12 @@ enum class node_ops_cmd : uint32_t { removenode_sync_data, removenode_abort, removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, }; struct node_ops_cmd_request { @@ -117,6 +123,8 @@ struct node_ops_cmd_request { utils::UUID ops_uuid; std::list ignore_nodes; std::list leaving_nodes; + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes; }; struct node_ops_cmd_response { diff --git a/message/messaging_service.cc b/message/messaging_service.cc index c59c173e7c..f6601f9461 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1082,14 +1082,14 @@ future<> messaging_service::unregister_complete_message() { return unregister_handler(messaging_verb::COMPLETE_MESSAGE); } -void messaging_service::register_gossip_echo(std::function ()>&& func) { +void messaging_service::register_gossip_echo(std::function (const rpc::client_info& cinfo, rpc::optional generation_number)>&& func) { register_handler(this, messaging_verb::GOSSIP_ECHO, std::move(func)); } future<> messaging_service::unregister_gossip_echo() { return unregister_handler(netw::messaging_verb::GOSSIP_ECHO); } -future<> messaging_service::send_gossip_echo(msg_addr id) { - return send_message_timeout(this, messaging_verb::GOSSIP_ECHO, std::move(id), 15000ms); +future<> messaging_service::send_gossip_echo(msg_addr id, int64_t generation_number) { + return send_message_timeout(this, messaging_verb::GOSSIP_ECHO, std::move(id), 15000ms, generation_number); } void messaging_service::register_gossip_shutdown(std::function&& func) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index f0ab9c8a9e..44dc0a4271 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -408,9 +408,9 @@ public: future send_node_ops_cmd(msg_addr id, node_ops_cmd_request); // Wrapper for GOSSIP_ECHO verb - void register_gossip_echo(std::function ()>&& func); + void register_gossip_echo(std::function (const rpc::client_info& cinfo, rpc::optional generation_number)>&& func); future<> unregister_gossip_echo(); - future<> send_gossip_echo(msg_addr id); + future<> send_gossip_echo(msg_addr id, int64_t generation_number); // Wrapper for GOSSIP_SHUTDOWN void register_gossip_shutdown(std::function&& func); diff --git a/repair/repair.hh b/repair/repair.hh index ecd8ef9359..19a42a7c0d 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -489,6 +489,12 @@ enum class node_ops_cmd : uint32_t { removenode_sync_data, removenode_abort, removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, }; // The cmd and ops_uuid are mandatory for each request. @@ -498,6 +504,8 @@ struct node_ops_cmd_request { utils::UUID ops_uuid; std::list ignore_nodes; std::list leaving_nodes; + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes; }; struct node_ops_cmd_response { diff --git a/service/storage_service.cc b/service/storage_service.cc index 67ab6b7e6b..96f1ce8a5c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -680,6 +680,8 @@ void storage_service::bootstrap() { // Wait until we know tokens of existing node before announcing replacing status. set_mode(mode::JOINING, sprint("Wait until local node knows tokens of peer nodes"), true); _gossiper.wait_for_range_setup().get(); + + if (!is_repair_based_node_ops_enabled()) { set_mode(mode::JOINING, sprint("Announce tokens and status of the replacing node"), true); _gossiper.add_local_application_state({ { gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) }, @@ -689,6 +691,7 @@ void storage_service::bootstrap() { _gossiper.advertise_myself().get(); set_mode(mode::JOINING, format("Wait until peer nodes know the bootstrap tokens of local node"), true); _gossiper.wait_for_range_setup().get(); + } auto replace_addr = db().local().get_replace_address(); if (replace_addr) { slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr); @@ -705,7 +708,7 @@ void storage_service::bootstrap() { set_mode(mode::JOINING, "Starting to bootstrap...", true); if (is_repair_based_node_ops_enabled()) { if (db().local().is_replacing()) { - replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); + run_replace_ops(); } else { bootstrap_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); } @@ -1863,6 +1866,150 @@ future<> storage_service::decommission() { }); } +// Runs inside seastar::async context +void storage_service::run_replace_ops() { + if (!db().local().get_replace_address()) { + throw std::runtime_error(format("replace_address is empty")); + } + auto replace_address = db().local().get_replace_address().value(); + auto uuid = utils::make_random_uuid(); + // TODO: Specify ignore_nodes + std::list ignore_nodes; + // Step 1: Decide who needs to sync data for replace operation + std::list sync_nodes; + for (const auto& x :_gossiper.endpoint_state_map) { + seastar::thread::maybe_yield(); + const auto& node = x.first; + slogger.debug("replace[{}]: Check node={}, status={}", uuid, node, _gossiper.get_gossip_status(node)); + if (node != get_broadcast_address() && + node != replace_address && + _gossiper.is_normal_ring_member(node) && + std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) { + sync_nodes.push_back(node); + } + } + sync_nodes.push_front(get_broadcast_address()); + auto sync_nodes_generations = _gossiper.get_generation_for_nodes(sync_nodes).get(); + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes = { + {replace_address, get_broadcast_address()}, + }; + std::unordered_set nodes_unknown_verb; + std::unordered_set nodes_down; + std::unordered_set nodes_aborted; + auto req = node_ops_cmd_request{node_ops_cmd::replace_prepare, uuid, ignore_nodes, {}, replace_nodes}; + slogger.info("replace[{}]: Started replace operation, replace_nodes={}, sync_nodes={}, ignore_nodes={}", uuid, replace_nodes, sync_nodes, ignore_nodes); + future<> heartbeat_updater = make_ready_future<>(); + auto heartbeat_updater_done = make_lw_shared(false); + try { + // Step 2: Prepare to sync data + parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got node_ops_cmd::replace_prepare response from node={}", uuid, node); + }).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) { + slogger.warn("replace[{}]: Node {} does not support node_ops_cmd verb", uuid, node); + nodes_unknown_verb.emplace(node); + }).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) { + slogger.warn("replace[{}]: Node {} is down for node_ops_cmd verb", uuid, node); + nodes_down.emplace(node); + }); + }).get(); + if (!nodes_unknown_verb.empty()) { + auto msg = format("replace[{}]: Nodes={} do not support replace verb. Please upgrade your cluster and run replace again.", uuid, nodes_unknown_verb); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + if (!nodes_down.empty()) { + auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes . E.g., scylla --ignore-dead-nodes 127.0.0.1,127.0.0.2", uuid, nodes_down); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + + // Step 3: Start heartbeat updater + heartbeat_updater = seastar::async([this, &sync_nodes, uuid, heartbeat_updater_done] { + slogger.debug("replace[{}]: Started heartbeat_updater", uuid); + while (!(*heartbeat_updater_done)) { + auto req = node_ops_cmd_request{node_ops_cmd::replace_heartbeat, uuid, {}, {}, {}}; + parallel_for_each(sync_nodes, [this, req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got heartbeat response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).handle_exception([uuid] (std::exception_ptr ep) { + slogger.warn("replace[{}]: Failed to send heartbeat: {}", uuid, ep); + }).get(); + int nr_seconds = 10; + while (!(*heartbeat_updater_done) && nr_seconds--) { + sleep_abortable(std::chrono::seconds(1), _abort_source).get(); + } + } + slogger.debug("replace[{}]: Stopped heartbeat_updater", uuid); + }); + auto stop_heartbeat_updater = defer([&] { + *heartbeat_updater_done = true; + heartbeat_updater.get(); + }); + + + // Step 4: Allow nodes in sync_nodes list to mark the replacing node as alive + _gossiper.advertise_to_nodes(sync_nodes_generations).get(); + slogger.info("replace[{}]: Allow nodes={} to mark replacing node={} as alive", uuid, sync_nodes, get_broadcast_address()); + + // Step 5: Wait for nodes to finish marking the replacing node as live + req.cmd = node_ops_cmd::replace_prepare_mark_alive; + parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got prepare_mark_alive response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + // Step 6: Update pending ranges on nodes + req.cmd = node_ops_cmd::replace_prepare_pending_ranges; + parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got pending_ranges response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + + // Step 7: Sync data for replace + replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); + + + // Step 8: Finish + req.cmd = node_ops_cmd::replace_done; + parallel_for_each(sync_nodes, [this, &req, &nodes_aborted, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([&nodes_aborted, uuid, node] (node_ops_cmd_response resp) { + nodes_aborted.emplace(node); + slogger.debug("replace[{}]: Got done response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + // Allow any nodes to mark the replacing node as alive + _gossiper.advertise_to_nodes({}).get(); + slogger.info("replace[{}]: Allow any nodes to mark replacing node={} as alive", uuid, get_broadcast_address()); + } catch (...) { + slogger.error("replace[{}]: Abort replace operation started, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}", + uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception()); + // we need to revert the effect of prepare verb the replace ops is failed + req.cmd = node_ops_cmd::replace_abort; + parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, &nodes_aborted, uuid] (const gms::inet_address& node) { + if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || nodes_aborted.contains(node)) { + // No need to revert previous prepare cmd for those who do not apply prepare cmd. + return make_ready_future<>(); + } + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got abort response from node={}", uuid, node); + }); + }).get(); + slogger.error("replace[{}]: Abort replace operation finished, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}", + uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception()); + throw; + } +} + future<> storage_service::removenode(sstring host_id_string, std::list ignore_nodes) { return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable { return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] { @@ -1901,7 +2048,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list nodes_unknown_verb; std::unordered_set nodes_down; - auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes}; + auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes, {}}; try { parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { @@ -1929,7 +2076,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list storage_service::removenode(sstring host_id_string, std::list>(_node_ops| boost::adaptors::map_keys); + std::string msg; + if (req.cmd == node_ops_cmd::removenode_prepare || req.cmd == node_ops_cmd::replace_prepare) { + // Peer node wants to start a new node operation. Make sure no pending node operation is in progress. + if (!_node_ops.empty()) { + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } + } else { + if (ops_uuids.size() == 1 && ops_uuids.front() == req.ops_uuid) { + // Check is good, since we know this ops_uuid and this is the only ops_uuid we are working on. + } else if (ops_uuids.size() == 0) { + // The ops_uuid received is unknown. Fail the request. + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, the node ops is unknown", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } else { + // Other node ops is in progress. Fail the request. + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } + } + if (!msg.empty()) { + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } +} + future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable { return seastar::async([&ss, coordinator, req = std::move(req)] () mutable { auto ops_uuid = req.ops_uuid; slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid); + + ss.node_ops_cmd_check(coordinator, req); + if (req.cmd == node_ops_cmd::removenode_prepare) { if (req.leaving_nodes.size() > 1) { auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); @@ -2041,6 +2219,60 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } } else if (req.cmd == node_ops_cmd::removenode_abort) { ss.node_ops_abort(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_prepare) { + // Mark the replacing node as replacing + if (req.replace_nodes.size() > 1) { + auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->add_replacing_endpoint(existing_node, replacing_node); + } + return make_ready_future<>(); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { + return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->del_replacing_endpoint(existing_node); + } + return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }); + }, + [&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); }); + ss._node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { + // Wait for local node has marked replacing node as alive + auto nodes = boost::copy_range>(req.replace_nodes| boost::adaptors::map_values); + try { + ss._gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000)); + } catch (...) { + slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}", + req.ops_uuid, req.replace_nodes, std::current_exception()); + throw; + } + } else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) { + // Update the pending_ranges for the replacing node + slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator); + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }).get(); + } else if (req.cmd == node_ops_cmd::replace_heartbeat) { + slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_done) { + slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_abort) { + ss.node_ops_abort(ops_uuid); } else { auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd)); slogger.warn("{}", msg); diff --git a/service/storage_service.hh b/service/storage_service.hh index 62d1f8c3f0..ded48ada96 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -379,6 +379,8 @@ private: future prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); + void run_replace_ops(); + public: future is_initialized(); @@ -799,6 +801,7 @@ public: */ future<> removenode(sstring host_id_string, std::list ignore_nodes); future node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req); + void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req); future get_operation_mode(); diff --git a/test/manual/message.cc b/test/manual/message.cc index 04b6c66b79..b150c5931e 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -111,7 +111,7 @@ public: return messaging_service::no_wait(); }); - ms.register_gossip_echo([] { + ms.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional gen_opt) { fmt::print("Server got gossip echo msg\n"); throw std::runtime_error("I'm throwing runtime_error exception"); return make_ready_future<>(); @@ -149,7 +149,8 @@ public: future<> test_echo() { fmt::print("=== {} ===\n", __func__); auto id = get_msg_addr(); - return ms.send_gossip_echo(id).then_wrapped([] (auto&& f) { + int64_t gen = 0x1; + return ms.send_gossip_echo(id, gen).then_wrapped([] (auto&& f) { try { f.get(); return make_ready_future<>();