diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index d647ceb5ef..229a36b7fd 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -84,7 +84,7 @@ std::unordered_map> range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges) { logger.debug("{} ks={}", __func__, keyspace_name); - auto range_addresses = erm->get_range_addresses(); + auto range_addresses = erm->get_range_addresses().get0(); logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size()); diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 636fbf0238..1ae19ea37d 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -256,7 +256,7 @@ abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet co_return ret; } -std::unordered_map +future> effective_replication_map::get_range_addresses() const { const token_metadata& tm = *_tmptr; std::unordered_map ret; @@ -265,8 +265,9 @@ effective_replication_map::get_range_addresses() const { for (auto& r : ranges) { ret.emplace(r, eps); } + co_await coroutine::maybe_yield(); } - return ret; + co_return ret; } future> diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 8d2e885b4d..d2eb45d6a5 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -222,7 +222,7 @@ public: // Note: must be called after token_metadata has been initialized. dht::token_range_vector get_primary_ranges_within_dc(inet_address ep) const; - std::unordered_map + future> get_range_addresses() const; private: diff --git a/service/storage_service.cc b/service/storage_service.cc index dc9e39b2bc..693e3b0de4 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2805,7 +2805,7 @@ future<> storage_service::removenode_add_ranges(lw_shared_ptr source_ranges = get_new_source_ranges(erm, my_new_ranges); + std::unordered_multimap source_ranges = co_await get_new_source_ranges(erm, my_new_ranges); std::unordered_map ranges_per_endpoint; for (auto& x : source_ranges) { ranges_per_endpoint[x.first].emplace_back(x.second); @@ -3034,10 +3034,10 @@ future<> storage_service::shutdown_protocol_servers() { } } -std::unordered_multimap +future> storage_service::get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const { auto my_address = get_broadcast_address(); - std::unordered_map range_addresses = erm->get_range_addresses(); + std::unordered_map range_addresses = co_await erm->get_range_addresses(); std::unordered_multimap source_ranges; // find alive sources for our new ranges @@ -3064,8 +3064,10 @@ storage_service::get_new_source_ranges(locator::effective_replication_map_ptr er break; } } + + co_await coroutine::maybe_yield(); } - return source_ranges; + co_return source_ranges; } future<> storage_service::move(token new_token) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 96d50cc38b..a2785a909c 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -574,7 +574,7 @@ private: * @param ranges the ranges to find sources for * @return multimap of addresses to ranges the address is responsible for */ - std::unordered_multimap get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; + future> get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; /** * Sends a notification to a node indicating we have finished replicating data.