diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index 5c2bcbc835..322f65a037 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -110,6 +110,12 @@ enum class node_ops_cmd : uint32_t { removenode_sync_data, removenode_abort, removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, }; struct node_ops_cmd_request { @@ -117,6 +123,8 @@ struct node_ops_cmd_request { utils::UUID ops_uuid; std::list ignore_nodes; std::list leaving_nodes; + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes; }; struct node_ops_cmd_response { diff --git a/repair/repair.hh b/repair/repair.hh index 61ecd4c6cf..d35f4fdad1 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -489,6 +489,12 @@ enum class node_ops_cmd : uint32_t { removenode_sync_data, removenode_abort, removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, }; // The cmd and ops_uuid are mandatory for each request. @@ -498,6 +504,8 @@ struct node_ops_cmd_request { utils::UUID ops_uuid; std::list ignore_nodes; std::list leaving_nodes; + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes; }; struct node_ops_cmd_response { diff --git a/service/storage_service.cc b/service/storage_service.cc index c8531c6d91..f9832d63e9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -682,6 +682,8 @@ void storage_service::bootstrap() { // Wait until we know tokens of existing node before announcing replacing status. set_mode(mode::JOINING, sprint("Wait until local node knows tokens of peer nodes"), true); _gossiper.wait_for_range_setup().get(); + + if (!is_repair_based_node_ops_enabled()) { set_mode(mode::JOINING, sprint("Announce tokens and status of the replacing node"), true); _gossiper.add_local_application_state({ { gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) }, @@ -691,6 +693,7 @@ void storage_service::bootstrap() { _gossiper.advertise_myself().get(); set_mode(mode::JOINING, format("Wait until peer nodes know the bootstrap tokens of local node"), true); _gossiper.wait_for_range_setup().get(); + } auto replace_addr = db().local().get_replace_address(); if (replace_addr) { slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr); @@ -707,7 +710,7 @@ void storage_service::bootstrap() { set_mode(mode::JOINING, "Starting to bootstrap...", true); if (is_repair_based_node_ops_enabled()) { if (db().local().is_replacing()) { - replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); + run_replace_ops(); } else { bootstrap_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); } @@ -1911,6 +1914,150 @@ future<> storage_service::decommission() { }); } +// Runs inside seastar::async context +void storage_service::run_replace_ops() { + if (!db().local().get_replace_address()) { + throw std::runtime_error(format("replace_address is empty")); + } + auto replace_address = db().local().get_replace_address().value(); + auto uuid = utils::make_random_uuid(); + // TODO: Specify ignore_nodes + std::list ignore_nodes; + // Step 1: Decide who needs to sync data for replace operation + std::list sync_nodes; + for (const auto& x :_gossiper.endpoint_state_map) { + seastar::thread::maybe_yield(); + const auto& node = x.first; + slogger.debug("replace[{}]: Check node={}, status={}", uuid, node, _gossiper.get_gossip_status(node)); + if (node != get_broadcast_address() && + node != replace_address && + _gossiper.is_normal_ring_member(node) && + std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) { + sync_nodes.push_back(node); + } + } + sync_nodes.push_front(get_broadcast_address()); + auto sync_nodes_generations = _gossiper.get_generation_for_nodes(sync_nodes).get(); + // Map existing nodes to replacing nodes + std::unordered_map replace_nodes = { + {replace_address, get_broadcast_address()}, + }; + std::unordered_set nodes_unknown_verb; + std::unordered_set nodes_down; + std::unordered_set nodes_aborted; + auto req = node_ops_cmd_request{node_ops_cmd::replace_prepare, uuid, ignore_nodes, {}, replace_nodes}; + slogger.info("replace[{}]: Started replace operation, replace_nodes={}, sync_nodes={}, ignore_nodes={}", uuid, replace_nodes, sync_nodes, ignore_nodes); + future<> heartbeat_updater = make_ready_future<>(); + auto heartbeat_updater_done = make_lw_shared(false); + try { + // Step 2: Prepare to sync data + parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got node_ops_cmd::replace_prepare response from node={}", uuid, node); + }).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) { + slogger.warn("replace[{}]: Node {} does not support node_ops_cmd verb", uuid, node); + nodes_unknown_verb.emplace(node); + }).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) { + slogger.warn("replace[{}]: Node {} is down for node_ops_cmd verb", uuid, node); + nodes_down.emplace(node); + }); + }).get(); + if (!nodes_unknown_verb.empty()) { + auto msg = format("replace[{}]: Nodes={} do not support replace verb. Please upgrade your cluster and run replace again.", uuid, nodes_unknown_verb); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + if (!nodes_down.empty()) { + auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes . E.g., scylla --ignore-dead-nodes 127.0.0.1,127.0.0.2", uuid, nodes_down); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + + // Step 3: Start heartbeat updater + heartbeat_updater = seastar::async([this, &sync_nodes, uuid, heartbeat_updater_done] { + slogger.debug("replace[{}]: Started heartbeat_updater", uuid); + while (!(*heartbeat_updater_done)) { + auto req = node_ops_cmd_request{node_ops_cmd::replace_heartbeat, uuid, {}, {}, {}}; + parallel_for_each(sync_nodes, [this, req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got heartbeat response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).handle_exception([uuid] (std::exception_ptr ep) { + slogger.warn("replace[{}]: Failed to send heartbeat: {}", uuid, ep); + }).get(); + int nr_seconds = 10; + while (!(*heartbeat_updater_done) && nr_seconds--) { + sleep_abortable(std::chrono::seconds(1), _abort_source).get(); + } + } + slogger.debug("replace[{}]: Stopped heartbeat_updater", uuid); + }); + auto stop_heartbeat_updater = defer([&] { + *heartbeat_updater_done = true; + heartbeat_updater.get(); + }); + + + // Step 4: Allow nodes in sync_nodes list to mark the replacing node as alive + _gossiper.advertise_to_nodes(sync_nodes_generations).get(); + slogger.info("replace[{}]: Allow nodes={} to mark replacing node={} as alive", uuid, sync_nodes, get_broadcast_address()); + + // Step 5: Wait for nodes to finish marking the replacing node as live + req.cmd = node_ops_cmd::replace_prepare_mark_alive; + parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got prepare_mark_alive response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + // Step 6: Update pending ranges on nodes + req.cmd = node_ops_cmd::replace_prepare_pending_ranges; + parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got pending_ranges response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + + // Step 7: Sync data for replace + replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get(); + + + // Step 8: Finish + req.cmd = node_ops_cmd::replace_done; + parallel_for_each(sync_nodes, [this, &req, &nodes_aborted, uuid] (const gms::inet_address& node) { + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([&nodes_aborted, uuid, node] (node_ops_cmd_response resp) { + nodes_aborted.emplace(node); + slogger.debug("replace[{}]: Got done response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + // Allow any nodes to mark the replacing node as alive + _gossiper.advertise_to_nodes({}).get(); + slogger.info("replace[{}]: Allow any nodes to mark replacing node={} as alive", uuid, get_broadcast_address()); + } catch (...) { + slogger.error("replace[{}]: Abort replace operation started, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}", + uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception()); + // we need to revert the effect of prepare verb the replace ops is failed + req.cmd = node_ops_cmd::replace_abort; + parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, &nodes_aborted, uuid] (const gms::inet_address& node) { + if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || nodes_aborted.contains(node)) { + // No need to revert previous prepare cmd for those who do not apply prepare cmd. + return make_ready_future<>(); + } + return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("replace[{}]: Got abort response from node={}", uuid, node); + }); + }).get(); + slogger.error("replace[{}]: Abort replace operation finished, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}", + uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception()); + throw; + } +} + future<> storage_service::removenode(sstring host_id_string, std::list ignore_nodes) { return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable { return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] { @@ -1949,7 +2096,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list nodes_unknown_verb; std::unordered_set nodes_down; - auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes}; + auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes, {}}; try { parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { @@ -1977,7 +2124,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list storage_service::removenode(sstring host_id_string, std::list>(_node_ops| boost::adaptors::map_keys); + std::string msg; + if (req.cmd == node_ops_cmd::removenode_prepare || req.cmd == node_ops_cmd::replace_prepare) { + // Peer node wants to start a new node operation. Make sure no pending node operation is in progress. + if (!_node_ops.empty()) { + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } + } else { + if (ops_uuids.size() == 1 && ops_uuids.front() == req.ops_uuid) { + // Check is good, since we know this ops_uuid and this is the only ops_uuid we are working on. + } else if (ops_uuids.size() == 0) { + // The ops_uuid received is unknown. Fail the request. + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, the node ops is unknown", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } else { + // Other node ops is in progress. Fail the request. + msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress", + get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids); + } + } + if (!msg.empty()) { + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } +} + future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable { return seastar::async([&ss, coordinator, req = std::move(req)] () mutable { auto ops_uuid = req.ops_uuid; slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid); + + ss.node_ops_cmd_check(coordinator, req); + if (req.cmd == node_ops_cmd::removenode_prepare) { if (req.leaving_nodes.size() > 1) { auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); @@ -2089,6 +2267,60 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad } } else if (req.cmd == node_ops_cmd::removenode_abort) { ss.node_ops_abort(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_prepare) { + // Mark the replacing node as replacing + if (req.replace_nodes.size() > 1) { + auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->add_replacing_endpoint(existing_node, replacing_node); + } + return make_ready_future<>(); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { + return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->del_replacing_endpoint(existing_node); + } + return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }); + }, + [&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); }); + ss._node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { + // Wait for local node has marked replacing node as alive + auto nodes = boost::copy_range>(req.replace_nodes| boost::adaptors::map_values); + try { + ss._gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000)); + } catch (...) { + slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}", + req.ops_uuid, req.replace_nodes, std::current_exception()); + throw; + } + } else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) { + // Update the pending_ranges for the replacing node + slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator); + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }).get(); + } else if (req.cmd == node_ops_cmd::replace_heartbeat) { + slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_done) { + slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_abort) { + ss.node_ops_abort(ops_uuid); } else { auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd)); slogger.warn("{}", msg); diff --git a/service/storage_service.hh b/service/storage_service.hh index e65b430689..86368577ae 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -386,6 +386,8 @@ private: future prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); + void run_replace_ops(); + public: future is_initialized(); @@ -806,6 +808,7 @@ public: */ future<> removenode(sstring host_id_string, std::list ignore_nodes); future node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req); + void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req); future get_operation_mode();