abstract_replication_strategy: add can_yield param to get_pending_ranges and friends

To prevent reactor stalls as seen in #7313.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2020-10-01 15:04:39 +03:00
parent 6c2a089a6f
commit ba31350239
16 changed files with 81 additions and 78 deletions

View File

@@ -70,7 +70,7 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason) {
return do_for_each(*keyspaces, [this, keyspaces, streamer] (sstring& keyspace_name) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
dht::token_range_vector ranges = strategy.get_pending_address_ranges(_token_metadata_ptr, _tokens, _address);
dht::token_range_vector ranges = strategy.get_pending_address_ranges(_token_metadata_ptr, _tokens, _address, locator::can_yield::no);
blogger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges);
return streamer->add_ranges(keyspace_name, ranges);
}).then([this, streamer] {

View File

@@ -116,7 +116,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dh
auto& strat = ks.get_replication_strategy();
auto tm = get_token_metadata().clone_only_token_map().get0();
auto range_addresses = strat.get_range_addresses(tm);
auto range_addresses = strat.get_range_addresses(tm, locator::can_yield::yes);
logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size());
@@ -158,11 +158,11 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
//Active ranges
auto metadata_clone = get_token_metadata().clone_only_token_map().get0();
auto range_addresses = strat.get_range_addresses(metadata_clone);
auto range_addresses = strat.get_range_addresses(metadata_clone, locator::can_yield::yes);
//Pending ranges
metadata_clone.update_normal_tokens(_tokens, _address);
auto pending_range_addresses = strat.get_range_addresses(metadata_clone);
auto pending_range_addresses = strat.get_range_addresses(metadata_clone, locator::can_yield::yes);
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, std::vector<inet_address>> range_sources;

View File

@@ -73,17 +73,17 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring&
}
}
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints(const token& search_token) {
return do_get_natural_endpoints(search_token, *_shared_token_metadata.get());
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints(const token& search_token, can_yield can_yield) {
return do_get_natural_endpoints(search_token, *_shared_token_metadata.get(), can_yield);
}
std::vector<inet_address> abstract_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm) {
std::vector<inet_address> abstract_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) {
const token& key_token = tm.first_token(search_token);
auto& cached_endpoints = get_cached_endpoints(tm);
auto res = cached_endpoints.find(key_token);
if (res == cached_endpoints.end()) {
auto endpoints = calculate_natural_endpoints(search_token, tm);
auto endpoints = calculate_natural_endpoints(search_token, tm, can_yield);
cached_endpoints.emplace(key_token, endpoints);
return endpoints;
@@ -93,9 +93,9 @@ std::vector<inet_address> abstract_replication_strategy::do_get_natural_endpoint
return res->second;
}
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token) {
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) {
token_metadata_ptr tmptr = _shared_token_metadata.get();
std::vector<gms::inet_address> natural_endpoints = do_get_natural_endpoints(search_token, *tmptr);
std::vector<gms::inet_address> natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield);
if (tmptr->is_any_node_being_replaced() &&
allow_remove_node_being_replaced_from_natural_endpoints()) {
// When a new node is started to replace an existing dead node, we want
@@ -200,10 +200,7 @@ abstract_replication_strategy::do_get_ranges(inet_address ep, const token_metada
const auto& tm = *tmptr;
auto prev_tok = tm.sorted_tokens().back();
for (auto tok : tm.sorted_tokens()) {
for (inet_address a : calculate_natural_endpoints(tok, tm)) {
if (can_yield) {
seastar::thread::maybe_yield();
}
for (inet_address a : calculate_natural_endpoints(tok, tm, can_yield)) {
if (a == ep) {
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
break;
@@ -215,12 +212,12 @@ abstract_replication_strategy::do_get_ranges(inet_address ep, const token_metada
}
dht::token_range_vector
abstract_replication_strategy::get_primary_ranges(inet_address ep) {
abstract_replication_strategy::get_primary_ranges(inet_address ep, can_yield can_yield) {
dht::token_range_vector ret;
token_metadata_ptr tmptr = _shared_token_metadata.get();
auto prev_tok = tmptr->sorted_tokens().back();
for (auto tok : tmptr->sorted_tokens()) {
auto&& eps = calculate_natural_endpoints(tok, *tmptr);
auto&& eps = calculate_natural_endpoints(tok, *tmptr, can_yield);
if (eps.size() > 0 && eps[0] == ep) {
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
}
@@ -230,14 +227,14 @@ abstract_replication_strategy::get_primary_ranges(inet_address ep) {
}
dht::token_range_vector
abstract_replication_strategy::get_primary_ranges_within_dc(inet_address ep) {
abstract_replication_strategy::get_primary_ranges_within_dc(inet_address ep, can_yield can_yield) {
dht::token_range_vector ret;
sstring local_dc = _snitch->get_datacenter(ep);
token_metadata_ptr tmptr = _shared_token_metadata.get();
std::unordered_set<inet_address> local_dc_nodes = tmptr->get_topology().get_datacenter_endpoints().at(local_dc);
auto prev_tok = tmptr->sorted_tokens().back();
for (auto tok : tmptr->sorted_tokens()) {
auto&& eps = calculate_natural_endpoints(tok, *tmptr);
auto&& eps = calculate_natural_endpoints(tok, *tmptr, can_yield);
// Unlike get_primary_ranges() which checks if ep is the first
// owner of this range, here we check if ep is the first just
// among nodes which belong to the local dc of ep.
@@ -255,11 +252,11 @@ abstract_replication_strategy::get_primary_ranges_within_dc(inet_address ep) {
}
std::unordered_multimap<inet_address, dht::token_range>
abstract_replication_strategy::get_address_ranges(const token_metadata& tm) const {
abstract_replication_strategy::get_address_ranges(const token_metadata& tm, can_yield can_yield) const {
std::unordered_multimap<inet_address, dht::token_range> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector r = tm.get_primary_ranges_for(t);
auto eps = calculate_natural_endpoints(t, tm);
auto eps = calculate_natural_endpoints(t, tm, can_yield);
logger.debug("token={}, primary_range={}, address={}", t, r, eps);
for (auto ep : eps) {
for (auto&& rng : r) {
@@ -271,10 +268,10 @@ abstract_replication_strategy::get_address_ranges(const token_metadata& tm) cons
}
std::unordered_multimap<inet_address, dht::token_range>
abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet_address endpoint) const {
abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet_address endpoint, can_yield can_yield) const {
std::unordered_multimap<inet_address, dht::token_range> ret;
for (auto& t : tm.sorted_tokens()) {
auto eps = calculate_natural_endpoints(t, tm);
auto eps = calculate_natural_endpoints(t, tm, can_yield);
bool found = false;
for (auto ep : eps) {
if (ep != endpoint) {
@@ -296,11 +293,11 @@ abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet
}
std::unordered_map<dht::token_range, std::vector<inet_address>>
abstract_replication_strategy::get_range_addresses(const token_metadata& tm) const {
abstract_replication_strategy::get_range_addresses(const token_metadata& tm, can_yield can_yield) const {
std::unordered_map<dht::token_range, std::vector<inet_address>> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
auto eps = calculate_natural_endpoints(t, tm);
auto eps = calculate_natural_endpoints(t, tm, can_yield);
for (auto& r : ranges) {
ret.emplace(r, eps);
}
@@ -309,16 +306,16 @@ abstract_replication_strategy::get_range_addresses(const token_metadata& tm) con
}
dht::token_range_vector
abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, token pending_token, inet_address pending_address) const {
return get_pending_address_ranges(std::move(tmptr), std::unordered_set<token>{pending_token}, pending_address);
abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, token pending_token, inet_address pending_address, can_yield can_yield) const {
return get_pending_address_ranges(std::move(tmptr), std::unordered_set<token>{pending_token}, pending_address, can_yield);
}
dht::token_range_vector
abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, inet_address pending_address) const {
abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, inet_address pending_address, can_yield can_yield) const {
dht::token_range_vector ret;
auto temp = tmptr->clone_only_token_map_sync();
temp.update_normal_tokens(pending_tokens, pending_address);
for (auto& x : get_address_ranges(temp, pending_address)) {
for (auto& x : get_address_ranges(temp, pending_address, can_yield)) {
ret.push_back(x.second);
}
return ret;

View File

@@ -91,15 +91,15 @@ public:
snitch_ptr& snitch,
const std::map<sstring, sstring>& config_options,
replication_strategy_type my_type);
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const = 0;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const = 0;
virtual ~abstract_replication_strategy() {}
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, const shared_token_metadata& stm, const std::map<sstring, sstring>& config_options);
static void validate_replication_strategy(const sstring& ks_name,
const sstring& strategy_name,
const shared_token_metadata& stm,
const std::map<sstring, sstring>& config_options);
std::vector<inet_address> get_natural_endpoints(const token& search_token);
std::vector<inet_address> get_natural_endpoints_without_node_being_replaced(const token& search_token);
std::vector<inet_address> get_natural_endpoints(const token& search_token, can_yield = can_yield::no);
std::vector<inet_address> get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no);
virtual void validate_options() const = 0;
virtual std::optional<std::set<sstring>> recognized_options() const = 0;
virtual size_t get_replication_factor() const = 0;
@@ -125,7 +125,7 @@ public:
dht::token_range_vector get_ranges_in_thread(inet_address ep, const token_metadata_ptr tmptr) const;
// Caller must ensure that token_metadata will not change throughout the call if can_yield::yes.
dht::token_range_vector do_get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield) const;
virtual std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm);
virtual std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield);
public:
// get_primary_ranges() returns the list of "primary ranges" for the given
@@ -134,20 +134,20 @@ public:
// returned calculate_natural_endpoints().
// This function is the analogue of Origin's
// StorageService.getPrimaryRangesForEndpoint().
dht::token_range_vector get_primary_ranges(inet_address ep);
dht::token_range_vector get_primary_ranges(inet_address ep, can_yield);
// get_primary_ranges_within_dc() is similar to get_primary_ranges()
// except it assigns a primary node for each range within each dc,
// instead of one node globally.
dht::token_range_vector get_primary_ranges_within_dc(inet_address ep);
dht::token_range_vector get_primary_ranges_within_dc(inet_address ep, can_yield);
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm) const;
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm, inet_address endpoint) const;
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm, can_yield) const;
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm, inet_address endpoint, can_yield) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_addresses(const token_metadata& tm) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_addresses(const token_metadata& tm, can_yield) const;
dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, token pending_token, inet_address pending_address) const;
dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, token pending_token, inet_address pending_address, can_yield) const;
dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, inet_address pending_address) const;
dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, inet_address pending_address, can_yield) const;
};
}

View File

@@ -47,15 +47,15 @@ namespace locator {
everywhere_replication_strategy::everywhere_replication_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::everywhere_topology) {}
std::vector<inet_address> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const {
std::vector<inet_address> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const {
return tm.get_all_endpoints();
}
std::vector<inet_address> everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm) {
std::vector<inet_address> everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) {
if (tm.sorted_tokens().empty()) {
return std::vector<inet_address>({utils::fb_utilities::get_broadcast_address()});
}
return calculate_natural_endpoints(search_token, tm);
return calculate_natural_endpoints(search_token, tm, can_yield);
}
size_t everywhere_replication_strategy::get_replication_factor() const {

View File

@@ -46,8 +46,8 @@ class everywhere_replication_strategy : public abstract_replication_strategy {
public:
everywhere_replication_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring,sstring>& config_options);
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm) override;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override;
virtual void validate_options() const override { /* noop */ }

View File

@@ -30,11 +30,11 @@ namespace locator {
local_strategy::local_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::local) {}
std::vector<inet_address> local_strategy::do_get_natural_endpoints(const token& t, const token_metadata& tm) {
return calculate_natural_endpoints(t, tm);
std::vector<inet_address> local_strategy::do_get_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) {
return calculate_natural_endpoints(t, tm, can_yield);
}
std::vector<inet_address> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
std::vector<inet_address> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield) const {
return std::vector<inet_address>({utils::fb_utilities::get_broadcast_address()});
}

View File

@@ -36,7 +36,7 @@ using token = dht::token;
class local_strategy : public abstract_replication_strategy {
protected:
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
public:
local_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options);
virtual ~local_strategy() {};
@@ -46,7 +46,7 @@ public:
* because the default implementation depends on token calculations but
* LocalStrategy may be used before tokens are set up.
*/
std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm) override;
std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override;
virtual void validate_options() const override;

View File

@@ -104,7 +104,7 @@ network_topology_strategy::network_topology_strategy(
std::vector<inet_address>
network_topology_strategy::calculate_natural_endpoints(
const token& search_token, const token_metadata& tm) const {
const token& search_token, const token_metadata& tm, can_yield can_yield) const {
using endpoint_set = utils::sequenced_set<inet_address>;
using endpoint_dc_rack_set = std::unordered_set<endpoint_dc_rack>;
@@ -246,6 +246,9 @@ network_topology_strategy::calculate_natural_endpoints(
if (dcs_to_fill == 0) {
break;
}
if (can_yield) {
seastar::thread::maybe_yield();
}
inet_address ep = *tm.get_endpoint(next);
auto& loc = tp.get_location(ep);

View File

@@ -76,7 +76,7 @@ protected:
* progress in each DC, rack etc.
*/
virtual std::vector<inet_address> calculate_natural_endpoints(
const token& search_token, const token_metadata& tm) const override;
const token& search_token, const token_metadata& tm, can_yield) const override;
virtual void validate_options() const override;

View File

@@ -42,7 +42,7 @@ simple_strategy::simple_strategy(const sstring& keyspace_name, const shared_toke
}
}
std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) const {
const std::vector<token>& tokens = tm.sorted_tokens();
if (tokens.empty()) {
@@ -57,6 +57,9 @@ std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const tok
if (endpoints.size() == replicas) {
break;
}
if (can_yield) {
seastar::thread::maybe_yield();
}
auto ep = tm.get_endpoint(token);
assert(ep);

View File

@@ -30,7 +30,7 @@ namespace locator {
class simple_strategy : public abstract_replication_strategy {
protected:
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
public:
simple_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options);
virtual ~simple_strategy() {};

View File

@@ -1412,7 +1412,7 @@ void token_metadata_impl::calculate_pending_ranges_for_leaving(
const abstract_replication_strategy& strategy,
std::unordered_multimap<range<token>, inet_address>& new_pending_ranges,
mutable_token_metadata_ptr all_left_metadata) const {
std::unordered_multimap<inet_address, dht::token_range> address_ranges = strategy.get_address_ranges(unpimplified_this);
std::unordered_multimap<inet_address, dht::token_range> address_ranges = strategy.get_address_ranges(unpimplified_this, can_yield::yes);
// get all ranges that will be affected by leaving nodes
std::unordered_set<range<token>> affected_ranges;
for (auto endpoint : _leaving_endpoints) {
@@ -1428,8 +1428,8 @@ void token_metadata_impl::calculate_pending_ranges_for_leaving(
tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} stars", affected_ranges_size);
for (const auto& r : affected_ranges) {
auto t = r.end() ? r.end()->value() : dht::maximum_token();
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
auto new_endpoints = strategy.calculate_natural_endpoints(t, *all_left_metadata);
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata, can_yield::yes);
auto new_endpoints = strategy.calculate_natural_endpoints(t, *all_left_metadata, can_yield::yes);
std::vector<inet_address> diff;
std::sort(current_endpoints.begin(), current_endpoints.end());
std::sort(new_endpoints.begin(), new_endpoints.end());
@@ -1450,7 +1450,7 @@ void token_metadata_impl::calculate_pending_ranges_for_replacing(
if (_replacing_endpoints.empty()) {
return;
}
auto address_ranges = strategy.get_address_ranges(unpimplified_this);
auto address_ranges = strategy.get_address_ranges(unpimplified_this, can_yield::yes);
for (const auto& node : _replacing_endpoints) {
auto existing_node = node.first;
auto replacing_node = node.second;
@@ -1464,6 +1464,7 @@ void token_metadata_impl::calculate_pending_ranges_for_replacing(
}
}
// Called from a seastar thread
void token_metadata_impl::calculate_pending_ranges_for_bootstrap(
const abstract_replication_strategy& strategy,
std::unordered_multimap<range<token>, inet_address>& new_pending_ranges,
@@ -1486,7 +1487,7 @@ void token_metadata_impl::calculate_pending_ranges_for_bootstrap(
auto& endpoint = x.first;
auto& tokens = x.second;
all_left_metadata->update_normal_tokens(tokens, endpoint);
for (auto& x : strategy.get_address_ranges(*all_left_metadata, endpoint)) {
for (auto& x : strategy.get_address_ranges(*all_left_metadata, endpoint, can_yield::yes)) {
new_pending_ranges.emplace(x.second, endpoint);
}
all_left_metadata->_impl->remove_endpoint(endpoint);

View File

@@ -1176,15 +1176,15 @@ static future<> repair_range(repair_info& ri, const dht::token_range& range) {
}
static dht::token_range_vector get_primary_ranges_for_endpoint(
database& db, sstring keyspace, gms::inet_address ep) {
database& db, sstring keyspace, gms::inet_address ep, locator::can_yield can_yield = locator::can_yield::no) {
auto& rs = db.find_keyspace(keyspace).get_replication_strategy();
return rs.get_primary_ranges(ep);
return rs.get_primary_ranges(ep, can_yield);
}
static dht::token_range_vector get_primary_ranges(
database& db, sstring keyspace) {
database& db, sstring keyspace, locator::can_yield can_yield = locator::can_yield::no) {
return get_primary_ranges_for_endpoint(db, keyspace,
utils::fb_utilities::get_broadcast_address());
utils::fb_utilities::get_broadcast_address(), can_yield);
}
// get_primary_ranges_within_dc() is similar to get_primary_ranges(),
@@ -1192,10 +1192,10 @@ static dht::token_range_vector get_primary_ranges(
// across the entire cluster, here each range is assigned a primary
// owner in each of the clusters.
static dht::token_range_vector get_primary_ranges_within_dc(
database& db, sstring keyspace) {
database& db, sstring keyspace, locator::can_yield can_yield = locator::can_yield::no) {
auto& rs = db.find_keyspace(keyspace).get_replication_strategy();
return rs.get_primary_ranges_within_dc(
utils::fb_utilities::get_broadcast_address());
utils::fb_utilities::get_broadcast_address(), can_yield);
}
static sstring get_local_dc() {
@@ -1763,7 +1763,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
}
auto& ks = db.local().find_keyspace(keyspace_name);
auto& strat = ks.get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip);
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, locator::can_yield::yes);
seastar::thread::maybe_yield();
nr_ranges_total += desired_ranges.size();
}
@@ -1779,16 +1779,16 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
}
auto& ks = db.local().find_keyspace(keyspace_name);
auto& strat = ks.get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip);
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, locator::can_yield::yes);
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
//Active ranges
auto metadata_clone = tmptr->clone_only_token_map().get0();
auto range_addresses = strat.get_range_addresses(metadata_clone);
auto range_addresses = strat.get_range_addresses(metadata_clone, locator::can_yield::yes);
//Pending ranges
metadata_clone.update_normal_tokens(tokens, myip);
auto pending_range_addresses = strat.get_range_addresses(metadata_clone);
auto pending_range_addresses = strat.get_range_addresses(metadata_clone, locator::can_yield::yes);
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
@@ -1962,7 +1962,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
// Find (for each range) all nodes that store replicas for these ranges as well
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto eps = strat.calculate_natural_endpoints(end_token, *tmptr);
auto eps = strat.calculate_natural_endpoints(end_token, *tmptr, locator::can_yield::yes);
current_replica_endpoints.emplace(r, std::move(eps));
}
auto temp = tmptr->clone_after_all_left().get0();
@@ -1977,9 +1977,8 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
auto local_dc = get_local_dc();
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
for (auto&r : ranges) {
seastar::thread::maybe_yield();
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
const std::vector<inet_address> new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp);
const std::vector<inet_address> new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, locator::can_yield::yes);
const std::vector<inet_address>& current_eps = current_replica_endpoints[r];
std::unordered_set<inet_address> neighbors_set(new_eps.begin(), new_eps.end());
bool skip_this_range = false;
@@ -2151,7 +2150,7 @@ static future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, s
seastar::thread::maybe_yield();
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_endpoints(end_token, *tmptr) |
auto neighbors = boost::copy_range<std::vector<gms::inet_address>>(strat.calculate_natural_endpoints(end_token, *tmptr, locator::can_yield::yes) |
boost::adaptors::filtered([myip, &source_dc, &snitch_ptr] (const gms::inet_address& node) {
if (node == myip) {
return false;

View File

@@ -2411,7 +2411,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
seastar::thread::maybe_yield();
auto& ks = _db.local().find_keyspace(keyspace_name);
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, metadata);
auto eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, metadata, locator::can_yield::yes);
current_replica_endpoints.emplace(r, std::move(eps));
}
@@ -2434,7 +2434,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
seastar::thread::maybe_yield();
auto& ks = _db.local().find_keyspace(keyspace_name);
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp);
auto new_replica_endpoints = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, locator::can_yield::yes);
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
@@ -2739,7 +2739,7 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const dht::
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strat = ks.get_replication_strategy();
auto tm = get_token_metadata().clone_only_token_map().get0();
std::unordered_map<dht::token_range, std::vector<inet_address>> range_addresses = strat.get_range_addresses(tm);
std::unordered_map<dht::token_range, std::vector<inet_address>> range_addresses = strat.get_range_addresses(tm, locator::can_yield::yes);
std::unordered_multimap<inet_address, dht::token_range> source_ranges;
// find alive sources for our new ranges

View File

@@ -73,8 +73,8 @@ static void check_ranges_are_sorted(abstract_replication_strategy* ars, gms::ine
// Too slow in debug mode
#ifndef SEASTAR_DEBUG
verify_sorted(ars->get_ranges(ep));
verify_sorted(ars->get_primary_ranges(ep));
verify_sorted(ars->get_primary_ranges_within_dc(ep));
verify_sorted(ars->get_primary_ranges(ep, locator::can_yield::no));
verify_sorted(ars->get_primary_ranges_within_dc(ep, locator::can_yield::no));
#endif
}
@@ -495,7 +495,7 @@ static void test_equivalence(const shared_token_metadata& stm, snitch_ptr& snitc
for (size_t i = 0; i < 1000; ++i) {
auto token = dht::token::get_random_token();
auto expected = calculate_natural_endpoints(token, tm, snitch, datacenters);
auto actual = nts.calculate_natural_endpoints(token, tm);
auto actual = nts.calculate_natural_endpoints(token, tm, locator::can_yield::no);
// Because the old algorithm does not put the nodes in the correct order in the case where more replicas
// are required than there are racks in a dc, we accept different order as long as the primary