Merge 'Small gossiper and migration_manager cleanups' from Gleb
Some assorted cleanups here: consolidation of schema agreement waiting into a single place and removing unused code from the gossiper. CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/1458/ Reviewed-by: Konstantin Osipov <kostja@scylladb.com> * gleb/gossiper-cleanups of github.com:scylladb/scylla-dev: storage_service: avoid unneeded copies in on_change storage_service: remove check that is always true storage_service: rename handle_state_removing to handle_state_removed storage_service: avoid string copy storage_service: delete code that handled REMOVING_TOKENS state gossiper: remove code related to advertising REMOVING_TOKEN state migration_manager: add wait_for_schema_agreement() function
This commit is contained in:
@@ -828,17 +828,6 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
|
||||
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
|
||||
}
|
||||
|
||||
static future<> wait_for_schema_agreement(service::migration_manager& mm, db::timeout_clock::time_point deadline) {
|
||||
return do_until([&mm, deadline] {
|
||||
if (db::timeout_clock::now() > deadline) {
|
||||
throw std::runtime_error("Unable to reach schema agreement");
|
||||
}
|
||||
return mm.have_schema_agreement();
|
||||
}, [] {
|
||||
return seastar::sleep(500ms);
|
||||
});
|
||||
}
|
||||
|
||||
static void verify_billing_mode(const rjson::value& request) {
|
||||
// Alternator does not yet support billing or throughput limitations, but
|
||||
// let's verify that BillingMode is at least legal.
|
||||
@@ -1084,7 +1073,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
|
||||
}
|
||||
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard));
|
||||
|
||||
co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s);
|
||||
co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
|
||||
rjson::value status = rjson::empty_object();
|
||||
executor::supplement_table_info(request, *schema, sp);
|
||||
rjson::add(status, "TableDescription", std::move(request));
|
||||
@@ -1151,7 +1140,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
|
||||
co_await mm.announce(std::move(m), std::move(group0_guard));
|
||||
|
||||
co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s);
|
||||
co_await mm.wait_for_schema_agreement(p.local().local_db(), db::timeout_clock::now() + 10s, nullptr);
|
||||
|
||||
rjson::value status = rjson::empty_object();
|
||||
supplement_table_info(request, *schema, p.local());
|
||||
|
||||
@@ -245,7 +245,7 @@
|
||||
"GOSSIP_SHUTDOWN",
|
||||
"DEFINITIONS_UPDATE",
|
||||
"TRUNCATE",
|
||||
"REPLICATION_FINISHED",
|
||||
"UNUSED__REPLICATION_FINISHED",
|
||||
"MIGRATION_REQUEST",
|
||||
"PREPARE_MESSAGE",
|
||||
"PREPARE_DONE_MESSAGE",
|
||||
|
||||
@@ -84,20 +84,6 @@ future<> create_metadata_table_if_missing(
|
||||
return futurize_invoke(create_metadata_table_if_missing_impl, table_name, qp, cql, mm);
|
||||
}
|
||||
|
||||
future<> wait_for_schema_agreement(::service::migration_manager& mm, const replica::database& db, seastar::abort_source& as) {
|
||||
static const auto pause = [] { return sleep(std::chrono::milliseconds(500)); };
|
||||
|
||||
return do_until([&db, &as] {
|
||||
as.check();
|
||||
return db.get_version() != replica::database::empty_version;
|
||||
}, pause).then([&mm, &as] {
|
||||
return do_until([&mm, &as] {
|
||||
as.check();
|
||||
return mm.have_schema_agreement();
|
||||
}, pause);
|
||||
});
|
||||
}
|
||||
|
||||
::service::query_state& internal_distributed_query_state() noexcept {
|
||||
#ifdef DEBUG
|
||||
// Give the much slower debug tests more headroom for completing auth queries.
|
||||
|
||||
@@ -67,8 +67,6 @@ future<> create_metadata_table_if_missing(
|
||||
std::string_view cql,
|
||||
::service::migration_manager&) noexcept;
|
||||
|
||||
future<> wait_for_schema_agreement(::service::migration_manager&, const replica::database&, seastar::abort_source&);
|
||||
|
||||
///
|
||||
/// Time-outs for internal, non-local CQL queries.
|
||||
///
|
||||
|
||||
@@ -129,7 +129,7 @@ future<> default_authorizer::start() {
|
||||
_migration_manager).then([this] {
|
||||
_finished = do_after_system_ready(_as, [this] {
|
||||
return async([this] {
|
||||
wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0();
|
||||
_migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0();
|
||||
|
||||
if (legacy_metadata_exists()) {
|
||||
if (!any_granted().get0()) {
|
||||
|
||||
@@ -132,7 +132,7 @@ future<> password_authenticator::start() {
|
||||
|
||||
_stopped = do_after_system_ready(_as, [this] {
|
||||
return async([this] {
|
||||
wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0();
|
||||
_migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0();
|
||||
|
||||
if (any_nondefault_role_row_satisfies(_qp, &has_salted_hash).get0()) {
|
||||
if (legacy_metadata_exists()) {
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include "log.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
@@ -232,7 +233,7 @@ future<> standard_role_manager::start() {
|
||||
return this->create_metadata_tables_if_missing().then([this] {
|
||||
_stopped = auth::do_after_system_ready(_as, [this] {
|
||||
return seastar::async([this] {
|
||||
wait_for_schema_agreement(_migration_manager, _qp.db().real_database(), _as).get0();
|
||||
_migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get0();
|
||||
|
||||
if (any_nondefault_role_row_satisfies(_qp, &has_can_login).get0()) {
|
||||
if (this->legacy_metadata_exists()) {
|
||||
|
||||
@@ -1794,9 +1794,8 @@ future<> view_builder::start(service::migration_manager& mm) {
|
||||
// or `on_update_view` events.
|
||||
auto units = get_units(_sem, 1).get0();
|
||||
// Wait for schema agreement even if we're a seed node.
|
||||
while (!mm.have_schema_agreement()) {
|
||||
seastar::sleep_abortable(500ms, _as).get();
|
||||
}
|
||||
mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as).get();
|
||||
|
||||
auto built = _sys_ks.load_built_views().get0();
|
||||
auto in_progress = _sys_ks.load_view_build_progress().get0();
|
||||
setup_shard_build_step(vbi, std::move(built), std::move(in_progress));
|
||||
|
||||
@@ -1159,30 +1159,6 @@ future<> gossiper::replicate(inet_address ep, application_state key, const versi
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::advertise_removing(inet_address endpoint, locator::host_id host_id, locator::host_id local_host_id) {
|
||||
auto& state = get_endpoint_state(endpoint);
|
||||
// remember this node's generation
|
||||
auto generation = state.get_heart_beat_state().get_generation();
|
||||
logger.info("Removing host: {}", host_id);
|
||||
auto ring_delay = std::chrono::milliseconds(_gcfg.ring_delay_ms);
|
||||
logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint);
|
||||
co_await sleep_abortable(ring_delay, _abort_source);
|
||||
// make sure it did not change
|
||||
auto& eps = get_endpoint_state(endpoint);
|
||||
if (eps.get_heart_beat_state().get_generation() != generation) {
|
||||
throw std::runtime_error(format("Endpoint {} generation changed while trying to remove it", endpoint));
|
||||
}
|
||||
|
||||
// update the other node's generation to mimic it as if it had changed it itself
|
||||
logger.info("Advertising removal for {}", endpoint);
|
||||
eps.update_timestamp(); // make sure we don't evict it too soon
|
||||
eps.get_heart_beat_state().force_newer_generation_unsafe();
|
||||
eps.add_application_state(application_state::STATUS, versioned_value::removing_nonlocal(host_id));
|
||||
eps.add_application_state(application_state::REMOVAL_COORDINATOR, versioned_value::removal_coordinator(local_host_id));
|
||||
_endpoint_state_map[endpoint] = eps;
|
||||
co_await replicate(endpoint, eps);
|
||||
}
|
||||
|
||||
future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id) {
|
||||
auto& eps = get_endpoint_state(endpoint);
|
||||
eps.update_timestamp(); // make sure we don't evict it too soon
|
||||
|
||||
@@ -162,12 +162,10 @@ private:
|
||||
|
||||
public:
|
||||
const std::vector<sstring> DEAD_STATES = {
|
||||
versioned_value::REMOVING_TOKEN,
|
||||
versioned_value::REMOVED_TOKEN,
|
||||
versioned_value::STATUS_LEFT,
|
||||
};
|
||||
const std::vector<sstring> SILENT_SHUTDOWN_STATES = {
|
||||
versioned_value::REMOVING_TOKEN,
|
||||
versioned_value::REMOVED_TOKEN,
|
||||
versioned_value::STATUS_LEFT,
|
||||
versioned_value::HIBERNATE,
|
||||
@@ -324,16 +322,6 @@ private:
|
||||
void make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g_digests);
|
||||
|
||||
public:
|
||||
/**
|
||||
* This method will begin removing an existing endpoint from the cluster by spoofing its state
|
||||
* This should never be called unless this coordinator has had 'removenode' invoked
|
||||
*
|
||||
* @param endpoint - the endpoint being removed
|
||||
* @param host_id - the ID of the host being removed
|
||||
* @param local_host_id - my own host ID for replication coordination
|
||||
*/
|
||||
future<> advertise_removing(inet_address endpoint, locator::host_id host_id, locator::host_id local_host_id);
|
||||
|
||||
/**
|
||||
* Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
|
||||
* This should only be called after advertise_removing
|
||||
|
||||
@@ -27,7 +27,6 @@ constexpr const char* versioned_value::STATUS_NORMAL;
|
||||
constexpr const char* versioned_value::STATUS_LEAVING;
|
||||
constexpr const char* versioned_value::STATUS_LEFT;
|
||||
constexpr const char* versioned_value::STATUS_MOVING;
|
||||
constexpr const char* versioned_value::REMOVING_TOKEN;
|
||||
constexpr const char* versioned_value::REMOVED_TOKEN;
|
||||
constexpr const char* versioned_value::HIBERNATE;
|
||||
constexpr const char* versioned_value::SHUTDOWN;
|
||||
|
||||
@@ -51,7 +51,6 @@ public:
|
||||
static constexpr const char* STATUS_LEFT = "LEFT";
|
||||
static constexpr const char* STATUS_MOVING = "MOVING";
|
||||
|
||||
static constexpr const char* REMOVING_TOKEN = "removing";
|
||||
static constexpr const char* REMOVED_TOKEN = "removed";
|
||||
|
||||
static constexpr const char* HIBERNATE = "hibernate";
|
||||
@@ -157,11 +156,6 @@ public:
|
||||
return versioned_value(make_cdc_generation_id_string(gen_id));
|
||||
}
|
||||
|
||||
static versioned_value removing_nonlocal(const locator::host_id& host_id) {
|
||||
return versioned_value(sstring(REMOVING_TOKEN) +
|
||||
sstring(DELIMITER_STR) + host_id.to_sstring());
|
||||
}
|
||||
|
||||
static versioned_value removed_nonlocal(const locator::host_id& host_id, int64_t expire_time) {
|
||||
return versioned_value(sstring(REMOVED_TOKEN) + sstring(DELIMITER_STR) +
|
||||
host_id.to_sstring() + sstring(DELIMITER_STR) + to_sstring(expire_time));
|
||||
|
||||
@@ -523,7 +523,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::UNUSED__STREAM_MUTATION:
|
||||
case messaging_verb::STREAM_MUTATION_DONE:
|
||||
case messaging_verb::COMPLETE_MESSAGE:
|
||||
case messaging_verb::REPLICATION_FINISHED:
|
||||
case messaging_verb::UNUSED__REPLICATION_FINISHED:
|
||||
case messaging_verb::UNUSED__REPAIR_CHECKSUM_RANGE:
|
||||
case messaging_verb::STREAM_MUTATION_FRAGMENTS:
|
||||
case messaging_verb::REPAIR_ROW_LEVEL_START:
|
||||
@@ -1183,18 +1183,6 @@ future<table_schema_version> messaging_service::send_schema_check(msg_addr dst,
|
||||
return send_message_cancellable<table_schema_version>(this, netw::messaging_verb::SCHEMA_CHECK, dst, as);
|
||||
}
|
||||
|
||||
// Wrapper for REPLICATION_FINISHED
|
||||
void messaging_service::register_replication_finished(std::function<future<> (inet_address)>&& func) {
|
||||
register_handler(this, messaging_verb::REPLICATION_FINISHED, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_replication_finished() {
|
||||
return unregister_handler(messaging_verb::REPLICATION_FINISHED);
|
||||
}
|
||||
future<> messaging_service::send_replication_finished(msg_addr id, inet_address from) {
|
||||
// FIXME: getRpcTimeout : conf.request_timeout_in_ms
|
||||
return send_message_timeout<void>(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from));
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void messaging_service::register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func));
|
||||
|
||||
@@ -126,7 +126,7 @@ enum class messaging_verb : int32_t {
|
||||
// end of gossip verb
|
||||
DEFINITIONS_UPDATE = 11,
|
||||
TRUNCATE = 12,
|
||||
REPLICATION_FINISHED = 13,
|
||||
UNUSED__REPLICATION_FINISHED = 13,
|
||||
MIGRATION_REQUEST = 14,
|
||||
// Used by streaming
|
||||
PREPARE_MESSAGE = 15,
|
||||
@@ -494,11 +494,6 @@ public:
|
||||
future<table_schema_version> send_schema_check(msg_addr);
|
||||
future<table_schema_version> send_schema_check(msg_addr, abort_source&);
|
||||
|
||||
// Wrapper for REPLICATION_FINISHED verb
|
||||
void register_replication_finished(std::function<future<> (inet_address from)>&& func);
|
||||
future<> unregister_replication_finished();
|
||||
future<> send_replication_finished(msg_addr id, inet_address from);
|
||||
|
||||
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
|
||||
private:
|
||||
template <typename Fn>
|
||||
|
||||
@@ -239,6 +239,18 @@ bool migration_manager::have_schema_agreement() {
|
||||
return match;
|
||||
}
|
||||
|
||||
future<> migration_manager::wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as) {
|
||||
while (db.get_version() == replica::database::empty_version || !have_schema_agreement()) {
|
||||
if (as) {
|
||||
as->check();
|
||||
}
|
||||
if (db::timeout_clock::now() > deadline) {
|
||||
throw std::runtime_error("Unable to reach schema agreement");
|
||||
}
|
||||
co_await (as ? sleep_abortable(std::chrono::milliseconds(500), *as) : sleep(std::chrono::milliseconds(500)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If versions differ this node sends request with local migration list to the endpoint
|
||||
* and expecting to receive a list of migrations to apply locally.
|
||||
|
||||
@@ -172,6 +172,7 @@ public:
|
||||
* Known peers in the cluster have the same schema version as us.
|
||||
*/
|
||||
bool have_schema_agreement();
|
||||
future<> wait_for_schema_agreement(const replica::database& db, db::timeout_clock::time_point deadline, seastar::abort_source* as);
|
||||
|
||||
void init_messaging_service();
|
||||
|
||||
|
||||
@@ -2425,12 +2425,9 @@ void storage_service::handle_state_moving(inet_address endpoint, std::vector<sst
|
||||
throw std::runtime_error(::format("Move operation is not supported anymore, endpoint={}", endpoint));
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_removing(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
slogger.debug("endpoint={} handle_state_removing", endpoint);
|
||||
if (pieces.empty()) {
|
||||
slogger.warn("Fail to handle_state_removing endpoint={} pieces={}", endpoint, pieces);
|
||||
co_return;
|
||||
}
|
||||
future<> storage_service::handle_state_removed(inet_address endpoint, std::vector<sstring> pieces) {
|
||||
slogger.debug("endpoint={} handle_state_removed", endpoint);
|
||||
|
||||
if (endpoint == get_broadcast_address()) {
|
||||
slogger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?");
|
||||
try {
|
||||
@@ -2444,55 +2441,10 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect
|
||||
if (get_token_metadata().is_normal_token_owner(endpoint)) {
|
||||
auto state = pieces[0];
|
||||
auto remove_tokens = get_token_metadata().get_tokens(endpoint);
|
||||
if (sstring(gms::versioned_value::REMOVED_TOKEN) == state) {
|
||||
std::unordered_set<token> tmp(remove_tokens.begin(), remove_tokens.end());
|
||||
co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces));
|
||||
} else if (sstring(gms::versioned_value::REMOVING_TOKEN) == state) {
|
||||
co_await mutate_token_metadata([this, remove_tokens = std::move(remove_tokens), endpoint] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
slogger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint);
|
||||
// Note that the endpoint is being removed
|
||||
tmptr->add_leaving_endpoint(endpoint);
|
||||
return update_topology_change_info(std::move(tmptr), ::format("handle_state_removing {}", endpoint));
|
||||
});
|
||||
// find the endpoint coordinating this removal that we need to notify when we're done
|
||||
auto* value = _gossiper.get_application_state_ptr(endpoint, application_state::REMOVAL_COORDINATOR);
|
||||
if (!value) {
|
||||
auto err = ::format("Can not find application_state for endpoint={}", endpoint);
|
||||
slogger.warn("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
std::vector<sstring> coordinator;
|
||||
boost::split(coordinator, value->value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
|
||||
if (coordinator.size() != 2) {
|
||||
auto err = ::format("Can not split REMOVAL_COORDINATOR for endpoint={}, value={}", endpoint, value->value());
|
||||
slogger.warn("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
auto host_id = locator::host_id(utils::UUID(coordinator[1]));
|
||||
// grab any data we are now responsible for and notify responsible node
|
||||
auto ep = get_token_metadata().get_endpoint_for_host_id(host_id);
|
||||
if (!ep) {
|
||||
auto err = ::format("Can not find host_id={}", host_id);
|
||||
slogger.warn("{}", err);
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
// Kick off streaming commands. No need to wait for
|
||||
// restore_replica_count to complete which can take a long time,
|
||||
// since when it completes, this node will send notification to
|
||||
// tell the removal_coordinator with IP address notify_endpoint
|
||||
// that the restore process is finished on this node.
|
||||
auto notify_endpoint = ep.value();
|
||||
// OK to discard future since _async_gate is closed on stop()
|
||||
(void)with_gate(_async_gate, [this, endpoint, notify_endpoint] {
|
||||
return restore_replica_count(endpoint, notify_endpoint).handle_exception([endpoint, notify_endpoint] (auto ep) {
|
||||
slogger.warn("Failed to restore_replica_count for node {}, notify_endpoint={} : {}", endpoint, notify_endpoint, ep);
|
||||
});
|
||||
});
|
||||
}
|
||||
} else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it
|
||||
if (sstring(gms::versioned_value::REMOVED_TOKEN) == pieces[0]) {
|
||||
add_expire_time_if_found(endpoint, extract_expire_time(pieces));
|
||||
}
|
||||
co_await remove_endpoint(endpoint);
|
||||
}
|
||||
}
|
||||
@@ -2543,21 +2495,20 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
|
||||
slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, state, value);
|
||||
co_return;
|
||||
}
|
||||
sstring move_name = pieces[0];
|
||||
const sstring& move_name = pieces[0];
|
||||
if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) {
|
||||
co_await handle_state_bootstrap(endpoint);
|
||||
} else if (move_name == sstring(versioned_value::STATUS_NORMAL) ||
|
||||
move_name == sstring(versioned_value::SHUTDOWN)) {
|
||||
co_await handle_state_normal(endpoint);
|
||||
} else if (move_name == sstring(versioned_value::REMOVING_TOKEN) ||
|
||||
move_name == sstring(versioned_value::REMOVED_TOKEN)) {
|
||||
co_await handle_state_removing(endpoint, pieces);
|
||||
} else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) {
|
||||
co_await handle_state_removed(endpoint, std::move(pieces));
|
||||
} else if (move_name == sstring(versioned_value::STATUS_LEAVING)) {
|
||||
co_await handle_state_leaving(endpoint);
|
||||
} else if (move_name == sstring(versioned_value::STATUS_LEFT)) {
|
||||
co_await handle_state_left(endpoint, pieces);
|
||||
co_await handle_state_left(endpoint, std::move(pieces));
|
||||
} else if (move_name == sstring(versioned_value::STATUS_MOVING)) {
|
||||
handle_state_moving(endpoint, pieces);
|
||||
handle_state_moving(endpoint, std::move(pieces));
|
||||
} else if (move_name == sstring(versioned_value::HIBERNATE)) {
|
||||
slogger.warn("endpoint={} went into HIBERNATE state, this is no longer supported. Use a new version to perform the replace operation.", endpoint);
|
||||
} else {
|
||||
@@ -4641,83 +4592,6 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
_abort_source.check();
|
||||
// Allocate a shared abort_source for node_ops_info
|
||||
auto sas = make_shared<abort_source>();
|
||||
auto sub = _abort_source.subscribe([sas] () noexcept {
|
||||
if (!sas->abort_requested()) {
|
||||
sas->request_abort();
|
||||
}
|
||||
});
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
|
||||
auto ops_uuid = node_ops_id::create_random_id();
|
||||
auto ops = seastar::make_shared<node_ops_info>(ops_uuid, sas, std::list<gms::inet_address>());
|
||||
auto f = co_await coroutine::as_future(_repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops));
|
||||
co_await send_replication_notification(notify_endpoint);
|
||||
co_return co_await std::move(f);
|
||||
}
|
||||
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
auto& as = *sas;
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Restore_replica_count", streaming::stream_reason::removenode);
|
||||
removenode_add_ranges(streamer, endpoint).get();
|
||||
auto check_status_loop = [this, endpoint, &as] () -> future<> {
|
||||
slogger.debug("restore_replica_count: Started status checker for removing node {}", endpoint);
|
||||
while (!as.abort_requested()) {
|
||||
auto status = _gossiper.get_gossip_status(endpoint);
|
||||
// If the node to be removed is already in removed status, it has
|
||||
// probably been removed forcely with `nodetool removenode force`.
|
||||
// Abort the restore_replica_count in such case to avoid streaming
|
||||
// attempt since the user has removed the node forcely.
|
||||
if (status == sstring(versioned_value::REMOVED_TOKEN)) {
|
||||
slogger.info("restore_replica_count: Detected node {} has left the cluster, status={}, abort restore_replica_count for removing node {}",
|
||||
endpoint, status, endpoint);
|
||||
if (!as.abort_requested()) {
|
||||
as.request_abort();
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
slogger.debug("restore_replica_count: Sleep and detect removing node {}, status={}", endpoint, status);
|
||||
co_await sleep_abortable(std::chrono::seconds(10), as);
|
||||
}
|
||||
};
|
||||
auto status_checker = check_status_loop();
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await streamer->stream_async();
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
slogger.debug("Streaming to restore replica count failed: {}.", ex);
|
||||
// We still want to send the notification
|
||||
}
|
||||
try {
|
||||
co_await this->send_replication_notification(notify_endpoint);
|
||||
} catch (...) {
|
||||
auto ex2 = std::current_exception();
|
||||
slogger.debug("Sending replication notification to {} failed: {}", notify_endpoint, ex2);
|
||||
if (!ex) {
|
||||
ex = std::move(ex2);
|
||||
}
|
||||
}
|
||||
try {
|
||||
slogger.debug("restore_replica_count: Started to stop status checker for removing node {}", endpoint);
|
||||
if (!as.abort_requested()) {
|
||||
as.request_abort();
|
||||
}
|
||||
co_await std::move(status_checker);
|
||||
} catch (const seastar::sleep_aborted& ignored) {
|
||||
slogger.debug("restore_replica_count: Got sleep_abort to stop status checker for removing node {}: {}", endpoint, ignored);
|
||||
} catch (...) {
|
||||
slogger.warn("restore_replica_count: Found error in status checker for removing node {}: {}",
|
||||
endpoint, std::current_exception());
|
||||
}
|
||||
slogger.debug("restore_replica_count: Finished to stop status checker for removing node {}", endpoint);
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint) {
|
||||
slogger.info("Removing tokens {} for {}", tokens, endpoint);
|
||||
// FIXME: HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
|
||||
@@ -4739,35 +4613,6 @@ future<> storage_service::excise(std::unordered_set<token> tokens, inet_address
|
||||
return excise(tokens, endpoint);
|
||||
}
|
||||
|
||||
future<> storage_service::send_replication_notification(inet_address remote) {
|
||||
// notify the remote token
|
||||
auto done = make_shared<bool>(false);
|
||||
auto local = get_broadcast_address();
|
||||
auto sent = make_lw_shared<int>(0);
|
||||
slogger.debug("Notifying {} of replication completion", remote);
|
||||
return do_until(
|
||||
[this, done, sent, remote] {
|
||||
// The node can send REPLICATION_FINISHED to itself, in which case
|
||||
// is_alive will be true. If the messaging_service is stopped,
|
||||
// REPLICATION_FINISHED can be sent infinitely here. To fix, limit
|
||||
// the number of retries.
|
||||
return *done || !_gossiper.is_alive(remote) || *sent >= 3;
|
||||
},
|
||||
[this, done, sent, remote, local] {
|
||||
netw::msg_addr id{remote, 0};
|
||||
(*sent)++;
|
||||
return _messaging.local().send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
*done = true;
|
||||
} catch (...) {
|
||||
slogger.warn("Fail to send REPLICATION_FINISHED to {}: {}", id, std::current_exception());
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
future<> storage_service::leave_ring() {
|
||||
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP);
|
||||
co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
|
||||
@@ -5183,11 +5028,6 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
|
||||
}
|
||||
|
||||
void storage_service::init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
_messaging.local().register_replication_finished([] (gms::inet_address from) {
|
||||
slogger.info("Got confirm_replication from {}", from);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
|
||||
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return container().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable {
|
||||
@@ -5263,7 +5103,6 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
|
||||
|
||||
future<> storage_service::uninit_messaging_service() {
|
||||
return when_all_succeed(
|
||||
_messaging.local().unregister_replication_finished(),
|
||||
_messaging.local().unregister_node_ops_cmd(),
|
||||
ser::storage_service_rpc_verbs::unregister(&_messaging.local())
|
||||
).discard_result();
|
||||
|
||||
@@ -533,9 +533,9 @@ private:
|
||||
* Handle notification that a node being actively removed from the ring via 'removenode'
|
||||
*
|
||||
* @param endpoint node
|
||||
* @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
|
||||
* @param pieces is REMOVED_TOKEN (node is gone)
|
||||
*/
|
||||
future<> handle_state_removing(inet_address endpoint, std::vector<sstring> pieces);
|
||||
future<> handle_state_removed(inet_address endpoint, std::vector<sstring> pieces);
|
||||
|
||||
future<>
|
||||
handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node);
|
||||
@@ -562,24 +562,6 @@ private:
|
||||
*/
|
||||
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
|
||||
|
||||
/**
|
||||
* Sends a notification to a node indicating we have finished replicating data.
|
||||
*
|
||||
* @param remote node to send notification to
|
||||
*/
|
||||
future<> send_replication_notification(inet_address remote);
|
||||
|
||||
/**
|
||||
* Called when an endpoint is removed from the ring. This function checks
|
||||
* whether this node becomes responsible for new ranges as a
|
||||
* consequence and streams data if needed.
|
||||
*
|
||||
* This is rather ineffective, but it does not matter so much
|
||||
* since this is called very seldom
|
||||
*
|
||||
* @param endpoint the node that left
|
||||
*/
|
||||
future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint);
|
||||
future<> removenode_with_stream(gms::inet_address leaving_node, shared_ptr<abort_source> as_ptr);
|
||||
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user