diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index d5d88f450a..a00ca76314 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -1105,6 +1105,14 @@ "allowMultiple":false, "type":"string", "paramType":"query" + }, + { + "name":"ignore_nodes", + "description":"List of dead nodes to ingore in removenode operation", + "required":false, + "allowMultiple":false, + "type":"string", + "paramType":"query" } ] } diff --git a/api/storage_service.cc b/api/storage_service.cc index 252724623b..a7a31f056c 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -27,6 +27,7 @@ #include #include #include +#include #include "service/storage_service.hh" #include "service/load_meter.hh" #include "db/commitlog/commitlog.hh" @@ -496,7 +497,22 @@ void set_storage_service(http_context& ctx, routes& r) { ss::remove_node.set(r, [](std::unique_ptr req) { auto host_id = req->get_query_param("host_id"); - return service::get_local_storage_service().removenode(host_id).then([] { + std::vector ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ","); + auto ignore_nodes = std::list(); + for (std::string n : ignore_nodes_strs) { + try { + std::replace(n.begin(), n.end(), '\"', ' '); + std::replace(n.begin(), n.end(), '\'', ' '); + boost::trim_all(n); + if (!n.empty()) { + auto node = gms::inet_address(n); + ignore_nodes.push_back(node); + } + } catch (...) { + throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n)); + } + } + return service::get_local_storage_service().removenode(host_id, std::move(ignore_nodes)).then([] { return make_ready_future(json_void()); }); }); diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index b5404c2d64..5c2bcbc835 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -103,3 +103,22 @@ enum class repair_row_level_start_status: uint8_t { struct repair_row_level_start_response { repair_row_level_start_status status; }; + +enum class node_ops_cmd : uint32_t { + removenode_prepare, + removenode_heartbeat, + removenode_sync_data, + removenode_abort, + removenode_done, +}; + +struct node_ops_cmd_request { + node_ops_cmd cmd; + utils::UUID ops_uuid; + std::list ignore_nodes; + std::list leaving_nodes; +}; + +struct node_ops_cmd_response { + bool ok; +}; diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 4855136ae4..38cfe0935e 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -332,6 +332,7 @@ public: void remove_bootstrap_tokens(std::unordered_set tokens); void add_leaving_endpoint(inet_address endpoint); + void del_leaving_endpoint(inet_address endpoint); public: void remove_endpoint(inet_address endpoint); #if 0 @@ -1546,6 +1547,10 @@ void token_metadata_impl::add_leaving_endpoint(inet_address endpoint) { _leaving_endpoints.emplace(endpoint); } +void token_metadata_impl::del_leaving_endpoint(inet_address endpoint) { + _leaving_endpoints.erase(endpoint); +} + void token_metadata_impl::add_replacing_endpoint(inet_address existing_node, inet_address replacing_node) { tlogger.info("Added node {} as pending replacing endpoint which replaces existing node {}", replacing_node, existing_node); @@ -1806,6 +1811,11 @@ token_metadata::add_leaving_endpoint(inet_address endpoint) { _impl->add_leaving_endpoint(endpoint); } +void +token_metadata::del_leaving_endpoint(inet_address endpoint) { + _impl->del_leaving_endpoint(endpoint); +} + void token_metadata::remove_endpoint(inet_address endpoint) { _impl->remove_endpoint(endpoint); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 1a79e2e29b..84c371069b 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -236,6 +236,7 @@ public: void remove_bootstrap_tokens(std::unordered_set tokens); void add_leaving_endpoint(inet_address endpoint); + void del_leaving_endpoint(inet_address endpoint); void remove_endpoint(inet_address endpoint); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 0547dd087f..d46036f33f 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -504,6 +504,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM: case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM: case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM: + case messaging_verb::NODE_OPS_CMD: case messaging_verb::HINT_MUTATION: return 1; case messaging_verb::CLIENT_ID: @@ -1349,6 +1350,17 @@ future> messaging_service::send_rep return send_message>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id)); } +// Wrapper for NODE_OPS_CMD +void messaging_service::register_node_ops_cmd(std::function (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func) { + register_handler(this, messaging_verb::NODE_OPS_CMD, std::move(func)); +} +future<> messaging_service::unregister_node_ops_cmd() { + return unregister_handler(messaging_verb::NODE_OPS_CMD); +} +future messaging_service::send_node_ops_cmd(msg_addr id, node_ops_cmd_request req) { + return send_message>(this, messaging_verb::NODE_OPS_CMD, std::move(id), std::move(req)); +} + void messaging_service::register_paxos_prepare(std::function>>( const rpc::client_info&, rpc::opt_time_point, query::read_command cmd, partition_key key, utils::UUID ballot, diff --git a/message/messaging_service.hh b/message/messaging_service.hh index ee3c9f5a19..3e3f28aece 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -143,7 +143,8 @@ enum class messaging_verb : int32_t { HINT_MUTATION = 42, PAXOS_PRUNE = 43, GOSSIP_GET_ENDPOINT_STATES = 44, - LAST = 45, + NODE_OPS_CMD = 45, + LAST = 46, }; } // namespace netw @@ -394,6 +395,11 @@ public: future<> unregister_repair_get_diff_algorithms(); future> send_repair_get_diff_algorithms(msg_addr id); + // Wrapper for NODE_OPS_CMD + void register_node_ops_cmd(std::function (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func); + future<> unregister_node_ops_cmd(); + future send_node_ops_cmd(msg_addr id, node_ops_cmd_request); + // Wrapper for GOSSIP_ECHO verb void register_gossip_echo(std::function ()>&& func); future<> unregister_gossip_echo(); diff --git a/repair/repair.cc b/repair/repair.cc index 2633e1201f..978d771605 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -53,6 +53,14 @@ logging::logger rlogger("repair"); static sharded* _messaging; +void node_ops_info::check_abort() { + if (abort) { + auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid); + rlogger.warn("{}", msg); + throw std::runtime_error(msg); + } +} + class node_ops_metrics { public: node_ops_metrics() { @@ -434,6 +442,16 @@ void tracker::abort_all_repairs() { rlogger.info0("Aborted {} repair job(s)", count); } +void tracker::abort_repair_node_ops(utils::UUID ops_uuid) { + for (auto& x : _repairs[this_shard_id()]) { + auto& ri = x.second; + if (ri->ops_uuid() && ri->ops_uuid().value() == ops_uuid) { + rlogger.info0("Aborted repair jobs for ops_uuid={}", ops_uuid); + ri->abort(); + } + } +} + float tracker::report_progress(streaming::stream_reason reason) { uint64_t nr_ranges_finished = 0; uint64_t nr_ranges_total = 0; @@ -792,7 +810,8 @@ repair_info::repair_info(seastar::sharded& db_, repair_uniq_id id_, const std::vector& data_centers_, const std::vector& hosts_, - streaming::stream_reason reason_) + streaming::stream_reason reason_, + std::optional ops_uuid) : db(db_) , messaging(ms_) , sharder(get_sharder_for_tables(db_, keyspace_, table_ids_)) @@ -806,7 +825,8 @@ repair_info::repair_info(seastar::sharded& db_, , hosts(hosts_) , reason(reason_) , nr_ranges_total(ranges.size()) - , _row_level_repair(db.local().features().cluster_supports_row_level_repair()) { + , _row_level_repair(db.local().features().cluster_supports_row_level_repair()) + , _ops_uuid(std::move(ops_uuid)) { } future<> repair_info::do_streaming() { @@ -1625,7 +1645,7 @@ static int do_repair_start(seastar::sharded& db, seastar::sharded(db, ms, std::move(keyspace), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair); + id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair, id.uuid); return repair_ranges(ri); }); repair_results.push_back(std::move(f)); @@ -1695,14 +1715,15 @@ static future<> sync_data_using_repair(seastar::sharded& db, sstring keyspace, dht::token_range_vector ranges, std::unordered_map neighbors, - streaming::stream_reason reason) { + streaming::stream_reason reason, + std::optional ops_uuid) { if (ranges.empty()) { return make_ready_future<>(); } - return smp::submit_to(0, [&db, &ms, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable { + return smp::submit_to(0, [&db, &ms, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable { repair_uniq_id id = repair_tracker().next_repair_command(); rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace); - return repair_tracker().run(id, [id, &db, &ms, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable { + return repair_tracker().run(id, [id, &db, &ms, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_uuid] () mutable { auto cfs = list_column_families(db.local(), keyspace); if (cfs.empty()) { rlogger.warn("repair id {} to sync data for keyspace={}, no table in this keyspace", id, keyspace); @@ -1712,12 +1733,12 @@ static future<> sync_data_using_repair(seastar::sharded& db, std::vector> repair_results; repair_results.reserve(smp::count); for (auto shard : boost::irange(unsigned(0), smp::count)) { - auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason] (database& localdb) mutable { + auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason, ops_uuid] (database& localdb) mutable { auto data_centers = std::vector(); auto hosts = std::vector(); auto ri = make_lw_shared(db, ms, std::move(keyspace), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), reason); + id, std::move(data_centers), std::move(hosts), reason, ops_uuid); ri->neighbors = std::move(neighbors); return repair_ranges(ri); }); @@ -1910,16 +1931,16 @@ future<> bootstrap_with_repair(seastar::sharded& db, seastar::sharded< } } auto nr_ranges = desired_ranges.size(); - sync_data_using_repair(db, ms, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason).get(); + sync_data_using_repair(db, ms, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason, {}).get(); rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges); } rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces); }); } -static future<> do_decommission_removenode_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node) { +static future<> do_decommission_removenode_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { using inet_address = gms::inet_address; - return seastar::async([&db, &ms, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node)] () mutable { + return seastar::async([&db, &ms, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable { auto myip = utils::fb_utilities::get_broadcast_address(); auto keyspaces = db.local().get_non_system_keyspaces(); bool is_removenode = myip != leaving_node; @@ -1978,6 +1999,9 @@ static future<> do_decommission_removenode_with_repair(seastar::shardedcheck_abort(); + } auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); const std::vector new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes); const std::vector& current_eps = current_replica_endpoints[r]; @@ -2059,6 +2083,12 @@ static future<> do_decommission_removenode_with_repair(seastar::shardedignore_nodes) { + neighbors_set.erase(node); + } + } auto neighbors = boost::copy_range>(neighbors_set | boost::adaptors::filtered([&local_dc, &snitch_ptr] (const gms::inet_address& node) { return snitch_ptr->get_datacenter(node) == local_dc; @@ -2070,9 +2100,10 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded mandatory_neighbors = is_removenode ? neighbors : std::vector{}; + rlogger.info("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, neighbors={}, mandatory_neighbor={}", + op, keyspace_name, r, current_eps, new_eps, neighbors, mandatory_neighbors); + range_sources[r] = repair_neighbors(std::move(neighbors), std::move(mandatory_neighbors)); if (is_removenode) { ranges_for_removenode.push_back(r); } @@ -2091,7 +2122,7 @@ static future<> do_decommission_removenode_with_repair(seastar::shardedops_uuid).get(); rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}", op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped); } @@ -2100,11 +2131,17 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded decommission_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr) { - return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), utils::fb_utilities::get_broadcast_address()); + return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), utils::fb_utilities::get_broadcast_address(), {}); } -future<> removenode_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node) { - return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), std::move(leaving_node)); +future<> removenode_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { + return do_decommission_removenode_with_repair(db, ms, std::move(tmptr), std::move(leaving_node), std::move(ops)); +} + +future<> abort_repair_node_ops(utils::UUID ops_uuid) { + return smp::invoke_on_all([ops_uuid] { + return repair_tracker().abort_repair_node_ops(ops_uuid); + }); } static future<> do_rebuild_replace_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason) { @@ -2179,7 +2216,7 @@ static future<> do_rebuild_replace_with_repair(seastar::sharded& db, s }).get(); } auto nr_ranges = ranges.size(); - sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason).get(); + sync_data_using_repair(db, ms, keyspace_name, std::move(ranges), std::move(range_sources), reason, {}).get(); rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges); } rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc); @@ -2218,12 +2255,19 @@ static future<> init_messaging_service_handler(sharded& db, sharded("src_cpu_id"); + auto coordinator = cinfo.retrieve_auxiliary("baddr"); + return smp::submit_to(src_cpu_id % smp::count, [coordinator, req = std::move(req)] () mutable { + return service::get_local_storage_service().node_ops_cmd_handler(coordinator, std::move(req)); + }); + }); }); } static future<> uninit_messaging_service_handler() { return _messaging->invoke_on_all([] (auto& ms) { - return ms.unregister_repair_checksum_range(); + return when_all_succeed(ms.unregister_repair_checksum_range(), ms.unregister_node_ops_cmd()).discard_result(); }); } diff --git a/repair/repair.hh b/repair/repair.hh index 2f62b5cc69..13a0b6b392 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -76,13 +76,22 @@ struct repair_uniq_id { }; std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x); +struct node_ops_info { + utils::UUID ops_uuid; + bool abort = false; + std::list ignore_nodes; + void check_abort(); +}; + // The tokens are the tokens assigned to the bootstrap node. future<> bootstrap_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, std::unordered_set bootstrap_tokens); future<> decommission_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr); -future<> removenode_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node); +future<> removenode_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); future<> rebuild_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, sstring source_dc); future<> replace_with_repair(seastar::sharded& db, seastar::sharded& ms, locator::token_metadata_ptr tmptr, std::unordered_set replacing_tokens); +future<> abort_repair_node_ops(utils::UUID ops_uuid); + // NOTE: repair_start() can be run on any node, but starts a node-global // operation. // repair_start() starts the requested repair on this node. It returns an @@ -244,6 +253,7 @@ public: bool _row_level_repair; uint64_t _sub_ranges_nr = 0; std::unordered_set dropped_tables; + std::optional _ops_uuid; public: repair_info(seastar::sharded& db_, seastar::sharded& ms_, @@ -253,7 +263,8 @@ public: repair_uniq_id id_, const std::vector& data_centers_, const std::vector& hosts_, - streaming::stream_reason reason_); + streaming::stream_reason reason_, + std::optional ops_uuid); future<> do_streaming(); void check_failed_ranges(); future<> request_transfer_ranges(const sstring& cf, @@ -272,6 +283,9 @@ public: const std::vector& table_names() { return cfs; } + const std::optional& ops_uuid() const { + return _ops_uuid; + }; }; // The repair_tracker tracks ongoing repair operations and their progress. @@ -324,6 +338,7 @@ public: future<> run(repair_uniq_id id, std::function func); future repair_await_completion(int id, std::chrono::steady_clock::time_point timeout); float report_progress(streaming::stream_reason reason); + void abort_repair_node_ops(utils::UUID ops_uuid); }; future estimate_partitions(seastar::sharded& db, const sstring& keyspace, @@ -464,6 +479,27 @@ enum class row_level_diff_detect_algorithm : uint8_t { std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo); +enum class node_ops_cmd : uint32_t { + removenode_prepare, + removenode_heartbeat, + removenode_sync_data, + removenode_abort, + removenode_done, +}; + +// The cmd and ops_uuid are mandatory for each request. +// The ignore_nodes and leaving_node are optional. +struct node_ops_cmd_request { + node_ops_cmd cmd; + utils::UUID ops_uuid; + std::list ignore_nodes; + std::list leaving_nodes; +}; + +struct node_ops_cmd_response { + bool ok; +}; + namespace std { template<> struct hash { diff --git a/service/storage_service.cc b/service/storage_service.cc index 0b3cbe00d9..b46707f1ac 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -107,6 +107,7 @@ storage_service::storage_service(abort_source& abort_source, distributed storage_service::gossip_sharder() { future<> storage_service::stop() { // make sure nobody uses the semaphore + node_ops_singal_abort(std::nullopt); return _service_memory_limiter.wait(_service_memory_total).finally([this] { _listeners.clear(); return _schema_version_publisher.join(); + }).finally([this] { + return std::move(_node_ops_abort_thread); }); } @@ -2163,102 +2167,192 @@ future<> storage_service::decommission() { }); } -future<> storage_service::removenode(sstring host_id_string) { - return run_with_api_lock(sstring("removenode"), [host_id_string] (storage_service& ss) mutable { - return seastar::async([&ss, host_id_string] { - slogger.debug("removenode: host_id = {}", host_id_string); - auto my_address = ss.get_broadcast_address(); - auto tmlock = std::make_unique(ss.get_token_metadata_lock().get0()); - auto tmptr = ss.get_mutable_token_metadata_ptr().get0(); - auto local_host_id = tmptr->get_host_id(my_address); +future<> storage_service::removenode(sstring host_id_string, std::list ignore_nodes) { + return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable { + return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] { + auto uuid = utils::make_random_uuid(); + auto tmptr = ss.get_token_metadata_ptr(); auto host_id = utils::UUID(host_id_string); auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id); if (!endpoint_opt) { - throw std::runtime_error("Host ID not found."); + throw std::runtime_error(format("removenode[{}]: Host ID not found in the cluster", uuid)); } auto endpoint = *endpoint_opt; - auto tokens = tmptr->get_tokens(endpoint); + auto leaving_nodes = std::list{endpoint}; - slogger.debug("removenode: endpoint = {}", endpoint); + future<> heartbeat_updater = make_ready_future<>(); + auto heartbeat_updater_done = make_lw_shared(false); - if (endpoint == my_address) { - throw std::runtime_error("Cannot remove self"); + // Step 1: Decide who needs to sync data + // + // By default, we require all nodes in the cluster to participate + // the removenode operation and sync data if needed. We fail the + // removenode operation if any of them is down or fails. + // + // If the user want the removenode opeartion to succeed even if some of the nodes + // are not available, the user has to explicitly pass a list of + // node that can be skipped for the operation. + std::vector nodes; + for (const auto& x : tmptr->get_endpoint_to_host_id_map_for_reading()) { + seastar::thread::maybe_yield(); + if (x.first != endpoint && std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) { + nodes.push_back(x.first); + } } + slogger.info("removenode[{}]: Started removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes); - if (ss._gossiper.get_live_members().contains(endpoint)) { - throw std::runtime_error(format("Node {} is alive and owns this ID. Use decommission command to remove it from the ring", endpoint)); - } - - // A leaving endpoint that is dead is already being removed. - if (tmptr->is_leaving(endpoint)) { - slogger.warn("Node {} is already being removed, continuing removal anyway", endpoint); - } - - if (!ss._replicating_nodes.empty()) { - throw std::runtime_error("This node is already processing a removal. Wait for it to complete, or use 'removenode force' if this has failed."); - } - - auto non_system_keyspaces = ss.db().local().get_non_system_keyspaces(); - // Find the endpoints that are going to become responsible for data - for (const auto& keyspace_name : non_system_keyspaces) { - auto& ks = ss.db().local().find_keyspace(keyspace_name); - // if the replication factor is 1 the data is lost so we shouldn't wait for confirmation - if (ks.get_replication_strategy().get_replication_factor() == 1) { - slogger.warn("keyspace={} has replication factor 1, the data is probably lost", keyspace_name); - continue; + // Step 2: Prepare to sync data + std::unordered_set nodes_unknown_verb; + std::unordered_set nodes_down; + auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes}; + try { + parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { + return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("removenode[{}]: Got prepare response from node={}", uuid, node); + }).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) { + slogger.warn("removenode[{}]: Node {} does not support removenode verb", uuid, node); + nodes_unknown_verb.emplace(node); + }).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) { + slogger.warn("removenode[{}]: Node {} is down for node_ops_cmd verb", uuid, node); + nodes_down.emplace(node); + }); + }).get(); + if (!nodes_unknown_verb.empty()) { + auto msg = format("removenode[{}]: Nodes={} do not support removenode verb. Please upgrade your cluster and run removenode again.", uuid, nodes_unknown_verb); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + if (!nodes_down.empty()) { + auto msg = format("removenode[{}]: Nodes={} needed for removenode operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, run nodetool removenode --ignore-dead-nodes . E.g., nodetool removenode --ignore-dead-nodes 127.0.0.1,127.0.0.2 817e9515-316f-4fe3-aaab-b00d6f12dddd", uuid, nodes_down); + slogger.warn("{}", msg); + throw std::runtime_error(msg); } - // get all ranges that change ownership (that is, a node needs - // to take responsibility for new range) - std::unordered_multimap changed_ranges = - ss.get_changed_ranges_for_leaving(keyspace_name, endpoint); - for (auto& x: changed_ranges) { - auto ep = x.second; - if (ss._gossiper.is_alive(ep)) { - ss._replicating_nodes.emplace(ep); - } else { - slogger.warn("Endpoint {} is down and will not receive data for re-replication of {}", ep, endpoint); + // Step 3: Start heartbeat updater + heartbeat_updater = seastar::async([&ss, &nodes, uuid, heartbeat_updater_done] { + slogger.debug("removenode[{}]: Started heartbeat_updater", uuid); + while (!(*heartbeat_updater_done)) { + auto req = node_ops_cmd_request{node_ops_cmd::removenode_heartbeat, uuid, {}, {}}; + parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) { + return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("removenode[{}]: Got heartbeat response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).handle_exception([uuid] (std::exception_ptr ep) { + slogger.warn("removenode[{}]: Failed to send heartbeat", uuid); + }).get(); + int nr_seconds = 10; + while (!(*heartbeat_updater_done) && nr_seconds--) { + sleep_abortable(std::chrono::seconds(1), ss._abort_source).get(); + } } + slogger.debug("removenode[{}]: Stopped heartbeat_updater", uuid); + }); + auto stop_heartbeat_updater = defer([&] { + *heartbeat_updater_done = true; + heartbeat_updater.get(); + }); + + // Step 4: Start to sync data + req.cmd = node_ops_cmd::removenode_sync_data; + parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) { + return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("removenode[{}]: Got sync_data response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + + + // Step 5: Announce the node has left + std::unordered_set tmp(tokens.begin(), tokens.end()); + ss.excise(std::move(tmp), endpoint); + ss._gossiper.advertise_token_removed(endpoint, host_id).get(); + + // Step 6: Finish + req.cmd = node_ops_cmd::removenode_done; + parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) { + return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("removenode[{}]: Got done response from node={}", uuid, node); + return make_ready_future<>(); + }); + }).get(); + slogger.info("removenode[{}]: Finished removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes); + } catch (...) { + // we need to revert the effect of prepare verb the removenode ops is failed + req.cmd = node_ops_cmd::removenode_abort; + parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) { + if (nodes_unknown_verb.contains(node) || nodes_down.contains(node)) { + // No need to revert previous prepare cmd for those who do not apply prepare cmd. + return make_ready_future<>(); + } + return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + slogger.debug("removenode[{}]: Got abort response from node={}", uuid, node); + }); + }).get(); + slogger.info("removenode[{}]: Aborted removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes); + throw; + } + }); + }); +} + +future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { + return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable { + return seastar::async([&ss, coordinator, req = std::move(req)] () mutable { + auto ops_uuid = req.ops_uuid; + slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid); + if (req.cmd == node_ops_cmd::removenode_prepare) { + if (req.leaving_nodes.size() > 1) { + auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); + slogger.warn("{}", msg); + throw std::runtime_error(msg); } + ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& node : req.leaving_nodes) { + slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); + tmptr->add_leaving_endpoint(node); + } + return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { + return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& node : req.leaving_nodes) { + slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); + tmptr->del_leaving_endpoint(node); + } + return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); + }); + }, + [&ss, ops_uuid] () mutable { ss.node_ops_singal_abort(ops_uuid); }); + ss._node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { + slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::removenode_done) { + slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + ss.node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::removenode_sync_data) { + auto it = ss._node_ops.find(ops_uuid); + if (it == ss._node_ops.end()) { + throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid)); + } + auto ops = it->second.get_ops_info(); + for (auto& node : req.leaving_nodes) { + slogger.info("removenode[{}]: Started to sync data for removing node={}, coordinator={}", req.ops_uuid, node, coordinator); + removenode_with_repair(ss._db, ss._messaging, ss.get_token_metadata_ptr(), node, ops).get(); + } + } else if (req.cmd == node_ops_cmd::removenode_abort) { + ss.node_ops_abort(ops_uuid); + } else { + auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd)); + slogger.warn("{}", msg); + throw std::runtime_error(msg); } - slogger.info("removenode: endpoint = {}, replicating_nodes = {}", endpoint, ss._replicating_nodes); - ss._removing_node = endpoint; - tmptr->add_leaving_endpoint(endpoint); - ss.update_pending_ranges(tmptr, format("removenode {}", endpoint)).get(); - ss.replicate_to_all_cores(std::move(tmptr)).get(); - tmlock.reset(); - - // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us - // we add our own token so other nodes to let us know when they're done - ss._gossiper.advertise_removing(endpoint, host_id, local_host_id).get(); - - // kick off streaming commands - // No need to wait for restore_replica_count to complete, since - // when it completes, the node will be removed from _replicating_nodes, - // and we wait for _replicating_nodes to become empty below - //FIXME: discarded future. - (void)ss.restore_replica_count(endpoint, my_address).handle_exception([endpoint, my_address] (auto ep) { - slogger.info("Failed to restore_replica_count for node {} on node {}", endpoint, my_address); - }); - - // wait for ReplicationFinishedVerbHandler to signal we're done - while (!(ss._replicating_nodes.empty() || ss._force_remove_completion)) { - sleep_abortable(std::chrono::milliseconds(100), ss._abort_source).get(); - } - - if (ss._force_remove_completion) { - throw std::runtime_error("nodetool removenode force is called by user"); - } - - std::unordered_set tmp(tokens.begin(), tokens.end()); - ss.excise(std::move(tmp), endpoint); - - // gossiper will indicate the token has left - ss._gossiper.advertise_token_removed(endpoint, host_id).get(); - - ss._replicating_nodes.clear(); - ss._removing_node = std::nullopt; + node_ops_cmd_response resp; + resp.ok = true; + return resp; }); }); } @@ -2499,7 +2593,9 @@ void storage_service::unbootstrap() { future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { if (is_repair_based_node_ops_enabled()) { - return removenode_with_repair(_db, _messaging, get_token_metadata_ptr(), endpoint).finally([this, notify_endpoint] () { + auto ops_uuid = utils::make_random_uuid(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::list()}); + return removenode_with_repair(_db, _messaging, get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () { return send_replication_notification(notify_endpoint); }); } @@ -3214,5 +3310,111 @@ bool storage_service::is_repair_based_node_ops_enabled() { return _db.local().get_config().enable_repair_based_node_ops(); } +node_ops_meta_data::node_ops_meta_data( + utils::UUID ops_uuid, + gms::inet_address coordinator, + shared_ptr ops, + std::function ()> abort_func, + std::function signal_func) + : _ops_uuid(std::move(ops_uuid)) + , _coordinator(std::move(coordinator)) + , _abort(std::move(abort_func)) + , _signal(std::move(signal_func)) + , _ops(std::move(ops)) + , _watchdog([sig = _signal] { sig(); }) { + _watchdog.arm(_watchdog_interval); +} + +future<> node_ops_meta_data::abort() { + slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid); + _aborted = true; + if (_ops) { + _ops->abort = true; + } + _watchdog.cancel(); + return _abort(); +} + +void node_ops_meta_data::update_watchdog() { + slogger.debug("node_ops_meta_data: ops_uuid={} update_watchdog", _ops_uuid); + if (_aborted) { + return; + } + _watchdog.cancel(); + _watchdog.arm(_watchdog_interval); +} + +void node_ops_meta_data::cancel_watchdog() { + slogger.debug("node_ops_meta_data: ops_uuid={} cancel_watchdog", _ops_uuid); + _watchdog.cancel(); +} + +shared_ptr node_ops_meta_data::get_ops_info() { + return _ops; +} + +void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) { + slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid); + auto permit = seastar::get_units(_node_ops_abort_sem, 1); + auto it = _node_ops.find(ops_uuid); + if (it != _node_ops.end()) { + node_ops_meta_data& meta = it->second; + meta.update_watchdog(); + } +} + +void storage_service::node_ops_done(utils::UUID ops_uuid) { + slogger.debug("node_ops_done: ops_uuid={}", ops_uuid); + auto permit = seastar::get_units(_node_ops_abort_sem, 1); + auto it = _node_ops.find(ops_uuid); + if (it != _node_ops.end()) { + node_ops_meta_data& meta = it->second; + meta.cancel_watchdog(); + _node_ops.erase(it); + } +} + +void storage_service::node_ops_abort(utils::UUID ops_uuid) { + slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid); + auto permit = seastar::get_units(_node_ops_abort_sem, 1); + auto it = _node_ops.find(ops_uuid); + if (it != _node_ops.end()) { + node_ops_meta_data& meta = it->second; + meta.abort().get(); + abort_repair_node_ops(ops_uuid).get(); + _node_ops.erase(it); + } +} + +void storage_service::node_ops_singal_abort(std::optional ops_uuid) { + slogger.debug("node_ops_singal_abort: ops_uuid={}", ops_uuid); + _node_ops_abort_queue.push_back(ops_uuid); + _node_ops_abort_cond.signal(); +} + +future<> storage_service::node_ops_abort_thread() { + return seastar::async([this] { + slogger.info("Started node_ops_abort_thread"); + for (;;) { + _node_ops_abort_cond.wait([this] { return !_node_ops_abort_queue.empty(); }).get(); + slogger.debug("Awoke node_ops_abort_thread: node_ops_abort_queue={}", _node_ops_abort_queue); + while (!_node_ops_abort_queue.empty()) { + auto uuid_opt = _node_ops_abort_queue.front(); + _node_ops_abort_queue.pop_front(); + if (!uuid_opt) { + return; + } + try { + storage_service::node_ops_abort(*uuid_opt); + } catch (...) { + slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception()); + } + } + } + slogger.info("Stopped node_ops_abort_thread"); + }); +} + + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 2c609c89f5..717b055a26 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -63,6 +63,12 @@ #include #include "sstables/version.hh" #include "cdc/metadata.hh" +#include +#include + +class node_ops_cmd_request; +class node_ops_cmd_response; +class node_ops_info; namespace cql_transport { class controller; } @@ -103,6 +109,28 @@ struct storage_service_config { size_t available_memory; }; +class node_ops_meta_data { + utils::UUID _ops_uuid; + gms::inet_address _coordinator; + std::function ()> _abort; + std::function _signal; + shared_ptr _ops; + seastar::timer _watchdog; + std::chrono::seconds _watchdog_interval{30}; + bool _aborted = false; +public: + explicit node_ops_meta_data( + utils::UUID ops_uuid, + gms::inet_address coordinator, + shared_ptr ops, + std::function ()> abort_func, + std::function signal_func); + shared_ptr get_ops_info(); + future<> abort(); + void update_watchdog(); + void cancel_watchdog(); +}; + /** * This abstraction contains the token/identifier of this node * on the identifier space. This token gets gossiped around. @@ -158,6 +186,17 @@ private: * and would only slow down tests (by having them wait). */ bool _for_testing; + + std::unordered_map _node_ops; + std::list> _node_ops_abort_queue; + seastar::condition_variable _node_ops_abort_cond; + named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}}; + future<> _node_ops_abort_thread; + void node_ops_update_heartbeat(utils::UUID ops_uuid); + void node_ops_done(utils::UUID ops_uuid); + void node_ops_abort(utils::UUID ops_uuid); + void node_ops_singal_abort(std::optional ops_uuid); + future<> node_ops_abort_thread(); public: storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, sharded&, sharded&, gms::feature_service& feature_service, storage_service_config config, sharded& mn, locator::shared_token_metadata& stm, sharded& ms, /* only for tests */ bool for_testing = false); @@ -767,7 +806,8 @@ public: * * @param hostIdString token for the node */ - future<> removenode(sstring host_id_string); + future<> removenode(sstring host_id_string, std::list ignore_nodes); + future node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req); future get_operation_mode();