From 278c8322859536fdb7bf9ba54f2e65d5c45fe04e Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 31 Oct 2023 23:16:56 +0400 Subject: [PATCH] storage_service: node_ops_cmd_handler: update new token_metadata --- service/storage_service.cc | 63 ++++++++++++++++++++++++++++++++------ 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index c612e83d0f..5f834470c0 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5462,8 +5462,8 @@ void storage_service::node_ops_insert(node_ops_id ops_uuid, on_node_ops_registered(ops_uuid); } -future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, std::optional, node_ops_cmd_request req) { - return seastar::async([this, coordinator, req = std::move(req)] () mutable { +future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, std::optional coordinator_host_id, node_ops_cmd_request req) { + return seastar::async([this, coordinator, coordinator_host_id, req = std::move(req)] () mutable { auto ops_uuid = req.ops_uuid; auto topo_guard = null_topology_guard; slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", req.cmd, ops_uuid); @@ -5506,6 +5506,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad for (auto& node : req.leaving_nodes) { slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->add_leaving_endpoint(node); + tmptr->get_new()->add_leaving_endpoint(tmptr->get_new()->get_host_id(node)); } return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes)); }).get(); @@ -5514,6 +5515,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad for (auto& node : req.leaving_nodes) { slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->del_leaving_endpoint(node); + tmptr->get_new()->del_leaving_endpoint(tmptr->get_new()->get_host_id(node)); } return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes)); }); @@ -5554,6 +5556,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad for (auto& node : req.leaving_nodes) { slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->add_leaving_endpoint(node); + tmptr->get_new()->add_leaving_endpoint(tmptr->get_new()->get_host_id(node)); } return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes)); }).get(); @@ -5562,6 +5565,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad for (auto& node : req.leaving_nodes) { slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); tmptr->del_leaving_endpoint(node); + tmptr->get_new()->del_leaving_endpoint(tmptr->get_new()->get_host_id(node)); } return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes)); }); @@ -5607,23 +5611,51 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.warn("{}", msg); throw std::runtime_error(msg); } - mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable { + if (!coordinator_host_id) { + throw std::runtime_error("Coordinator host_id not found"); + } + mutate_token_metadata([coordinator, coordinator_host_id, &req, this] (mutable_token_metadata_ptr tmptr) mutable { for (auto& x: req.replace_nodes) { auto existing_node = x.first; auto replacing_node = x.second; - slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + const auto existing_node_id = tmptr->get_new()->get_host_id(existing_node); + const auto replacing_node_id = *coordinator_host_id; + slogger.info("replace[{}]: Added replacing_node={}/{} to replace existing_node={}/{}, coordinator={}/{}", + req.ops_uuid, replacing_node, replacing_node_id, existing_node, existing_node_id, coordinator, *coordinator_host_id); tmptr->update_topology(replacing_node, get_dc_rack_for(replacing_node), locator::node::state::replacing); tmptr->add_replacing_endpoint(existing_node, replacing_node); + + // In case of replace-with-same-ip we need to map both host_id-s + // to the same IP. The locator::topology allows this specifically in case + // where one node is being_replaced and another is replacing, + // so here we adjust the state of the original node accordingly. + // The host_id -> IP map works as usual, and IP -> host_id will map + // IP to the being_replaced node - this is what is implied by the + // current code. The IP will be placed in pending_endpoints and + // excluded from normal_endpoints (maybe_remove_node_being_replaced function). + // In handle_state_normal we'll remap the IP to the new host_id. + tmptr->get_new()->update_topology(existing_node_id, std::nullopt, locator::node::state::being_replaced); + tmptr->get_new()->update_topology(replacing_node_id, get_dc_rack_for(replacing_node), locator::node::state::replacing); + tmptr->get_new()->update_host_id(replacing_node_id, replacing_node); + tmptr->get_new()->add_replacing_endpoint(existing_node_id, replacing_node_id); } return make_ready_future<>(); }).get(); - node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable { - return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, coordinator_host_id, req = std::move(req)] () mutable { + return mutate_token_metadata([this, coordinator, coordinator_host_id, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { for (auto& x: req.replace_nodes) { auto existing_node = x.first; auto replacing_node = x.second; - slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + const auto existing_node_id = tmptr->get_new()->get_host_id(existing_node); + const auto replacing_node_id = *coordinator_host_id; + slogger.info("replace[{}]: Removed replacing_node={}/{} to replace existing_node={}/{}, coordinator={}/{}", + req.ops_uuid, replacing_node, replacing_node_id, existing_node, existing_node_id, coordinator, *coordinator_host_id); tmptr->del_replacing_endpoint(existing_node); + + tmptr->get_new()->del_replacing_endpoint(existing_node_id); + const auto dc_rack = get_dc_rack_for(replacing_node); + tmptr->get_new()->update_topology(existing_node_id, dc_rack, locator::node::state::normal); + tmptr->get_new()->remove_endpoint(replacing_node_id); } return update_topology_change_info(tmptr, ::format("replace {}", req.replace_nodes)); }); @@ -5659,13 +5691,23 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad slogger.warn("{}", msg); throw std::runtime_error(msg); } - mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable { + if (!coordinator_host_id) { + throw std::runtime_error("Coordinator host_id not found"); + } + mutate_token_metadata([coordinator, coordinator_host_id, &req, this] (mutable_token_metadata_ptr tmptr) mutable { for (auto& x: req.bootstrap_nodes) { auto& endpoint = x.first; auto tokens = std::unordered_set(x.second.begin(), x.second.end()); - slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); - tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::bootstrapping); + const auto host_id = *coordinator_host_id; + const auto dc_rack = get_dc_rack_for(endpoint); + slogger.info("bootstrap[{}]: Added node={}/{} as bootstrap, coordinator={}/{}", + req.ops_uuid, endpoint, host_id, coordinator, *coordinator_host_id); + tmptr->update_topology(endpoint, dc_rack, locator::node::state::bootstrapping); tmptr->add_bootstrap_tokens(tokens, endpoint); + + tmptr->get_new()->update_host_id(host_id, endpoint); + tmptr->get_new()->update_topology(host_id, dc_rack, locator::node::state::bootstrapping); + tmptr->get_new()->add_bootstrap_tokens(tokens, host_id); } return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); }).get(); @@ -5676,6 +5718,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad auto tokens = std::unordered_set(x.second.begin(), x.second.end()); slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); tmptr->remove_bootstrap_tokens(tokens); + tmptr->get_new()->remove_bootstrap_tokens(tokens); } return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes)); });