From 05aa07835d3c7079ea846ba38d6c04441b2b90db Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 25 May 2023 10:24:14 +0300 Subject: [PATCH] storage_service: delete code that handled REMOVING_TOKENS state The state is never advertised so the code is never used. --- api/api-doc/messaging_service.json | 2 +- gms/gossiper.hh | 2 - gms/versioned_value.cc | 1 - gms/versioned_value.hh | 1 - message/messaging_service.cc | 14 +-- message/messaging_service.hh | 7 +- service/storage_service.cc | 156 +---------------------------- service/storage_service.hh | 18 ---- 8 files changed, 4 insertions(+), 197 deletions(-) diff --git a/api/api-doc/messaging_service.json b/api/api-doc/messaging_service.json index 442ebeef2c..9d6a39a7b9 100644 --- a/api/api-doc/messaging_service.json +++ b/api/api-doc/messaging_service.json @@ -245,7 +245,7 @@ "GOSSIP_SHUTDOWN", "DEFINITIONS_UPDATE", "TRUNCATE", - "REPLICATION_FINISHED", + "UNUSED__REPLICATION_FINISHED", "MIGRATION_REQUEST", "PREPARE_MESSAGE", "PREPARE_DONE_MESSAGE", diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 050ee6e974..86690b13cf 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -162,12 +162,10 @@ private: public: const std::vector DEAD_STATES = { - versioned_value::REMOVING_TOKEN, versioned_value::REMOVED_TOKEN, versioned_value::STATUS_LEFT, }; const std::vector SILENT_SHUTDOWN_STATES = { - versioned_value::REMOVING_TOKEN, versioned_value::REMOVED_TOKEN, versioned_value::STATUS_LEFT, versioned_value::HIBERNATE, diff --git a/gms/versioned_value.cc b/gms/versioned_value.cc index 23fec70b56..af270b6e09 100644 --- a/gms/versioned_value.cc +++ b/gms/versioned_value.cc @@ -27,7 +27,6 @@ constexpr const char* versioned_value::STATUS_NORMAL; constexpr const char* versioned_value::STATUS_LEAVING; constexpr const char* versioned_value::STATUS_LEFT; constexpr const char* versioned_value::STATUS_MOVING; -constexpr const char* versioned_value::REMOVING_TOKEN; constexpr const char* versioned_value::REMOVED_TOKEN; constexpr const char* versioned_value::HIBERNATE; constexpr const char* versioned_value::SHUTDOWN; diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index f14e258c16..6b1253eab7 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -51,7 +51,6 @@ public: static constexpr const char* STATUS_LEFT = "LEFT"; static constexpr const char* STATUS_MOVING = "MOVING"; - static constexpr const char* REMOVING_TOKEN = "removing"; static constexpr const char* REMOVED_TOKEN = "removed"; static constexpr const char* HIBERNATE = "hibernate"; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index a5576f2650..d432140811 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -523,7 +523,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::UNUSED__STREAM_MUTATION: case messaging_verb::STREAM_MUTATION_DONE: case messaging_verb::COMPLETE_MESSAGE: - case messaging_verb::REPLICATION_FINISHED: + case messaging_verb::UNUSED__REPLICATION_FINISHED: case messaging_verb::UNUSED__REPAIR_CHECKSUM_RANGE: case messaging_verb::STREAM_MUTATION_FRAGMENTS: case messaging_verb::REPAIR_ROW_LEVEL_START: @@ -1183,18 +1183,6 @@ future messaging_service::send_schema_check(msg_addr dst, return send_message_cancellable(this, netw::messaging_verb::SCHEMA_CHECK, dst, as); } -// Wrapper for REPLICATION_FINISHED -void messaging_service::register_replication_finished(std::function (inet_address)>&& func) { - register_handler(this, messaging_verb::REPLICATION_FINISHED, std::move(func)); -} -future<> messaging_service::unregister_replication_finished() { - return unregister_handler(messaging_verb::REPLICATION_FINISHED); -} -future<> messaging_service::send_replication_finished(msg_addr id, inet_address from) { - // FIXME: getRpcTimeout : conf.request_timeout_in_ms - return send_message_timeout(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from)); -} - // Wrapper for REPAIR_GET_FULL_ROW_HASHES void messaging_service::register_repair_get_full_row_hashes(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) { register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 32d92f927d..fd6ad3b904 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -128,7 +128,7 @@ enum class messaging_verb : int32_t { // end of gossip verb DEFINITIONS_UPDATE = 11, TRUNCATE = 12, - REPLICATION_FINISHED = 13, + UNUSED__REPLICATION_FINISHED = 13, MIGRATION_REQUEST = 14, // Used by streaming PREPARE_MESSAGE = 15, @@ -496,11 +496,6 @@ public: future send_schema_check(msg_addr); future send_schema_check(msg_addr, abort_source&); - // Wrapper for REPLICATION_FINISHED verb - void register_replication_finished(std::function (inet_address from)>&& func); - future<> unregister_replication_finished(); - future<> send_replication_finished(msg_addr id, inet_address from); - void foreach_server_connection_stats(std::function&& f) const; private: template diff --git a/service/storage_service.cc b/service/storage_service.cc index 25024d4b86..12f17179c9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2447,47 +2447,6 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect if (sstring(gms::versioned_value::REMOVED_TOKEN) == state) { std::unordered_set tmp(remove_tokens.begin(), remove_tokens.end()); co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces)); - } else if (sstring(gms::versioned_value::REMOVING_TOKEN) == state) { - co_await mutate_token_metadata([this, remove_tokens = std::move(remove_tokens), endpoint] (mutable_token_metadata_ptr tmptr) mutable { - slogger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint); - // Note that the endpoint is being removed - tmptr->add_leaving_endpoint(endpoint); - return update_topology_change_info(std::move(tmptr), ::format("handle_state_removing {}", endpoint)); - }); - // find the endpoint coordinating this removal that we need to notify when we're done - auto* value = _gossiper.get_application_state_ptr(endpoint, application_state::REMOVAL_COORDINATOR); - if (!value) { - auto err = ::format("Can not find application_state for endpoint={}", endpoint); - slogger.warn("{}", err); - throw std::runtime_error(err); - } - std::vector coordinator; - boost::split(coordinator, value->value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); - if (coordinator.size() != 2) { - auto err = ::format("Can not split REMOVAL_COORDINATOR for endpoint={}, value={}", endpoint, value->value()); - slogger.warn("{}", err); - throw std::runtime_error(err); - } - auto host_id = locator::host_id(utils::UUID(coordinator[1])); - // grab any data we are now responsible for and notify responsible node - auto ep = get_token_metadata().get_endpoint_for_host_id(host_id); - if (!ep) { - auto err = ::format("Can not find host_id={}", host_id); - slogger.warn("{}", err); - throw std::runtime_error(err); - } - // Kick off streaming commands. No need to wait for - // restore_replica_count to complete which can take a long time, - // since when it completes, this node will send notification to - // tell the removal_coordinator with IP address notify_endpoint - // that the restore process is finished on this node. - auto notify_endpoint = ep.value(); - // OK to discard future since _async_gate is closed on stop() - (void)with_gate(_async_gate, [this, endpoint, notify_endpoint] { - return restore_replica_count(endpoint, notify_endpoint).handle_exception([endpoint, notify_endpoint] (auto ep) { - slogger.warn("Failed to restore_replica_count for node {}, notify_endpoint={} : {}", endpoint, notify_endpoint, ep); - }); - }); } } else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it if (sstring(gms::versioned_value::REMOVED_TOKEN) == pieces[0]) { @@ -2549,8 +2508,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta } else if (move_name == sstring(versioned_value::STATUS_NORMAL) || move_name == sstring(versioned_value::SHUTDOWN)) { co_await handle_state_normal(endpoint); - } else if (move_name == sstring(versioned_value::REMOVING_TOKEN) || - move_name == sstring(versioned_value::REMOVED_TOKEN)) { + } else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) { co_await handle_state_removing(endpoint, pieces); } else if (move_name == sstring(versioned_value::STATUS_LEAVING)) { co_await handle_state_leaving(endpoint); @@ -4641,83 +4599,6 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, }); } -future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { - _abort_source.check(); - // Allocate a shared abort_source for node_ops_info - auto sas = make_shared(); - auto sub = _abort_source.subscribe([sas] () noexcept { - if (!sas->abort_requested()) { - sas->request_abort(); - } - }); - if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { - auto ops_uuid = node_ops_id::create_random_id(); - auto ops = seastar::make_shared(ops_uuid, sas, std::list()); - auto f = co_await coroutine::as_future(_repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops)); - co_await send_replication_notification(notify_endpoint); - co_return co_await std::move(f); - } - - auto tmptr = get_token_metadata_ptr(); - auto& as = *sas; - auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Restore_replica_count", streaming::stream_reason::removenode); - removenode_add_ranges(streamer, endpoint).get(); - auto check_status_loop = [this, endpoint, &as] () -> future<> { - slogger.debug("restore_replica_count: Started status checker for removing node {}", endpoint); - while (!as.abort_requested()) { - auto status = _gossiper.get_gossip_status(endpoint); - // If the node to be removed is already in removed status, it has - // probably been removed forcely with `nodetool removenode force`. - // Abort the restore_replica_count in such case to avoid streaming - // attempt since the user has removed the node forcely. - if (status == sstring(versioned_value::REMOVED_TOKEN)) { - slogger.info("restore_replica_count: Detected node {} has left the cluster, status={}, abort restore_replica_count for removing node {}", - endpoint, status, endpoint); - if (!as.abort_requested()) { - as.request_abort(); - } - co_return; - } - slogger.debug("restore_replica_count: Sleep and detect removing node {}, status={}", endpoint, status); - co_await sleep_abortable(std::chrono::seconds(10), as); - } - }; - auto status_checker = check_status_loop(); - std::exception_ptr ex; - try { - co_await streamer->stream_async(); - } catch (...) { - ex = std::current_exception(); - slogger.debug("Streaming to restore replica count failed: {}.", ex); - // We still want to send the notification - } - try { - co_await this->send_replication_notification(notify_endpoint); - } catch (...) { - auto ex2 = std::current_exception(); - slogger.debug("Sending replication notification to {} failed: {}", notify_endpoint, ex2); - if (!ex) { - ex = std::move(ex2); - } - } - try { - slogger.debug("restore_replica_count: Started to stop status checker for removing node {}", endpoint); - if (!as.abort_requested()) { - as.request_abort(); - } - co_await std::move(status_checker); - } catch (const seastar::sleep_aborted& ignored) { - slogger.debug("restore_replica_count: Got sleep_abort to stop status checker for removing node {}: {}", endpoint, ignored); - } catch (...) { - slogger.warn("restore_replica_count: Found error in status checker for removing node {}: {}", - endpoint, std::current_exception()); - } - slogger.debug("restore_replica_count: Finished to stop status checker for removing node {}", endpoint); - if (ex) { - co_await coroutine::return_exception_ptr(std::move(ex)); - } -} - future<> storage_service::excise(std::unordered_set tokens, inet_address endpoint) { slogger.info("Removing tokens {} for {}", tokens, endpoint); // FIXME: HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint); @@ -4739,35 +4620,6 @@ future<> storage_service::excise(std::unordered_set tokens, inet_address return excise(tokens, endpoint); } -future<> storage_service::send_replication_notification(inet_address remote) { - // notify the remote token - auto done = make_shared(false); - auto local = get_broadcast_address(); - auto sent = make_lw_shared(0); - slogger.debug("Notifying {} of replication completion", remote); - return do_until( - [this, done, sent, remote] { - // The node can send REPLICATION_FINISHED to itself, in which case - // is_alive will be true. If the messaging_service is stopped, - // REPLICATION_FINISHED can be sent infinitely here. To fix, limit - // the number of retries. - return *done || !_gossiper.is_alive(remote) || *sent >= 3; - }, - [this, done, sent, remote, local] { - netw::msg_addr id{remote, 0}; - (*sent)++; - return _messaging.local().send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) { - try { - f.get(); - *done = true; - } catch (...) { - slogger.warn("Fail to send REPLICATION_FINISHED to {}: {}", id, std::current_exception()); - } - }); - } - ); -} - future<> storage_service::leave_ring() { co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP); co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) { @@ -5183,11 +5035,6 @@ future storage_service::raft_topology_cmd_handler(shar } void storage_service::init_messaging_service(sharded& proxy, sharded& sys_dist_ks) { - _messaging.local().register_replication_finished([] (gms::inet_address from) { - slogger.info("Got confirm_replication from {}", from); - return make_ready_future<>(); - }); - _messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); return container().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable { @@ -5263,7 +5110,6 @@ void storage_service::init_messaging_service(sharded& pr future<> storage_service::uninit_messaging_service() { return when_all_succeed( - _messaging.local().unregister_replication_finished(), _messaging.local().unregister_node_ops_cmd(), ser::storage_service_rpc_verbs::unregister(&_messaging.local()) ).discard_result(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 700c6c97e1..70af06db0a 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -562,24 +562,6 @@ private: */ future> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; - /** - * Sends a notification to a node indicating we have finished replicating data. - * - * @param remote node to send notification to - */ - future<> send_replication_notification(inet_address remote); - - /** - * Called when an endpoint is removed from the ring. This function checks - * whether this node becomes responsible for new ranges as a - * consequence and streams data if needed. - * - * This is rather ineffective, but it does not matter so much - * since this is called very seldom - * - * @param endpoint the node that left - */ - future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint); future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr); future<> removenode_add_ranges(lw_shared_ptr streamer, gms::inet_address leaving_node);