|
|
|
|
@@ -682,6 +682,8 @@ void storage_service::bootstrap() {
|
|
|
|
|
// Wait until we know tokens of existing node before announcing replacing status.
|
|
|
|
|
set_mode(mode::JOINING, sprint("Wait until local node knows tokens of peer nodes"), true);
|
|
|
|
|
_gossiper.wait_for_range_setup().get();
|
|
|
|
|
|
|
|
|
|
if (!is_repair_based_node_ops_enabled()) {
|
|
|
|
|
set_mode(mode::JOINING, sprint("Announce tokens and status of the replacing node"), true);
|
|
|
|
|
_gossiper.add_local_application_state({
|
|
|
|
|
{ gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) },
|
|
|
|
|
@@ -691,6 +693,7 @@ void storage_service::bootstrap() {
|
|
|
|
|
_gossiper.advertise_myself().get();
|
|
|
|
|
set_mode(mode::JOINING, format("Wait until peer nodes know the bootstrap tokens of local node"), true);
|
|
|
|
|
_gossiper.wait_for_range_setup().get();
|
|
|
|
|
}
|
|
|
|
|
auto replace_addr = db().local().get_replace_address();
|
|
|
|
|
if (replace_addr) {
|
|
|
|
|
slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
|
|
|
|
|
@@ -707,7 +710,7 @@ void storage_service::bootstrap() {
|
|
|
|
|
set_mode(mode::JOINING, "Starting to bootstrap...", true);
|
|
|
|
|
if (is_repair_based_node_ops_enabled()) {
|
|
|
|
|
if (db().local().is_replacing()) {
|
|
|
|
|
replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get();
|
|
|
|
|
run_replace_ops();
|
|
|
|
|
} else {
|
|
|
|
|
bootstrap_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get();
|
|
|
|
|
}
|
|
|
|
|
@@ -1911,6 +1914,150 @@ future<> storage_service::decommission() {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Runs inside seastar::async context
|
|
|
|
|
void storage_service::run_replace_ops() {
|
|
|
|
|
if (!db().local().get_replace_address()) {
|
|
|
|
|
throw std::runtime_error(format("replace_address is empty"));
|
|
|
|
|
}
|
|
|
|
|
auto replace_address = db().local().get_replace_address().value();
|
|
|
|
|
auto uuid = utils::make_random_uuid();
|
|
|
|
|
// TODO: Specify ignore_nodes
|
|
|
|
|
std::list<gms::inet_address> ignore_nodes;
|
|
|
|
|
// Step 1: Decide who needs to sync data for replace operation
|
|
|
|
|
std::list<gms::inet_address> sync_nodes;
|
|
|
|
|
for (const auto& x :_gossiper.endpoint_state_map) {
|
|
|
|
|
seastar::thread::maybe_yield();
|
|
|
|
|
const auto& node = x.first;
|
|
|
|
|
slogger.debug("replace[{}]: Check node={}, status={}", uuid, node, _gossiper.get_gossip_status(node));
|
|
|
|
|
if (node != get_broadcast_address() &&
|
|
|
|
|
node != replace_address &&
|
|
|
|
|
_gossiper.is_normal_ring_member(node) &&
|
|
|
|
|
std::find(ignore_nodes.begin(), ignore_nodes.end(), x.first) == ignore_nodes.end()) {
|
|
|
|
|
sync_nodes.push_back(node);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
sync_nodes.push_front(get_broadcast_address());
|
|
|
|
|
auto sync_nodes_generations = _gossiper.get_generation_for_nodes(sync_nodes).get();
|
|
|
|
|
// Map existing nodes to replacing nodes
|
|
|
|
|
std::unordered_map<gms::inet_address, gms::inet_address> replace_nodes = {
|
|
|
|
|
{replace_address, get_broadcast_address()},
|
|
|
|
|
};
|
|
|
|
|
std::unordered_set<gms::inet_address> nodes_unknown_verb;
|
|
|
|
|
std::unordered_set<gms::inet_address> nodes_down;
|
|
|
|
|
std::unordered_set<gms::inet_address> nodes_aborted;
|
|
|
|
|
auto req = node_ops_cmd_request{node_ops_cmd::replace_prepare, uuid, ignore_nodes, {}, replace_nodes};
|
|
|
|
|
slogger.info("replace[{}]: Started replace operation, replace_nodes={}, sync_nodes={}, ignore_nodes={}", uuid, replace_nodes, sync_nodes, ignore_nodes);
|
|
|
|
|
future<> heartbeat_updater = make_ready_future<>();
|
|
|
|
|
auto heartbeat_updater_done = make_lw_shared<bool>(false);
|
|
|
|
|
try {
|
|
|
|
|
// Step 2: Prepare to sync data
|
|
|
|
|
parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
|
|
|
|
|
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
|
|
|
|
slogger.debug("replace[{}]: Got node_ops_cmd::replace_prepare response from node={}", uuid, node);
|
|
|
|
|
}).handle_exception_type([&nodes_unknown_verb, node, uuid] (seastar::rpc::unknown_verb_error&) {
|
|
|
|
|
slogger.warn("replace[{}]: Node {} does not support node_ops_cmd verb", uuid, node);
|
|
|
|
|
nodes_unknown_verb.emplace(node);
|
|
|
|
|
}).handle_exception_type([&nodes_down, node, uuid] (seastar::rpc::closed_error&) {
|
|
|
|
|
slogger.warn("replace[{}]: Node {} is down for node_ops_cmd verb", uuid, node);
|
|
|
|
|
nodes_down.emplace(node);
|
|
|
|
|
});
|
|
|
|
|
}).get();
|
|
|
|
|
if (!nodes_unknown_verb.empty()) {
|
|
|
|
|
auto msg = format("replace[{}]: Nodes={} do not support replace verb. Please upgrade your cluster and run replace again.", uuid, nodes_unknown_verb);
|
|
|
|
|
slogger.warn("{}", msg);
|
|
|
|
|
throw std::runtime_error(msg);
|
|
|
|
|
}
|
|
|
|
|
if (!nodes_down.empty()) {
|
|
|
|
|
auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes <list_of_dead_nodes>. E.g., scylla --ignore-dead-nodes 127.0.0.1,127.0.0.2", uuid, nodes_down);
|
|
|
|
|
slogger.warn("{}", msg);
|
|
|
|
|
throw std::runtime_error(msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Step 3: Start heartbeat updater
|
|
|
|
|
heartbeat_updater = seastar::async([this, &sync_nodes, uuid, heartbeat_updater_done] {
|
|
|
|
|
slogger.debug("replace[{}]: Started heartbeat_updater", uuid);
|
|
|
|
|
while (!(*heartbeat_updater_done)) {
|
|
|
|
|
auto req = node_ops_cmd_request{node_ops_cmd::replace_heartbeat, uuid, {}, {}, {}};
|
|
|
|
|
parallel_for_each(sync_nodes, [this, req, uuid] (const gms::inet_address& node) {
|
|
|
|
|
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
|
|
|
|
slogger.debug("replace[{}]: Got heartbeat response from node={}", uuid, node);
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
});
|
|
|
|
|
}).handle_exception([uuid] (std::exception_ptr ep) {
|
|
|
|
|
slogger.warn("replace[{}]: Failed to send heartbeat: {}", uuid, ep);
|
|
|
|
|
}).get();
|
|
|
|
|
int nr_seconds = 10;
|
|
|
|
|
while (!(*heartbeat_updater_done) && nr_seconds--) {
|
|
|
|
|
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
slogger.debug("replace[{}]: Stopped heartbeat_updater", uuid);
|
|
|
|
|
});
|
|
|
|
|
auto stop_heartbeat_updater = defer([&] {
|
|
|
|
|
*heartbeat_updater_done = true;
|
|
|
|
|
heartbeat_updater.get();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Step 4: Allow nodes in sync_nodes list to mark the replacing node as alive
|
|
|
|
|
_gossiper.advertise_to_nodes(sync_nodes_generations).get();
|
|
|
|
|
slogger.info("replace[{}]: Allow nodes={} to mark replacing node={} as alive", uuid, sync_nodes, get_broadcast_address());
|
|
|
|
|
|
|
|
|
|
// Step 5: Wait for nodes to finish marking the replacing node as live
|
|
|
|
|
req.cmd = node_ops_cmd::replace_prepare_mark_alive;
|
|
|
|
|
parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) {
|
|
|
|
|
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
|
|
|
|
slogger.debug("replace[{}]: Got prepare_mark_alive response from node={}", uuid, node);
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
});
|
|
|
|
|
}).get();
|
|
|
|
|
|
|
|
|
|
// Step 6: Update pending ranges on nodes
|
|
|
|
|
req.cmd = node_ops_cmd::replace_prepare_pending_ranges;
|
|
|
|
|
parallel_for_each(sync_nodes, [this, &req, uuid] (const gms::inet_address& node) {
|
|
|
|
|
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
|
|
|
|
slogger.debug("replace[{}]: Got pending_ranges response from node={}", uuid, node);
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
});
|
|
|
|
|
}).get();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Step 7: Sync data for replace
|
|
|
|
|
replace_with_repair(_db, _messaging, get_token_metadata_ptr(), _bootstrap_tokens).get();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Step 8: Finish
|
|
|
|
|
req.cmd = node_ops_cmd::replace_done;
|
|
|
|
|
parallel_for_each(sync_nodes, [this, &req, &nodes_aborted, uuid] (const gms::inet_address& node) {
|
|
|
|
|
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([&nodes_aborted, uuid, node] (node_ops_cmd_response resp) {
|
|
|
|
|
nodes_aborted.emplace(node);
|
|
|
|
|
slogger.debug("replace[{}]: Got done response from node={}", uuid, node);
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
});
|
|
|
|
|
}).get();
|
|
|
|
|
// Allow any nodes to mark the replacing node as alive
|
|
|
|
|
_gossiper.advertise_to_nodes({}).get();
|
|
|
|
|
slogger.info("replace[{}]: Allow any nodes to mark replacing node={} as alive", uuid, get_broadcast_address());
|
|
|
|
|
} catch (...) {
|
|
|
|
|
slogger.error("replace[{}]: Abort replace operation started, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}",
|
|
|
|
|
uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception());
|
|
|
|
|
// we need to revert the effect of prepare verb the replace ops is failed
|
|
|
|
|
req.cmd = node_ops_cmd::replace_abort;
|
|
|
|
|
parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, &nodes_aborted, uuid] (const gms::inet_address& node) {
|
|
|
|
|
if (nodes_unknown_verb.contains(node) || nodes_down.contains(node) || nodes_aborted.contains(node)) {
|
|
|
|
|
// No need to revert previous prepare cmd for those who do not apply prepare cmd.
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
|
|
|
|
slogger.debug("replace[{}]: Got abort response from node={}", uuid, node);
|
|
|
|
|
});
|
|
|
|
|
}).get();
|
|
|
|
|
slogger.error("replace[{}]: Abort replace operation finished, replace_nodes={}, sync_nodes={}, ignore_nodes={}: {}",
|
|
|
|
|
uuid, replace_nodes, sync_nodes, ignore_nodes, std::current_exception());
|
|
|
|
|
throw;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes) {
|
|
|
|
|
return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable {
|
|
|
|
|
return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] {
|
|
|
|
|
@@ -1949,7 +2096,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet
|
|
|
|
|
// Step 2: Prepare to sync data
|
|
|
|
|
std::unordered_set<gms::inet_address> nodes_unknown_verb;
|
|
|
|
|
std::unordered_set<gms::inet_address> nodes_down;
|
|
|
|
|
auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes};
|
|
|
|
|
auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, leaving_nodes, {}};
|
|
|
|
|
try {
|
|
|
|
|
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
|
|
|
|
|
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
|
|
|
|
@@ -1977,7 +2124,7 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet
|
|
|
|
|
heartbeat_updater = seastar::async([&ss, &nodes, uuid, heartbeat_updater_done] {
|
|
|
|
|
slogger.debug("removenode[{}]: Started heartbeat_updater", uuid);
|
|
|
|
|
while (!(*heartbeat_updater_done)) {
|
|
|
|
|
auto req = node_ops_cmd_request{node_ops_cmd::removenode_heartbeat, uuid, {}, {}};
|
|
|
|
|
auto req = node_ops_cmd_request{node_ops_cmd::removenode_heartbeat, uuid, {}, {}, {}};
|
|
|
|
|
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
|
|
|
|
|
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
|
|
|
|
slogger.debug("removenode[{}]: Got heartbeat response from node={}", uuid, node);
|
|
|
|
|
@@ -2041,11 +2188,42 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req) {
|
|
|
|
|
auto ops_uuids = boost::copy_range<std::vector<utils::UUID>>(_node_ops| boost::adaptors::map_keys);
|
|
|
|
|
std::string msg;
|
|
|
|
|
if (req.cmd == node_ops_cmd::removenode_prepare || req.cmd == node_ops_cmd::replace_prepare) {
|
|
|
|
|
// Peer node wants to start a new node operation. Make sure no pending node operation is in progress.
|
|
|
|
|
if (!_node_ops.empty()) {
|
|
|
|
|
msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress",
|
|
|
|
|
get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if (ops_uuids.size() == 1 && ops_uuids.front() == req.ops_uuid) {
|
|
|
|
|
// Check is good, since we know this ops_uuid and this is the only ops_uuid we are working on.
|
|
|
|
|
} else if (ops_uuids.size() == 0) {
|
|
|
|
|
// The ops_uuid received is unknown. Fail the request.
|
|
|
|
|
msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, the node ops is unknown",
|
|
|
|
|
get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids);
|
|
|
|
|
} else {
|
|
|
|
|
// Other node ops is in progress. Fail the request.
|
|
|
|
|
msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress",
|
|
|
|
|
get_broadcast_address(), uint32_t(req.cmd), coordinator, req.ops_uuid, ops_uuids);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (!msg.empty()) {
|
|
|
|
|
slogger.warn("{}", msg);
|
|
|
|
|
throw std::runtime_error(msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
|
|
|
|
|
return get_storage_service().invoke_on(0, [coordinator, req = std::move(req)] (auto& ss) mutable {
|
|
|
|
|
return seastar::async([&ss, coordinator, req = std::move(req)] () mutable {
|
|
|
|
|
auto ops_uuid = req.ops_uuid;
|
|
|
|
|
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid);
|
|
|
|
|
|
|
|
|
|
ss.node_ops_cmd_check(coordinator, req);
|
|
|
|
|
|
|
|
|
|
if (req.cmd == node_ops_cmd::removenode_prepare) {
|
|
|
|
|
if (req.leaving_nodes.size() > 1) {
|
|
|
|
|
auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
|
|
|
|
|
@@ -2089,6 +2267,60 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
|
|
|
|
}
|
|
|
|
|
} else if (req.cmd == node_ops_cmd::removenode_abort) {
|
|
|
|
|
ss.node_ops_abort(ops_uuid);
|
|
|
|
|
} else if (req.cmd == node_ops_cmd::replace_prepare) {
|
|
|
|
|
// Mark the replacing node as replacing
|
|
|
|
|
if (req.replace_nodes.size() > 1) {
|
|
|
|
|
auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes);
|
|
|
|
|
slogger.warn("{}", msg);
|
|
|
|
|
throw std::runtime_error(msg);
|
|
|
|
|
}
|
|
|
|
|
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
|
|
|
|
for (auto& x: req.replace_nodes) {
|
|
|
|
|
auto existing_node = x.first;
|
|
|
|
|
auto replacing_node = x.second;
|
|
|
|
|
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
|
|
|
|
tmptr->add_replacing_endpoint(existing_node, replacing_node);
|
|
|
|
|
}
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}).get();
|
|
|
|
|
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
|
|
|
|
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
|
|
|
|
|
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
|
|
|
|
for (auto& x: req.replace_nodes) {
|
|
|
|
|
auto existing_node = x.first;
|
|
|
|
|
auto replacing_node = x.second;
|
|
|
|
|
slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
|
|
|
|
tmptr->del_replacing_endpoint(existing_node);
|
|
|
|
|
}
|
|
|
|
|
return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
[&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); });
|
|
|
|
|
ss._node_ops.emplace(ops_uuid, std::move(meta));
|
|
|
|
|
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
|
|
|
|
|
// Wait for local node has marked replacing node as alive
|
|
|
|
|
auto nodes = boost::copy_range<std::vector<inet_address>>(req.replace_nodes| boost::adaptors::map_values);
|
|
|
|
|
try {
|
|
|
|
|
ss._gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000));
|
|
|
|
|
} catch (...) {
|
|
|
|
|
slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}",
|
|
|
|
|
req.ops_uuid, req.replace_nodes, std::current_exception());
|
|
|
|
|
throw;
|
|
|
|
|
}
|
|
|
|
|
} else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) {
|
|
|
|
|
// Update the pending_ranges for the replacing node
|
|
|
|
|
slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator);
|
|
|
|
|
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
|
|
|
|
return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
|
|
|
|
|
}).get();
|
|
|
|
|
} else if (req.cmd == node_ops_cmd::replace_heartbeat) {
|
|
|
|
|
slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
|
|
|
|
ss.node_ops_update_heartbeat(ops_uuid);
|
|
|
|
|
} else if (req.cmd == node_ops_cmd::replace_done) {
|
|
|
|
|
slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
|
|
|
|
ss.node_ops_done(ops_uuid);
|
|
|
|
|
} else if (req.cmd == node_ops_cmd::replace_abort) {
|
|
|
|
|
ss.node_ops_abort(ops_uuid);
|
|
|
|
|
} else {
|
|
|
|
|
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd));
|
|
|
|
|
slogger.warn("{}", msg);
|
|
|
|
|
|