Merge 'repair: Switch to use NODE_OPS_CMD for replace operation' from Asias He

In commit c82250e0cf (gossip: Allow deferring
advertise of local node to be up), the replacing node is changed to postpone
the responding of gossip echo message to avoid other nodes sending read
requests to the replacing node. It works as following:

1) replacing node does not respond echo message to avoid other nodes to
   mark replacing node as alive

2) replacing node advertises hibernate state so other nodes knows
   replacing node is replacing

3) replacing node responds echo message so other nodes can mark
   replacing node as alive

This is problematic because after step 2, the existing nodes in the
cluster will start to send writes to the replacing node, but at this
time it is possible that existing nodes haven't marked the replacing
node as alive, thus failing the write request unnecessarily.

For instance, we saw the following errors in issue #8013 (Cassandra
stress fails to achieve consistency when only one of the nodes is down)

```
scylla:
[shard 1] consistency - Live nodes 2 do not satisfy ConsistencyLevel (2
required, 1 pending, live_endpoints={127.0.0.2, 127.0.0.1},
pending_endpoints={127.0.0.3}) [shard 0] gossip - Fail to send
EchoMessage to 127.0.0.3: std::runtime_error (Not ready to respond
gossip echo message)

c-s:
java.io.IOException: Operation x10 on key(s) [4c4f4d37324c35304c30]:
Error executing: (UnavailableException): Not enough replicas available
for query at consistency QUORUM (2 required but only 1 alive
```

To solve this problem, we can do the replacing operation in multiple stages.

One solution is to introduce a new gossip status state as proposed
here: gossip: Introduce STATUS_PREPARE_REPLACE #7416

1) replacing node does not respond echo message

2) replacing node advertises prepare_replace state (Remove replacing
   node from natural endpoint, but do not put in pending list yet)

3) replacing node responds echo message

4) replacing node advertises hibernate state (Put replacing node in
   pending list)

Since we now have the node ops verb introduced in
829b4c1438 (repair: Make removenode safe
by default), we can do the multiple stage without introducing a new
gossip status state.

This patch uses the NODE_OPS_CMD infrastructure to implement replace
operation.

Improvements:

1) It solves the race between marking replacing node alive and sending
   writes to replacing node

2) The cluster reverts to a state before the replace operation
   automatically in case of error. As a result, it solves when the
   replacing node fails in the middle of the operation, the repacing
   node will be in HIBERNATE status forever issue.

3) The gossip status of the node to be replaced is not changed until the
   replace operation is successful. HIBERNATE gossip status is not used
   anymore.

4) Users can now pass a list of dead nodes to ignore explicitly.

Fixes #8013

Closes #8330

* github.com:scylladb/scylla:
  repair: Switch to use NODE_OPS_CMD for replace operation
  gossip: Add advertise_to_nodes
  gossip: Add helper to wait for a node to be up
  gossip: Add is_normal_ring_member helper
This commit is contained in:
Avi Kivity
2021-04-04 12:54:09 +03:00
9 changed files with 360 additions and 16 deletions

View File

@@ -408,8 +408,31 @@ future<> gossiper::handle_ack2_msg(gossip_digest_ack2 msg) {
return apply_state_locally(std::move(remote_ep_state_map)).finally([mp = std::move(mp)] {});
}
future<> gossiper::handle_echo_msg() {
future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional<int64_t> generation_number_opt) {
bool respond = true;
if (!_advertise_myself) {
respond = false;
} else {
if (!_advertise_to_nodes.empty()) {
auto it = _advertise_to_nodes.find(from);
if (it == _advertise_to_nodes.end()) {
respond = false;
} else {
auto es = get_endpoint_state_for_endpoint_ptr(from);
if (es) {
int64_t saved_generation_number = it->second;
int64_t current_generation_number = generation_number_opt ?
generation_number_opt.value() : es->get_heart_beat_state().get_generation();
respond = saved_generation_number == current_generation_number;
logger.debug("handle_echo_msg: from={}, saved_generation_number={}, current_generation_number={}",
from, saved_generation_number, current_generation_number);
} else {
respond = false;
}
}
}
}
if (!respond) {
return make_exception_future(std::runtime_error("Not ready to respond gossip echo message"));
}
return make_ready_future<>();
@@ -482,8 +505,9 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
});
return messaging_service::no_wait();
});
_messaging.register_gossip_echo([] {
return gms::get_local_gossiper().handle_echo_msg();
_messaging.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number_opt) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return gms::get_local_gossiper().handle_echo_msg(from, generation_number_opt);
});
_messaging.register_gossip_shutdown([] (inet_address from) {
// In a new fiber.
@@ -1419,9 +1443,10 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) {
local_state.mark_dead();
msg_addr id = get_msg_addr(addr);
logger.trace("Sending a EchoMessage to {}", id);
int64_t generation = endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation();
logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation);
// Do it in the background.
(void)_messaging.send_gossip_echo(id).then([this, addr] {
(void)_messaging.send_gossip_echo(id, generation).then([this, addr] {
logger.trace("Got EchoMessage Reply");
return seastar::async([this, addr] {
// After sending echo message, the Node might not be in the
@@ -1566,6 +1591,11 @@ bool gossiper::is_normal(const inet_address& endpoint) const {
return get_gossip_status(endpoint) == sstring(versioned_value::STATUS_NORMAL);
}
bool gossiper::is_normal_ring_member(const inet_address& endpoint) const {
auto status = get_gossip_status(endpoint);
return status == sstring(versioned_value::STATUS_NORMAL) || status == sstring(versioned_value::SHUTDOWN);
}
bool gossiper::is_silent_shutdown_state(const endpoint_state& ep_state) const{
auto state = get_gossip_status(ep_state);
for (auto& deadstate : SILENT_SHUTDOWN_STATES) {
@@ -1769,6 +1799,29 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
});
}
future<std::unordered_map<gms::inet_address, int32_t>>
gossiper::get_generation_for_nodes(std::list<gms::inet_address> nodes) {
std::unordered_map<gms::inet_address, int32_t> ret;
for (const auto& node : nodes) {
auto es = get_endpoint_state_for_endpoint_ptr(node);
if (es) {
auto current_generation_number = es->get_heart_beat_state().get_generation();
ret.emplace(node, current_generation_number);
} else {
return make_exception_future<std::unordered_map<gms::inet_address, int32_t>>(
std::runtime_error(format("Can not find generation number for node={}", node)));
}
}
return make_ready_future<std::unordered_map<gms::inet_address, int32_t>>(std::move(ret));
}
future<> gossiper::advertise_to_nodes(std::unordered_map<gms::inet_address, int32_t> advertise_to_nodes) {
return container().invoke_on_all([advertise_to_nodes] (auto& g) {
g._advertise_to_nodes = advertise_to_nodes;
g._advertise_myself = true;
});
}
future<> gossiper::advertise_myself() {
return container().invoke_on_all([] (auto& g) {
g._advertise_myself = true;
@@ -2131,6 +2184,32 @@ bool gossiper::is_alive(inet_address ep) const {
return false;
}
// Runs inside seastar::async context
void gossiper::wait_alive(std::vector<gms::inet_address> nodes, std::chrono::milliseconds timeout) {
auto start_time = std::chrono::steady_clock::now();
for (;;) {
std::vector<gms::inet_address> live_nodes;
for (const auto& node: nodes) {
size_t nr_alive = container().map_reduce0([node] (gossiper& g) -> size_t {
return g.is_alive(node) ? 1 : 0;
}, 0, std::plus<size_t>()).get0();
logger.debug("Marked node={} as alive on {} out of {} shards", node, nr_alive, smp::count);
if (nr_alive == smp::count) {
live_nodes.push_back(node);
}
}
logger.debug("Waited for marking node as up, replace_nodes={}, live_nodes={}", nodes, live_nodes);
if (live_nodes.size() == nodes.size()) {
break;
}
if (std::chrono::steady_clock::now() > timeout + start_time) {
throw std::runtime_error(format("Failed to mark node as alive in {} ms, nodes={}, live_nodes={}",
timeout.count(), nodes, live_nodes));
}
sleep_abortable(std::chrono::milliseconds(100), _abort_source).get();
}
}
const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept {
auto* eps = get_endpoint_state_for_endpoint_ptr(std::move(endpoint));
if (!eps) {

View File

@@ -129,7 +129,7 @@ private:
future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg);
future<> handle_ack2_msg(gossip_digest_ack2 msg);
future<> handle_echo_msg();
future<> handle_echo_msg(inet_address from, std::optional<int64_t> generation_number_opt);
future<> handle_shutdown_msg(inet_address from);
future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_digest> ack_msg_digest);
@@ -146,8 +146,15 @@ private:
std::unordered_map<gms::inet_address, syn_msg_pending> _syn_handlers;
std::unordered_map<gms::inet_address, ack_msg_pending> _ack_handlers;
bool _advertise_myself = true;
// Map ip address and generation number
std::unordered_map<gms::inet_address, int32_t> _advertise_to_nodes;
public:
future<> advertise_myself();
// Get current generation number for the given nodes
future<std::unordered_map<gms::inet_address, int32_t>>
get_generation_for_nodes(std::list<gms::inet_address> nodes);
// Only respond echo message listed in nodes with the generation number
future<> advertise_to_nodes(std::unordered_map<gms::inet_address, int32_t> advertise_to_nodes = {});
const sstring& get_cluster_name() const noexcept;
const sstring& get_partitioner_name() const noexcept;
inet_address get_broadcast_address() const noexcept {
@@ -442,6 +449,8 @@ private:
public:
bool is_alive(inet_address ep) const;
bool is_dead_state(const endpoint_state& eps) const;
// Wait for nodes to be alive on all shards
void wait_alive(std::vector<gms::inet_address> nodes, std::chrono::milliseconds timeout);
future<> apply_state_locally(std::map<inet_address, endpoint_state> map);
@@ -572,6 +581,10 @@ public:
bool is_seed(const inet_address& endpoint) const;
bool is_shutdown(const inet_address& endpoint) const;
bool is_normal(const inet_address& endpoint) const;
// Check if a node is in NORMAL or SHUTDOWN status which means the node is
// part of the token ring from the gossip point of view and operates in
// normal status or was in normal status but is shutdown.
bool is_normal_ring_member(const inet_address& endpoint) const;
bool is_cql_ready(const inet_address& endpoint) const;
bool is_silent_shutdown_state(const endpoint_state& ep_state) const;
void mark_as_shutdown(const inet_address& endpoint);

View File

@@ -110,6 +110,12 @@ enum class node_ops_cmd : uint32_t {
removenode_sync_data,
removenode_abort,
removenode_done,
replace_prepare,
replace_prepare_mark_alive,
replace_prepare_pending_ranges,
replace_heartbeat,
replace_abort,
replace_done,
};
struct node_ops_cmd_request {
@@ -117,6 +123,8 @@ struct node_ops_cmd_request {
utils::UUID ops_uuid;
std::list<gms::inet_address> ignore_nodes;
std::list<gms::inet_address> leaving_nodes;
// Map existing nodes to replacing nodes
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes;
};
struct node_ops_cmd_response {

View File

@@ -1082,14 +1082,14 @@ future<> messaging_service::unregister_complete_message() {
return unregister_handler(messaging_verb::COMPLETE_MESSAGE);
}
void messaging_service::register_gossip_echo(std::function<future<> ()>&& func) {
void messaging_service::register_gossip_echo(std::function<future<> (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number)>&& func) {
register_handler(this, messaging_verb::GOSSIP_ECHO, std::move(func));
}
future<> messaging_service::unregister_gossip_echo() {
return unregister_handler(netw::messaging_verb::GOSSIP_ECHO);
}
future<> messaging_service::send_gossip_echo(msg_addr id) {
return send_message_timeout<void>(this, messaging_verb::GOSSIP_ECHO, std::move(id), 15000ms);
future<> messaging_service::send_gossip_echo(msg_addr id, int64_t generation_number) {
return send_message_timeout<void>(this, messaging_verb::GOSSIP_ECHO, std::move(id), 15000ms, generation_number);
}
void messaging_service::register_gossip_shutdown(std::function<rpc::no_wait_type (inet_address from)>&& func) {

View File

@@ -408,9 +408,9 @@ public:
future<node_ops_cmd_response> send_node_ops_cmd(msg_addr id, node_ops_cmd_request);
// Wrapper for GOSSIP_ECHO verb
void register_gossip_echo(std::function<future<> ()>&& func);
void register_gossip_echo(std::function<future<> (const rpc::client_info& cinfo, rpc::optional<int64_t> generation_number)>&& func);
future<> unregister_gossip_echo();
future<> send_gossip_echo(msg_addr id);
future<> send_gossip_echo(msg_addr id, int64_t generation_number);
// Wrapper for GOSSIP_SHUTDOWN
void register_gossip_shutdown(std::function<rpc::no_wait_type (inet_address from)>&& func);

View File

@@ -489,6 +489,12 @@ enum class node_ops_cmd : uint32_t {
removenode_sync_data,
removenode_abort,
removenode_done,
replace_prepare,
replace_prepare_mark_alive,
replace_prepare_pending_ranges,
replace_heartbeat,
replace_abort,
replace_done,
};
// The cmd and ops_uuid are mandatory for each request.
@@ -498,6 +504,8 @@ struct node_ops_cmd_request {
utils::UUID ops_uuid;
std::list<gms::inet_address> ignore_nodes;
std::list<gms::inet_address> leaving_nodes;
// Map existing nodes to replacing nodes
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes;
};
struct node_ops_cmd_response {

View File

@@ -680,6 +680,8 @@ void storage_service::bootstrap() {
// Wait until we know tokens of existing node before announcing replacing status.
set_mode(mode::JOINING, sprint("Wait until local node knows tokens of peer nodes"), true);
_gossiper.wait_for_range_setup().get();
if (!is_repair_based_node_ops_enabled()) {
set_mode(mode::JOINING, sprint("Announce tokens and status of the replacing node"), true);
_gossiper.add_local_application_state({
{ gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) },
@@ -689,6 +691,7 @@ void storage_service::bootstrap() {
_gossiper.advertise_myself().get();
set_mode(mode::JOINING, format("Wait until peer nodes know the bootstrap tokens of local node"), true);
_gossiper.wait_for_range_setup().get();
}
auto replace_addr = db().local().get_replace_address();
if (replace_addr) {
slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
@@ -705,7 +708,7 @@ void storage_service::bootstrap() {
set_mode(mode::JOINING, "Starting to bootstrap...", true);
if (is_repair_based_node_ops_enabled()) {
if (db().local().is_replacing()) {
replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get();
run_replace_ops();
} else {
bootstrap_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get();
}
@@ -1863,6 +1866,150 @@ future<> storage_service::decommission() {
});
}
// Runs inside seastar::async context
void storage_service::run_replace_ops() {
if (!db().local().get_replace_address()) {
throw std::runtime_error(format("replace_address is empty"));
}
auto replace_address = db().local().get_replace_address().value();
auto uuid = utils::make_random_uuid();
// TODO: Specify ignore_nodes
std::list<gms::inet_address> ignore_nodes;
// Step 1: Decide who needs to sync data for replace operation
std::list<gms::inet_address> sync_nodes;
for (const auto& x :_gossiper.endpoint_state_map) {
seastar::thread::maybe_yield();
const auto& node = x.first;
slogger.debug("replace[{}]: Check node={}, status={}", uuid, node, _gossiper.get_gossip_status(node));
if (node != get_broadcast_address() &&
node != replace_address &&
_gossiper.is_normal_ring_member(node) &&
std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) {
sync_nodes.push_back(node);
}
}
sync_nodes.push_front(get_broadcast_address());
auto sync_nodes_generations = _gossiper.get_generation_for_nodes(sync_nodes).get();
// Map existing nodes to replacing nodes
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes = {
{replace_address, get_broadcast_address()},
};
std::unordered_set<gms::inet_address> nodes_unknown_verb;
std::unordered_set<gms::inet_address> nodes_down;
std::unordered_set<gms::inet_address> nodes_aborted;
auto req = node_ops_cmd_request{node_ops_cmd::replace_prepare, uuid, ignore_nodes, {}, replace_nodes};
slogger.info("replace[{}]: Started replace operation, replace_nodes={}, sync_nodes={}, ignore_nodes={}", uuid, replace_nodes, sync_nodes, ignore_nodes);
future<> heartbeat_updater = make_ready_future<>();
auto heartbeat_updater_done = make_lw_shared<bool>(false);
try {
// Step 2: Prepare to sync data
parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("replace[{}]: Got node_ops_cmd::replace_prepare response from node={}", uuid, node);
}).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) {
slogger.warn("replace[{}]: Node {} does not support node_ops_cmd verb", uuid, node);
nodes_unknown_verb.emplace(node);
}).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) {
slogger.warn("replace[{}]: Node {} is down for node_ops_cmd verb", uuid, node);
nodes_down.emplace(node);
});
}).get();
if (!nodes_unknown_verb.empty()) {
auto msg = format("replace[{}]: Nodes={} do not support replace verb. Please upgrade your cluster and run replace again.", uuid, nodes_unknown_verb);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
if (!nodes_down.empty()) {
auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes <list_of_dead_nodes>. E.g., scylla --ignore-dead-nodes 127.0.0.1,127.0.0.2", uuid, nodes_down);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
// Step 3: Start heartbeat updater
heartbeat_updater = seastar::async([this, &sync_nodes, uuid, heartbeat_updater_done] {
slogger.debug("replace[{}]: Started heartbeat_updater", uuid);
while (!(*heartbeat_updater_done)) {
auto req = node_ops_cmd_request{node_ops_cmd::replace_heartbeat, uuid, {}, {}, {}};
parallel_for_each(sync_nodes, [this, req, uuid] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("replace[{}]: Got heartbeat response from node={}", uuid, node);
return make_ready_future<>();
});
}).handle_exception([uuid] (std::exception_ptr ep) {
slogger.warn("replace[{}]: Failed to send heartbeat: {}", uuid, ep);
}).get();
int nr_seconds = 10;
while (!(*heartbeat_updater_done) && nr_seconds--) {
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
}
}
slogger.debug("replace[{}]: Stopped heartbeat_updater", uuid);
});
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});
// Step 4: Allow nodes in sync_nodes list to mark the replacing node as alive
_gossiper.advertise_to_nodes(sync_nodes_generations).get();
slogger.info("replace[{}]: Allow nodes={} to mark replacing node={} as alive", uuid, sync_nodes, get_broadcast_address());
// Step 5: Wait for nodes to finish marking the replacing node as live
req.cmd = node_ops_cmd::replace_prepare_mark_alive;
parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("replace[{}]: Got prepare_mark_alive response from node={}", uuid, node);
return make_ready_future<>();
});
}).get();
// Step 6: Update pending ranges on nodes
req.cmd = node_ops_cmd::replace_prepare_pending_ranges;
parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("replace[{}]: Got pending_ranges response from node={}", uuid, node);
return make_ready_future<>();
});
}).get();
// Step 7: Sync data for replace
replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get();
// Step 8: Finish
req.cmd = node_ops_cmd::replace_done;
parallel_for_each(sync_nodes, [this, &req, &nodes_aborted, uuid] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([&nodes_aborted, uuid, node] (node_ops_cmd_response resp) {
nodes_aborted.emplace(node);
slogger.debug("replace[{}]: Got done response from node={}", uuid, node);
return make_ready_future<>();
});
}).get();
// Allow any nodes to mark the replacing node as alive
_gossiper.advertise_to_nodes({}).get();
slogger.info("replace[{}]: Allow any nodes to mark replacing node={} as alive", uuid, get_broadcast_address());
} catch (...) {
slogger.error("replace[{}]: Abort replace operation started, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}",
uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception());
// we need to revert the effect of prepare verb the replace ops is failed
req.cmd = node_ops_cmd::replace_abort;
parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, &nodes_aborted, uuid] (const gms::inet_address& node) {
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || nodes_aborted.contains(node)) {
// No need to revert previous prepare cmd for those who do not apply prepare cmd.
return make_ready_future<>();
}
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("replace[{}]: Got abort response from node={}", uuid, node);
});
}).get();
slogger.error("replace[{}]: Abort replace operation finished, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}",
uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception());
throw;
}
}
future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes) {
return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] {
@@ -1901,7 +2048,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet
// Step 2: Prepare to sync data
std::unordered_set<gms::inet_address> nodes_unknown_verb;
std::unordered_set<gms::inet_address> nodes_down;
auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes};
auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes, {}};
try {
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
@@ -1929,7 +2076,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet
heartbeat_updater = seastar::async([&ss, &nodes, uuid, heartbeat_updater_done] {
slogger.debug("removenode[{}]: Started heartbeat_updater", uuid);
while (!(*heartbeat_updater_done)) {
auto req = node_ops_cmd_request{node_ops_cmd::removenode_heartbeat, uuid, {}, {}};
auto req = node_ops_cmd_request{node_ops_cmd::removenode_heartbeat, uuid, {}, {}, {}};
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("removenode[{}]: Got heartbeat response from node={}", uuid, node);
@@ -1993,11 +2140,42 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet
});
}
void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req) {
auto ops_uuids = boost::copy_range<std::vector<utils::UUID>>(_node_ops| boost::adaptors::map_keys);
std::string msg;
if (req.cmd == node_ops_cmd::removenode_prepare || req.cmd == node_ops_cmd::replace_prepare) {
// Peer node wants to start a new node operation. Make sure no pending node operation is in progress.
if (!_node_ops.empty()) {
msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress",
get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids);
}
} else {
if (ops_uuids.size() == 1 && ops_uuids.front() == req.ops_uuid) {
// Check is good, since we know this ops_uuid and this is the only ops_uuid we are working on.
} else if (ops_uuids.size() == 0) {
// The ops_uuid received is unknown. Fail the request.
msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, the node ops is unknown",
get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids);
} else {
// Other node ops is in progress. Fail the request.
msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress",
get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids);
}
}
if (!msg.empty()) {
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
}
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable {
return seastar::async([&ss, coordinator, req = std::move(req)] () mutable {
auto ops_uuid = req.ops_uuid;
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid);
ss.node_ops_cmd_check(coordinator, req);
if (req.cmd == node_ops_cmd::removenode_prepare) {
if (req.leaving_nodes.size() > 1) {
auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
@@ -2041,6 +2219,60 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
} else if (req.cmd == node_ops_cmd::removenode_abort) {
ss.node_ops_abort(ops_uuid);
} else if (req.cmd == node_ops_cmd::replace_prepare) {
// Mark the replacing node as replacing
if (req.replace_nodes.size() > 1) {
auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& x: req.replace_nodes) {
auto existing_node = x.first;
auto replacing_node = x.second;
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
tmptr->add_replacing_endpoint(existing_node, replacing_node);
}
return make_ready_future<>();
}).get();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& x: req.replace_nodes) {
auto existing_node = x.first;
auto replacing_node = x.second;
slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
tmptr->del_replacing_endpoint(existing_node);
}
return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
});
},
[&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); });
ss._node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
// Wait for local node has marked replacing node as alive
auto nodes = boost::copy_range<std::vector<inet_address>>(req.replace_nodes| boost::adaptors::map_values);
try {
ss._gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000));
} catch (...) {
slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}",
req.ops_uuid, req.replace_nodes, std::current_exception());
throw;
}
} else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) {
// Update the pending_ranges for the replacing node
slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator);
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
}).get();
} else if (req.cmd == node_ops_cmd::replace_heartbeat) {
slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
ss.node_ops_update_heartbeat(ops_uuid);
} else if (req.cmd == node_ops_cmd::replace_done) {
slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
ss.node_ops_done(ops_uuid);
} else if (req.cmd == node_ops_cmd::replace_abort) {
ss.node_ops_abort(ops_uuid);
} else {
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd));
slogger.warn("{}", msg);

View File

@@ -379,6 +379,8 @@ private:
future<replacement_info> prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes);
void run_replace_ops();
public:
future<bool> is_initialized();
@@ -799,6 +801,7 @@ public:
*/
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
future<sstring> get_operation_mode();

View File

@@ -111,7 +111,7 @@ public:
return messaging_service::no_wait();
});
ms.register_gossip_echo([] {
ms.register_gossip_echo([] (const rpc::client_info& cinfo, rpc::optional<int64_t> gen_opt) {
fmt::print("Server got gossip echo msg\n");
throw std::runtime_error("I'm throwing runtime_error exception");
return make_ready_future<>();
@@ -149,7 +149,8 @@ public:
future<> test_echo() {
fmt::print("=== {} ===\n", __func__);
auto id = get_msg_addr();
return ms.send_gossip_echo(id).then_wrapped([] (auto&& f) {
int64_t gen = 0x1;
return ms.send_gossip_echo(id, gen).then_wrapped([] (auto&& f) {
try {
f.get();
return make_ready_future<>();