bootstrap: use new token_metadata

Just mechanical changes to the new token_metadata.

All the boost and topology tests pass with this change.
This commit is contained in:
Petr Gusev
2023-11-02 16:28:06 +04:00
parent d9283bd025
commit 93263bf9e7
8 changed files with 63 additions and 64 deletions

View File

@@ -29,21 +29,23 @@ using check_token_endpoint = bool_class<struct check_token_endpoint_tag>;
class boot_strapper {
using inet_address = gms::inet_address;
using token_metadata = locator::token_metadata;
using token_metadata2 = locator::token_metadata2;
using token_metadata_ptr = locator::token_metadata_ptr;
using token_metadata2_ptr = locator::token_metadata2_ptr;
using token = dht::token;
distributed<replica::database>& _db;
sharded<streaming::stream_manager>& _stream_manager;
abort_source& _abort_source;
/* endpoint that needs to be bootstrapped */
inet_address _address;
locator::host_id _address;
/* its DC/RACK info */
locator::endpoint_dc_rack _dr;
/* token of the node being bootstrapped. */
std::unordered_set<token> _tokens;
const token_metadata_ptr _token_metadata_ptr;
const locator::token_metadata2_ptr _token_metadata_ptr;
public:
boot_strapper(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, abort_source& abort_source,
inet_address addr, locator::endpoint_dc_rack dr, std::unordered_set<token> tokens, const token_metadata_ptr tmptr)
locator::host_id addr, locator::endpoint_dc_rack dr, std::unordered_set<token> tokens, const token_metadata2_ptr tmptr)
: _db(db)
, _stream_manager(sm)
, _abort_source(abort_source)
@@ -91,7 +93,7 @@ public:
#endif
private:
const token_metadata& get_token_metadata() {
const token_metadata2& get_token_metadata() {
return *_token_metadata_ptr;
}
};

View File

@@ -88,6 +88,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, lo
logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size());
std::unordered_map<dht::token_range, std::vector<inet_address>> range_sources;
const auto address_ep = get_token_metadata().get_endpoint_for_host_id(_address);
for (auto& desired_range : desired_ranges) {
auto found = false;
for (auto& x : range_addresses) {
@@ -97,7 +98,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, lo
const range<token>& src_range = x.first;
if (src_range.contains(desired_range, dht::operator<=>)) {
inet_address_vector_replica_set preferred(x.second.begin(), x.second.end());
get_token_metadata().get_topology().sort_by_proximity(_address, preferred);
get_token_metadata().get_topology().sort_by_proximity(address_ep, preferred);
for (inet_address& p : preferred) {
range_sources[desired_range].push_back(p);
}
@@ -122,14 +123,14 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
auto& strat = erm->get_replication_strategy();
//Active ranges
auto metadata_clone = get_token_metadata().clone_only_token_map().get0();
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
auto metadata_clone = locator::make_token_metadata2_ptr(get_token_metadata().clone_only_token_map().get0());
auto range_addresses = strat.get_range_addresses(token_metadata(metadata_clone)).get0();
//Pending ranges
metadata_clone.update_topology(_address, _dr);
metadata_clone.update_normal_tokens(_tokens, _address).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
metadata_clone.clear_gently().get();
metadata_clone->update_topology(_address, _dr);
metadata_clone->update_normal_tokens(_tokens, _address).get();
auto pending_range_addresses = strat.get_range_addresses(token_metadata(metadata_clone)).get0();
metadata_clone->clear_gently().get();
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, std::vector<inet_address>> range_sources;

View File

@@ -37,7 +37,9 @@ class range_streamer {
public:
using inet_address = gms::inet_address;
using token_metadata = locator::token_metadata;
using token_metadata2 = locator::token_metadata2;
using token_metadata_ptr = locator::token_metadata_ptr;
using token_metadata2_ptr = locator::token_metadata2_ptr;
using stream_plan = streaming::stream_plan;
using stream_state = streaming::stream_state;
public:
@@ -77,8 +79,8 @@ public:
}
};
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason,
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata2_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
locator::host_id address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason,
service::frozen_topology_guard topo_guard,
std::vector<sstring> tables = {})
: _db(db)
@@ -96,8 +98,8 @@ public:
_abort_source.check();
}
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source,
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector<sstring> tables = {})
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata2_ptr tmptr, abort_source& abort_source,
locator::host_id address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, service::frozen_topology_guard topo_guard, std::vector<sstring> tables = {})
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason, std::move(topo_guard), std::move(tables)) {
}
@@ -145,7 +147,7 @@ private:
#endif
// Can be called only before stream_async().
const token_metadata& get_token_metadata() {
const token_metadata2& get_token_metadata() {
return *_token_metadata_ptr;
}
public:
@@ -154,10 +156,10 @@ public:
private:
distributed<replica::database>& _db;
sharded<streaming::stream_manager>& _stream_manager;
token_metadata_ptr _token_metadata_ptr;
token_metadata2_ptr _token_metadata_ptr;
abort_source& _abort_source;
std::unordered_set<token> _tokens;
inet_address _address;
locator::host_id _address;
locator::endpoint_dc_rack _dr;
sstring _description;
streaming::stream_reason _reason;

View File

@@ -342,9 +342,10 @@ vnode_effective_replication_map::get_range_addresses() const {
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
abstract_replication_strategy::get_range_addresses(const token_metadata& tm) const {
std::unordered_map<dht::token_range, inet_address_vector_replica_set> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
auto eps = get<endpoint_set>(co_await calculate_natural_endpoints(t, tm, false));
auto tm_new = tm.get_new_strong();
for (auto& t : tm_new->sorted_tokens()) {
dht::token_range_vector ranges = tm_new->get_primary_ranges_for(t);
auto eps = co_await calculate_natural_ips(t, tm_new);
for (auto& r : ranges) {
ret.emplace(r, eps.get_vector());
}
@@ -353,20 +354,20 @@ abstract_replication_strategy::get_range_addresses(const token_metadata& tm) con
}
future<dht::token_range_vector>
abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, inet_address pending_address, locator::endpoint_dc_rack dr) const {
abstract_replication_strategy::get_pending_address_ranges(const token_metadata2_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const {
dht::token_range_vector ret;
token_metadata temp = co_await tmptr->clone_only_token_map();
temp.update_topology(pending_address, std::move(dr));
co_await temp.update_normal_tokens(pending_tokens, pending_address);
for (const auto& t : temp.sorted_tokens()) {
auto eps = get<endpoint_set>(co_await calculate_natural_endpoints(t, temp, false));
auto temp = make_token_metadata2_ptr(co_await tmptr->clone_only_token_map());
temp->update_topology(pending_address, std::move(dr));
co_await temp->update_normal_tokens(pending_tokens, pending_address);
for (const auto& t : temp->sorted_tokens()) {
auto eps = get<host_id_set>(co_await calculate_natural_endpoints(t, token_metadata(temp), true));
if (eps.contains(pending_address)) {
dht::token_range_vector r = temp.get_primary_ranges_for(t);
dht::token_range_vector r = temp->get_primary_ranges_for(t);
rslogger.debug("get_pending_address_ranges: token={} primary_range={} endpoint={}", t, r, pending_address);
ret.insert(ret.end(), r.begin(), r.end());
}
}
co_await temp.clear_gently();
co_await temp->clear_gently();
co_return ret;
}

View File

@@ -174,7 +174,7 @@ public:
// Caller must ensure that token_metadata will not change throughout the call.
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_addresses(const token_metadata& tm) const;
future<dht::token_range_vector> get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, inet_address pending_address, locator::endpoint_dc_rack dr) const;
future<dht::token_range_vector> get_pending_address_ranges(const token_metadata2_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const;
};
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;

View File

@@ -1492,7 +1492,7 @@ std::optional<double> repair::data_sync_repair_task_impl::expected_children_numb
return smp::count;
}
future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens) {
future<> repair_service::bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens) {
assert(this_shard_id() == 0);
using inet_address = gms::inet_address;
return seastar::async([this, tmptr = std::move(tmptr), tokens = std::move(bootstrap_tokens)] () mutable {
@@ -1500,7 +1500,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
auto& topology = tmptr->get_topology();
auto myloc = topology.get_location();
auto myip = topology.my_address();
auto myid = tmptr->get_my_id();
auto reason = streaming::stream_reason::bootstrap;
// Calculate number of ranges to sync data
size_t nr_ranges_total = 0;
@@ -1509,7 +1509,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
continue;
}
auto& strat = erm->get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, myloc).get0();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get0();
seastar::thread::maybe_yield();
auto nr_tables = get_nr_tables(db, keyspace_name);
nr_ranges_total += desired_ranges.size() * nr_tables;
@@ -1525,20 +1525,20 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
continue;
}
auto& strat = erm->get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, myloc).get0();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get0();
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
bool everywhere_topology = strat.get_type() == locator::replication_strategy_type::everywhere_topology;
auto replication_factor = erm->get_replication_factor();
//Active ranges
auto metadata_clone = tmptr->clone_only_token_map().get0();
auto range_addresses = strat.get_range_addresses(metadata_clone).get0();
auto metadata_clone = locator::make_token_metadata2_ptr(tmptr->clone_only_token_map().get0());
auto range_addresses = strat.get_range_addresses(locator::token_metadata(metadata_clone)).get0();
//Pending ranges
metadata_clone.update_topology(myip, myloc, locator::node::state::bootstrapping);
metadata_clone.update_normal_tokens(tokens, myip).get();
auto pending_range_addresses = strat.get_range_addresses(metadata_clone).get0();
metadata_clone.clear_gently().get();
metadata_clone->update_topology(myid, myloc, locator::node::state::bootstrapping);
metadata_clone->update_normal_tokens(tokens, myid).get();
auto pending_range_addresses = strat.get_range_addresses(locator::token_metadata(metadata_clone)).get0();
metadata_clone->clear_gently().get();
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, repair_neighbors> range_sources;

View File

@@ -138,7 +138,7 @@ public:
// The tokens are the tokens assigned to the bootstrap node.
// all repair-based node operation entry points must be called on shard 0
future<> bootstrap_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
future<> bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
future<> decommission_with_repair(locator::token_metadata_ptr tmptr);
future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
future<> rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc);

View File

@@ -3598,7 +3598,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count());
_gossiper.wait_for_range_setup().get();
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr());
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()->get_new_strong());
slogger.info("Starting to bootstrap...");
bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, null_topology_guard).get();
} else {
@@ -5099,7 +5099,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
ctl.prepare(node_ops_cmd::bootstrap_prepare).get();
// Step 5: Sync data for bootstrap
_repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get();
_repair.local().bootstrap_with_repair(get_token_metadata_ptr()->get_new_strong(), bootstrap_tokens).get();
on_streaming_finished();
// Step 6: Finish
@@ -5152,7 +5152,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
_repair.local().replace_with_repair(get_token_metadata_ptr(), bootstrap_tokens, ctl.ignore_nodes).get();
} else {
slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid);
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr());
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()->get_new_strong());
bs.bootstrap(streaming::stream_reason::replace, _gossiper, null_topology_guard, replace_address).get();
}
on_streaming_finished();
@@ -5892,8 +5892,8 @@ future<> storage_service::rebuild(sstring source_dc) {
if (ss.is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) {
co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._stream_manager, tmptr, ss._abort_source,
ss.get_broadcast_address(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard);
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._stream_manager, tmptr->get_new_strong(), ss._abort_source,
tmptr->get_new()->get_my_id(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(ss._gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
@@ -6071,7 +6071,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
as.request_abort();
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, as, get_broadcast_address(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard);
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr->get_new_strong(), as, tmptr->get_my_id(), _snitch.local()->get_location(), "Removenode", streaming::stream_reason::removenode, topo_guard);
removenode_add_ranges(streamer, leaving_node).get();
try {
streamer->stream_async().get();
@@ -6127,12 +6127,7 @@ future<> storage_service::leave_ring() {
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace) {
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr(), _abort_source,
get_broadcast_address(),
_snitch.local()->get_location(),
"Unbootstrap",
streaming::stream_reason::decommission,
null_topology_guard);
auto streamer = dht::range_streamer(_db, _stream_manager, get_token_metadata_ptr()->get_new_strong(), _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), _snitch.local()->get_location(), "Unbootstrap", streaming::stream_reason::decommission, null_topology_guard);
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
@@ -6507,10 +6502,10 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state
co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) {
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens);
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr()->get_new_strong(), rs.ring.value().tokens);
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()->get_new_strong());
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, _topology_state_machine._topology.session);
}
}));
@@ -6533,8 +6528,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
co_await _repair.local().replace_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens, std::move(ignored_ips));
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()->get_new_strong());
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id;
auto existing_ip = _group0->address_map().find(replaced_id);
assert(existing_ip);
@@ -6604,9 +6599,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) {
co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
} else {
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, _abort_source,
get_broadcast_address(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild,
_topology_state_machine._topology.session);
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr->get_new_strong(), _abort_source,
tmptr->get_new()->get_my_id(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, _topology_state_machine._topology.session);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(_gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));
@@ -6795,10 +6789,9 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
auto& table = _db.local().find_column_family(tablet.table);
std::vector<sstring> tables = {table.schema()->cf_name()};
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, std::move(tm), guard.get_abort_source(),
get_broadcast_address(), _snitch.local()->get_location(),
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tm->get_new_strong(), guard.get_abort_source(),
tm->get_new()->get_my_id(), _snitch.local()->get_location(),
"Tablet migration", streaming::stream_reason::tablet_migration, topo_guard, std::move(tables));
tm = nullptr;
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
_gossiper.get_unreachable_members()));