diff --git a/alternator/executor.cc b/alternator/executor.cc index 2c6293fa13..d37fcddcf3 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -828,17 +828,6 @@ future executor::list_tags_of_resource(client_sta return make_ready_future(make_jsonable(std::move(ret))); } -static future<> wait_for_schema_agreement(service::migration_manager& mm, db::timeout_clock::time_point deadline) { - return do_until([&mm, deadline] { - if (db::timeout_clock::now() > deadline) { - throw std::runtime_error("Unable to reach schema agreement"); - } - return mm.have_schema_agreement(); - }, [] { - return seastar::sleep(500ms); - }); -} - static void verify_billing_mode(const rjson::value& request) { // Alternator does not yet support billing or throughput limitations, but // let's verify that BillingMode is at least legal. @@ -1084,7 +1073,7 @@ static future create_table_on_shard0(tracing::tra } co_await mm.announce(std::move(schema_mutations), std::move(group0_guard)); - co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); + co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr); rjson::value status = rjson::empty_object(); executor::supplement_table_info(request, *schema, sp); rjson::add(status, "TableDescription", std::move(request)); @@ -1151,7 +1140,7 @@ future executor::update_table(client_state& clien co_await mm.announce(std::move(m), std::move(group0_guard)); - co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); + co_await mm.wait_for_schema_agreement(p.local().local_db(), db::timeout_clock::now() + 10s, nullptr); rjson::value status = rjson::empty_object(); supplement_table_info(request, *schema, p.local()); diff --git a/api/api-doc/messaging_service.json b/api/api-doc/messaging_service.json index 442ebeef2c..9d6a39a7b9 100644 --- a/api/api-doc/messaging_service.json +++ b/api/api-doc/messaging_service.json @@ -245,7 +245,7 @@ "GOSSIP_SHUTDOWN", "DEFINITIONS_UPDATE", "TRUNCATE", - "REPLICATION_FINISHED", + "UNUSED__REPLICATION_FINISHED", "MIGRATION_REQUEST", "PREPARE_MESSAGE", "PREPARE_DONE_MESSAGE", diff --git a/auth/common.cc b/auth/common.cc index 00784aa503..7158b99b1f 100644 --- a/auth/common.cc +++ b/auth/common.cc @@ -84,20 +84,6 @@ future<> create_metadata_table_if_missing( return futurize_invoke(create_metadata_table_if_missing_impl, table_name, qp, cql, mm); } -future<> wait_for_schema_agreement(::service::migration_manager& mm, const replica::database& db, seastar::abort_source& as) { - static const auto pause = [] { return sleep(std::chrono::milliseconds(500)); }; - - return do_until([&db, &as] { - as.check(); - return db.get_version() != replica::database::empty_version; - }, pause).then([&mm, &as] { - return do_until([&mm, &as] { - as.check(); - return mm.have_schema_agreement(); - }, pause); - }); -} - ::service::query_state& internal_distributed_query_state() noexcept { #ifdef DEBUG // Give the much slower debug tests more headroom for completing auth queries. diff --git a/auth/common.hh b/auth/common.hh index 071112cc08..bdaa98d930 100644 --- a/auth/common.hh +++ b/auth/common.hh @@ -67,8 +67,6 @@ future<> create_metadata_table_if_missing( std::string_view cql, ::service::migration_manager&) noexcept; -future<> wait_for_schema_agreement(::service::migration_manager&, const replica::database&, seastar::abort_source&); - /// /// Time-outs for internal, non-local CQL queries. /// diff --git a/auth/default_authorizer.cc b/auth/default_authorizer.cc index 5216e14b68..6c71b1772b 100644 --- a/auth/default_authorizer.cc +++ b/auth/default_authorizer.cc @@ -129,7 +129,7 @@ future<> default_authorizer::start() { _migration_manager).then([this] { _finished = do_after_system_ready(_as, [this] { return async([this] { - wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); + _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0(); if (legacy_metadata_exists()) { if (!any_granted().get0()) { diff --git a/auth/password_authenticator.cc b/auth/password_authenticator.cc index b5ab045d2f..d6b66b5198 100644 --- a/auth/password_authenticator.cc +++ b/auth/password_authenticator.cc @@ -132,7 +132,7 @@ future<> password_authenticator::start() { _stopped = do_after_system_ready(_as, [this] { return async([this] { - wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); + _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0(); if (any_nondefault_role_row_satisfies(_qp, &has_salted_hash).get0()) { if (legacy_metadata_exists()) { diff --git a/auth/standard_role_manager.cc b/auth/standard_role_manager.cc index 3706be66e3..eaf4bf6e5f 100644 --- a/auth/standard_role_manager.cc +++ b/auth/standard_role_manager.cc @@ -28,6 +28,7 @@ #include "log.hh" #include "utils/class_registrator.hh" #include "replica/database.hh" +#include "service/migration_manager.hh" namespace auth { @@ -232,7 +233,7 @@ future<> standard_role_manager::start() { return this->create_metadata_tables_if_missing().then([this] { _stopped = auth::do_after_system_ready(_as, [this] { return seastar::async([this] { - wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0(); + _migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0(); if (any_nondefault_role_row_satisfies(_qp, &has_can_login).get0()) { if (this->legacy_metadata_exists()) { diff --git a/db/view/view.cc b/db/view/view.cc index 672447e60d..9ef0a72ba8 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1794,9 +1794,8 @@ future<> view_builder::start(service::migration_manager& mm) { // or `on_update_view` events. auto units = get_units(_sem, 1).get0(); // Wait for schema agreement even if we're a seed node. - while (!mm.have_schema_agreement()) { - seastar::sleep_abortable(500ms, _as).get(); - } + mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as).get(); + auto built = _sys_ks.load_built_views().get0(); auto in_progress = _sys_ks.load_view_build_progress().get0(); setup_shard_build_step(vbi, std::move(built), std::move(in_progress)); diff --git a/gms/gossiper.cc b/gms/gossiper.cc index ae25d586d4..1b60644fc9 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1159,30 +1159,6 @@ future<> gossiper::replicate(inet_address ep, application_state key, const versi }); } -future<> gossiper::advertise_removing(inet_address endpoint, locator::host_id host_id, locator::host_id local_host_id) { - auto& state = get_endpoint_state(endpoint); - // remember this node's generation - auto generation = state.get_heart_beat_state().get_generation(); - logger.info("Removing host: {}", host_id); - auto ring_delay = std::chrono::milliseconds(_gcfg.ring_delay_ms); - logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint); - co_await sleep_abortable(ring_delay, _abort_source); - // make sure it did not change - auto& eps = get_endpoint_state(endpoint); - if (eps.get_heart_beat_state().get_generation() != generation) { - throw std::runtime_error(format("Endpoint {} generation changed while trying to remove it", endpoint)); - } - - // update the other node's generation to mimic it as if it had changed it itself - logger.info("Advertising removal for {}", endpoint); - eps.update_timestamp(); // make sure we don't evict it too soon - eps.get_heart_beat_state().force_newer_generation_unsafe(); - eps.add_application_state(application_state::STATUS, versioned_value::removing_nonlocal(host_id)); - eps.add_application_state(application_state::REMOVAL_COORDINATOR, versioned_value::removal_coordinator(local_host_id)); - _endpoint_state_map[endpoint] = eps; - co_await replicate(endpoint, eps); -} - future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id) { auto& eps = get_endpoint_state(endpoint); eps.update_timestamp(); // make sure we don't evict it too soon diff --git a/gms/gossiper.hh b/gms/gossiper.hh index aa2c626ac5..86690b13cf 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -162,12 +162,10 @@ private: public: const std::vector DEAD_STATES = { - versioned_value::REMOVING_TOKEN, versioned_value::REMOVED_TOKEN, versioned_value::STATUS_LEFT, }; const std::vector SILENT_SHUTDOWN_STATES = { - versioned_value::REMOVING_TOKEN, versioned_value::REMOVED_TOKEN, versioned_value::STATUS_LEFT, versioned_value::HIBERNATE, @@ -324,16 +322,6 @@ private: void make_random_gossip_digest(utils::chunked_vector& g_digests); public: - /** - * This method will begin removing an existing endpoint from the cluster by spoofing its state - * This should never be called unless this coordinator has had 'removenode' invoked - * - * @param endpoint - the endpoint being removed - * @param host_id - the ID of the host being removed - * @param local_host_id - my own host ID for replication coordination - */ - future<> advertise_removing(inet_address endpoint, locator::host_id host_id, locator::host_id local_host_id); - /** * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN * This should only be called after advertise_removing diff --git a/gms/versioned_value.cc b/gms/versioned_value.cc index 23fec70b56..af270b6e09 100644 --- a/gms/versioned_value.cc +++ b/gms/versioned_value.cc @@ -27,7 +27,6 @@ constexpr const char* versioned_value::STATUS_NORMAL; constexpr const char* versioned_value::STATUS_LEAVING; constexpr const char* versioned_value::STATUS_LEFT; constexpr const char* versioned_value::STATUS_MOVING; -constexpr const char* versioned_value::REMOVING_TOKEN; constexpr const char* versioned_value::REMOVED_TOKEN; constexpr const char* versioned_value::HIBERNATE; constexpr const char* versioned_value::SHUTDOWN; diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index 2763d378b2..6b1253eab7 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -51,7 +51,6 @@ public: static constexpr const char* STATUS_LEFT = "LEFT"; static constexpr const char* STATUS_MOVING = "MOVING"; - static constexpr const char* REMOVING_TOKEN = "removing"; static constexpr const char* REMOVED_TOKEN = "removed"; static constexpr const char* HIBERNATE = "hibernate"; @@ -157,11 +156,6 @@ public: return versioned_value(make_cdc_generation_id_string(gen_id)); } - static versioned_value removing_nonlocal(const locator::host_id& host_id) { - return versioned_value(sstring(REMOVING_TOKEN) + - sstring(DELIMITER_STR) + host_id.to_sstring()); - } - static versioned_value removed_nonlocal(const locator::host_id& host_id, int64_t expire_time) { return versioned_value(sstring(REMOVED_TOKEN) + sstring(DELIMITER_STR) + host_id.to_sstring() + sstring(DELIMITER_STR) + to_sstring(expire_time)); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index a5576f2650..d432140811 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -523,7 +523,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::UNUSED__STREAM_MUTATION: case messaging_verb::STREAM_MUTATION_DONE: case messaging_verb::COMPLETE_MESSAGE: - case messaging_verb::REPLICATION_FINISHED: + case messaging_verb::UNUSED__REPLICATION_FINISHED: case messaging_verb::UNUSED__REPAIR_CHECKSUM_RANGE: case messaging_verb::STREAM_MUTATION_FRAGMENTS: case messaging_verb::REPAIR_ROW_LEVEL_START: @@ -1183,18 +1183,6 @@ future messaging_service::send_schema_check(msg_addr dst, return send_message_cancellable(this, netw::messaging_verb::SCHEMA_CHECK, dst, as); } -// Wrapper for REPLICATION_FINISHED -void messaging_service::register_replication_finished(std::function (inet_address)>&& func) { - register_handler(this, messaging_verb::REPLICATION_FINISHED, std::move(func)); -} -future<> messaging_service::unregister_replication_finished() { - return unregister_handler(messaging_verb::REPLICATION_FINISHED); -} -future<> messaging_service::send_replication_finished(msg_addr id, inet_address from) { - // FIXME: getRpcTimeout : conf.request_timeout_in_ms - return send_message_timeout(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from)); -} - // Wrapper for REPAIR_GET_FULL_ROW_HASHES void messaging_service::register_repair_get_full_row_hashes(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) { register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 207f1da6cc..0b6cf2f1f8 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -126,7 +126,7 @@ enum class messaging_verb : int32_t { // end of gossip verb DEFINITIONS_UPDATE = 11, TRUNCATE = 12, - REPLICATION_FINISHED = 13, + UNUSED__REPLICATION_FINISHED = 13, MIGRATION_REQUEST = 14, // Used by streaming PREPARE_MESSAGE = 15, @@ -494,11 +494,6 @@ public: future send_schema_check(msg_addr); future send_schema_check(msg_addr, abort_source&); - // Wrapper for REPLICATION_FINISHED verb - void register_replication_finished(std::function (inet_address from)>&& func); - future<> unregister_replication_finished(); - future<> send_replication_finished(msg_addr id, inet_address from); - void foreach_server_connection_stats(std::function&& f) const; private: template diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 5e54d2341f..62d6fe7dcd 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -239,6 +239,18 @@ bool migration_manager::have_schema_agreement() { return match; } +future<> migration_manager::wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as) { + while (db.get_version() == replica::database::empty_version || !have_schema_agreement()) { + if (as) { + as->check(); + } + if (db::timeout_clock::now() > deadline) { + throw std::runtime_error("Unable to reach schema agreement"); + } + co_await (as ? sleep_abortable(std::chrono::milliseconds(500), *as) : sleep(std::chrono::milliseconds(500))); + } +} + /** * If versions differ this node sends request with local migration list to the endpoint * and expecting to receive a list of migrations to apply locally. diff --git a/service/migration_manager.hh b/service/migration_manager.hh index b0798b50c9..5778873e54 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -172,6 +172,7 @@ public: * Known peers in the cluster have the same schema version as us. */ bool have_schema_agreement(); + future<> wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as); void init_messaging_service(); diff --git a/service/storage_service.cc b/service/storage_service.cc index 25024d4b86..0a8f249b76 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2425,12 +2425,9 @@ void storage_service::handle_state_moving(inet_address endpoint, std::vector storage_service::handle_state_removing(inet_address endpoint, std::vector pieces) { - slogger.debug("endpoint={} handle_state_removing", endpoint); - if (pieces.empty()) { - slogger.warn("Fail to handle_state_removing endpoint={} pieces={}", endpoint, pieces); - co_return; - } +future<> storage_service::handle_state_removed(inet_address endpoint, std::vector pieces) { + slogger.debug("endpoint={} handle_state_removed", endpoint); + if (endpoint == get_broadcast_address()) { slogger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?"); try { @@ -2444,55 +2441,10 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect if (get_token_metadata().is_normal_token_owner(endpoint)) { auto state = pieces[0]; auto remove_tokens = get_token_metadata().get_tokens(endpoint); - if (sstring(gms::versioned_value::REMOVED_TOKEN) == state) { - std::unordered_set tmp(remove_tokens.begin(), remove_tokens.end()); - co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces)); - } else if (sstring(gms::versioned_value::REMOVING_TOKEN) == state) { - co_await mutate_token_metadata([this, remove_tokens = std::move(remove_tokens), endpoint] (mutable_token_metadata_ptr tmptr) mutable { - slogger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint); - // Note that the endpoint is being removed - tmptr->add_leaving_endpoint(endpoint); - return update_topology_change_info(std::move(tmptr), ::format("handle_state_removing {}", endpoint)); - }); - // find the endpoint coordinating this removal that we need to notify when we're done - auto* value = _gossiper.get_application_state_ptr(endpoint, application_state::REMOVAL_COORDINATOR); - if (!value) { - auto err = ::format("Can not find application_state for endpoint={}", endpoint); - slogger.warn("{}", err); - throw std::runtime_error(err); - } - std::vector coordinator; - boost::split(coordinator, value->value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); - if (coordinator.size() != 2) { - auto err = ::format("Can not split REMOVAL_COORDINATOR for endpoint={}, value={}", endpoint, value->value()); - slogger.warn("{}", err); - throw std::runtime_error(err); - } - auto host_id = locator::host_id(utils::UUID(coordinator[1])); - // grab any data we are now responsible for and notify responsible node - auto ep = get_token_metadata().get_endpoint_for_host_id(host_id); - if (!ep) { - auto err = ::format("Can not find host_id={}", host_id); - slogger.warn("{}", err); - throw std::runtime_error(err); - } - // Kick off streaming commands. No need to wait for - // restore_replica_count to complete which can take a long time, - // since when it completes, this node will send notification to - // tell the removal_coordinator with IP address notify_endpoint - // that the restore process is finished on this node. - auto notify_endpoint = ep.value(); - // OK to discard future since _async_gate is closed on stop() - (void)with_gate(_async_gate, [this, endpoint, notify_endpoint] { - return restore_replica_count(endpoint, notify_endpoint).handle_exception([endpoint, notify_endpoint] (auto ep) { - slogger.warn("Failed to restore_replica_count for node {}, notify_endpoint={} : {}", endpoint, notify_endpoint, ep); - }); - }); - } + std::unordered_set tmp(remove_tokens.begin(), remove_tokens.end()); + co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces)); } else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it - if (sstring(gms::versioned_value::REMOVED_TOKEN) == pieces[0]) { - add_expire_time_if_found(endpoint, extract_expire_time(pieces)); - } + add_expire_time_if_found(endpoint, extract_expire_time(pieces)); co_await remove_endpoint(endpoint); } } @@ -2543,21 +2495,20 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, state, value); co_return; } - sstring move_name = pieces[0]; + const sstring& move_name = pieces[0]; if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) { co_await handle_state_bootstrap(endpoint); } else if (move_name == sstring(versioned_value::STATUS_NORMAL) || move_name == sstring(versioned_value::SHUTDOWN)) { co_await handle_state_normal(endpoint); - } else if (move_name == sstring(versioned_value::REMOVING_TOKEN) || - move_name == sstring(versioned_value::REMOVED_TOKEN)) { - co_await handle_state_removing(endpoint, pieces); + } else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) { + co_await handle_state_removed(endpoint, std::move(pieces)); } else if (move_name == sstring(versioned_value::STATUS_LEAVING)) { co_await handle_state_leaving(endpoint); } else if (move_name == sstring(versioned_value::STATUS_LEFT)) { - co_await handle_state_left(endpoint, pieces); + co_await handle_state_left(endpoint, std::move(pieces)); } else if (move_name == sstring(versioned_value::STATUS_MOVING)) { - handle_state_moving(endpoint, pieces); + handle_state_moving(endpoint, std::move(pieces)); } else if (move_name == sstring(versioned_value::HIBERNATE)) { slogger.warn("endpoint={} went into HIBERNATE state, this is no longer supported. Use a new version to perform the replace operation.", endpoint); } else { @@ -4641,83 +4592,6 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, }); } -future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { - _abort_source.check(); - // Allocate a shared abort_source for node_ops_info - auto sas = make_shared(); - auto sub = _abort_source.subscribe([sas] () noexcept { - if (!sas->abort_requested()) { - sas->request_abort(); - } - }); - if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { - auto ops_uuid = node_ops_id::create_random_id(); - auto ops = seastar::make_shared(ops_uuid, sas, std::list()); - auto f = co_await coroutine::as_future(_repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops)); - co_await send_replication_notification(notify_endpoint); - co_return co_await std::move(f); - } - - auto tmptr = get_token_metadata_ptr(); - auto& as = *sas; - auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Restore_replica_count", streaming::stream_reason::removenode); - removenode_add_ranges(streamer, endpoint).get(); - auto check_status_loop = [this, endpoint, &as] () -> future<> { - slogger.debug("restore_replica_count: Started status checker for removing node {}", endpoint); - while (!as.abort_requested()) { - auto status = _gossiper.get_gossip_status(endpoint); - // If the node to be removed is already in removed status, it has - // probably been removed forcely with `nodetool removenode force`. - // Abort the restore_replica_count in such case to avoid streaming - // attempt since the user has removed the node forcely. - if (status == sstring(versioned_value::REMOVED_TOKEN)) { - slogger.info("restore_replica_count: Detected node {} has left the cluster, status={}, abort restore_replica_count for removing node {}", - endpoint, status, endpoint); - if (!as.abort_requested()) { - as.request_abort(); - } - co_return; - } - slogger.debug("restore_replica_count: Sleep and detect removing node {}, status={}", endpoint, status); - co_await sleep_abortable(std::chrono::seconds(10), as); - } - }; - auto status_checker = check_status_loop(); - std::exception_ptr ex; - try { - co_await streamer->stream_async(); - } catch (...) { - ex = std::current_exception(); - slogger.debug("Streaming to restore replica count failed: {}.", ex); - // We still want to send the notification - } - try { - co_await this->send_replication_notification(notify_endpoint); - } catch (...) { - auto ex2 = std::current_exception(); - slogger.debug("Sending replication notification to {} failed: {}", notify_endpoint, ex2); - if (!ex) { - ex = std::move(ex2); - } - } - try { - slogger.debug("restore_replica_count: Started to stop status checker for removing node {}", endpoint); - if (!as.abort_requested()) { - as.request_abort(); - } - co_await std::move(status_checker); - } catch (const seastar::sleep_aborted& ignored) { - slogger.debug("restore_replica_count: Got sleep_abort to stop status checker for removing node {}: {}", endpoint, ignored); - } catch (...) { - slogger.warn("restore_replica_count: Found error in status checker for removing node {}: {}", - endpoint, std::current_exception()); - } - slogger.debug("restore_replica_count: Finished to stop status checker for removing node {}", endpoint); - if (ex) { - co_await coroutine::return_exception_ptr(std::move(ex)); - } -} - future<> storage_service::excise(std::unordered_set tokens, inet_address endpoint) { slogger.info("Removing tokens {} for {}", tokens, endpoint); // FIXME: HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint); @@ -4739,35 +4613,6 @@ future<> storage_service::excise(std::unordered_set tokens, inet_address return excise(tokens, endpoint); } -future<> storage_service::send_replication_notification(inet_address remote) { - // notify the remote token - auto done = make_shared(false); - auto local = get_broadcast_address(); - auto sent = make_lw_shared(0); - slogger.debug("Notifying {} of replication completion", remote); - return do_until( - [this, done, sent, remote] { - // The node can send REPLICATION_FINISHED to itself, in which case - // is_alive will be true. If the messaging_service is stopped, - // REPLICATION_FINISHED can be sent infinitely here. To fix, limit - // the number of retries. - return *done || !_gossiper.is_alive(remote) || *sent >= 3; - }, - [this, done, sent, remote, local] { - netw::msg_addr id{remote, 0}; - (*sent)++; - return _messaging.local().send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) { - try { - f.get(); - *done = true; - } catch (...) { - slogger.warn("Fail to send REPLICATION_FINISHED to {}: {}", id, std::current_exception()); - } - }); - } - ); -} - future<> storage_service::leave_ring() { co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP); co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) { @@ -5183,11 +5028,6 @@ future storage_service::raft_topology_cmd_handler(shar } void storage_service::init_messaging_service(sharded& proxy, sharded& sys_dist_ks) { - _messaging.local().register_replication_finished([] (gms::inet_address from) { - slogger.info("Got confirm_replication from {}", from); - return make_ready_future<>(); - }); - _messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); return container().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable { @@ -5263,7 +5103,6 @@ void storage_service::init_messaging_service(sharded& pr future<> storage_service::uninit_messaging_service() { return when_all_succeed( - _messaging.local().unregister_replication_finished(), _messaging.local().unregister_node_ops_cmd(), ser::storage_service_rpc_verbs::unregister(&_messaging.local()) ).discard_result(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 700c6c97e1..3894bb90e4 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -533,9 +533,9 @@ private: * Handle notification that a node being actively removed from the ring via 'removenode' * * @param endpoint node - * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored) + * @param pieces is REMOVED_TOKEN (node is gone) */ - future<> handle_state_removing(inet_address endpoint, std::vector pieces); + future<> handle_state_removed(inet_address endpoint, std::vector pieces); future<> handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node); @@ -562,24 +562,6 @@ private: */ future> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; - /** - * Sends a notification to a node indicating we have finished replicating data. - * - * @param remote node to send notification to - */ - future<> send_replication_notification(inet_address remote); - - /** - * Called when an endpoint is removed from the ring. This function checks - * whether this node becomes responsible for new ranges as a - * consequence and streams data if needed. - * - * This is rather ineffective, but it does not matter so much - * since this is called very seldom - * - * @param endpoint the node that left - */ - future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint); future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr as_ptr); future<> removenode_add_ranges(lw_shared_ptr streamer, gms::inet_address leaving_node);