This makes decisions made by the scheduler consistent with decisions made on table creation, with regard to tablet count. We want to avoid over-allocation of tablets when table is created, which would then be reduced by the scheduler's scaling logic. Not just to avoid wasteful migrations post table creation, but to respect the per-shard goal. To respect the per-shard goal, the algorithm will no longer be as simple as looking at hints, and we want to share the algorithm between the scheduler and initial tablet allocator. So invoke the scheduler to get the tablet count when table is created.
604 lines
25 KiB
C++
604 lines
25 KiB
C++
/*
|
|
*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#include <algorithm>
|
|
#include <functional>
|
|
#include <random>
|
|
|
|
#include <fmt/ranges.h>
|
|
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
|
|
#include "locator/network_topology_strategy.hh"
|
|
#include "locator/load_sketch.hh"
|
|
|
|
#include <absl/container/flat_hash_map.h>
|
|
#include <boost/algorithm/string.hpp>
|
|
#include "exceptions/exceptions.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/class_registrator.hh"
|
|
#include "utils/hash.hh"
|
|
|
|
namespace std {
|
|
template<>
|
|
struct hash<locator::endpoint_dc_rack> {
|
|
size_t operator()(const locator::endpoint_dc_rack& v) const {
|
|
return utils::tuple_hash()(std::tie(v.dc, v.rack));
|
|
}
|
|
};
|
|
}
|
|
|
|
namespace locator {
|
|
|
|
network_topology_strategy::network_topology_strategy(replication_strategy_params params) :
|
|
abstract_replication_strategy(params,
|
|
replication_strategy_type::network_topology) {
|
|
auto opts = _config_options;
|
|
process_tablet_options(*this, opts, params);
|
|
|
|
size_t rep_factor = 0;
|
|
for (auto& config_pair : opts) {
|
|
auto& key = config_pair.first;
|
|
auto& val = config_pair.second;
|
|
|
|
//
|
|
// FIXME!!!
|
|
// The first option we get at the moment is a class name. Skip it!
|
|
//
|
|
if (boost::iequals(key, "class")) {
|
|
continue;
|
|
}
|
|
|
|
if (boost::iequals(key, "replication_factor")) {
|
|
if (boost::equals(key, "replication_factor")) {
|
|
on_internal_error(rslogger, "replication_factor should have been replaced with a DC:RF mapping by now");
|
|
} else {
|
|
throw exceptions::configuration_exception(format(
|
|
"'{}' is not a valid option, did you mean (lowercase) 'replication_factor'?", key));
|
|
}
|
|
}
|
|
|
|
auto rf = parse_replication_factor(val);
|
|
rep_factor += rf;
|
|
_dc_rep_factor.emplace(key, rf);
|
|
_datacenteres.push_back(key);
|
|
}
|
|
|
|
_rep_factor = rep_factor;
|
|
|
|
rslogger.debug("Configured datacenter replicas are: {}", _dc_rep_factor);
|
|
}
|
|
|
|
using endpoint_dc_rack_set = std::unordered_set<endpoint_dc_rack>;
|
|
|
|
class natural_endpoints_tracker {
|
|
/**
|
|
* Endpoint adder applying the replication rules for a given DC.
|
|
*/
|
|
struct data_center_endpoints {
|
|
/** List accepted endpoints get pushed into. */
|
|
host_id_set& _endpoints;
|
|
|
|
/**
|
|
* Racks encountered so far. Replicas are put into separate racks while possible.
|
|
* For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure
|
|
* clashing names aren't a problem.
|
|
*/
|
|
endpoint_dc_rack_set& _racks;
|
|
|
|
/** Number of replicas left to fill from this DC. */
|
|
size_t _rf_left;
|
|
ssize_t _acceptable_rack_repeats;
|
|
|
|
data_center_endpoints(size_t rf, size_t rack_count, size_t node_count, host_id_set& endpoints, endpoint_dc_rack_set& racks)
|
|
: _endpoints(endpoints)
|
|
, _racks(racks)
|
|
// If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF.
|
|
, _rf_left(std::min(rf, node_count))
|
|
// If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack,
|
|
// and the difference is to be filled by the first encountered nodes.
|
|
, _acceptable_rack_repeats(rf - rack_count)
|
|
{}
|
|
|
|
/**
|
|
* Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful.
|
|
* Returns true if the endpoint was added, and this datacenter does not require further replicas.
|
|
*/
|
|
bool add_endpoint_and_check_if_done(const host_id& ep, const endpoint_dc_rack& location) {
|
|
if (done()) {
|
|
return false;
|
|
}
|
|
|
|
if (_racks.emplace(location).second) {
|
|
// New rack.
|
|
--_rf_left;
|
|
auto added = _endpoints.insert(ep).second;
|
|
if (!added) {
|
|
throw std::runtime_error(fmt::format("Topology error: found {} in more than one rack", ep));
|
|
}
|
|
return done();
|
|
}
|
|
|
|
/**
|
|
* Ensure we don't allow too many endpoints in the same rack, i.e. we have
|
|
* minimum current rf_left + 1 distinct racks. See above, _acceptable_rack_repeats
|
|
* is defined as RF - rack_count, i.e. how many nodes in a single rack we are ok
|
|
* with.
|
|
*
|
|
* With RF = 3 and 2 Racks in DC,
|
|
*
|
|
* IP1, Rack1
|
|
* IP2, Rack1
|
|
* IP3, Rack1, The line _acceptable_rack_repeats <= 0 will reject IP3.
|
|
* IP4, Rack2
|
|
*
|
|
*/
|
|
if (_acceptable_rack_repeats <= 0) {
|
|
// There must be rf_left distinct racks left, do not add any more rack repeats.
|
|
return false;
|
|
}
|
|
|
|
if (!_endpoints.insert(ep).second) {
|
|
// Cannot repeat a node.
|
|
return false;
|
|
}
|
|
|
|
// Added a node that is from an already met rack to match RF when there aren't enough racks.
|
|
--_acceptable_rack_repeats;
|
|
--_rf_left;
|
|
|
|
return done();
|
|
}
|
|
|
|
bool done() const {
|
|
return _rf_left == 0;
|
|
}
|
|
};
|
|
|
|
const token_metadata& _tm;
|
|
const topology& _tp;
|
|
std::unordered_map<sstring, size_t> _dc_rep_factor;
|
|
|
|
//
|
|
// We want to preserve insertion order so that the first added endpoint
|
|
// becomes primary.
|
|
//
|
|
host_id_set _replicas;
|
|
// tracks the racks we have already placed replicas in
|
|
endpoint_dc_rack_set _seen_racks;
|
|
|
|
//
|
|
// all token owners in each DC, so we can check when we have exhausted all
|
|
// the token-owning members of a DC
|
|
//
|
|
std::unordered_map<sstring, std::unordered_set<locator::host_id>> _token_owners;
|
|
|
|
//
|
|
// all racks (with non-token owners filtered out) in a DC so we can check
|
|
// when we have exhausted all racks in a DC
|
|
//
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<locator::host_id>>> _racks;
|
|
|
|
std::unordered_map<std::string_view, data_center_endpoints> _dcs;
|
|
|
|
size_t _dcs_to_fill;
|
|
|
|
public:
|
|
natural_endpoints_tracker(const token_metadata& tm, const std::unordered_map<sstring, size_t>& dc_rep_factor)
|
|
: _tm(tm)
|
|
, _tp(_tm.get_topology())
|
|
, _dc_rep_factor(dc_rep_factor)
|
|
, _token_owners(_tm.get_datacenter_token_owners())
|
|
, _racks(_tm.get_datacenter_racks_token_owners())
|
|
{
|
|
// not aware of any cluster members
|
|
SCYLLA_ASSERT(!_token_owners.empty() && !_racks.empty());
|
|
|
|
auto size_for = [](auto& map, auto& k) {
|
|
auto i = map.find(k);
|
|
return i != map.end() ? i->second.size() : size_t(0);
|
|
};
|
|
|
|
// Create a data_center_endpoints object for each non-empty DC.
|
|
for (auto& [dc, rf] : _dc_rep_factor) {
|
|
auto node_count = size_for(_token_owners, dc);
|
|
|
|
if (rf == 0 || node_count == 0) {
|
|
continue;
|
|
}
|
|
|
|
_dcs.emplace(dc, data_center_endpoints(rf, size_for(_racks, dc), node_count, _replicas, _seen_racks));
|
|
_dcs_to_fill = _dcs.size();
|
|
}
|
|
}
|
|
|
|
bool add_endpoint_and_check_if_done(host_id ep) {
|
|
auto& loc = _tp.get_location(ep);
|
|
auto i = _dcs.find(loc.dc);
|
|
if (i != _dcs.end() && i->second.add_endpoint_and_check_if_done(ep, loc)) {
|
|
--_dcs_to_fill;
|
|
}
|
|
return done();
|
|
}
|
|
|
|
bool done() const noexcept {
|
|
return _dcs_to_fill == 0;
|
|
}
|
|
|
|
host_id_set& replicas() noexcept {
|
|
return _replicas;
|
|
}
|
|
|
|
static void check_enough_endpoints(const token_metadata& tm, const std::unordered_map<sstring, size_t>& dc_rf) {
|
|
auto dc_endpoints = tm.get_datacenter_token_owners();
|
|
auto endpoints_in = [&dc_endpoints](sstring dc) {
|
|
auto i = dc_endpoints.find(dc);
|
|
return i != dc_endpoints.end() ? i->second.size() : size_t(0);
|
|
};
|
|
for (const auto& [dc, rf] : dc_rf) {
|
|
if (rf > endpoints_in(dc)) {
|
|
throw exceptions::configuration_exception(seastar::format(
|
|
"Datacenter {} doesn't have enough token-owning nodes for replication_factor={}", dc, rf));
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
future<host_id_set>
|
|
network_topology_strategy::calculate_natural_endpoints(
|
|
const token& search_token, const token_metadata& tm) const {
|
|
|
|
natural_endpoints_tracker tracker(tm, _dc_rep_factor);
|
|
|
|
for (auto& next : tm.ring_range(search_token)) {
|
|
co_await coroutine::maybe_yield();
|
|
|
|
host_id ep = *tm.get_endpoint(next);
|
|
if (tracker.add_endpoint_and_check_if_done(ep)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
co_return std::move(tracker.replicas());
|
|
}
|
|
|
|
void network_topology_strategy::validate_options(const gms::feature_service& fs, const locator::topology& topology) const {
|
|
// #22688 / #20039 - we want to remove dc:s once rf=0, and we
|
|
// also want to allow fully setting rf=0 in _all_ dc:s (hello data loss)
|
|
// so empty options here are in fact ok. Removed check for it
|
|
auto dcs = topology.get_datacenters();
|
|
validate_tablet_options(*this, fs, _config_options);
|
|
for (auto& c : _config_options) {
|
|
if (c.first == sstring("replication_factor")) {
|
|
on_internal_error(rslogger, fmt::format("'replication_factor' tag should be unrolled into a list of DC:RF by now."
|
|
"_config_options:{}", _config_options));
|
|
}
|
|
if (!dcs.contains(c.first)) {
|
|
throw exceptions::configuration_exception(format("Unrecognized strategy option {{{}}} "
|
|
"passed to NetworkTopologyStrategy", this->to_qualified_class_name(c.first)));
|
|
}
|
|
parse_replication_factor(c.second);
|
|
}
|
|
}
|
|
|
|
effective_replication_map_ptr network_topology_strategy::make_replication_map(table_id table, token_metadata_ptr tm) const {
|
|
if (!uses_tablets()) {
|
|
on_internal_error(rslogger, format("make_replication_map() called for table {} but replication strategy not configured to use tablets", table));
|
|
}
|
|
return do_make_replication_map(table, shared_from_this(), std::move(tm), _rep_factor);
|
|
}
|
|
|
|
//
|
|
// Try to use as many tablets initially, so that all shards in the current topology
|
|
// are covered with at least `min_per_shard_tablet_count` tablets. In other words, the value is
|
|
//
|
|
// initial_tablets = max(nr_shards_in(dc) / RF_in(dc) for dc in datacenters)
|
|
//
|
|
|
|
static future<unsigned> calculate_initial_tablets_from_topology(const schema& s, token_metadata_ptr tm, const std::unordered_map<sstring, size_t>& rf, double min_per_shard_tablet_count = 0) {
|
|
unsigned initial_tablets = std::numeric_limits<unsigned>::min();
|
|
std::unordered_map<sstring, unsigned> shards_per_dc_map;
|
|
tm->for_each_token_owner([&] (const node& node) {
|
|
if (node.is_normal()) {
|
|
shards_per_dc_map[node.dc_rack().dc] += node.get_shard_count();
|
|
}
|
|
});
|
|
min_per_shard_tablet_count = std::max(1.0, min_per_shard_tablet_count);
|
|
for (const auto& [dc, rf_in_dc] : rf) {
|
|
if (!rf_in_dc) {
|
|
continue;
|
|
}
|
|
unsigned shards_in_dc = shards_per_dc_map[dc];
|
|
unsigned tablets_in_dc = std::ceil((double)(min_per_shard_tablet_count * shards_in_dc) / rf_in_dc);
|
|
initial_tablets = std::max(initial_tablets, tablets_in_dc);
|
|
}
|
|
rslogger.debug("Estimated {} initial tablets for table {}.{}", initial_tablets, s.ks_name(), s.cf_name());
|
|
co_return initial_tablets;
|
|
}
|
|
|
|
future<size_t> network_topology_strategy::calculate_min_tablet_count(schema_ptr s, token_metadata_ptr tm, uint64_t target_tablet_size, std::optional<unsigned> initial_scale) const {
|
|
size_t tablet_count = get_initial_tablets();
|
|
const auto& tablet_options = s->tablet_options();
|
|
if (tablet_options.min_tablet_count) {
|
|
tablet_count = std::max<size_t>(tablet_count, tablet_options.min_tablet_count.value());
|
|
}
|
|
if (tablet_options.expected_data_size_in_gb) {
|
|
tablet_count = std::max<size_t>(tablet_count, (tablet_options.expected_data_size_in_gb.value() << 30) / target_tablet_size);
|
|
}
|
|
auto min_per_shard_tablet_count = tablet_options.min_per_shard_tablet_count.value_or(
|
|
// If min_tablet_count is set, initial_scale should not be effective for
|
|
// compatibility with the deprecated "initial" tablet count.
|
|
(get_initial_tablets() || tablet_options.min_tablet_count) ? 0 : initial_scale.value_or(1));
|
|
if (min_per_shard_tablet_count) {
|
|
tablet_count = std::max<size_t>(tablet_count, co_await calculate_initial_tablets_from_topology(*s, tm, _dc_rep_factor,
|
|
min_per_shard_tablet_count));
|
|
}
|
|
co_return tablet_count;
|
|
}
|
|
|
|
future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata_ptr tm, size_t tablet_count) const {
|
|
auto aligned_tablet_count = 1ul << log2ceil(tablet_count);
|
|
if (tablet_count != aligned_tablet_count) {
|
|
rslogger.info("Rounding up tablet count from {} to {} for table {}.{}", tablet_count, aligned_tablet_count, s->ks_name(), s->cf_name());
|
|
tablet_count = aligned_tablet_count;
|
|
}
|
|
co_return co_await reallocate_tablets(std::move(s), std::move(tm), tablet_map(tablet_count));
|
|
}
|
|
|
|
future<tablet_map> network_topology_strategy::reallocate_tablets(schema_ptr s, token_metadata_ptr tm, tablet_map tablets) const {
|
|
natural_endpoints_tracker::check_enough_endpoints(*tm, _dc_rep_factor);
|
|
load_sketch load(tm);
|
|
co_await load.populate();
|
|
|
|
tablet_logger.debug("Allocating tablets for {}.{} ({}): dc_rep_factor={} tablet_count={}", s->ks_name(), s->cf_name(), s->id(), _dc_rep_factor, tablets.tablet_count());
|
|
|
|
for (tablet_id tb : tablets.tablet_ids()) {
|
|
auto tinfo = tablets.get_tablet_info(tb);
|
|
tinfo.replicas = co_await reallocate_tablets(s, tm, load, tablets, tb);
|
|
tablets.set_tablet(tb, std::move(tinfo));
|
|
}
|
|
|
|
tablet_logger.debug("Allocated tablets for {}.{} ({}): dc_rep_factor={}: {}", s->ks_name(), s->cf_name(), s->id(), _dc_rep_factor, tablets);
|
|
co_return tablets;
|
|
}
|
|
|
|
future<tablet_replica_set> network_topology_strategy::reallocate_tablets(schema_ptr s, token_metadata_ptr tm, load_sketch& load, const tablet_map& cur_tablets, tablet_id tb) const {
|
|
tablet_replica_set replicas;
|
|
// Current number of replicas per dc
|
|
std::unordered_map<sstring, size_t> nodes_per_dc;
|
|
// Current replicas per dc/rack
|
|
std::unordered_map<sstring, std::map<sstring, std::unordered_set<locator::host_id>>> replicas_per_dc_rack;
|
|
|
|
replicas = cur_tablets.get_tablet_info(tb).replicas;
|
|
for (const auto& tr : replicas) {
|
|
const auto& node = tm->get_topology().get_node(tr.host);
|
|
replicas_per_dc_rack[node.dc_rack().dc][node.dc_rack().rack].insert(tr.host);
|
|
++nodes_per_dc[node.dc_rack().dc];
|
|
}
|
|
|
|
// #22688 - take all dcs in topology into account when determining migration.
|
|
// Any change should still have been pre-checked to never exceed rf factor one.
|
|
for (const auto& dc : tm->get_topology().get_datacenters()) {
|
|
auto dc_rf = get_replication_factor(dc);
|
|
auto dc_node_count = nodes_per_dc[dc];
|
|
if (dc_rf == dc_node_count) {
|
|
continue;
|
|
}
|
|
if (dc_rf > dc_node_count) {
|
|
replicas = co_await add_tablets_in_dc(s, tm, load, tb, replicas_per_dc_rack[dc], replicas, dc, dc_node_count, dc_rf);
|
|
} else {
|
|
replicas = drop_tablets_in_dc(s, tm->get_topology(), load, tb, replicas, dc, dc_node_count, dc_rf);
|
|
}
|
|
}
|
|
|
|
co_return replicas;
|
|
}
|
|
|
|
future<tablet_replica_set> network_topology_strategy::add_tablets_in_dc(schema_ptr s, token_metadata_ptr tm, load_sketch& load, tablet_id tb,
|
|
std::map<sstring, std::unordered_set<locator::host_id>>& replicas_per_rack,
|
|
const tablet_replica_set& cur_replicas,
|
|
sstring dc, size_t dc_node_count, size_t dc_rf) const {
|
|
static thread_local std::default_random_engine rnd_engine{std::random_device{}()};
|
|
|
|
auto replicas = cur_replicas;
|
|
// all_dc_racks is ordered lexicographically on purpose
|
|
auto all_dc_racks = tm->get_datacenter_racks_token_owners_nodes().at(dc)
|
|
| std::ranges::to<std::map>();
|
|
|
|
// Track all nodes with no replicas on them for this tablet, per rack.
|
|
struct node_load {
|
|
locator::host_id host;
|
|
uint64_t load;
|
|
};
|
|
// for sorting in descending load order
|
|
// (in terms of number of replicas)
|
|
auto node_load_cmp = [] (const node_load& a, const node_load& b) {
|
|
return a.load > b.load;
|
|
};
|
|
|
|
struct rack_candidates {
|
|
sstring rack;
|
|
utils::small_vector<node_load, 3> nodes;
|
|
};
|
|
using candidates_list = std::vector<rack_candidates>;
|
|
candidates_list existing_racks;
|
|
|
|
// We use this list to start allocating from an
|
|
// unpoplated rack.
|
|
candidates_list new_racks;
|
|
|
|
for (const auto& [rack, nodes] : all_dc_racks) {
|
|
co_await coroutine::maybe_yield();
|
|
if (nodes.empty()) {
|
|
continue;
|
|
}
|
|
const auto& existing = replicas_per_rack[rack];
|
|
candidates_list& rack_list = existing.empty() ? new_racks : existing_racks;
|
|
auto& candidate = rack_list.emplace_back(rack);
|
|
for (const auto& node : nodes) {
|
|
if (!node.get().is_normal()) {
|
|
continue;
|
|
}
|
|
const auto& host_id = node.get().host_id();
|
|
if (!existing.contains(host_id)) {
|
|
candidate.nodes.emplace_back(host_id, load.get_load(host_id));
|
|
}
|
|
}
|
|
if (candidate.nodes.empty()) {
|
|
rack_list.pop_back();
|
|
tablet_logger.trace("allocate_replica {}.{}: no candidate nodes left on rack={}", s->ks_name(), s->cf_name(), rack);
|
|
// Note that this rack can't be in new_racks since
|
|
// those had no existing replicas and if current rack has no nodes
|
|
// we skip it in the beginning of the loop body
|
|
continue;
|
|
}
|
|
// Sort candidate nodes in each rack in descending load order
|
|
// so we want to allocate first from the least loaded nodes.
|
|
// Do shuffle + stable_sort to shuffle nodes with equal load.
|
|
std::shuffle(candidate.nodes.begin(), candidate.nodes.end(), rnd_engine);
|
|
std::stable_sort(candidate.nodes.begin(), candidate.nodes.end(), node_load_cmp);
|
|
}
|
|
|
|
candidates_list candidate_racks;
|
|
|
|
// ensure fairness across racks (in particular if rf < number_of_racks)
|
|
// by rotating the racks order
|
|
auto append_candidate_racks = [&] (candidates_list& racks) {
|
|
if (auto size = racks.size()) {
|
|
auto it = racks.begin() + tb.id % size;
|
|
std::move(it, racks.end(), std::back_inserter(candidate_racks));
|
|
std::move(racks.begin(), it, std::back_inserter(candidate_racks));
|
|
}
|
|
};
|
|
|
|
append_candidate_racks(new_racks);
|
|
append_candidate_racks(existing_racks);
|
|
|
|
if (candidate_racks.empty()) {
|
|
on_internal_error(tablet_logger,
|
|
seastar::format("allocate_replica {}.{}: no candidate racks found for dc={} allocated={} rf={}: existing={}",
|
|
s->ks_name(), s->cf_name(), dc, dc_node_count, dc_rf, replicas_per_rack));
|
|
}
|
|
|
|
auto candidate_rack = candidate_racks.begin();
|
|
|
|
auto allocate_replica = [&] (candidates_list::iterator& candidate) {
|
|
const auto& rack = candidate->rack;
|
|
auto& nodes = candidate->nodes;
|
|
if (nodes.empty()) {
|
|
on_internal_error(tablet_logger,
|
|
seastar::format("allocate_replica {}.{} tablet_id={}: candidates vector for rack={} is empty for allocating tablet replicas in dc={} allocated={} rf={}",
|
|
s->ks_name(), s->cf_name(), tb.id, rack, dc, dc_node_count, dc_rf));
|
|
}
|
|
auto host_id = nodes.back().host;
|
|
auto replica = tablet_replica{host_id, load.next_shard(host_id)};
|
|
const auto& node = tm->get_topology().get_node(host_id);
|
|
auto inserted = replicas_per_rack[node.dc_rack().rack].insert(host_id).second;
|
|
// Sanity check that a node is not used more than once
|
|
if (!inserted) {
|
|
on_internal_error(tablet_logger,
|
|
seastar::format("allocate_replica {}.{} tablet_id={}: allocated replica={} node already used when allocating tablet replicas in dc={} allocated={} rf={}: replicas={}",
|
|
s->ks_name(), s->cf_name(), tb.id, replica, dc, dc_node_count, dc_rf, replicas));
|
|
}
|
|
nodes.pop_back();
|
|
tablet_logger.trace("allocate_replica {}.{} tablet_id={}: allocated tablet replica={} dc={} rack={}: nodes remaining in rack={}",
|
|
s->ks_name(), s->cf_name(), tb.id, replica, node.dc_rack().dc, node.dc_rack().rack, nodes.size());
|
|
if (nodes.empty()) {
|
|
candidate = candidate_racks.erase(candidate);
|
|
} else {
|
|
++candidate;
|
|
}
|
|
if (candidate == candidate_racks.end()) {
|
|
candidate = candidate_racks.begin();
|
|
}
|
|
if (tablet_logger.is_enabled(log_level::trace)) {
|
|
if (candidate != candidate_racks.end()) {
|
|
tablet_logger.trace("allocate_replica {}.{} tablet_id={}: next rack={} nodes={}", s->ks_name(), s->cf_name(), tb.id, candidate->rack, candidate->nodes.size());
|
|
} else {
|
|
tablet_logger.trace("allocate_replica {}.{} tablet_id={}: no candidate racks left", s->ks_name(), s->cf_name(), tb.id);
|
|
}
|
|
}
|
|
return replica;
|
|
};
|
|
|
|
tablet_logger.debug("allocate_replica {}.{} tablet_id={}: allocating tablet replicas in dc={} allocated={} rf={}",
|
|
s->ks_name(), s->cf_name(), tb.id, dc, dc_node_count, dc_rf);
|
|
|
|
for (size_t remaining = dc_rf - dc_node_count; remaining; --remaining) {
|
|
co_await coroutine::maybe_yield();
|
|
if (candidate_rack == candidate_racks.end()) {
|
|
on_internal_error(tablet_logger,
|
|
format("allocate_replica {}.{} tablet_id={}: ran out of candidates for allocating tablet replicas in dc={} allocated={} rf={}: remaining={}",
|
|
s->ks_name(), s->cf_name(), tb.id, dc, dc_node_count, dc_rf, remaining));
|
|
}
|
|
replicas.emplace_back(allocate_replica(candidate_rack));
|
|
}
|
|
|
|
co_return replicas;
|
|
}
|
|
|
|
tablet_replica_set network_topology_strategy::drop_tablets_in_dc(schema_ptr s, const locator::topology& topo, load_sketch& load, tablet_id tb,
|
|
const tablet_replica_set& cur_replicas,
|
|
sstring dc, size_t dc_node_count, size_t dc_rf) const {
|
|
tablet_logger.debug("drop_tablets_in_dc {}.{} tablet_id={}: deallocating tablet replicas in dc={} allocated={} rf={}", s->ks_name(), s->cf_name(), tb.id, dc, dc_node_count, dc_rf);
|
|
|
|
// Leave dc_rf replicas in dc, effectively deallocating in reverse order,
|
|
// to maintain replica pairing between the base table and its materialized views.
|
|
// This may leave racks unbalanced, but that's ok since the tablet load balancer
|
|
// can fix this later.
|
|
tablet_replica_set filtered;
|
|
filtered.reserve(cur_replicas.size() - (dc_node_count - dc_rf));
|
|
size_t nodes_in_dc = 0;
|
|
for (const auto& tr : cur_replicas) {
|
|
if (topo.get_node(tr.host).dc_rack().dc != dc || ++nodes_in_dc <= dc_rf) {
|
|
filtered.emplace_back(tr);
|
|
} else {
|
|
load.unload(tr.host, tr.shard);
|
|
}
|
|
}
|
|
return filtered;
|
|
}
|
|
|
|
sstring network_topology_strategy::sanity_check_read_replicas(const effective_replication_map& erm,
|
|
const host_id_vector_replica_set& read_replicas) const {
|
|
const auto& topology = erm.get_topology();
|
|
|
|
struct rf_node_count {
|
|
size_t replication_factor{0};
|
|
size_t node_count{0};
|
|
};
|
|
|
|
absl::flat_hash_map<sstring, rf_node_count> data_centers_replication_factor;
|
|
std::ranges::for_each(read_replicas, [&data_centers_replication_factor, &topology, this](const auto& node) {
|
|
auto res = data_centers_replication_factor.emplace(topology.get_datacenter(node), rf_node_count{0, 0});
|
|
if (res.second) {
|
|
// For new item add replication factor.
|
|
res.first->second.replication_factor = get_replication_factor(res.first->first);
|
|
}
|
|
++res.first->second.node_count;
|
|
});
|
|
|
|
for (const auto& [key, item] : data_centers_replication_factor) {
|
|
if (item.replication_factor < item.node_count) {
|
|
return seastar::format("network_topology_strategy: ERM inconsistency, Datacenter [{}] has higher count of read replicas (accounting for "
|
|
"current consistency level): [{}] than its replication factor [{}]",
|
|
key, item.node_count, item.replication_factor);
|
|
}
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
using registry = class_registrator<abstract_replication_strategy, network_topology_strategy, replication_strategy_params>;
|
|
static registry registrator("org.apache.cassandra.locator.NetworkTopologyStrategy");
|
|
static registry registrator_short_name("NetworkTopologyStrategy");
|
|
}
|