storage_service: delete code that handled REMOVING_TOKENS state

The state is never advertised so the code is never used.
This commit is contained in:
Gleb Natapov
2023-05-25 10:24:14 +03:00
parent 66ff072540
commit 05aa07835d
8 changed files with 4 additions and 197 deletions

View File

@@ -245,7 +245,7 @@
"GOSSIP_SHUTDOWN",
"DEFINITIONS_UPDATE",
"TRUNCATE",
"REPLICATION_FINISHED",
"UNUSED__REPLICATION_FINISHED",
"MIGRATION_REQUEST",
"PREPARE_MESSAGE",
"PREPARE_DONE_MESSAGE",

View File

@@ -162,12 +162,10 @@ private:
public:
const std::vector<sstring> DEAD_STATES = {
versioned_value::REMOVING_TOKEN,
versioned_value::REMOVED_TOKEN,
versioned_value::STATUS_LEFT,
};
const std::vector<sstring> SILENT_SHUTDOWN_STATES = {
versioned_value::REMOVING_TOKEN,
versioned_value::REMOVED_TOKEN,
versioned_value::STATUS_LEFT,
versioned_value::HIBERNATE,

View File

@@ -27,7 +27,6 @@ constexpr const char* versioned_value::STATUS_NORMAL;
constexpr const char* versioned_value::STATUS_LEAVING;
constexpr const char* versioned_value::STATUS_LEFT;
constexpr const char* versioned_value::STATUS_MOVING;
constexpr const char* versioned_value::REMOVING_TOKEN;
constexpr const char* versioned_value::REMOVED_TOKEN;
constexpr const char* versioned_value::HIBERNATE;
constexpr const char* versioned_value::SHUTDOWN;

View File

@@ -51,7 +51,6 @@ public:
static constexpr const char* STATUS_LEFT = "LEFT";
static constexpr const char* STATUS_MOVING = "MOVING";
static constexpr const char* REMOVING_TOKEN = "removing";
static constexpr const char* REMOVED_TOKEN = "removed";
static constexpr const char* HIBERNATE = "hibernate";

View File

@@ -523,7 +523,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::UNUSED__STREAM_MUTATION:
case messaging_verb::STREAM_MUTATION_DONE:
case messaging_verb::COMPLETE_MESSAGE:
case messaging_verb::REPLICATION_FINISHED:
case messaging_verb::UNUSED__REPLICATION_FINISHED:
case messaging_verb::UNUSED__REPAIR_CHECKSUM_RANGE:
case messaging_verb::STREAM_MUTATION_FRAGMENTS:
case messaging_verb::REPAIR_ROW_LEVEL_START:
@@ -1183,18 +1183,6 @@ future<table_schema_version> messaging_service::send_schema_check(msg_addr dst,
return send_message_cancellable<table_schema_version>(this, netw::messaging_verb::SCHEMA_CHECK, dst, as);
}
// Wrapper for REPLICATION_FINISHED
void messaging_service::register_replication_finished(std::function<future<> (inet_address)>&& func) {
register_handler(this, messaging_verb::REPLICATION_FINISHED, std::move(func));
}
future<> messaging_service::unregister_replication_finished() {
return unregister_handler(messaging_verb::REPLICATION_FINISHED);
}
future<> messaging_service::send_replication_finished(msg_addr id, inet_address from) {
// FIXME: getRpcTimeout : conf.request_timeout_in_ms
return send_message_timeout<void>(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from));
}
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
void messaging_service::register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func));

View File

@@ -128,7 +128,7 @@ enum class messaging_verb : int32_t {
// end of gossip verb
DEFINITIONS_UPDATE = 11,
TRUNCATE = 12,
REPLICATION_FINISHED = 13,
UNUSED__REPLICATION_FINISHED = 13,
MIGRATION_REQUEST = 14,
// Used by streaming
PREPARE_MESSAGE = 15,
@@ -496,11 +496,6 @@ public:
future<table_schema_version> send_schema_check(msg_addr);
future<table_schema_version> send_schema_check(msg_addr, abort_source&);
// Wrapper for REPLICATION_FINISHED verb
void register_replication_finished(std::function<future<> (inet_address from)>&& func);
future<> unregister_replication_finished();
future<> send_replication_finished(msg_addr id, inet_address from);
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
private:
template <typename Fn>

View File

@@ -2447,47 +2447,6 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect
if (sstring(gms::versioned_value::REMOVED_TOKEN) == state) {
std::unordered_set<token> tmp(remove_tokens.begin(), remove_tokens.end());
co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces));
} else if (sstring(gms::versioned_value::REMOVING_TOKEN) == state) {
co_await mutate_token_metadata([this, remove_tokens = std::move(remove_tokens), endpoint] (mutable_token_metadata_ptr tmptr) mutable {
slogger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint);
// Note that the endpoint is being removed
tmptr->add_leaving_endpoint(endpoint);
return update_topology_change_info(std::move(tmptr), ::format("handle_state_removing {}", endpoint));
});
// find the endpoint coordinating this removal that we need to notify when we're done
auto* value = _gossiper.get_application_state_ptr(endpoint, application_state::REMOVAL_COORDINATOR);
if (!value) {
auto err = ::format("Can not find application_state for endpoint={}", endpoint);
slogger.warn("{}", err);
throw std::runtime_error(err);
}
std::vector<sstring> coordinator;
boost::split(coordinator, value->value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
if (coordinator.size() != 2) {
auto err = ::format("Can not split REMOVAL_COORDINATOR for endpoint={}, value={}", endpoint, value->value());
slogger.warn("{}", err);
throw std::runtime_error(err);
}
auto host_id = locator::host_id(utils::UUID(coordinator[1]));
// grab any data we are now responsible for and notify responsible node
auto ep = get_token_metadata().get_endpoint_for_host_id(host_id);
if (!ep) {
auto err = ::format("Can not find host_id={}", host_id);
slogger.warn("{}", err);
throw std::runtime_error(err);
}
// Kick off streaming commands. No need to wait for
// restore_replica_count to complete which can take a long time,
// since when it completes, this node will send notification to
// tell the removal_coordinator with IP address notify_endpoint
// that the restore process is finished on this node.
auto notify_endpoint = ep.value();
// OK to discard future since _async_gate is closed on stop()
(void)with_gate(_async_gate, [this, endpoint, notify_endpoint] {
return restore_replica_count(endpoint, notify_endpoint).handle_exception([endpoint, notify_endpoint] (auto ep) {
slogger.warn("Failed to restore_replica_count for node {}, notify_endpoint={} : {}", endpoint, notify_endpoint, ep);
});
});
}
} else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it
if (sstring(gms::versioned_value::REMOVED_TOKEN) == pieces[0]) {
@@ -2549,8 +2508,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
} else if (move_name == sstring(versioned_value::STATUS_NORMAL) ||
move_name == sstring(versioned_value::SHUTDOWN)) {
co_await handle_state_normal(endpoint);
} else if (move_name == sstring(versioned_value::REMOVING_TOKEN) ||
move_name == sstring(versioned_value::REMOVED_TOKEN)) {
} else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) {
co_await handle_state_removing(endpoint, pieces);
} else if (move_name == sstring(versioned_value::STATUS_LEAVING)) {
co_await handle_state_leaving(endpoint);
@@ -4641,83 +4599,6 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
});
}
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
_abort_source.check();
// Allocate a shared abort_source for node_ops_info
auto sas = make_shared<abort_source>();
auto sub = _abort_source.subscribe([sas] () noexcept {
if (!sas->abort_requested()) {
sas->request_abort();
}
});
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
auto ops_uuid = node_ops_id::create_random_id();
auto ops = seastar::make_shared<node_ops_info>(ops_uuid, sas, std::list<gms::inet_address>());
auto f = co_await coroutine::as_future(_repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops));
co_await send_replication_notification(notify_endpoint);
co_return co_await std::move(f);
}
auto tmptr = get_token_metadata_ptr();
auto& as = *sas;
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Restore_replica_count", streaming::stream_reason::removenode);
removenode_add_ranges(streamer, endpoint).get();
auto check_status_loop = [this, endpoint, &as] () -> future<> {
slogger.debug("restore_replica_count: Started status checker for removing node {}", endpoint);
while (!as.abort_requested()) {
auto status = _gossiper.get_gossip_status(endpoint);
// If the node to be removed is already in removed status, it has
// probably been removed forcely with `nodetool removenode force`.
// Abort the restore_replica_count in such case to avoid streaming
// attempt since the user has removed the node forcely.
if (status == sstring(versioned_value::REMOVED_TOKEN)) {
slogger.info("restore_replica_count: Detected node {} has left the cluster, status={}, abort restore_replica_count for removing node {}",
endpoint, status, endpoint);
if (!as.abort_requested()) {
as.request_abort();
}
co_return;
}
slogger.debug("restore_replica_count: Sleep and detect removing node {}, status={}", endpoint, status);
co_await sleep_abortable(std::chrono::seconds(10), as);
}
};
auto status_checker = check_status_loop();
std::exception_ptr ex;
try {
co_await streamer->stream_async();
} catch (...) {
ex = std::current_exception();
slogger.debug("Streaming to restore replica count failed: {}.", ex);
// We still want to send the notification
}
try {
co_await this->send_replication_notification(notify_endpoint);
} catch (...) {
auto ex2 = std::current_exception();
slogger.debug("Sending replication notification to {} failed: {}", notify_endpoint, ex2);
if (!ex) {
ex = std::move(ex2);
}
}
try {
slogger.debug("restore_replica_count: Started to stop status checker for removing node {}", endpoint);
if (!as.abort_requested()) {
as.request_abort();
}
co_await std::move(status_checker);
} catch (const seastar::sleep_aborted& ignored) {
slogger.debug("restore_replica_count: Got sleep_abort to stop status checker for removing node {}: {}", endpoint, ignored);
} catch (...) {
slogger.warn("restore_replica_count: Found error in status checker for removing node {}: {}",
endpoint, std::current_exception());
}
slogger.debug("restore_replica_count: Finished to stop status checker for removing node {}", endpoint);
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint) {
slogger.info("Removing tokens {} for {}", tokens, endpoint);
// FIXME: HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
@@ -4739,35 +4620,6 @@ future<> storage_service::excise(std::unordered_set<token> tokens, inet_address
return excise(tokens, endpoint);
}
future<> storage_service::send_replication_notification(inet_address remote) {
// notify the remote token
auto done = make_shared<bool>(false);
auto local = get_broadcast_address();
auto sent = make_lw_shared<int>(0);
slogger.debug("Notifying {} of replication completion", remote);
return do_until(
[this, done, sent, remote] {
// The node can send REPLICATION_FINISHED to itself, in which case
// is_alive will be true. If the messaging_service is stopped,
// REPLICATION_FINISHED can be sent infinitely here. To fix, limit
// the number of retries.
return *done || !_gossiper.is_alive(remote) || *sent >= 3;
},
[this, done, sent, remote, local] {
netw::msg_addr id{remote, 0};
(*sent)++;
return _messaging.local().send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) {
try {
f.get();
*done = true;
} catch (...) {
slogger.warn("Fail to send REPLICATION_FINISHED to {}: {}", id, std::current_exception());
}
});
}
);
}
future<> storage_service::leave_ring() {
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP);
co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
@@ -5183,11 +5035,6 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
}
void storage_service::init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
_messaging.local().register_replication_finished([] (gms::inet_address from) {
slogger.info("Got confirm_replication from {}", from);
return make_ready_future<>();
});
_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return container().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable {
@@ -5263,7 +5110,6 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
future<> storage_service::uninit_messaging_service() {
return when_all_succeed(
_messaging.local().unregister_replication_finished(),
_messaging.local().unregister_node_ops_cmd(),
ser::storage_service_rpc_verbs::unregister(&_messaging.local())
).discard_result();

View File

@@ -562,24 +562,6 @@ private:
*/
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
/**
* Sends a notification to a node indicating we have finished replicating data.
*
* @param remote node to send notification to
*/
future<> send_replication_notification(inet_address remote);
/**
* Called when an endpoint is removed from the ring. This function checks
* whether this node becomes responsible for new ranges as a
* consequence and streams data if needed.
*
* This is rather ineffective, but it does not matter so much
* since this is called very seldom
*
* @param endpoint the node that left
*/
future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint);
future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr);
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);