storage_service: node_ops_cmd_handler: update new token_metadata
This commit is contained in:
@@ -5462,8 +5462,8 @@ void storage_service::node_ops_insert(node_ops_id ops_uuid,
|
||||
on_node_ops_registered(ops_uuid);
|
||||
}
|
||||
|
||||
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id>, node_ops_cmd_request req) {
|
||||
return seastar::async([this, coordinator, req = std::move(req)] () mutable {
|
||||
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req) {
|
||||
return seastar::async([this, coordinator, coordinator_host_id, req = std::move(req)] () mutable {
|
||||
auto ops_uuid = req.ops_uuid;
|
||||
auto topo_guard = null_topology_guard;
|
||||
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", req.cmd, ops_uuid);
|
||||
@@ -5506,6 +5506,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
tmptr->get_new()->add_leaving_endpoint(tmptr->get_new()->get_host_id(node));
|
||||
}
|
||||
return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
@@ -5514,6 +5515,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
tmptr->get_new()->del_leaving_endpoint(tmptr->get_new()->get_host_id(node));
|
||||
}
|
||||
return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes));
|
||||
});
|
||||
@@ -5554,6 +5556,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
tmptr->get_new()->add_leaving_endpoint(tmptr->get_new()->get_host_id(node));
|
||||
}
|
||||
return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes));
|
||||
}).get();
|
||||
@@ -5562,6 +5565,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
tmptr->get_new()->del_leaving_endpoint(tmptr->get_new()->get_host_id(node));
|
||||
}
|
||||
return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes));
|
||||
});
|
||||
@@ -5607,23 +5611,51 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
if (!coordinator_host_id) {
|
||||
throw std::runtime_error("Coordinator host_id not found");
|
||||
}
|
||||
mutate_token_metadata([coordinator, coordinator_host_id, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
const auto existing_node_id = tmptr->get_new()->get_host_id(existing_node);
|
||||
const auto replacing_node_id = *coordinator_host_id;
|
||||
slogger.info("replace[{}]: Added replacing_node={}/{} to replace existing_node={}/{}, coordinator={}/{}",
|
||||
req.ops_uuid, replacing_node, replacing_node_id, existing_node, existing_node_id, coordinator, *coordinator_host_id);
|
||||
tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::node::state::replacing);
|
||||
tmptr->add_replacing_endpoint(existing_node, replacing_node);
|
||||
|
||||
// In case of replace-with-same-ip we need to map both host_id-s
|
||||
// to the same IP. The locator::topology allows this specifically in case
|
||||
// where one node is being_replaced and another is replacing,
|
||||
// so here we adjust the state of the original node accordingly.
|
||||
// The host_id -> IP map works as usual, and IP -> host_id will map
|
||||
// IP to the being_replaced node - this is what is implied by the
|
||||
// current code. The IP will be placed in pending_endpoints and
|
||||
// excluded from normal_endpoints (maybe_remove_node_being_replaced function).
|
||||
// In handle_state_normal we'll remap the IP to the new host_id.
|
||||
tmptr->get_new()->update_topology(existing_node_id, std::nullopt, locator::node::state::being_replaced);
|
||||
tmptr->get_new()->update_topology(replacing_node_id, get_dc_rack_for(replacing_node), locator::node::state::replacing);
|
||||
tmptr->get_new()->update_host_id(replacing_node_id, replacing_node);
|
||||
tmptr->get_new()->add_replacing_endpoint(existing_node_id, replacing_node_id);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, coordinator_host_id, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, coordinator_host_id, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
const auto existing_node_id = tmptr->get_new()->get_host_id(existing_node);
|
||||
const auto replacing_node_id = *coordinator_host_id;
|
||||
slogger.info("replace[{}]: Removed replacing_node={}/{} to replace existing_node={}/{}, coordinator={}/{}",
|
||||
req.ops_uuid, replacing_node, replacing_node_id, existing_node, existing_node_id, coordinator, *coordinator_host_id);
|
||||
tmptr->del_replacing_endpoint(existing_node);
|
||||
|
||||
tmptr->get_new()->del_replacing_endpoint(existing_node_id);
|
||||
const auto dc_rack = get_dc_rack_for(replacing_node);
|
||||
tmptr->get_new()->update_topology(existing_node_id, dc_rack, locator::node::state::normal);
|
||||
tmptr->get_new()->remove_endpoint(replacing_node_id);
|
||||
}
|
||||
return update_topology_change_info(tmptr, ::format("replace {}", req.replace_nodes));
|
||||
});
|
||||
@@ -5659,13 +5691,23 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
if (!coordinator_host_id) {
|
||||
throw std::runtime_error("Coordinator host_id not found");
|
||||
}
|
||||
mutate_token_metadata([coordinator, coordinator_host_id, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::bootstrapping);
|
||||
const auto host_id = *coordinator_host_id;
|
||||
const auto dc_rack = get_dc_rack_for(endpoint);
|
||||
slogger.info("bootstrap[{}]: Added node={}/{} as bootstrap, coordinator={}/{}",
|
||||
req.ops_uuid, endpoint, host_id, coordinator, *coordinator_host_id);
|
||||
tmptr->update_topology(endpoint, dc_rack, locator::node::state::bootstrapping);
|
||||
tmptr->add_bootstrap_tokens(tokens, endpoint);
|
||||
|
||||
tmptr->get_new()->update_host_id(host_id, endpoint);
|
||||
tmptr->get_new()->update_topology(host_id, dc_rack, locator::node::state::bootstrapping);
|
||||
tmptr->get_new()->add_bootstrap_tokens(tokens, host_id);
|
||||
}
|
||||
return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));
|
||||
}).get();
|
||||
@@ -5676,6 +5718,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->remove_bootstrap_tokens(tokens);
|
||||
tmptr->get_new()->remove_bootstrap_tokens(tokens);
|
||||
}
|
||||
return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user