range_streamer: add_ranges and friends: get erm as param
Rather than getting it in the callee, let the caller (e.g. storage_service) hold the erm and pass it down to potentially multiple async functions. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -46,20 +46,14 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper
|
||||
}
|
||||
blogger.debug("nodes_to_filter={}", nodes_to_filter);
|
||||
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(nodes_to_filter));
|
||||
auto keyspace = _db.local().get_non_local_strategy_keyspaces();
|
||||
for (auto& keyspace_name : keyspace) {
|
||||
if (!_db.local().has_keyspace(keyspace_name)) {
|
||||
// The keyspace was dropped while we were looping.
|
||||
continue;
|
||||
}
|
||||
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto strategy = ks.get_replication_strategy_ptr();
|
||||
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
auto& strategy = erm->get_replication_strategy();
|
||||
// We took a strategy ptr to keep it alive during the `co_await`.
|
||||
// The keyspace may be dropped in the meantime.
|
||||
dht::token_range_vector ranges = co_await strategy->get_pending_address_ranges(_token_metadata_ptr, _tokens, _address);
|
||||
dht::token_range_vector ranges = co_await strategy.get_pending_address_ranges(_token_metadata_ptr, _tokens, _address);
|
||||
blogger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges);
|
||||
co_await streamer->add_ranges(keyspace_name, ranges, gossiper, reason == streaming::stream_reason::replace);
|
||||
co_await streamer->add_ranges(keyspace_name, erm, ranges, gossiper, reason == streaming::stream_reason::replace);
|
||||
}
|
||||
_abort_source.check();
|
||||
co_await streamer->stream_async();
|
||||
|
||||
@@ -21,7 +21,6 @@
|
||||
#include "db/config.hh"
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
|
||||
namespace dht {
|
||||
|
||||
@@ -82,12 +81,9 @@ range_streamer::get_range_fetch_map(const std::unordered_map<dht::token_range, s
|
||||
|
||||
// Must be called from a seastar thread
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dht::token_range_vector desired_ranges) {
|
||||
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges) {
|
||||
logger.debug("{} ks={}", __func__, keyspace_name);
|
||||
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto erm = ks.get_effective_replication_map();
|
||||
|
||||
auto range_addresses = erm->get_range_addresses();
|
||||
|
||||
logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size());
|
||||
@@ -121,13 +117,11 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dh
|
||||
|
||||
// Must be called from a seastar thread
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, dht::token_range_vector desired_ranges, gms::gossiper& gossiper) {
|
||||
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper) {
|
||||
logger.debug("{} ks={}", __func__, keyspace_name);
|
||||
assert (_tokens.empty() == false);
|
||||
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto& strat = ks.get_replication_strategy();
|
||||
auto erm = ks.get_effective_replication_map();
|
||||
auto& strat = erm->get_replication_strategy();
|
||||
|
||||
//Active ranges
|
||||
auto metadata_clone = get_token_metadata().clone_only_token_map().get0();
|
||||
@@ -190,12 +184,10 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
|
||||
return range_sources;
|
||||
}
|
||||
|
||||
bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name) {
|
||||
auto& ks = _db.local().find_keyspace(keyspace_name);
|
||||
auto erm = ks.get_effective_replication_map();
|
||||
bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::effective_replication_map_ptr& erm) {
|
||||
auto rf = erm->get_replication_factor();
|
||||
auto nr_nodes_in_ring = get_token_metadata().get_all_endpoints().size();
|
||||
bool everywhere_topology = ks.get_replication_strategy().get_type() == locator::replication_strategy_type::everywhere_topology;
|
||||
bool everywhere_topology = erm->get_replication_strategy().get_type() == locator::replication_strategy_type::everywhere_topology;
|
||||
// Use strict when number of nodes in the ring is equal or more than RF
|
||||
auto strict = _db.local().get_config().consistent_rangemovement()
|
||||
&& !_tokens.empty()
|
||||
@@ -223,15 +215,15 @@ void range_streamer::add_rx_ranges(const sstring& keyspace_name, std::unordered_
|
||||
}
|
||||
|
||||
// TODO: This is the legacy range_streamer interface, it is add_rx_ranges which adds rx ranges.
|
||||
future<> range_streamer::add_ranges(const sstring& keyspace_name, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing) {
|
||||
return seastar::async([this, keyspace_name, ranges= std::move(ranges), &gossiper, is_replacing] () mutable {
|
||||
future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing) {
|
||||
return seastar::async([this, keyspace_name, erm = std::move(erm), ranges= std::move(ranges), &gossiper, is_replacing] () mutable {
|
||||
if (_nr_tx_added) {
|
||||
throw std::runtime_error("Mixed sending and receiving is not supported");
|
||||
}
|
||||
_nr_rx_added++;
|
||||
auto ranges_for_keyspace = !is_replacing && use_strict_sources_for_ranges(keyspace_name)
|
||||
? get_all_ranges_with_strict_sources_for(keyspace_name, ranges, gossiper)
|
||||
: get_all_ranges_with_sources_for(keyspace_name, ranges);
|
||||
auto ranges_for_keyspace = !is_replacing && use_strict_sources_for_ranges(keyspace_name, erm)
|
||||
? get_all_ranges_with_strict_sources_for(keyspace_name, erm, ranges, gossiper)
|
||||
: get_all_ranges_with_sources_for(keyspace_name, erm, ranges);
|
||||
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
for (auto& x : ranges_for_keyspace) {
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "streaming/stream_plan.hh"
|
||||
#include "streaming/stream_state.hh"
|
||||
#include "streaming/stream_reason.hh"
|
||||
@@ -97,24 +97,24 @@ public:
|
||||
_source_filters.emplace(std::move(filter));
|
||||
}
|
||||
|
||||
future<> add_ranges(const sstring& keyspace_name, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing);
|
||||
future<> add_ranges(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing);
|
||||
void add_tx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint);
|
||||
void add_rx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint);
|
||||
private:
|
||||
bool use_strict_sources_for_ranges(const sstring& keyspace_name);
|
||||
bool use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::effective_replication_map_ptr& erm);
|
||||
/**
|
||||
* Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
|
||||
* to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
|
||||
*/
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
get_all_ranges_with_sources_for(const sstring& keyspace_name, dht::token_range_vector desired_ranges);
|
||||
get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges);
|
||||
/**
|
||||
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
|
||||
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
|
||||
* consistency.
|
||||
*/
|
||||
std::unordered_map<dht::token_range, std::vector<inet_address>>
|
||||
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, dht::token_range_vector desired_ranges, gms::gossiper& gossiper);
|
||||
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper);
|
||||
private:
|
||||
/**
|
||||
* @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
|
||||
|
||||
@@ -2663,7 +2663,7 @@ future<> storage_service::rebuild(sstring source_dc) {
|
||||
}
|
||||
auto ks_erms = ss._db.local().get_non_local_strategy_keyspaces_erms();
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
co_await streamer->add_ranges(keyspace_name, ss.get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), ss._gossiper, false);
|
||||
co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), ss._gossiper, false);
|
||||
}
|
||||
try {
|
||||
co_await streamer->stream_async();
|
||||
|
||||
Reference in New Issue
Block a user