rebuild_with_repair, replace_with_repair: use new token_metadata
Just mechanical changes to the new token_metadata. All the boost and topology tests pass with this change.
This commit is contained in:
@@ -256,18 +256,18 @@ vnode_effective_replication_map::get_ranges(inet_address ep) const {
|
||||
|
||||
// Caller must ensure that token_metadata will not change throughout the call.
|
||||
future<dht::token_range_vector>
|
||||
abstract_replication_strategy::get_ranges(inet_address ep, token_metadata_ptr tmptr) const {
|
||||
abstract_replication_strategy::get_ranges(locator::host_id ep, token_metadata_ptr tmptr) const {
|
||||
co_return co_await get_ranges(ep, *tmptr);
|
||||
}
|
||||
|
||||
// Caller must ensure that token_metadata will not change throughout the call.
|
||||
future<dht::token_range_vector>
|
||||
abstract_replication_strategy::get_ranges(inet_address ep, const token_metadata& tm) const {
|
||||
abstract_replication_strategy::get_ranges(locator::host_id ep, const token_metadata& tm) const {
|
||||
dht::token_range_vector ret;
|
||||
if (!tm.is_normal_token_owner(ep)) {
|
||||
if (!tm.get_new()->is_normal_token_owner(ep)) {
|
||||
co_return ret;
|
||||
}
|
||||
const auto& sorted_tokens = tm.sorted_tokens();
|
||||
const auto& sorted_tokens = tm.get_new()->sorted_tokens();
|
||||
if (sorted_tokens.empty()) {
|
||||
on_internal_error(rslogger, "Token metadata is empty");
|
||||
}
|
||||
@@ -279,7 +279,7 @@ abstract_replication_strategy::get_ranges(inet_address ep, const token_metadata&
|
||||
// Using the common path would make the function quadratic in the number of endpoints.
|
||||
should_add = true;
|
||||
} else {
|
||||
auto eps = get<endpoint_set>(co_await calculate_natural_endpoints(tok, tm, false));
|
||||
auto eps = get<host_id_set>(co_await calculate_natural_endpoints(tok, tm, true));
|
||||
should_add = eps.contains(ep);
|
||||
}
|
||||
if (should_add) {
|
||||
|
||||
@@ -168,8 +168,8 @@ public:
|
||||
|
||||
// Use the token_metadata provided by the caller instead of _token_metadata
|
||||
// Note: must be called with initialized, non-empty token_metadata.
|
||||
future<dht::token_range_vector> get_ranges(inet_address ep, token_metadata_ptr tmptr) const;
|
||||
future<dht::token_range_vector> get_ranges(inet_address ep, const token_metadata& tm) const;
|
||||
future<dht::token_range_vector> get_ranges(locator::host_id ep, token_metadata_ptr tmptr) const;
|
||||
future<dht::token_range_vector> get_ranges(locator::host_id ep, const token_metadata& tm) const;
|
||||
|
||||
// Caller must ensure that token_metadata will not change throughout the call.
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_addresses(const token_metadata& tm) const;
|
||||
|
||||
@@ -1883,12 +1883,13 @@ future<> repair_service::removenode_with_repair(locator::token_metadata_ptr tmpt
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata2_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
assert(this_shard_id() == 0);
|
||||
return seastar::async([this, tmptr = std::move(tmptr), source_dc = std::move(source_dc), op = std::move(op), reason, ignore_nodes = std::move(ignore_nodes)] () mutable {
|
||||
auto& db = get_db().local();
|
||||
auto ks_erms = db.get_non_local_strategy_keyspaces_erms();
|
||||
auto myip = tmptr->get_topology().my_address();
|
||||
auto myid = tmptr->get_my_id();
|
||||
size_t nr_ranges_total = 0;
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
if (!db.has_keyspace(keyspace_name)) {
|
||||
@@ -1896,7 +1897,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
}
|
||||
auto& strat = erm->get_replication_strategy();
|
||||
// Okay to yield since tm is immutable
|
||||
dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myid, locator::token_metadata(tmptr)).get0();
|
||||
auto nr_tables = get_nr_tables(db, keyspace_name);
|
||||
nr_ranges_total += ranges.size() * nr_tables;
|
||||
|
||||
@@ -1920,7 +1921,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
continue;
|
||||
}
|
||||
auto& strat = erm->get_replication_strategy();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myip, tmptr).get0();
|
||||
dht::token_range_vector ranges = strat.get_ranges(myid, locator::token_metadata(tmptr)).get0();
|
||||
auto& topology = erm->get_token_metadata().get_topology();
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
auto nr_tables = get_nr_tables(db, keyspace_name);
|
||||
@@ -1929,7 +1930,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
auto& r = *it;
|
||||
seastar::thread::maybe_yield();
|
||||
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(get<locator::endpoint_set>(strat.calculate_natural_endpoints(end_token, *tmptr, false).get0()) |
|
||||
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_ips(end_token, tmptr).get0() |
|
||||
boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) {
|
||||
if (node == myip) {
|
||||
return false;
|
||||
@@ -1967,7 +1968,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc) {
|
||||
future<> repair_service::rebuild_with_repair(locator::token_metadata2_ptr tmptr, sstring source_dc) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto op = sstring("rebuild_with_repair");
|
||||
if (source_dc.empty()) {
|
||||
@@ -1983,19 +1984,18 @@ future<> repair_service::rebuild_with_repair(locator::token_metadata_ptr tmptr,
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
future<> repair_service::replace_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto cloned_tm = co_await tmptr->clone_async();
|
||||
auto op = sstring("replace_with_repair");
|
||||
auto& topology = tmptr->get_topology();
|
||||
auto myip = topology.my_address();
|
||||
auto myloc = topology.get_location();
|
||||
auto reason = streaming::stream_reason::replace;
|
||||
// update a cloned version of tmptr
|
||||
// no need to set the original version
|
||||
auto cloned_tmptr = make_token_metadata_ptr(std::move(cloned_tm));
|
||||
cloned_tmptr->update_topology(myip, myloc, locator::node::state::replacing);
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, myip);
|
||||
auto cloned_tmptr = make_token_metadata2_ptr(std::move(cloned_tm));
|
||||
cloned_tmptr->update_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing);
|
||||
co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id());
|
||||
co_return co_await do_rebuild_replace_with_repair(std::move(cloned_tmptr), std::move(op), myloc.dc, reason, std::move(ignore_nodes));
|
||||
}
|
||||
|
||||
|
||||
@@ -141,11 +141,11 @@ public:
|
||||
future<> bootstrap_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> bootstrap_tokens);
|
||||
future<> decommission_with_repair(locator::token_metadata_ptr tmptr);
|
||||
future<> removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> rebuild_with_repair(locator::token_metadata_ptr tmptr, sstring source_dc);
|
||||
future<> replace_with_repair(locator::token_metadata_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
future<> rebuild_with_repair(locator::token_metadata2_ptr tmptr, sstring source_dc);
|
||||
future<> replace_with_repair(locator::token_metadata2_ptr tmptr, std::unordered_set<dht::token> replacing_tokens, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
private:
|
||||
future<> do_decommission_removenode_with_repair(locator::token_metadata_ptr tmptr, gms::inet_address leaving_node, shared_ptr<node_ops_info> ops);
|
||||
future<> do_rebuild_replace_with_repair(locator::token_metadata_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
future<> do_rebuild_replace_with_repair(locator::token_metadata2_ptr tmptr, sstring op, sstring source_dc, streaming::stream_reason reason, std::unordered_set<gms::inet_address> ignore_nodes);
|
||||
|
||||
// Must be called on shard 0
|
||||
future<> sync_data_using_repair(sstring keyspace,
|
||||
|
||||
@@ -5149,7 +5149,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
|
||||
// Step 7: Sync data for replace
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) {
|
||||
slogger.info("replace[{}]: Using repair based node ops to sync data", uuid);
|
||||
_repair.local().replace_with_repair(get_token_metadata_ptr(), bootstrap_tokens, ctl.ignore_nodes).get();
|
||||
_repair.local().replace_with_repair(get_token_metadata_ptr()->get_new_strong(), bootstrap_tokens, ctl.ignore_nodes).get();
|
||||
} else {
|
||||
slogger.info("replace[{}]: Using streaming based node ops to sync data", uuid);
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(), _snitch.local()->get_location(), bootstrap_tokens, get_token_metadata_ptr()->get_new_strong());
|
||||
@@ -5890,7 +5890,7 @@ future<> storage_service::rebuild(sstring source_dc) {
|
||||
slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
|
||||
auto tmptr = ss.get_token_metadata_ptr();
|
||||
if (ss.is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) {
|
||||
co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
|
||||
co_await ss._repair.local().rebuild_with_repair(tmptr->get_new_strong(), std::move(source_dc));
|
||||
} else {
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(ss._db, ss._stream_manager, tmptr->get_new_strong(), ss._abort_source,
|
||||
tmptr->get_new()->get_my_id(), ss._snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, null_topology_guard);
|
||||
@@ -6526,7 +6526,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
}
|
||||
ignored_ips.insert(*ip);
|
||||
}
|
||||
co_await _repair.local().replace_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens, std::move(ignored_ips));
|
||||
co_await _repair.local().replace_with_repair(get_token_metadata_ptr()->get_new_strong(), rs.ring.value().tokens, std::move(ignored_ips));
|
||||
} else {
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_new()->get_my_id(),
|
||||
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()->get_new_strong());
|
||||
@@ -6597,7 +6597,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
co_await retrier(_rebuild_result, [&] () -> future<> {
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) {
|
||||
co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc));
|
||||
co_await _repair.local().rebuild_with_repair(tmptr->get_new_strong(), std::move(source_dc));
|
||||
} else {
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr->get_new_strong(), _abort_source,
|
||||
tmptr->get_new()->get_my_id(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, _topology_state_machine._topology.session);
|
||||
|
||||
Reference in New Issue
Block a user