service: implement make_rf_change_plan
In make_rf_change_plan, load balancer schedules necessary migrations, considering the load of nodes and other pending tablet transitions. Requests from ongoing_rf_changes are processed concurrently, independently from one another. In each request racks are processed concurrently. No tablet replica will be removed until all required replicas are added. While adding replicas to each rack we always start with base tables and won't proceed with views until they are done (while removing - the other way around). Node availability is checked at two levels for extending actions: 1) In prepare_per_rack_rf_change_plan: the entire RF change request is aborted if any node in the target dc+rack is down, or if there are no live (non-excluded) nodes at all. Shrinking is never aborted. 2) In make_rf_change_plan: extending is skipped for a given round if any normal, non-excluded node in the target dc+rack is missing from the balanced node set. Shrinking always proceeds regardless. The resulting behavior per node state combination (extending only): - all up -> proceed - some excluded + some up -> proceed (excluded nodes are skipped) - any down node -> abort - all excluded (no live) -> abort When the last step is finished: - in system_schema.keyspaces: - next_replication is cleared; - new keyspace properties are saved (if request succeeded); - request is removed from ongoing_rf_changes; - the request is marked as done in system.topology_requests.
This commit is contained in:
@@ -381,6 +381,10 @@ public:
|
||||
return _nodes.at(node)._du.capacity;
|
||||
}
|
||||
|
||||
bool has_node(host_id node) const {
|
||||
return _nodes.contains(node);
|
||||
}
|
||||
|
||||
shard_id get_shard_count(host_id node) const {
|
||||
if (!_nodes.contains(node)) {
|
||||
return 0;
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include <ranges>
|
||||
#include <utility>
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/switch_to.hh>
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
@@ -532,6 +533,25 @@ struct hash<migration_tablet_set> {
|
||||
|
||||
namespace service {
|
||||
|
||||
// Subtract right from left. The result contains only keys from left.
|
||||
static std::unordered_map<sstring, std::vector<sstring>> subtract_replication(const std::unordered_map<sstring, std::vector<sstring>>& left, const std::unordered_map<sstring, std::vector<sstring>>& right) {
|
||||
std::unordered_map<sstring, std::vector<sstring>> res;
|
||||
for (const auto& [dc, rf_value] : left) {
|
||||
auto it = right.find(dc);
|
||||
if (it == right.end()) {
|
||||
res[dc] = rf_value;
|
||||
} else {
|
||||
std::vector<sstring> diff = rf_value | std::views::filter([&] (const sstring& rack) {
|
||||
return std::find(it->second.begin(), it->second.end(), rack) == it->second.end();
|
||||
}) | std::ranges::to<std::vector<sstring>>();
|
||||
if (!diff.empty()) {
|
||||
res[dc] = diff;
|
||||
}
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/// The algorithm aims to equalize tablet count on each shard.
|
||||
/// This goal is based on the assumption that every shard has similar processing power and space capacity,
|
||||
/// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we
|
||||
@@ -1058,12 +1078,13 @@ public:
|
||||
migration_plan plan;
|
||||
|
||||
auto rack_list_colocation = ongoing_rack_list_colocation();
|
||||
auto rf_change_prep = co_await prepare_per_rack_rf_change_plan(plan);
|
||||
|
||||
// Prepare plans for each DC separately and combine them to be executed in parallel.
|
||||
for (auto&& dc : topo.get_datacenters()) {
|
||||
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation) {
|
||||
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation || !rf_change_prep.actions.empty()) {
|
||||
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
|
||||
auto rack_plan = co_await make_plan(dc, rack);
|
||||
auto rack_plan = co_await make_plan(dc, rack, rf_change_prep.actions[{dc, rack}]);
|
||||
auto level = rack_plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
|
||||
lblogger.log(level, "Plan for {}/{}: {}", dc, rack, plan_summary(rack_plan));
|
||||
plan.merge(std::move(rack_plan));
|
||||
@@ -1453,6 +1474,382 @@ public:
|
||||
co_return std::move(plan);
|
||||
}
|
||||
|
||||
enum class rf_change_state {
|
||||
ready, // RF change is ready (succeed or failed).
|
||||
needs_extending,
|
||||
needs_shrinking,
|
||||
};
|
||||
|
||||
using process_views = bool_class<struct process_views_tag>;
|
||||
struct rf_change_action {
|
||||
sstring keyspace;
|
||||
rf_change_state state;
|
||||
process_views pv = process_views::no;
|
||||
};
|
||||
using rf_change_actions = std::unordered_map<locator::endpoint_dc_rack, std::vector<rf_change_action>>;
|
||||
struct rf_change_preparation {
|
||||
rf_change_actions actions;
|
||||
};
|
||||
|
||||
// Determines which dc+rack combinations need RF change actions for a given keyspace,
|
||||
// by comparing current tablet replicas against the target replication configuration.
|
||||
// Scans in priority order: extend tables, extend views, shrink views, shrink tables.
|
||||
// Returns the first non-empty set of per-rack actions; colocated tables are skipped.
|
||||
// An empty result means all tablets already match the target configuration.
|
||||
future<rf_change_preparation> determine_rf_change_actions_per_rack(const sstring& ks_name, const std::vector<schema_ptr>& tables, const std::vector<schema_ptr>& views, const locator::replication_strategy_config_options& next) {
|
||||
auto add_entry = [&ks_name] (rf_change_preparation& prep, const sstring& dc, const sstring& rack, rf_change_state state, process_views pv) {
|
||||
locator::endpoint_dc_rack key{dc, rack};
|
||||
auto& actions = prep.actions[key];
|
||||
if (std::none_of(actions.begin(), actions.end(), [&](const rf_change_action& a) { return a.keyspace == ks_name; })) {
|
||||
actions.push_back(rf_change_action{.keyspace = ks_name, .state = state, .pv = pv});
|
||||
}
|
||||
};
|
||||
|
||||
auto next_replication = next | std::views::transform([] (const auto& pair) {
|
||||
return std::make_pair(pair.first, std::get<rack_list>(pair.second));
|
||||
}) | std::ranges::to<std::unordered_map<sstring, std::vector<sstring>>>();
|
||||
|
||||
auto scan_tables = [&] (const std::vector<schema_ptr>& table_list, rf_change_state target_state, process_views pv) -> future<rf_change_preparation> {
|
||||
rf_change_preparation prep;
|
||||
for (const auto& table : table_list) {
|
||||
if (!_tm->tablets().is_base_table(table->id())) {
|
||||
continue;
|
||||
}
|
||||
const auto& tmap = _tm->tablets().get_tablet_map(table->id());
|
||||
for (const tablet_info& ti : tmap.tablets()) {
|
||||
std::unordered_map<sstring, std::vector<sstring>> dc_to_racks;
|
||||
for (const auto& r : ti.replicas) {
|
||||
const auto& node_dc_rack = _tm->get_topology().get_node(r.host).dc_rack();
|
||||
dc_to_racks[node_dc_rack.dc].push_back(node_dc_rack.rack);
|
||||
}
|
||||
|
||||
auto diff = (target_state == rf_change_state::needs_extending ?
|
||||
subtract_replication(next_replication, dc_to_racks) : subtract_replication(dc_to_racks, next_replication))
|
||||
| std::views::filter([] (const auto& pair) {
|
||||
return !pair.second.empty();
|
||||
}
|
||||
) | std::ranges::to<std::unordered_map<sstring, std::vector<sstring>>>();
|
||||
|
||||
for (const auto& [dc, racks] : diff) {
|
||||
for (const auto& rack : racks) {
|
||||
add_entry(prep, dc, rack, target_state, pv);
|
||||
}
|
||||
}
|
||||
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_return prep;
|
||||
};
|
||||
|
||||
// Extend base tables.
|
||||
if (auto prep = co_await scan_tables(tables, rf_change_state::needs_extending, process_views::no); !prep.actions.empty()) {
|
||||
co_return prep;
|
||||
}
|
||||
|
||||
// Extend views.
|
||||
if (auto prep = co_await scan_tables(views, rf_change_state::needs_extending, process_views::yes); !prep.actions.empty()) {
|
||||
co_return prep;
|
||||
}
|
||||
|
||||
// Shrink views.
|
||||
if (auto prep = co_await scan_tables(views, rf_change_state::needs_shrinking, process_views::yes); !prep.actions.empty()) {
|
||||
co_return prep;
|
||||
}
|
||||
|
||||
// Shrink base tables.
|
||||
if (auto prep = co_await scan_tables(tables, rf_change_state::needs_shrinking, process_views::no); !prep.actions.empty()) {
|
||||
co_return prep;
|
||||
}
|
||||
|
||||
co_return rf_change_preparation{};
|
||||
}
|
||||
|
||||
future<rf_change_preparation> prepare_per_rack_rf_change_plan(migration_plan& mplan) {
|
||||
lblogger.debug("In prepare_per_rack_rf_change_plan");
|
||||
|
||||
rf_change_preparation res;
|
||||
keyspace_rf_change_plan plan;
|
||||
if (!ongoing_rf_change()) {
|
||||
co_return res;
|
||||
}
|
||||
|
||||
for (const auto& request_id : _topology->ongoing_rf_changes) {
|
||||
auto req_entry = co_await _sys_ks->get_topology_request_entry(request_id);
|
||||
sstring ks_name = *req_entry.new_keyspace_rf_change_ks_name;
|
||||
|
||||
if (!_db.has_keyspace(ks_name)) {
|
||||
if (!plan.completion) {
|
||||
plan.completion = rf_change_completion_info{
|
||||
.request_id = request_id,
|
||||
.ks_name = ks_name,
|
||||
.error = format("Keyspace {} not found", ks_name),
|
||||
.saved_ks_props = req_entry.new_keyspace_rf_change_data.value(),
|
||||
};
|
||||
}
|
||||
continue;
|
||||
}
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
if (!ks.metadata()->next_strategy_options_opt()) {
|
||||
on_internal_error(lblogger, format("There is an ongoing rf change request {} for keyspace {}, "
|
||||
"but the keyspace does not have next replication settings", request_id, ks_name));
|
||||
}
|
||||
|
||||
auto tables = ks.metadata()->tables();
|
||||
auto views = ks.metadata()->views() | std::views::transform([] (const auto& view) { return schema_ptr(view); }) | std::ranges::to<std::vector<schema_ptr>>();
|
||||
auto rf_change_prep = co_await determine_rf_change_actions_per_rack(ks_name, tables, views, *ks.metadata()->next_strategy_options_opt());
|
||||
if (rf_change_prep.actions.empty()) {
|
||||
if (!plan.completion) {
|
||||
plan.completion = rf_change_completion_info{
|
||||
.request_id = request_id,
|
||||
.ks_name = ks_name,
|
||||
.error = req_entry.error,
|
||||
.saved_ks_props = req_entry.new_keyspace_rf_change_data.value()
|
||||
};
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if any extending action targets a dc+rack with no available nodes.
|
||||
// If so, the RF change can never complete and should be aborted.
|
||||
sstring error_msg = "";
|
||||
const auto& topo = _tm->get_topology();
|
||||
const auto& dc_rack_nodes = topo.get_datacenter_rack_nodes();
|
||||
for (const auto& [dc_rack, actions] : rf_change_prep.actions) {
|
||||
bool needs_extending = std::ranges::any_of(actions, [] (const rf_change_action& a) {
|
||||
return a.state == rf_change_state::needs_extending;
|
||||
});
|
||||
if (!needs_extending) {
|
||||
break;
|
||||
}
|
||||
bool has_live_node = false;
|
||||
bool has_down_node = false;
|
||||
auto dc_it = dc_rack_nodes.find(dc_rack.dc);
|
||||
if (dc_it != dc_rack_nodes.end()) {
|
||||
auto rack_it = dc_it->second.find(dc_rack.rack);
|
||||
if (rack_it != dc_it->second.end()) {
|
||||
for (const auto& node_ref : rack_it->second) {
|
||||
const auto& node = node_ref.get();
|
||||
if (_skiplist.contains(node.host_id())) {
|
||||
has_down_node = true;
|
||||
break;
|
||||
}
|
||||
if (!node.is_excluded()) {
|
||||
has_live_node = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (has_down_node) {
|
||||
lblogger.warn("RF change for keyspace {} requires extending to {}/{} but there are down nodes there; aborting",
|
||||
ks_name, dc_rack.dc, dc_rack.rack);
|
||||
error_msg = format("RF change aborted: there are down nodes in required rack {}/{}", dc_rack.dc, dc_rack.rack);
|
||||
break;
|
||||
}
|
||||
if (!has_live_node) {
|
||||
lblogger.warn("RF change for keyspace {} requires extending to {}/{} but no available nodes exist there; aborting",
|
||||
ks_name, dc_rack.dc, dc_rack.rack);
|
||||
error_msg = format("RF change aborted: no available nodes in required rack {}/{}", dc_rack.dc, dc_rack.rack);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!error_msg.empty()) {
|
||||
plan.aborts.push_back(rf_change_abort_info{
|
||||
.request_id = request_id,
|
||||
.ks_name = ks_name,
|
||||
.error = error_msg,
|
||||
.current_replication = ks.metadata()->strategy_options(),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto& [dc_rack, actions] : rf_change_prep.actions) {
|
||||
auto& dst = res.actions[dc_rack];
|
||||
dst.insert(dst.end(), std::make_move_iterator(actions.begin()), std::make_move_iterator(actions.end()));
|
||||
}
|
||||
}
|
||||
mplan.set_rf_change_plan(std::move(plan));
|
||||
co_return res;
|
||||
}
|
||||
|
||||
future<migration_plan> make_rf_change_plan(node_load_map& nodes, std::vector<rf_change_action> actions, sstring dc, sstring rack) {
|
||||
lblogger.debug("In make_rf_change_plan");
|
||||
|
||||
migration_plan mplan;
|
||||
keyspace_rf_change_plan plan;
|
||||
|
||||
auto nodes_by_load_dst = nodes | std::views::filter([&] (const auto& host_load) {
|
||||
auto& [host, load] = host_load;
|
||||
auto& node = *load.node;
|
||||
return node.dc_rack().dc == dc && node.dc_rack().rack == rack;
|
||||
}) | std::views::keys | std::ranges::to<std::vector<host_id>>();
|
||||
|
||||
bool has_extending = std::ranges::any_of(actions, [] (const rf_change_action& a) {
|
||||
return a.state == rf_change_state::needs_extending;
|
||||
});
|
||||
if (has_extending) {
|
||||
// Check that all normal, non-excluded nodes in the target dc/rack are present in the
|
||||
// balanced node set. If any such node is missing, extending cannot safely proceed.
|
||||
const auto& topo = _tm->get_topology();
|
||||
const auto& dc_rack_nodes = topo.get_datacenter_rack_nodes();
|
||||
bool missing_node = false;
|
||||
auto dc_it = dc_rack_nodes.find(dc);
|
||||
if (dc_it != dc_rack_nodes.end()) {
|
||||
auto rack_it = dc_it->second.find(rack);
|
||||
if (rack_it != dc_it->second.end()) {
|
||||
for (const auto& node_ref : rack_it->second) {
|
||||
const auto& node = node_ref.get();
|
||||
if (node.is_normal() && !node.is_excluded() && !nodes.contains(node.host_id())) {
|
||||
missing_node = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (missing_node || nodes_by_load_dst.empty()) {
|
||||
lblogger.warn("Not all non-excluded nodes are available for RF change extending plan in dc {}, rack {}", dc, rack);
|
||||
// Filter out extending actions since not all nodes are available.
|
||||
// Shrinking actions can still proceed without target nodes.
|
||||
std::erase_if(actions, [] (const rf_change_action& a) {
|
||||
return a.state == rf_change_state::needs_extending;
|
||||
});
|
||||
if (actions.empty()) {
|
||||
co_return mplan;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto nodes_cmp = nodes_by_load_cmp(nodes);
|
||||
auto nodes_dst_cmp = [&] (const host_id& a, const host_id& b) {
|
||||
return nodes_cmp(b, a);
|
||||
};
|
||||
|
||||
// Ascending load heap of candidate target nodes.
|
||||
std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
|
||||
const locator::topology& topo = _tm->get_topology();
|
||||
locator::endpoint_dc_rack location{dc, rack};
|
||||
|
||||
for (const auto& action : actions) {
|
||||
const auto& ks_name = action.keyspace;
|
||||
const auto& rf_change_state = action.state;
|
||||
|
||||
auto& ks = _db.find_keyspace(ks_name);
|
||||
auto table_list = action.pv
|
||||
? ks.metadata()->views() | std::views::transform([] (const auto& view) { return schema_ptr(view); }) | std::ranges::to<std::vector<schema_ptr>>()
|
||||
: ks.metadata()->tables();
|
||||
for (const auto& table_or_mv : table_list) {
|
||||
const auto& tmap = _tm->tablets().get_tablet_map(table_or_mv->id());
|
||||
co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) -> future<> {
|
||||
if (!_tm->tablets().is_base_table(table_or_mv->id())) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto gid = locator::global_tablet_id{table_or_mv->id(), tid};
|
||||
|
||||
auto it = std::find_if(ti.replicas.begin(), ti.replicas.end(), [&] (const tablet_replica& r) {
|
||||
return topo.get_node(r.host).dc_rack() == location;
|
||||
});
|
||||
|
||||
auto replica = it != ti.replicas.end() ? std::optional<tablet_replica>{*it} : std::nullopt;
|
||||
|
||||
auto* tti = tmap.get_tablet_transition_info(tid);
|
||||
bool pending_replica_in_this_rack = false;
|
||||
bool leaving_replica_in_this_rack = false;
|
||||
if (tti) {
|
||||
auto leaving_replica = get_leaving_replica(ti, *tti);
|
||||
leaving_replica_in_this_rack = leaving_replica.has_value() && topo.get_node(leaving_replica->host).dc_rack() == location;
|
||||
pending_replica_in_this_rack = tti->pending_replica.has_value() && topo.get_node(tti->pending_replica->host).dc_rack() == location;
|
||||
}
|
||||
|
||||
if ((rf_change_state == rf_change_state::needs_extending && (replica && !leaving_replica_in_this_rack)) ||
|
||||
(rf_change_state == rf_change_state::needs_shrinking && (!replica && !pending_replica_in_this_rack))) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Skip tablet that is in transitions.
|
||||
if (tti) {
|
||||
lblogger.debug("Skipped rf change extending for tablet={} which is already in transition={} stage={}", gid, tti->transition, tti->stage);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Skip tablet that is about to be in transition.
|
||||
if (_scheduled_tablets.contains(gid)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
migration_tablet_set source_tablets {
|
||||
.tablet_s = gid, // Ignore the merge co-location.
|
||||
};
|
||||
if (rf_change_state == rf_change_state::needs_extending) {
|
||||
// Pick the least loaded node as target.
|
||||
std::pop_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
auto target = nodes_by_load_dst.back();
|
||||
|
||||
lblogger.debug("target node: {}, avg_load={}", target, nodes[target].avg_load);
|
||||
|
||||
auto dst = global_shard_id {target, _load_sketch->get_least_loaded_shard(target)};
|
||||
|
||||
lblogger.trace("target shard: {}, tablets={}, load={}", dst.shard,
|
||||
nodes[target].shards[dst.shard].tablet_count,
|
||||
nodes[target].shard_load(dst.shard, _target_tablet_size));
|
||||
|
||||
tablet_replica pending_replica{
|
||||
.host = target,
|
||||
.shard = dst.shard,
|
||||
};
|
||||
auto next = ti.replicas;
|
||||
next.push_back(pending_replica);
|
||||
tablet_migration_info mig{
|
||||
.kind = locator::tablet_transition_kind::rebuild_v2,
|
||||
.tablet = gid,
|
||||
.src = std::nullopt,
|
||||
.dst = pending_replica,
|
||||
};
|
||||
auto mig_streaming_info = get_migration_streaming_info(topo, ti, mig);
|
||||
pick(*_load_sketch, dst.host, dst.shard, source_tablets);
|
||||
if (can_accept_load(nodes, mig_streaming_info)) {
|
||||
lblogger.debug("Starting rebuild_v2 transition to {}.{} of tablet {}; new_replica = {}", dc, rack, gid, pending_replica);
|
||||
apply_load(nodes, mig_streaming_info);
|
||||
mark_as_scheduled(mig);
|
||||
mplan.add(std::move(mig));
|
||||
}
|
||||
increase_node_load(nodes, dst, source_tablets);
|
||||
std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
} else {
|
||||
auto next = ti.replicas | std::views::filter([&] (const tablet_replica& r) {
|
||||
return r != *replica;
|
||||
}) | std::ranges::to<tablet_replica_set>();
|
||||
tablet_migration_info mig{
|
||||
.kind = locator::tablet_transition_kind::rebuild_v2,
|
||||
.tablet = gid,
|
||||
.src = *replica,
|
||||
.dst = std::nullopt,
|
||||
};
|
||||
auto mig_streaming_info = get_migration_streaming_info(topo, ti, mig);
|
||||
// The node being shrunk may be excluded/down and lack complete tablet stats.
|
||||
// Since we're removing a replica (not placing one), accurate load data isn't needed.
|
||||
if (_load_sketch->has_node(replica->host)) {
|
||||
unload(*_load_sketch, replica->host, replica->shard, source_tablets);
|
||||
}
|
||||
if (can_accept_load(nodes, mig_streaming_info)) {
|
||||
apply_load(nodes, mig_streaming_info);
|
||||
mark_as_scheduled(mig);
|
||||
mplan.add(std::move(mig));
|
||||
}
|
||||
if (nodes.contains(replica->host)) {
|
||||
decrease_node_load(nodes, *replica, source_tablets);
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
}
|
||||
mplan.set_rf_change_plan(std::move(plan));
|
||||
co_return mplan;
|
||||
}
|
||||
|
||||
// Returns true if a table has replicas of all its sibling tablets co-located.
|
||||
// This is used for determining whether merge can be finalized, since co-location
|
||||
// is a strict requirement for sibling tablets to be merged.
|
||||
@@ -3642,7 +4039,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<migration_plan> make_plan(dc_name dc, std::optional<sstring> rack = std::nullopt) {
|
||||
future<migration_plan> make_plan(dc_name dc, std::optional<sstring> rack = std::nullopt, std::vector<rf_change_action> rf_change_actions = {}) {
|
||||
migration_plan plan;
|
||||
|
||||
if (utils::get_local_injector().enter("tablet_migration_bypass")) {
|
||||
@@ -3760,12 +4157,6 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
if (nodes.empty()) {
|
||||
lblogger.debug("No nodes to balance.");
|
||||
_current_stats->stop_balance++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
// Detect finished drain.
|
||||
|
||||
for (auto i = nodes_to_drain.begin(); i != nodes_to_drain.end();) {
|
||||
@@ -3840,7 +4231,6 @@ public:
|
||||
}
|
||||
lblogger.debug("No candidate nodes");
|
||||
_current_stats->stop_no_candidates++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
// We want to saturate the target node so we migrate several tablets in parallel, one for each shard
|
||||
@@ -4002,7 +4392,7 @@ public:
|
||||
|
||||
print_node_stats(nodes, only_active::yes);
|
||||
|
||||
if (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || !is_balanced(min_load, max_load)))) {
|
||||
if (has_dest_nodes && (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || !is_balanced(min_load, max_load)))) && !nodes.empty()) {
|
||||
host_id target = *min_load_node;
|
||||
lblogger.info("target node: {}, avg_load: {}, max: {}", target, min_load, max_load);
|
||||
plan.merge(co_await make_internode_plan(nodes, nodes_to_drain, target));
|
||||
@@ -4014,6 +4404,10 @@ public:
|
||||
plan.merge(co_await make_intranode_plan(nodes, nodes_to_drain));
|
||||
}
|
||||
|
||||
if (!rf_change_actions.empty() && rack.has_value()) {
|
||||
plan.merge(co_await make_rf_change_plan(nodes, rf_change_actions, dc, rack.value()));
|
||||
}
|
||||
|
||||
if (_tm->tablets().balancing_enabled() && plan.empty() && !ongoing_rack_list_colocation()) {
|
||||
auto dc_merge_plan = co_await make_merge_colocation_plan(nodes);
|
||||
auto level = dc_merge_plan.tablet_migration_count() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
|
||||
Reference in New Issue
Block a user