diff --git a/dht/boot_strapper.hh b/dht/boot_strapper.hh index 0c50c043e7..599ea0ee84 100644 --- a/dht/boot_strapper.hh +++ b/dht/boot_strapper.hh @@ -29,21 +29,23 @@ using check_token_endpoint = bool_class; class boot_strapper { using inet_address = gms::inet_address; using token_metadata = locator::token_metadata; + using token_metadata2 = locator::token_metadata2; using token_metadata_ptr = locator::token_metadata_ptr; + using token_metadata2_ptr = locator::token_metadata2_ptr; using token = dht::token; distributed& _db; sharded& _stream_manager; abort_source& _abort_source; /* endpoint that needs to be bootstrapped */ - inet_address _address; + locator::host_id _address; /* its DC/RACK info */ locator::endpoint_dc_rack _dr; /* token of the node being bootstrapped. */ std::unordered_set _tokens; - const token_metadata_ptr _token_metadata_ptr; + const locator::token_metadata2_ptr _token_metadata_ptr; public: boot_strapper(distributed& db, sharded& sm, abort_source& abort_source, - inet_address addr, locator::endpoint_dc_rack dr, std::unordered_set tokens, const token_metadata_ptr tmptr) + locator::host_id addr, locator::endpoint_dc_rack dr, std::unordered_set tokens, const token_metadata2_ptr tmptr) : _db(db) , _stream_manager(sm) , _abort_source(abort_source) @@ -91,7 +93,7 @@ public: #endif private: - const token_metadata& get_token_metadata() { + const token_metadata2& get_token_metadata() { return *_token_metadata_ptr; } }; diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 8d2d4ba9e7..f27d3a48e7 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -88,6 +88,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, lo logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size()); std::unordered_map> range_sources; + const auto address_ep = get_token_metadata().get_endpoint_for_host_id(_address); for (auto& desired_range : desired_ranges) { auto found = false; for (auto& x : range_addresses) { @@ -97,7 +98,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, lo const range& src_range = x.first; if (src_range.contains(desired_range, dht::operator<=>)) { inet_address_vector_replica_set preferred(x.second.begin(), x.second.end()); - get_token_metadata().get_topology().sort_by_proximity(_address, preferred); + get_token_metadata().get_topology().sort_by_proximity(address_ep, preferred); for (inet_address& p : preferred) { range_sources[desired_range].push_back(p); } @@ -122,14 +123,14 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n auto& strat = erm->get_replication_strategy(); //Active ranges - auto metadata_clone = get_token_metadata().clone_only_token_map().get0(); - auto range_addresses = strat.get_range_addresses(metadata_clone).get0(); + auto metadata_clone = locator::make_token_metadata2_ptr(get_token_metadata().clone_only_token_map().get0()); + auto range_addresses = strat.get_range_addresses(token_metadata(metadata_clone)).get0(); //Pending ranges - metadata_clone.update_topology(_address, _dr); - metadata_clone.update_normal_tokens(_tokens, _address).get(); - auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0(); - metadata_clone.clear_gently().get(); + metadata_clone->update_topology(_address, _dr); + metadata_clone->update_normal_tokens(_tokens, _address).get(); + auto pending_range_addresses = strat.get_range_addresses(token_metadata(metadata_clone)).get0(); + metadata_clone->clear_gently().get(); //Collects the source that will have its range moved to the new node std::unordered_map> range_sources; diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index 31a14d813d..75ceb1b01b 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -37,7 +37,9 @@ class range_streamer { public: using inet_address = gms::inet_address; using token_metadata = locator::token_metadata; + using token_metadata2 = locator::token_metadata2; using token_metadata_ptr = locator::token_metadata_ptr; + using token_metadata2_ptr = locator::token_metadata2_ptr; using stream_plan = streaming::stream_plan; using stream_state = streaming::stream_state; public: @@ -77,8 +79,8 @@ public: } }; - range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set tokens, - inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, + range_streamer(distributed& db, sharded& sm, const token_metadata2_ptr tmptr, abort_source& abort_source, std::unordered_set tokens, + locator::host_id address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector tables = {}) : _db(db) @@ -96,8 +98,8 @@ public: _abort_source.check(); } - range_streamer(distributed& db, sharded& sm, const token_metadata_ptr tmptr, abort_source& abort_source, - inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector tables = {}) + range_streamer(distributed& db, sharded& sm, const token_metadata2_ptr tmptr, abort_source& abort_source, + locator::host_id address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector tables = {}) : range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set(), address, std::move(dr), description, reason, std::move(topo_guard), std::move(tables)) { } @@ -145,7 +147,7 @@ private: #endif // Can be called only before stream_async(). - const token_metadata& get_token_metadata() { + const token_metadata2& get_token_metadata() { return *_token_metadata_ptr; } public: @@ -154,10 +156,10 @@ public: private: distributed& _db; sharded& _stream_manager; - token_metadata_ptr _token_metadata_ptr; + token_metadata2_ptr _token_metadata_ptr; abort_source& _abort_source; std::unordered_set _tokens; - inet_address _address; + locator::host_id _address; locator::endpoint_dc_rack _dr; sstring _description; streaming::stream_reason _reason; diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index f06a4924e4..162c4e0a71 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -342,9 +342,10 @@ vnode_effective_replication_map::get_range_addresses() const { future> abstract_replication_strategy::get_range_addresses(const token_metadata& tm) const { std::unordered_map ret; - for (auto& t : tm.sorted_tokens()) { - dht::token_range_vector ranges = tm.get_primary_ranges_for(t); - auto eps = get(co_await calculate_natural_endpoints(t, tm, false)); + auto tm_new = tm.get_new_strong(); + for (auto& t : tm_new->sorted_tokens()) { + dht::token_range_vector ranges = tm_new->get_primary_ranges_for(t); + auto eps = co_await calculate_natural_ips(t, tm_new); for (auto& r : ranges) { ret.emplace(r, eps.get_vector()); } @@ -353,20 +354,20 @@ abstract_replication_strategy::get_range_addresses(const token_metadata& tm) con } future -abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set pending_tokens, inet_address pending_address, locator::endpoint_dc_rack dr) const { +abstract_replication_strategy::get_pending_address_ranges(const token_metadata2_ptr tmptr, std::unordered_set pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const { dht::token_range_vector ret; - token_metadata temp = co_await tmptr->clone_only_token_map(); - temp.update_topology(pending_address, std::move(dr)); - co_await temp.update_normal_tokens(pending_tokens, pending_address); - for (const auto& t : temp.sorted_tokens()) { - auto eps = get(co_await calculate_natural_endpoints(t, temp, false)); + auto temp = make_token_metadata2_ptr(co_await tmptr->clone_only_token_map()); + temp->update_topology(pending_address, std::move(dr)); + co_await temp->update_normal_tokens(pending_tokens, pending_address); + for (const auto& t : temp->sorted_tokens()) { + auto eps = get(co_await calculate_natural_endpoints(t, token_metadata(temp), true)); if (eps.contains(pending_address)) { - dht::token_range_vector r = temp.get_primary_ranges_for(t); + dht::token_range_vector r = temp->get_primary_ranges_for(t); rslogger.debug("get_pending_address_ranges: token={} primary_range={} endpoint={}", t, r, pending_address); ret.insert(ret.end(), r.begin(), r.end()); } } - co_await temp.clear_gently(); + co_await temp->clear_gently(); co_return ret; } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index f296de7910..3da8fce7a3 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -174,7 +174,7 @@ public: // Caller must ensure that token_metadata will not change throughout the call. future> get_range_addresses(const token_metadata& tm) const; - future get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set pending_tokens, inet_address pending_address, locator::endpoint_dc_rack dr) const; + future get_pending_address_ranges(const token_metadata2_ptr tmptr, std::unordered_set pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const; }; using ring_mapping = boost::icl::interval_map>; diff --git a/repair/repair.cc b/repair/repair.cc index 92415b5f9e..b0fe70444a 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1492,7 +1492,7 @@ std::optional repair::data_sync_repair_task_impl::expected_children_numb return smp::count; } -future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set bootstrap_tokens) { +future<> repair_service::bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set bootstrap_tokens) { assert(this_shard_id() == 0); using inet_address = gms::inet_address; return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens)] () mutable { @@ -1500,7 +1500,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr auto ks_erms = db.get_non_local_strategy_keyspaces_erms(); auto& topology = tmptr->get_topology(); auto myloc = topology.get_location(); - auto myip = topology.my_address(); + auto myid = tmptr->get_my_id(); auto reason = streaming::stream_reason::bootstrap; // Calculate number of ranges to sync data size_t nr_ranges_total = 0; @@ -1509,7 +1509,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr continue; } auto& strat = erm->get_replication_strategy(); - dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, myloc).get0(); + dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get0(); seastar::thread::maybe_yield(); auto nr_tables = get_nr_tables(db, keyspace_name); nr_ranges_total += desired_ranges.size() * nr_tables; @@ -1525,20 +1525,20 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr continue; } auto& strat = erm->get_replication_strategy(); - dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, myloc).get0(); + dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get0(); bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology; bool everywhere_topology = strat.get_type() == locator::replication_strategy_type::everywhere_topology; auto replication_factor = erm->get_replication_factor(); //Active ranges - auto metadata_clone = tmptr->clone_only_token_map().get0(); - auto range_addresses = strat.get_range_addresses(metadata_clone).get0(); + auto metadata_clone = locator::make_token_metadata2_ptr(tmptr->clone_only_token_map().get0()); + auto range_addresses = strat.get_range_addresses(locator::token_metadata(metadata_clone)).get0(); //Pending ranges - metadata_clone.update_topology(myip, myloc, locator::node::state::bootstrapping); - metadata_clone.update_normal_tokens(tokens, myip).get(); - auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0(); - metadata_clone.clear_gently().get(); + metadata_clone->update_topology(myid, myloc, locator::node::state::bootstrapping); + metadata_clone->update_normal_tokens(tokens, myid).get(); + auto pending_range_addresses = strat.get_range_addresses(locator::token_metadata(metadata_clone)).get0(); + metadata_clone->clear_gently().get(); //Collects the source that will have its range moved to the new node std::unordered_map range_sources; diff --git a/repair/row_level.hh b/repair/row_level.hh index e21f4df2d2..61251f1bb8 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -138,7 +138,7 @@ public: // The tokens are the tokens assigned to the bootstrap node. // all repair-based node operation entry points must be called on shard 0 - future<> bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set bootstrap_tokens); + future<> bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set bootstrap_tokens); future<> decommission_with_repair(locator::token_metadata_ptr tmptr); future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr ops); future<> rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc); diff --git a/service/storage_service.cc b/service/storage_service.cc index ee1f63a677..eb79d949f5 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3598,7 +3598,7 @@ future<> storage_service::bootstrap(std::unordered_set& bootstrap_tokens, slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count()); _gossiper.wait_for_range_setup().get(); - dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()); + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()->get_new_strong()); slogger.info("Starting to bootstrap..."); bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, null_topology_guard).get(); } else { @@ -5099,7 +5099,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set& bootstrap_tok ctl.prepare(node_ops_cmd::bootstrap_prepare).get(); // Step 5: Sync data for bootstrap - _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get(); + _repair.local().bootstrap_with_repair(get_token_metadata_ptr()->get_new_strong(), bootstrap_tokens).get(); on_streaming_finished(); // Step 6: Finish @@ -5152,7 +5152,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token _repair.local().replace_with_repair(get_token_metadata_ptr(), bootstrap_tokens, ctl.ignore_nodes).get(); } else { slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid); - dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()); + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()->get_new_strong()); bs.bootstrap(streaming::stream_reason::replace, _gossiper, null_topology_guard, replace_address).get(); } on_streaming_finished(); @@ -5892,8 +5892,8 @@ future<> storage_service::rebuild(sstring source_dc) { if (ss.is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); } else { - auto streamer = make_lw_shared(ss._db, ss._stream_manager, tmptr, ss._abort_source, - ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard); + auto streamer = make_lw_shared(ss._db, ss._stream_manager, tmptr->get_new_strong(), ss._abort_source, + tmptr->get_new()->get_my_id(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard); streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); if (source_dc != "") { streamer->add_source_filter(std::make_unique(source_dc)); @@ -6071,7 +6071,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, as.request_abort(); } }); - auto streamer = make_lw_shared(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard); + auto streamer = make_lw_shared(_db, _stream_manager, tmptr->get_new_strong(), as, tmptr->get_my_id(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard); removenode_add_ranges(streamer, leaving_node).get(); try { streamer->stream_async().get(); @@ -6127,12 +6127,7 @@ future<> storage_service::leave_ring() { future<> storage_service::stream_ranges(std::unordered_map> ranges_to_stream_by_keyspace) { - auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source, - get_broadcast_address(), - _snitch.local()->get_location(), - "Unbootstrap", - streaming::stream_reason::decommission, - null_topology_guard); + auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr()->get_new_strong(), _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission, null_topology_guard); for (auto& entry : ranges_to_stream_by_keyspace) { const auto& keyspace = entry.first; auto& ranges_with_endpoints = entry.second; @@ -6507,10 +6502,10 @@ future storage_service::raft_topology_cmd_handler(raft if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> { if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) { - co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens); + co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr()->get_new_strong(), rs.ring.value().tokens); } else { - dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), - locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()); + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), + locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()->get_new_strong()); co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, _topology_state_machine._topology.session); } })); @@ -6533,8 +6528,8 @@ future storage_service::raft_topology_cmd_handler(raft } co_await _repair.local().replace_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens, std::move(ignored_ips)); } else { - dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), - locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()); + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), + locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()->get_new_strong()); auto replaced_id = std::get(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id; auto existing_ip = _group0->address_map().find(replaced_id); assert(existing_ip); @@ -6604,9 +6599,8 @@ future storage_service::raft_topology_cmd_handler(raft if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); } else { - auto streamer = make_lw_shared(_db, _stream_manager, tmptr, _abort_source, - get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, - _topology_state_machine._topology.session); + auto streamer = make_lw_shared(_db, _stream_manager, tmptr->get_new_strong(), _abort_source, + tmptr->get_new()->get_my_id(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, _topology_state_machine._topology.session); streamer->add_source_filter(std::make_unique(_gossiper.get_unreachable_members())); if (source_dc != "") { streamer->add_source_filter(std::make_unique(source_dc)); @@ -6795,10 +6789,9 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { auto& table = _db.local().find_column_family(tablet.table); std::vector tables = {table.schema()->cf_name()}; - auto streamer = make_lw_shared(_db, _stream_manager, std::move(tm), guard.get_abort_source(), - get_broadcast_address(), _snitch.local()->get_location(), + auto streamer = make_lw_shared(_db, _stream_manager, tm->get_new_strong(), guard.get_abort_source(), + tm->get_new()->get_my_id(), _snitch.local()->get_location(), "Tablet migration", streaming::stream_reason::tablet_migration, topo_guard, std::move(tables)); - tm = nullptr; streamer->add_source_filter(std::make_unique( _gossiper.get_unreachable_members()));