effective_replication_map: make get_range_addresses asynchronous
So it may yield, preenting reactor stalls as seen in #11005. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -84,7 +84,7 @@ std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges) {
|
||||
logger.debug("{} ks={}", __func__, keyspace_name);
|
||||
|
||||
auto range_addresses = erm->get_range_addresses();
|
||||
auto range_addresses = erm->get_range_addresses().get0();
|
||||
|
||||
logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size());
|
||||
|
||||
|
||||
@@ -256,7 +256,7 @@ abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
effective_replication_map::get_range_addresses() const {
|
||||
const token_metadata& tm = *_tmptr;
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> ret;
|
||||
@@ -265,8 +265,9 @@ effective_replication_map::get_range_addresses() const {
|
||||
for (auto& r : ranges) {
|
||||
ret.emplace(r, eps);
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
return ret;
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
|
||||
@@ -222,7 +222,7 @@ public:
|
||||
// Note: must be called after token_metadata has been initialized.
|
||||
dht::token_range_vector get_primary_ranges_within_dc(inet_address ep) const;
|
||||
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
|
||||
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
get_range_addresses() const;
|
||||
|
||||
private:
|
||||
|
||||
@@ -2805,7 +2805,7 @@ future<> storage_service::removenode_add_ranges(lw_shared_ptr<dht::range_streame
|
||||
my_new_ranges.emplace_back(x.first);
|
||||
}
|
||||
}
|
||||
std::unordered_multimap<inet_address, dht::token_range> source_ranges = get_new_source_ranges(erm, my_new_ranges);
|
||||
std::unordered_multimap<inet_address, dht::token_range> source_ranges = co_await get_new_source_ranges(erm, my_new_ranges);
|
||||
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
|
||||
for (auto& x : source_ranges) {
|
||||
ranges_per_endpoint[x.first].emplace_back(x.second);
|
||||
@@ -3034,10 +3034,10 @@ future<> storage_service::shutdown_protocol_servers() {
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_multimap<inet_address, dht::token_range>
|
||||
future<std::unordered_multimap<inet_address, dht::token_range>>
|
||||
storage_service::get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const {
|
||||
auto my_address = get_broadcast_address();
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> range_addresses = erm->get_range_addresses();
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> range_addresses = co_await erm->get_range_addresses();
|
||||
std::unordered_multimap<inet_address, dht::token_range> source_ranges;
|
||||
|
||||
// find alive sources for our new ranges
|
||||
@@ -3064,8 +3064,10 @@ storage_service::get_new_source_ranges(locator::effective_replication_map_ptr er
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
return source_ranges;
|
||||
co_return source_ranges;
|
||||
}
|
||||
|
||||
future<> storage_service::move(token new_token) {
|
||||
|
||||
@@ -574,7 +574,7 @@ private:
|
||||
* @param ranges the ranges to find sources for
|
||||
* @return multimap of addresses to ranges the address is responsible for
|
||||
*/
|
||||
std::unordered_multimap<inet_address, dht::token_range> get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
|
||||
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
|
||||
|
||||
/**
|
||||
* Sends a notification to a node indicating we have finished replicating data.
|
||||
|
||||
Reference in New Issue
Block a user