From 323f72e48a7d89d9edc40257a1f45c202accc146 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 22 Mar 2021 13:06:21 +0800 Subject: [PATCH] repair: Switch to use NODE_OPS_CMD for replace operation In commit c82250e0cfacb62617155ac98583ac90aed40133 (gossip: Allow deferring advertise of local node to be up), the replacing node is changed to postpone the responding of gossip echo message to avoid other nodes sending read requests to the replacing node. It works as following: 1) replacing node does not respond echo message to avoid other nodes to mark replacing node as alive 2) replacing node advertises hibernate state so other nodes knows replacing node is replacing 3) replacing node responds echo message so other nodes can mark replacing node as alive This is problematic because after step 2, the existing nodes in the cluster will start to send writes to the replacing node, but at this time it is possible that existing nodes haven't marked the replacing node as alive, thus failing the write request unnecessarily. For instance, we saw the following errors in issue #8013 (Cassandra stress fails to achieve consistency when only one of the nodes is down) ``` scylla: [shard 1] consistency - Live nodes 2 do not satisfy ConsistencyLevel (2 required, 1 pending, live_endpoints={127.0.0.2, 127.0.0.1}, pending_endpoints={127.0.0.3}) [shard 0] gossip - Fail to send EchoMessage to 127.0.0.3: std::runtime_error (Not ready to respond gossip echo message) c-s: java.io.IOException: Operation x10 on key(s) [4c4f4d37324c35304c30]: Error executing: (UnavailableException): Not enough replicas available for query at consistency QUORUM (2 required but only 1 alive ``` To solve this problem, we can do the replacing operation in multiple stages. One solution is to introduce a new gossip status state as proposed here: gossip: Introduce STATUS_PREPARE_REPLACE #7416 1) replacing node does not respond echo message 2) replacing node advertises prepare_replace state (Remove replacing node from natural endpoint, but do not put in pending list yet) 3) replacing node responds echo message 4) replacing node advertises hibernate state (Put replacing node in pending list) Since we now have the node ops verb introduced in 829b4c14380020fa4058335c4c43cefec135b3de (repair: Make removenode safe by default), we can do the multiple stage without introducing a new gossip status state. This patch uses the NODE_OPS_CMD infrastructure to implement replace operation. Improvements: 1) It solves the race between marking replacing node alive and sending writes to replacing node 2) The cluster reverts to a state before the replace operation automatically in case of error. As a result, it solves when the replacing node fails in the middle of the operation, the repacing node will be in HIBERNATE status forever issue. 3) The gossip status of the node to be replaced is not changed until the replace operation is successful. HIBERNATE gossip status is not used anymore. 4) Users can now pass a list of dead nodes to ignore explicitly. Refs #8013 --- idl/partition_checksum.idl.hh | 8 ++ repair/repair.hh | 8 ++ service/storage_service.cc | 238 +++++++++++++++++++++++++++++++++- service/storage_service.hh | 3 + 4 files changed, 254 insertions(+), 3 deletions(-) diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index 5c2bcbc835..322f65a037 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -110,6 +110,12 @@ enum class node_ops_cmd : uint32_t { removenode_sync_data, removenode_abort, removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, }; struct node_ops_cmd_request { @@ -117,6 +123,8 @@ struct node_ops_cmd_request { utils::UUID ops_uuid; std::list ignore_nodes; std::list leaving_nodes; + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes; }; struct node_ops_cmd_response { diff --git a/repair/repair.hh b/repair/repair.hh index 61ecd4c6cf..d35f4fdad1 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -489,6 +489,12 @@ enum class node_ops_cmd : uint32_t { removenode_sync_data, removenode_abort, removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, }; // The cmd and ops_uuid are mandatory for each request. @@ -498,6 +504,8 @@ struct node_ops_cmd_request { utils::UUID ops_uuid; std::list ignore_nodes; std::list leaving_nodes; + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes; }; struct node_ops_cmd_response { diff --git a/service/storage_service.cc b/service/storage_service.cc index c8531c6d91..f9832d63e9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -682,6 +682,8 @@ void storage_service::bootstrap() { // Wait until we know tokens of existing node before announcing replacing status. set_mode(mode::JOINING, sprint("Wait until local node knows tokens of peer nodes"), true); _gossiper.wait_for_range_setup().get(); + + if (!is_repair_based_node_ops_enabled()) { set_mode(mode::JOINING, sprint("Announce tokens and status of the replacing node"), true); _gossiper.add_local_application_state({ { gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) }, @@ -691,6 +693,7 @@ void storage_service::bootstrap() { _gossiper.advertise_myself().get(); set_mode(mode::JOINING, format("Wait until peer nodes know the bootstrap tokens of local node"), true); _gossiper.wait_for_range_setup().get(); + } auto replace_addr = db().local().get_replace_address(); if (replace_addr) { slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr); @@ -707,7 +710,7 @@ void storage_service::bootstrap() { set_mode(mode::JOINING, "Starting to bootstrap...", true); if (is_repair_based_node_ops_enabled()) { if (db().local().is_replacing()) { - replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); + run_replace_ops(); } else { bootstrap_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); } @@ -1911,6 +1914,150 @@ future<> storage_service::decommission() { }); } +// Runs inside seastar::async context +void storage_service::run_replace_ops() { + if (!db().local().get_replace_address()) { + throw std::runtime_error(format("replace_address is empty")); + } + auto replace_address = db().local().get_replace_address().value(); + auto uuid = utils::make_random_uuid(); + // TODO: Specify ignore_nodes + std::list ignore_nodes; + // Step 1: Decide who needs to sync data for replace operation + std::list sync_nodes; + for (const auto& x :_gossiper.endpoint_state_map) { + seastar::thread::maybe_yield(); + const auto& node = x.first; + slogger.debug("replace[{}]: Check node={}, status={}", uuid, node, _gossiper.get_gossip_status(node)); + if (node != get_broadcast_address() && + node != replace_address && + _gossiper.is_normal_ring_member(node) && + std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) { + sync_nodes.push_back(node); + } + } + sync_nodes.push_front(get_broadcast_address()); + auto sync_nodes_generations = _gossiper.get_generation_for_nodes(sync_nodes).get(); + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes = { + {replace_address, get_broadcast_address()}, + }; + std::unordered_set nodes_unknown_verb; + std::unordered_set nodes_down; + std::unordered_set nodes_aborted; + auto req = node_ops_cmd_request{node_ops_cmd::replace_prepare, uuid, ignore_nodes, {}, replace_nodes}; + slogger.info("replace[{}]: Started replace operation, replace_nodes={}, sync_nodes={}, ignore_nodes={}", uuid, replace_nodes, sync_nodes, ignore_nodes); + future<> heartbeat_updater = make_ready_future<>(); + auto heartbeat_updater_done = make_lw_shared(false); + try { + // Step 2: Prepare to sync data + parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got node_ops_cmd::replace_prepare response from node={}", uuid, node); + }).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) { + slogger.warn("replace[{}]: Node {} does not support node_ops_cmd verb", uuid, node); + nodes_unknown_verb.emplace(node); + }).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) { + slogger.warn("replace[{}]: Node {} is down for node_ops_cmd verb", uuid, node); + nodes_down.emplace(node); + }); + }).get(); + if (!nodes_unknown_verb.empty()) { + auto msg = format("replace[{}]: Nodes={} do not support replace verb. Please upgrade your cluster and run replace again.", uuid, nodes_unknown_verb); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + if (!nodes_down.empty()) { + auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes . E.g., scylla --ignore-dead-nodes 127.0.0.1,127.0.0.2", uuid, nodes_down); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + + // Step 3: Start heartbeat updater + heartbeat_updater = seastar::async([this, &sync_nodes, uuid, heartbeat_updater_done] { + slogger.debug("replace[{}]: Started heartbeat_updater", uuid); + while (!(*heartbeat_updater_done)) { + auto req = node_ops_cmd_request{node_ops_cmd::replace_heartbeat, uuid, {}, {}, {}}; + parallel_for_each(sync_nodes, [this, req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got heartbeat response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).handle_exception([uuid] (std::exception_ptr ep) { + slogger.warn("replace[{}]: Failed to send heartbeat: {}", uuid, ep); + }).get(); + int nr_seconds = 10; + while (!(*heartbeat_updater_done) && nr_seconds--) { + sleep_abortable(std::chrono::seconds(1), _abort_source).get(); + } + } + slogger.debug("replace[{}]: Stopped heartbeat_updater", uuid); + }); + auto stop_heartbeat_updater = defer([&] { + *heartbeat_updater_done = true; + heartbeat_updater.get(); + }); + + + // Step 4: Allow nodes in sync_nodes list to mark the replacing node as alive + _gossiper.advertise_to_nodes(sync_nodes_generations).get(); + slogger.info("replace[{}]: Allow nodes={} to mark replacing node={} as alive", uuid, sync_nodes, get_broadcast_address()); + + // Step 5: Wait for nodes to finish marking the replacing node as live + req.cmd = node_ops_cmd::replace_prepare_mark_alive; + parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got prepare_mark_alive response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + // Step 6: Update pending ranges on nodes + req.cmd = node_ops_cmd::replace_prepare_pending_ranges; + parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got pending_ranges response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + + // Step 7: Sync data for replace + replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); + + + // Step 8: Finish + req.cmd = node_ops_cmd::replace_done; + parallel_for_each(sync_nodes, [this, &req, &nodes_aborted, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([&nodes_aborted, uuid, node] (node_ops_cmd_response resp) { + nodes_aborted.emplace(node); + slogger.debug("replace[{}]: Got done response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + // Allow any nodes to mark the replacing node as alive + _gossiper.advertise_to_nodes({}).get(); + slogger.info("replace[{}]: Allow any nodes to mark replacing node={} as alive", uuid, get_broadcast_address()); + } catch (...) { + slogger.error("replace[{}]: Abort replace operation started, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}", + uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception()); + // we need to revert the effect of prepare verb the replace ops is failed + req.cmd = node_ops_cmd::replace_abort; + parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, &nodes_aborted, uuid] (const gms::inet_address& node) { + if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || nodes_aborted.contains(node)) { + // No need to revert previous prepare cmd for those who do not apply prepare cmd. + return make_ready_future<>(); + } + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got abort response from node={}", uuid, node); + }); + }).get(); + slogger.error("replace[{}]: Abort replace operation finished, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}", + uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception()); + throw; + } +} + future<> storage_service::removenode(sstring host_id_string, std::list ignore_nodes) { return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable { return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] { @@ -1949,7 +2096,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list nodes_unknown_verb; std::unordered_set nodes_down; - auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes}; + auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes, {}}; try { parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { @@ -1977,7 +2124,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list storage_service::removenode(sstring host_id_string, std::list>(_node_ops| boost::adaptors::map_keys); + std::string msg; + if (req.cmd == node_ops_cmd::removenode_prepare || req.cmd == node_ops_cmd::replace_prepare) { + // Peer node wants to start a new node operation. Make sure no pending node operation is in progress. + if (!_node_ops.empty()) { + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } + } else { + if (ops_uuids.size() == 1 && ops_uuids.front() == req.ops_uuid) { + // Check is good, since we know this ops_uuid and this is the only ops_uuid we are working on. + } else if (ops_uuids.size() == 0) { + // The ops_uuid received is unknown. Fail the request. + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, the node ops is unknown", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } else { + // Other node ops is in progress. Fail the request. + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } + } + if (!msg.empty()) { + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } +} + future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable { return seastar::async([&ss, coordinator, req = std::move(req)] () mutable { auto ops_uuid = req.ops_uuid; slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid); + + ss.node_ops_cmd_check(coordinator, req); + if (req.cmd == node_ops_cmd::removenode_prepare) { if (req.leaving_nodes.size() > 1) { auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); @@ -2089,6 +2267,60 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } } else if (req.cmd == node_ops_cmd::removenode_abort) { ss.node_ops_abort(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_prepare) { + // Mark the replacing node as replacing + if (req.replace_nodes.size() > 1) { + auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->add_replacing_endpoint(existing_node, replacing_node); + } + return make_ready_future<>(); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { + return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->del_replacing_endpoint(existing_node); + } + return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }); + }, + [&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); }); + ss._node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { + // Wait for local node has marked replacing node as alive + auto nodes = boost::copy_range>(req.replace_nodes| boost::adaptors::map_values); + try { + ss._gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000)); + } catch (...) { + slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}", + req.ops_uuid, req.replace_nodes, std::current_exception()); + throw; + } + } else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) { + // Update the pending_ranges for the replacing node + slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator); + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }).get(); + } else if (req.cmd == node_ops_cmd::replace_heartbeat) { + slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_done) { + slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_abort) { + ss.node_ops_abort(ops_uuid); } else { auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd)); slogger.warn("{}", msg); diff --git a/service/storage_service.hh b/service/storage_service.hh index e65b430689..86368577ae 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -386,6 +386,8 @@ private: future prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); + void run_replace_ops(); + public: future is_initialized(); @@ -806,6 +808,7 @@ public: */ future<> removenode(sstring host_id_string, std::list ignore_nodes); future node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req); + void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req); future get_operation_mode();