From ef534ac8764cec02544ec95492de2dd61c165280 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Sat, 4 Nov 2023 20:53:34 +0400 Subject: [PATCH] rebuild_with_repair, replace_with_repair: use new token_metadata Just mechanical changes to the new token_metadata. All the boost and topology tests pass with this change. --- locator/abstract_replication_strategy.cc | 10 +++++----- locator/abstract_replication_strategy.hh | 4 ++-- repair/repair.cc | 20 ++++++++++---------- repair/row_level.hh | 6 +++--- service/storage_service.cc | 8 ++++---- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 162c4e0a71..751c1519a5 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -256,18 +256,18 @@ vnode_effective_replication_map::get_ranges(inet_address ep) const { // Caller must ensure that token_metadata will not change throughout the call. future -abstract_replication_strategy::get_ranges(inet_address ep, token_metadata_ptr tmptr) const { +abstract_replication_strategy::get_ranges(locator::host_id ep, token_metadata_ptr tmptr) const { co_return co_await get_ranges(ep, *tmptr); } // Caller must ensure that token_metadata will not change throughout the call. future -abstract_replication_strategy::get_ranges(inet_address ep, const token_metadata& tm) const { +abstract_replication_strategy::get_ranges(locator::host_id ep, const token_metadata& tm) const { dht::token_range_vector ret; - if (!tm.is_normal_token_owner(ep)) { + if (!tm.get_new()->is_normal_token_owner(ep)) { co_return ret; } - const auto& sorted_tokens = tm.sorted_tokens(); + const auto& sorted_tokens = tm.get_new()->sorted_tokens(); if (sorted_tokens.empty()) { on_internal_error(rslogger, "Token metadata is empty"); } @@ -279,7 +279,7 @@ abstract_replication_strategy::get_ranges(inet_address ep, const token_metadata& // Using the common path would make the function quadratic in the number of endpoints. should_add = true; } else { - auto eps = get(co_await calculate_natural_endpoints(tok, tm, false)); + auto eps = get(co_await calculate_natural_endpoints(tok, tm, true)); should_add = eps.contains(ep); } if (should_add) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 3da8fce7a3..87d81f1e97 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -168,8 +168,8 @@ public: // Use the token_metadata provided by the caller instead of _token_metadata // Note: must be called with initialized, non-empty token_metadata. - future get_ranges(inet_address ep, token_metadata_ptr tmptr) const; - future get_ranges(inet_address ep, const token_metadata& tm) const; + future get_ranges(locator::host_id ep, token_metadata_ptr tmptr) const; + future get_ranges(locator::host_id ep, const token_metadata& tm) const; // Caller must ensure that token_metadata will not change throughout the call. future> get_range_addresses(const token_metadata& tm) const; diff --git a/repair/repair.cc b/repair/repair.cc index b0fe70444a..96bfc988de 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1883,12 +1883,13 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt }); } -future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set ignore_nodes) { +future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata2_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set ignore_nodes) { assert(this_shard_id() == 0); return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable { auto& db = get_db().local(); auto ks_erms = db.get_non_local_strategy_keyspaces_erms(); auto myip = tmptr->get_topology().my_address(); + auto myid = tmptr->get_my_id(); size_t nr_ranges_total = 0; for (const auto& [keyspace_name, erm] : ks_erms) { if (!db.has_keyspace(keyspace_name)) { @@ -1896,7 +1897,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ } auto& strat = erm->get_replication_strategy(); // Okay to yield since tm is immutable - dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0(); + dht::token_range_vector ranges = strat.get_ranges(myid, locator::token_metadata(tmptr)).get0(); auto nr_tables = get_nr_tables(db, keyspace_name); nr_ranges_total += ranges.size() * nr_tables; @@ -1920,7 +1921,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ continue; } auto& strat = erm->get_replication_strategy(); - dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0(); + dht::token_range_vector ranges = strat.get_ranges(myid, locator::token_metadata(tmptr)).get0(); auto& topology = erm->get_token_metadata().get_topology(); std::unordered_map range_sources; auto nr_tables = get_nr_tables(db, keyspace_name); @@ -1929,7 +1930,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ auto& r = *it; seastar::thread::maybe_yield(); auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - auto neighbors = boost::copy_range>(get(strat.calculate_natural_endpoints(end_token, *tmptr, false).get0()) | + auto neighbors = boost::copy_range>(strat.calculate_natural_ips(end_token, tmptr).get0() | boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) { if (node == myip) { return false; @@ -1967,7 +1968,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ }); } -future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) { +future<> repair_service::rebuild_with_repair(locator::token_metadata2_ptr tmptr, sstring source_dc) { assert(this_shard_id() == 0); auto op = sstring("rebuild_with_repair"); if (source_dc.empty()) { @@ -1983,19 +1984,18 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, }); } -future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set replacing_tokens, std::unordered_set ignore_nodes) { +future<> repair_service::replace_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set replacing_tokens, std::unordered_set ignore_nodes) { assert(this_shard_id() == 0); auto cloned_tm = co_await tmptr->clone_async(); auto op = sstring("replace_with_repair"); auto& topology = tmptr->get_topology(); - auto myip = topology.my_address(); auto myloc = topology.get_location(); auto reason = streaming::stream_reason::replace; // update a cloned version of tmptr // no need to set the original version - auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm)); - cloned_tmptr->update_topology(myip, myloc, locator::node::state::replacing); - co_await cloned_tmptr->update_normal_tokens(replacing_tokens, myip); + auto cloned_tmptr = make_token_metadata2_ptr(std::move(cloned_tm)); + cloned_tmptr->update_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing); + co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id()); co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), myloc.dc, reason, std::move(ignore_nodes)); } diff --git a/repair/row_level.hh b/repair/row_level.hh index 61251f1bb8..9e27aae540 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -141,11 +141,11 @@ public: future<> bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set bootstrap_tokens); future<> decommission_with_repair(locator::token_metadata_ptr tmptr); future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); - future<> rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc); - future<> replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set replacing_tokens, std::unordered_set ignore_nodes); + future<> rebuild_with_repair(locator::token_metadata2_ptr tmptr, sstring source_dc); + future<> replace_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set replacing_tokens, std::unordered_set ignore_nodes); private: future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); - future<> do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set ignore_nodes); + future<> do_rebuild_replace_with_repair(locator::token_metadata2_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set ignore_nodes); // Must be called on shard 0 future<> sync_data_using_repair(sstring keyspace, diff --git a/service/storage_service.cc b/service/storage_service.cc index eb79d949f5..baa8badd44 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5149,7 +5149,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token // Step 7: Sync data for replace if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) { slogger.info("replace[{}]: Using repair based node ops to sync data", uuid); - _repair.local().replace_with_repair(get_token_metadata_ptr(), bootstrap_tokens, ctl.ignore_nodes).get(); + _repair.local().replace_with_repair(get_token_metadata_ptr()->get_new_strong(), bootstrap_tokens, ctl.ignore_nodes).get(); } else { slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid); dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()->get_new_strong()); @@ -5890,7 +5890,7 @@ future<> storage_service::rebuild(sstring source_dc) { slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); auto tmptr = ss.get_token_metadata_ptr(); if (ss.is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { - co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); + co_await ss._repair.local().rebuild_with_repair(tmptr->get_new_strong(), std::move(source_dc)); } else { auto streamer = make_lw_shared(ss._db, ss._stream_manager, tmptr->get_new_strong(), ss._abort_source, tmptr->get_new()->get_my_id(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard); @@ -6526,7 +6526,7 @@ future storage_service::raft_topology_cmd_handler(raft } ignored_ips.insert(*ip); } - co_await _repair.local().replace_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens, std::move(ignored_ips)); + co_await _repair.local().replace_with_repair(get_token_metadata_ptr()->get_new_strong(), rs.ring.value().tokens, std::move(ignored_ips)); } else { dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()->get_new_strong()); @@ -6597,7 +6597,7 @@ future storage_service::raft_topology_cmd_handler(raft co_await retrier(_rebuild_result, [&] () -> future<> { auto tmptr = get_token_metadata_ptr(); if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { - co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); + co_await _repair.local().rebuild_with_repair(tmptr->get_new_strong(), std::move(source_dc)); } else { auto streamer = make_lw_shared(_db, _stream_manager, tmptr->get_new_strong(), _abort_source, tmptr->get_new()->get_my_id(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, _topology_state_machine._topology.session);