From 7c7dbe377944d730c4e34234c4c36acefa5e5d4a Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Sat, 4 Nov 2023 19:32:52 +0400 Subject: [PATCH] decommission_with_repair, removenode_with_repair -> new token_metadata Just mechanical changes to the new token_metadata. All the boost and topology tests pass with this change. --- repair/repair.cc | 19 ++++++++++--------- repair/row_level.hh | 6 +++--- service/storage_service.cc | 6 +++--- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 96bfc988de..b60f4ce76b 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1669,13 +1669,14 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata2_ptr tmpt }); } -future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { +future<> repair_service::do_decommission_removenode_with_repair(locator::token_metadata2_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { assert(this_shard_id() == 0); using inet_address = gms::inet_address; return seastar::async([this, tmptr = std::move(tmptr), leaving_node = std::move(leaving_node), ops] () mutable { auto& db = get_db().local(); auto& topology = tmptr->get_topology(); auto myip = topology.my_address(); + const auto leaving_node_id = tmptr->get_host_id(leaving_node); auto ks_erms = db.get_non_local_strategy_keyspaces_erms(); auto local_dc = topology.get_datacenter(); bool is_removenode = myip != leaving_node; @@ -1719,15 +1720,15 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m // Find (for each range) all nodes that store replicas for these ranges as well for (auto& r : ranges) { auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - auto eps = get(strat.calculate_natural_endpoints(end_token, *tmptr, false).get0()); + auto eps = strat.calculate_natural_ips(end_token, tmptr).get0(); current_replica_endpoints.emplace(r, std::move(eps)); seastar::thread::maybe_yield(); } - auto temp = tmptr->clone_after_all_left().get0(); + auto temp = locator::make_token_metadata2_ptr(tmptr->clone_after_all_left().get0()); // leaving_node might or might not be 'leaving'. If it was not leaving (that is, removenode // command was used), it is still present in temp and must be removed. - if (temp.is_normal_token_owner(leaving_node)) { - temp.remove_endpoint(leaving_node); + if (temp->is_normal_token_owner(leaving_node_id)) { + temp->remove_endpoint(leaving_node_id); } std::unordered_map range_sources; dht::token_range_vector ranges_for_removenode; @@ -1738,7 +1739,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m ops->check_abort(); } auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - const auto new_eps = get(strat.calculate_natural_endpoints(end_token, temp, false).get0()); + const auto new_eps = strat.calculate_natural_ips(end_token, temp).get0(); const auto& current_eps = current_replica_endpoints[r]; std::unordered_set neighbors_set = new_eps.get_set(); bool skip_this_range = false; @@ -1842,7 +1843,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m } } } - temp.clear_gently().get(); + temp->clear_gently().get(); if (reason == streaming::stream_reason::decommission) { container().invoke_on_all([nr_ranges_skipped] (repair_service& rs) { rs.get_metrics().decommission_finished_ranges += nr_ranges_skipped; @@ -1864,13 +1865,13 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m }); } -future<> repair_service::decommission_with_repair(locator::token_metadata_ptr tmptr) { +future<> repair_service::decommission_with_repair(locator::token_metadata2_ptr tmptr) { assert(this_shard_id() == 0); auto my_address = tmptr->get_topology().my_address(); return do_decommission_removenode_with_repair(std::move(tmptr), my_address, {}); } -future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { +future<> repair_service::removenode_with_repair(locator::token_metadata2_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops) { assert(this_shard_id() == 0); return do_decommission_removenode_with_repair(std::move(tmptr), std::move(leaving_node), std::move(ops)).then([this] { rlogger.debug("Triggering off-strategy compaction for all non-system tables on removenode completion"); diff --git a/repair/row_level.hh b/repair/row_level.hh index 9e27aae540..d82ac83f9b 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -139,12 +139,12 @@ public: // The tokens are the tokens assigned to the bootstrap node. // all repair-based node operation entry points must be called on shard 0 future<> bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set bootstrap_tokens); - future<> decommission_with_repair(locator::token_metadata_ptr tmptr); - future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); + future<> decommission_with_repair(locator::token_metadata2_ptr tmptr); + future<> removenode_with_repair(locator::token_metadata2_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); future<> rebuild_with_repair(locator::token_metadata2_ptr tmptr, sstring source_dc); future<> replace_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set replacing_tokens, std::unordered_set ignore_nodes); private: - future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); + future<> do_decommission_removenode_with_repair(locator::token_metadata2_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); future<> do_rebuild_replace_with_repair(locator::token_metadata2_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set ignore_nodes); // Must be called on shard 0 diff --git a/service/storage_service.cc b/service/storage_service.cc index baa8badd44..1075182dce 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5548,7 +5548,7 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad for (auto& node : req.leaving_nodes) { if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { slogger.info("removenode[{}]: Started to sync data for removing node={} using repair, coordinator={}", req.ops_uuid, node, coordinator); - _repair.local().removenode_with_repair(get_token_metadata_ptr(), node, ops).get(); + _repair.local().removenode_with_repair(get_token_metadata_ptr()->get_new_strong(), node, ops).get(); } else { slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator); removenode_with_stream(node, topo_guard, as).get(); @@ -5999,7 +5999,7 @@ future<> storage_service::unbootstrap() { slogger.info("Finished batchlog replay for decommission"); if (is_repair_based_node_ops_enabled(streaming::stream_reason::decommission)) { - co_await _repair.local().decommission_with_repair(get_token_metadata_ptr()); + co_await _repair.local().decommission_with_repair(get_token_metadata_ptr()->get_new_strong()); } else { std::unordered_map> ranges_to_stream; @@ -6583,7 +6583,7 @@ future storage_service::raft_topology_cmd_handler(raft ignored_ips.push_back(*ip); } auto ops = seastar::make_shared(node_ops_id::create_random_id(), as, std::move(ignored_ips)); - return _repair.local().removenode_with_repair(get_token_metadata_ptr(), *ip, ops); + return _repair.local().removenode_with_repair(get_token_metadata_ptr()->get_new_strong(), *ip, ops); } else { return removenode_with_stream(*ip, _topology_state_machine._topology.session, as); }