From 4f5676630e95d38a3ca6b0cce757aa29312beef6 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 25 Mar 2021 08:44:11 +0800 Subject: [PATCH 1/4] gossip: Add is_normal_ring_member helper Check if a node is in NORMAL or SHUTDOWN status which means the node is part of the token ring from the gossip point of view and operates in normal status or was in normal status but is shutdown. Refs #8013 --- gms/gossiper.cc | 5 +++++ gms/gossiper.hh | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index c4bf1dc917..06aee5f5b5 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1566,6 +1566,11 @@ bool gossiper::is_normal(const inet_address& endpoint) const { return get_gossip_status(endpoint) == sstring(versioned_value::STATUS_NORMAL); } +bool gossiper::is_normal_ring_member(const inet_address& endpoint) const { + auto status = get_gossip_status(endpoint); + return status == sstring(versioned_value::STATUS_NORMAL) || status == sstring(versioned_value::SHUTDOWN); +} + bool gossiper::is_silent_shutdown_state(const endpoint_state& ep_state) const{ auto state = get_gossip_status(ep_state); for (auto& deadstate : SILENT_SHUTDOWN_STATES) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 0b6e9c7016..904006c596 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -572,6 +572,10 @@ public: bool is_seed(const inet_address& endpoint) const; bool is_shutdown(const inet_address& endpoint) const; bool is_normal(const inet_address& endpoint) const; + // Check if a node is in NORMAL or SHUTDOWN status which means the node is + // part of the token ring from the gossip point of view and operates in + // normal status or was in normal status but is shutdown. + bool is_normal_ring_member(const inet_address& endpoint) const; bool is_cql_ready(const inet_address& endpoint) const; bool is_silent_shutdown_state(const endpoint_state& ep_state) const; void mark_as_shutdown(const inet_address& endpoint); From f690f3ee8ea5b9d06756bfb6d90bd0960ca4e344 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 26 Mar 2021 11:04:09 +0800 Subject: [PATCH 2/4] gossip: Add helper to wait for a node to be up This patch adds gossiper::wait_alive helper to wait for nodes to be up on all shards. Refs #8013 --- gms/gossiper.cc | 26 ++++++++++++++++++++++++++ gms/gossiper.hh | 2 ++ 2 files changed, 28 insertions(+) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 06aee5f5b5..568e441aa8 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -2136,6 +2136,32 @@ bool gossiper::is_alive(inet_address ep) const { return false; } +// Runs inside seastar::async context +void gossiper::wait_alive(std::vector nodes, std::chrono::milliseconds timeout) { + auto start_time = std::chrono::steady_clock::now(); + for (;;) { + std::vector live_nodes; + for (const auto& node: nodes) { + size_t nr_alive = container().map_reduce0([node] (gossiper& g) -> size_t { + return g.is_alive(node) ? 1 : 0; + }, 0, std::plus()).get0(); + logger.debug("Marked node={} as alive on {} out of {} shards", node, nr_alive, smp::count); + if (nr_alive == smp::count) { + live_nodes.push_back(node); + } + } + logger.debug("Waited for marking node as up, replace_nodes={}, live_nodes={}", nodes, live_nodes); + if (live_nodes.size() == nodes.size()) { + break; + } + if (std::chrono::steady_clock::now() > timeout + start_time) { + throw std::runtime_error(format("Failed to mark node as alive in {} ms, nodes={}, live_nodes={}", + timeout.count(), nodes, live_nodes)); + } + sleep_abortable(std::chrono::milliseconds(100), _abort_source).get(); + } +} + const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept { auto* eps = get_endpoint_state_for_endpoint_ptr(std::move(endpoint)); if (!eps) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 904006c596..32cdcfaf57 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -442,6 +442,8 @@ private: public: bool is_alive(inet_address ep) const; bool is_dead_state(const endpoint_state& eps) const; + // Wait for nodes to be alive on all shards + void wait_alive(std::vector nodes, std::chrono::milliseconds timeout); future<> apply_state_locally(std::map map); From bdb95233e84e6ad1d768462d833938d47e7bb293 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 31 Mar 2021 11:14:02 +0800 Subject: [PATCH 3/4] gossip: Add advertise_to_nodes gossiper::advertise_to_nodes() is added to allow respond to gossip echo message with specified nodes and the current gossip generation number for the nodes. This is helpful to avoid the restarted node to be marked as alive during a pending replace operation. After this patch, when a node sends a echo message, the gossip generation number is sent in the echo message. Since the generation number changes after a restart, the receiver of the echo message can compare the generation number to tell if the node has restarted. Refs #8013 --- gms/gossiper.cc | 58 ++++++++++++++++++++++++++++++++---- gms/gossiper.hh | 9 +++++- message/messaging_service.cc | 6 ++-- message/messaging_service.hh | 4 +-- test/manual/message.cc | 5 ++-- 5 files changed, 69 insertions(+), 13 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 568e441aa8..f0b7f1f743 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -408,8 +408,31 @@ future<> gossiper::handle_ack2_msg(gossip_digest_ack2 msg) { return apply_state_locally(std::move(remote_ep_state_map)).finally([mp = std::move(mp)] {}); } -future<> gossiper::handle_echo_msg() { +future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional generation_number_opt) { + bool respond = true; if (!_advertise_myself) { + respond = false; + } else { + if (!_advertise_to_nodes.empty()) { + auto it = _advertise_to_nodes.find(from); + if (it == _advertise_to_nodes.end()) { + respond = false; + } else { + auto es = get_endpoint_state_for_endpoint_ptr(from); + if (es) { + int64_t saved_generation_number = it->second; + int64_t current_generation_number = generation_number_opt ? + generation_number_opt.value() : es->get_heart_beat_state().get_generation(); + respond = saved_generation_number == current_generation_number; + logger.debug("handle_echo_msg: from={}, saved_generation_number={}, current_generation_number={}", + from, saved_generation_number, current_generation_number); + } else { + respond = false; + } + } + } + } + if (!respond) { return make_exception_future(std::runtime_error("Not ready to respond gossip echo message")); } return make_ready_future<>(); @@ -482,8 +505,9 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { }); return messaging_service::no_wait(); }); - _messaging.register_gossip_echo([] { - return gms::get_local_gossiper().handle_echo_msg(); + _messaging.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional generation_number_opt) { + auto from = cinfo.retrieve_auxiliary("baddr"); + return gms::get_local_gossiper().handle_echo_msg(from, generation_number_opt); }); _messaging.register_gossip_shutdown([] (inet_address from) { // In a new fiber. @@ -1419,9 +1443,10 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) { local_state.mark_dead(); msg_addr id = get_msg_addr(addr); - logger.trace("Sending a EchoMessage to {}", id); + int64_t generation = endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation(); + logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation); // Do it in the background. - (void)_messaging.send_gossip_echo(id).then([this, addr] { + (void)_messaging.send_gossip_echo(id, generation).then([this, addr] { logger.trace("Got EchoMessage Reply"); return seastar::async([this, addr] { // After sending echo message, the Node might not be in the @@ -1774,6 +1799,29 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map> +gossiper::get_generation_for_nodes(std::list nodes) { + std::unordered_map ret; + for (const auto& node : nodes) { + auto es = get_endpoint_state_for_endpoint_ptr(node); + if (es) { + auto current_generation_number = es->get_heart_beat_state().get_generation(); + ret.emplace(node, current_generation_number); + } else { + return make_exception_future>( + std::runtime_error(format("Can not find generation number for node={}", node))); + } + } + return make_ready_future>(std::move(ret)); +} + +future<> gossiper::advertise_to_nodes(std::unordered_map advertise_to_nodes) { + return container().invoke_on_all([advertise_to_nodes] (auto& g) { + g._advertise_to_nodes = advertise_to_nodes; + g._advertise_myself = true; + }); +} + future<> gossiper::advertise_myself() { return container().invoke_on_all([] (auto& g) { g._advertise_myself = true; diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 32cdcfaf57..1dfe80632f 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -129,7 +129,7 @@ private: future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg); future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg); future<> handle_ack2_msg(gossip_digest_ack2 msg); - future<> handle_echo_msg(); + future<> handle_echo_msg(inet_address from, std::optional generation_number_opt); future<> handle_shutdown_msg(inet_address from); future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg); future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector ack_msg_digest); @@ -146,8 +146,15 @@ private: std::unordered_map _syn_handlers; std::unordered_map _ack_handlers; bool _advertise_myself = true; + // Map ip address and generation number + std::unordered_map _advertise_to_nodes; public: future<> advertise_myself(); + // Get current generation number for the given nodes + future> + get_generation_for_nodes(std::list nodes); + // Only respond echo message listed in nodes with the generation number + future<> advertise_to_nodes(std::unordered_map advertise_to_nodes = {}); const sstring& get_cluster_name() const noexcept; const sstring& get_partitioner_name() const noexcept; inet_address get_broadcast_address() const noexcept { diff --git a/message/messaging_service.cc b/message/messaging_service.cc index dfd28610b4..5789122070 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1050,14 +1050,14 @@ future<> messaging_service::unregister_complete_message() { return unregister_handler(messaging_verb::COMPLETE_MESSAGE); } -void messaging_service::register_gossip_echo(std::function ()>&& func) { +void messaging_service::register_gossip_echo(std::function (const rpc::client_info& cinfo, rpc::optional generation_number)>&& func) { register_handler(this, messaging_verb::GOSSIP_ECHO, std::move(func)); } future<> messaging_service::unregister_gossip_echo() { return unregister_handler(netw::messaging_verb::GOSSIP_ECHO); } -future<> messaging_service::send_gossip_echo(msg_addr id) { - return send_message_timeout(this, messaging_verb::GOSSIP_ECHO, std::move(id), 15000ms); +future<> messaging_service::send_gossip_echo(msg_addr id, int64_t generation_number) { + return send_message_timeout(this, messaging_verb::GOSSIP_ECHO, std::move(id), 15000ms, generation_number); } void messaging_service::register_gossip_shutdown(std::function&& func) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index f1371fcfd6..6bad7d7ec4 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -407,9 +407,9 @@ public: future send_node_ops_cmd(msg_addr id, node_ops_cmd_request); // Wrapper for GOSSIP_ECHO verb - void register_gossip_echo(std::function ()>&& func); + void register_gossip_echo(std::function (const rpc::client_info& cinfo, rpc::optional generation_number)>&& func); future<> unregister_gossip_echo(); - future<> send_gossip_echo(msg_addr id); + future<> send_gossip_echo(msg_addr id, int64_t generation_number); // Wrapper for GOSSIP_SHUTDOWN void register_gossip_shutdown(std::function&& func); diff --git a/test/manual/message.cc b/test/manual/message.cc index 04b6c66b79..b150c5931e 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -111,7 +111,7 @@ public: return messaging_service::no_wait(); }); - ms.register_gossip_echo([] { + ms.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional gen_opt) { fmt::print("Server got gossip echo msg\n"); throw std::runtime_error("I'm throwing runtime_error exception"); return make_ready_future<>(); @@ -149,7 +149,8 @@ public: future<> test_echo() { fmt::print("=== {} ===\n", __func__); auto id = get_msg_addr(); - return ms.send_gossip_echo(id).then_wrapped([] (auto&& f) { + int64_t gen = 0x1; + return ms.send_gossip_echo(id, gen).then_wrapped([] (auto&& f) { try { f.get(); return make_ready_future<>(); From 323f72e48a7d89d9edc40257a1f45c202accc146 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 22 Mar 2021 13:06:21 +0800 Subject: [PATCH 4/4] repair: Switch to use NODE_OPS_CMD for replace operation In commit c82250e0cfacb62617155ac98583ac90aed40133 (gossip: Allow deferring advertise of local node to be up), the replacing node is changed to postpone the responding of gossip echo message to avoid other nodes sending read requests to the replacing node. It works as following: 1) replacing node does not respond echo message to avoid other nodes to mark replacing node as alive 2) replacing node advertises hibernate state so other nodes knows replacing node is replacing 3) replacing node responds echo message so other nodes can mark replacing node as alive This is problematic because after step 2, the existing nodes in the cluster will start to send writes to the replacing node, but at this time it is possible that existing nodes haven't marked the replacing node as alive, thus failing the write request unnecessarily. For instance, we saw the following errors in issue #8013 (Cassandra stress fails to achieve consistency when only one of the nodes is down) ``` scylla: [shard 1] consistency - Live nodes 2 do not satisfy ConsistencyLevel (2 required, 1 pending, live_endpoints={127.0.0.2, 127.0.0.1}, pending_endpoints={127.0.0.3}) [shard 0] gossip - Fail to send EchoMessage to 127.0.0.3: std::runtime_error (Not ready to respond gossip echo message) c-s: java.io.IOException: Operation x10 on key(s) [4c4f4d37324c35304c30]: Error executing: (UnavailableException): Not enough replicas available for query at consistency QUORUM (2 required but only 1 alive ``` To solve this problem, we can do the replacing operation in multiple stages. One solution is to introduce a new gossip status state as proposed here: gossip: Introduce STATUS_PREPARE_REPLACE #7416 1) replacing node does not respond echo message 2) replacing node advertises prepare_replace state (Remove replacing node from natural endpoint, but do not put in pending list yet) 3) replacing node responds echo message 4) replacing node advertises hibernate state (Put replacing node in pending list) Since we now have the node ops verb introduced in 829b4c14380020fa4058335c4c43cefec135b3de (repair: Make removenode safe by default), we can do the multiple stage without introducing a new gossip status state. This patch uses the NODE_OPS_CMD infrastructure to implement replace operation. Improvements: 1) It solves the race between marking replacing node alive and sending writes to replacing node 2) The cluster reverts to a state before the replace operation automatically in case of error. As a result, it solves when the replacing node fails in the middle of the operation, the repacing node will be in HIBERNATE status forever issue. 3) The gossip status of the node to be replaced is not changed until the replace operation is successful. HIBERNATE gossip status is not used anymore. 4) Users can now pass a list of dead nodes to ignore explicitly. Refs #8013 --- idl/partition_checksum.idl.hh | 8 ++ repair/repair.hh | 8 ++ service/storage_service.cc | 238 +++++++++++++++++++++++++++++++++- service/storage_service.hh | 3 + 4 files changed, 254 insertions(+), 3 deletions(-) diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index 5c2bcbc835..322f65a037 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -110,6 +110,12 @@ enum class node_ops_cmd : uint32_t { removenode_sync_data, removenode_abort, removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, }; struct node_ops_cmd_request { @@ -117,6 +123,8 @@ struct node_ops_cmd_request { utils::UUID ops_uuid; std::list ignore_nodes; std::list leaving_nodes; + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes; }; struct node_ops_cmd_response { diff --git a/repair/repair.hh b/repair/repair.hh index 61ecd4c6cf..d35f4fdad1 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -489,6 +489,12 @@ enum class node_ops_cmd : uint32_t { removenode_sync_data, removenode_abort, removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, }; // The cmd and ops_uuid are mandatory for each request. @@ -498,6 +504,8 @@ struct node_ops_cmd_request { utils::UUID ops_uuid; std::list ignore_nodes; std::list leaving_nodes; + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes; }; struct node_ops_cmd_response { diff --git a/service/storage_service.cc b/service/storage_service.cc index c8531c6d91..f9832d63e9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -682,6 +682,8 @@ void storage_service::bootstrap() { // Wait until we know tokens of existing node before announcing replacing status. set_mode(mode::JOINING, sprint("Wait until local node knows tokens of peer nodes"), true); _gossiper.wait_for_range_setup().get(); + + if (!is_repair_based_node_ops_enabled()) { set_mode(mode::JOINING, sprint("Announce tokens and status of the replacing node"), true); _gossiper.add_local_application_state({ { gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) }, @@ -691,6 +693,7 @@ void storage_service::bootstrap() { _gossiper.advertise_myself().get(); set_mode(mode::JOINING, format("Wait until peer nodes know the bootstrap tokens of local node"), true); _gossiper.wait_for_range_setup().get(); + } auto replace_addr = db().local().get_replace_address(); if (replace_addr) { slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr); @@ -707,7 +710,7 @@ void storage_service::bootstrap() { set_mode(mode::JOINING, "Starting to bootstrap...", true); if (is_repair_based_node_ops_enabled()) { if (db().local().is_replacing()) { - replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); + run_replace_ops(); } else { bootstrap_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); } @@ -1911,6 +1914,150 @@ future<> storage_service::decommission() { }); } +// Runs inside seastar::async context +void storage_service::run_replace_ops() { + if (!db().local().get_replace_address()) { + throw std::runtime_error(format("replace_address is empty")); + } + auto replace_address = db().local().get_replace_address().value(); + auto uuid = utils::make_random_uuid(); + // TODO: Specify ignore_nodes + std::list ignore_nodes; + // Step 1: Decide who needs to sync data for replace operation + std::list sync_nodes; + for (const auto& x :_gossiper.endpoint_state_map) { + seastar::thread::maybe_yield(); + const auto& node = x.first; + slogger.debug("replace[{}]: Check node={}, status={}", uuid, node, _gossiper.get_gossip_status(node)); + if (node != get_broadcast_address() && + node != replace_address && + _gossiper.is_normal_ring_member(node) && + std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) { + sync_nodes.push_back(node); + } + } + sync_nodes.push_front(get_broadcast_address()); + auto sync_nodes_generations = _gossiper.get_generation_for_nodes(sync_nodes).get(); + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes = { + {replace_address, get_broadcast_address()}, + }; + std::unordered_set nodes_unknown_verb; + std::unordered_set nodes_down; + std::unordered_set nodes_aborted; + auto req = node_ops_cmd_request{node_ops_cmd::replace_prepare, uuid, ignore_nodes, {}, replace_nodes}; + slogger.info("replace[{}]: Started replace operation, replace_nodes={}, sync_nodes={}, ignore_nodes={}", uuid, replace_nodes, sync_nodes, ignore_nodes); + future<> heartbeat_updater = make_ready_future<>(); + auto heartbeat_updater_done = make_lw_shared(false); + try { + // Step 2: Prepare to sync data + parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got node_ops_cmd::replace_prepare response from node={}", uuid, node); + }).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) { + slogger.warn("replace[{}]: Node {} does not support node_ops_cmd verb", uuid, node); + nodes_unknown_verb.emplace(node); + }).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) { + slogger.warn("replace[{}]: Node {} is down for node_ops_cmd verb", uuid, node); + nodes_down.emplace(node); + }); + }).get(); + if (!nodes_unknown_verb.empty()) { + auto msg = format("replace[{}]: Nodes={} do not support replace verb. Please upgrade your cluster and run replace again.", uuid, nodes_unknown_verb); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + if (!nodes_down.empty()) { + auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes . E.g., scylla --ignore-dead-nodes 127.0.0.1,127.0.0.2", uuid, nodes_down); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + + // Step 3: Start heartbeat updater + heartbeat_updater = seastar::async([this, &sync_nodes, uuid, heartbeat_updater_done] { + slogger.debug("replace[{}]: Started heartbeat_updater", uuid); + while (!(*heartbeat_updater_done)) { + auto req = node_ops_cmd_request{node_ops_cmd::replace_heartbeat, uuid, {}, {}, {}}; + parallel_for_each(sync_nodes, [this, req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got heartbeat response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).handle_exception([uuid] (std::exception_ptr ep) { + slogger.warn("replace[{}]: Failed to send heartbeat: {}", uuid, ep); + }).get(); + int nr_seconds = 10; + while (!(*heartbeat_updater_done) && nr_seconds--) { + sleep_abortable(std::chrono::seconds(1), _abort_source).get(); + } + } + slogger.debug("replace[{}]: Stopped heartbeat_updater", uuid); + }); + auto stop_heartbeat_updater = defer([&] { + *heartbeat_updater_done = true; + heartbeat_updater.get(); + }); + + + // Step 4: Allow nodes in sync_nodes list to mark the replacing node as alive + _gossiper.advertise_to_nodes(sync_nodes_generations).get(); + slogger.info("replace[{}]: Allow nodes={} to mark replacing node={} as alive", uuid, sync_nodes, get_broadcast_address()); + + // Step 5: Wait for nodes to finish marking the replacing node as live + req.cmd = node_ops_cmd::replace_prepare_mark_alive; + parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got prepare_mark_alive response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + // Step 6: Update pending ranges on nodes + req.cmd = node_ops_cmd::replace_prepare_pending_ranges; + parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got pending_ranges response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + + // Step 7: Sync data for replace + replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); + + + // Step 8: Finish + req.cmd = node_ops_cmd::replace_done; + parallel_for_each(sync_nodes, [this, &req, &nodes_aborted, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([&nodes_aborted, uuid, node] (node_ops_cmd_response resp) { + nodes_aborted.emplace(node); + slogger.debug("replace[{}]: Got done response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + // Allow any nodes to mark the replacing node as alive + _gossiper.advertise_to_nodes({}).get(); + slogger.info("replace[{}]: Allow any nodes to mark replacing node={} as alive", uuid, get_broadcast_address()); + } catch (...) { + slogger.error("replace[{}]: Abort replace operation started, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}", + uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception()); + // we need to revert the effect of prepare verb the replace ops is failed + req.cmd = node_ops_cmd::replace_abort; + parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, &nodes_aborted, uuid] (const gms::inet_address& node) { + if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || nodes_aborted.contains(node)) { + // No need to revert previous prepare cmd for those who do not apply prepare cmd. + return make_ready_future<>(); + } + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got abort response from node={}", uuid, node); + }); + }).get(); + slogger.error("replace[{}]: Abort replace operation finished, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}", + uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception()); + throw; + } +} + future<> storage_service::removenode(sstring host_id_string, std::list ignore_nodes) { return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable { return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] { @@ -1949,7 +2096,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list nodes_unknown_verb; std::unordered_set nodes_down; - auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes}; + auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes, {}}; try { parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { @@ -1977,7 +2124,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list storage_service::removenode(sstring host_id_string, std::list>(_node_ops| boost::adaptors::map_keys); + std::string msg; + if (req.cmd == node_ops_cmd::removenode_prepare || req.cmd == node_ops_cmd::replace_prepare) { + // Peer node wants to start a new node operation. Make sure no pending node operation is in progress. + if (!_node_ops.empty()) { + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } + } else { + if (ops_uuids.size() == 1 && ops_uuids.front() == req.ops_uuid) { + // Check is good, since we know this ops_uuid and this is the only ops_uuid we are working on. + } else if (ops_uuids.size() == 0) { + // The ops_uuid received is unknown. Fail the request. + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, the node ops is unknown", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } else { + // Other node ops is in progress. Fail the request. + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } + } + if (!msg.empty()) { + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } +} + future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable { return seastar::async([&ss, coordinator, req = std::move(req)] () mutable { auto ops_uuid = req.ops_uuid; slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid); + + ss.node_ops_cmd_check(coordinator, req); + if (req.cmd == node_ops_cmd::removenode_prepare) { if (req.leaving_nodes.size() > 1) { auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); @@ -2089,6 +2267,60 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } } else if (req.cmd == node_ops_cmd::removenode_abort) { ss.node_ops_abort(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_prepare) { + // Mark the replacing node as replacing + if (req.replace_nodes.size() > 1) { + auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->add_replacing_endpoint(existing_node, replacing_node); + } + return make_ready_future<>(); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { + return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->del_replacing_endpoint(existing_node); + } + return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }); + }, + [&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); }); + ss._node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { + // Wait for local node has marked replacing node as alive + auto nodes = boost::copy_range>(req.replace_nodes| boost::adaptors::map_values); + try { + ss._gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000)); + } catch (...) { + slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}", + req.ops_uuid, req.replace_nodes, std::current_exception()); + throw; + } + } else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) { + // Update the pending_ranges for the replacing node + slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator); + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }).get(); + } else if (req.cmd == node_ops_cmd::replace_heartbeat) { + slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_done) { + slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_abort) { + ss.node_ops_abort(ops_uuid); } else { auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd)); slogger.warn("{}", msg); diff --git a/service/storage_service.hh b/service/storage_service.hh index e65b430689..86368577ae 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -386,6 +386,8 @@ private: future prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); + void run_replace_ops(); + public: future is_initialized(); @@ -806,6 +808,7 @@ public: */ future<> removenode(sstring host_id_string, std::list ignore_nodes); future node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req); + void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req); future get_operation_mode();