tablets: load_balancer: Scale down tablet count to respect per-shard tablet count goal

The limit is enforced by controlling average per-shard tablet replica
count in a given DC, which is controlled by per-table tablet
count. This is effective in respecting the limit on individual shards
as long as tablet replicas are distributed evenly between shards.

There is no attempt to move tablets around in order to enforce limits
on individual shards in case of imbalance between shards.

If the average per-shard tablet count exceeds the limit, all tables
which contribute to it (have replicas in the DC) are scaled down
by the same factor. Due to rounding up to the nearest power of 2,
we may overshoot the per-shard goal by at most a factor of 2.

If different DCs want different scale factors of a given table, the
lowest scale factor is chosen for a given table.

The limit is configurable. It's a global per-cluster config which
controls how many tablet replicas per shard in total we consider to be
still ok. It controls tablet allocator behavior, when choosing initial
tablet count. Even though it's a per-node config, we don't support
different limits per node. All nodes must have the same value of that
config. It's similar in that regard to other scheduler config items
like tablets_initial_scale_factor and target_tablet_size_in_bytes.
This commit is contained in:
Tomasz Grabiec
2025-01-02 11:27:20 +01:00
parent 94b5165ac7
commit f1bda8d4c1
7 changed files with 188 additions and 13 deletions

View File

@@ -1336,6 +1336,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, tablets_initial_scale_factor(this, "tablets_initial_scale_factor", value_status::Used, 10,
"Minimum average number of tablet replicas per shard per table. Suppressed by tablet options in table's schema: min_per_shard_tablet_count and min_tablet_count")
, tablets_per_shard_goal(this, "tablets_per_shard_goal", liveness::LiveUpdate, value_status::Used, 100,
"The goal for the maximum number of tablet replicas per shard. Tablet allocator tries to keep this goal.")
, target_tablet_size_in_bytes(this, "target_tablet_size_in_bytes", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size,
"Allows target tablet size to be configured. Defaults to 5G (in bytes). Maintaining tablets at reasonable sizes is important to be able to " \
"redistribute load. A higher value means tablet migration throughput can be reduced. A lower value may cause number of tablets to increase significantly, " \

View File

@@ -503,6 +503,7 @@ public:
named_value<int> maximum_replication_factor_fail_threshold;
named_value<int> tablets_initial_scale_factor;
named_value<unsigned> tablets_per_shard_goal;
named_value<uint64_t> target_tablet_size_in_bytes;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_warn_list;

View File

@@ -29,7 +29,7 @@ public:
return _rep_factor;
}
size_t get_replication_factor(const sstring& dc) const {
size_t get_replication_factor(const sstring& dc) const override {
auto dc_factor = _dc_rep_factor.find(dc);
return (dc_factor == _dc_rep_factor.end()) ? 0 : dc_factor->second;
}

View File

@@ -49,6 +49,12 @@ public:
/// otherwise, cur_tablets is a copy of the current tablet_map.
/// Runs under group0 guard.
virtual future<tablet_map> reallocate_tablets(schema_ptr, token_metadata_ptr, tablet_map cur_tablets) const = 0;
/// Returns replication factor in a given DC.
/// Note that individual tablets may lag behind desired replication factor in their
/// current replica list, as replication factor changes involve table rebuilding transitions
/// which are not instantaneous.
virtual size_t get_replication_factor(const sstring& dc) const = 0;
};
} // namespace locator

View File

@@ -1693,6 +1693,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto stop_tsm = defer_verbose_shutdown("topology_state_machine", [&tsm] {
tsm.stop().get();
});
auto tablets_per_shard_goal_observer = cfg->tablets_per_shard_goal.observe([&tsm] (unsigned goal) {
tsm.local().event.broadcast();
});
auto compression_dict_updated_callback = [] () -> future<> {
auto dict = co_await sys_ks.local().query_dict();

View File

@@ -480,6 +480,8 @@ class load_balancer {
// due to the average size dropping below the merge threshold, as tablet count doubles.
const uint64_t _target_tablet_size = default_target_tablet_size;
const unsigned _tablets_per_shard_goal;
uint64_t target_max_tablet_size() const noexcept {
return _target_tablet_size * 2;
}
@@ -662,8 +664,13 @@ private:
return streaming_infos;
}
public:
load_balancer(replica::database& db, token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats, uint64_t target_tablet_size, std::unordered_set<host_id> skiplist)
load_balancer(replica::database& db, token_metadata_ptr tm, locator::load_stats_ptr table_load_stats,
load_balancer_stats_manager& stats,
uint64_t target_tablet_size,
unsigned tablets_per_shard_goal,
std::unordered_set<host_id> skiplist)
: _target_tablet_size(target_tablet_size)
, _tablets_per_shard_goal(tablets_per_shard_goal)
, _db(db)
, _tm(std::move(tm))
, _table_load_stats(std::move(table_load_stats))
@@ -1068,11 +1075,13 @@ public:
};
future<sizing_plan> make_sizing_plan(schema_ptr new_table = nullptr, const tablet_aware_replication_strategy* new_rs = nullptr) {
std::unordered_map<table_id, const tablet_aware_replication_strategy*> rs_by_table;
sizing_plan plan;
auto process_table = [&] (table_id table, schema_ptr s, const tablet_aware_replication_strategy* rs, size_t tablet_count) -> future<> {
table_sizing& table_plan = plan.tables[table];
table_plan.current_tablet_count = tablet_count;
rs_by_table[table] = rs;
size_t target_tablet_count = 1;
@@ -1111,16 +1120,6 @@ public:
}
table_plan.target_tablet_count = target_tablet_count;
table_plan.target_tablet_count_aligned = 1u << log2ceil(table_plan.target_tablet_count);
if (table_plan.target_tablet_count_aligned > table_plan.current_tablet_count) {
table_plan.resize_decision = locator::resize_decision::split();
} else if (table_plan.target_tablet_count_aligned < table_plan.current_tablet_count) {
table_plan.resize_decision = locator::resize_decision::merge();
}
lblogger.debug("make_sizing_plan: table={}.{}, id={}, tablet_count={}, avg_tablet_size={}, min_tablet_count={}, target_tablet_count={}: decision={}",
s->ks_name(), s->cf_name(), s->id(), tablet_count, table_plan.avg_tablet_size, min_tablet_count, target_tablet_count, table_plan.resize_decision);
};
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
@@ -1132,6 +1131,114 @@ public:
co_await process_table(new_table->id(), new_table, new_rs, 0);
}
// Below section ensures we respect the _tablets_per_shard_goal.
//
// It will scale down target_tablet_count for all tables so that
// the average number of tablets per shard in each DC does not exceed _tablets_per_shard_goal.
//
// The impact of table's tablet count on average per-shard tablet replica count
// is different in each DC because replication factors are different in each DC.
//
// The algorithm works like this:
// Compute average tablet replica count per-shard in each DC,
// determine if per-shard goal is exceeded in that DC,
// compute scale factor by which tablet count should be multiplied so that the goal
// is not exceeded in that DC.
// Take the smallest scale factor among all DCs, which ensures that no DC is overloaded.
//
// We align tablet counts to the nearest power of 2 post-scaling, which
// means that scaling may not be effective and in the worst case we may overshoot the goal by
// a factor of 2. This is acceptable since the goal is a soft limit and not a hard constraint.
// Scaling post-alignment would be problematic. If we scale down all tables fairly, we undershoot the goal
// by a factor of 2 in the worst case. If we choose a subset of tables to scale down by a factor of 2 then
// we have a problem of making sure that the choice is stable across scheduler invocations to avoid
// oscillations of decisions.
std::unordered_map<table_id, double> table_scaling;
std::unordered_map<sstring, unsigned> shards_per_dc;
_tm->for_each_token_owner([&] (const node& n) {
if (n.is_normal()) {
shards_per_dc[n.dc_rack().dc] += n.get_shard_count();
}
});
for (auto&& [dc, shard_count] : shards_per_dc) {
double cur_avg_tablets_per_shard = 0;
double new_avg_tablets_per_shard = 0;
for (auto&& [table, table_plan] : plan.tables) {
auto* rs = rs_by_table[table];
auto rf = rs->get_replication_factor(dc);
auto get_avg_tablets_per_shard = [&] (size_t tablet_count) {
return double(tablet_count) * rf / shard_count;
};
auto cur_tablets_per_shard = get_avg_tablets_per_shard(table_plan.current_tablet_count);
cur_avg_tablets_per_shard += cur_tablets_per_shard;
lblogger.debug("cur_avg_tablets_per_shard [dc={}, table={}]: {:.3f}", dc, table, cur_tablets_per_shard);
auto new_tablets_per_shard = get_avg_tablets_per_shard(table_plan.target_tablet_count);
new_avg_tablets_per_shard += new_tablets_per_shard;
lblogger.debug("new_avg_tablets_per_shard [dc={}, table={}]: {:.3f}", dc, table, new_tablets_per_shard);
}
{
bool overloaded = cur_avg_tablets_per_shard > _tablets_per_shard_goal;
lblogger.debug("cur_avg_tablets_per_shard[dc={}]: {:.3f}{}", dc, cur_avg_tablets_per_shard,
overloaded ? " (overloaded!)" : "");
}
bool overloaded = new_avg_tablets_per_shard > _tablets_per_shard_goal;
lblogger.debug("new_avg_tablets_per_shard[dc={}]: {:.3f}{}", dc, new_avg_tablets_per_shard,
overloaded ? " (overloaded!)" : "");
if (overloaded) {
auto scale = _tablets_per_shard_goal / new_avg_tablets_per_shard;
for (auto&& [table, table_plan]: plan.tables) {
auto* rs = rs_by_table[table];
auto rf = rs->get_replication_factor(dc);
// If table has no replicas in this DC, scaling it won't help and is harmful to its distribution
// in other DCs.
if (rf) {
auto [i, inserted] = table_scaling.try_emplace(table, scale);
if (!inserted) {
i->second = std::min(i->second, scale);
}
}
}
}
}
for (auto&& [table, scale] : table_scaling) {
auto& table_plan = plan.tables[table];
auto new_count = std::max<size_t>(1, table_plan.target_tablet_count * scale);
lblogger.debug("Scaling down table {} by a factor of {:.3f}: {} => {}", table, scale, table_plan.target_tablet_count, new_count);
table_plan.target_tablet_count = new_count;
}
// Generate:
// table_plan.target_tablet_count_aligned
// table_plan.resize_decision
for (auto&& [table, table_plan] : plan.tables) {
table_plan.target_tablet_count_aligned = 1u << log2ceil(table_plan.target_tablet_count);
if (table_plan.target_tablet_count_aligned > table_plan.current_tablet_count) {
table_plan.resize_decision = locator::resize_decision::split();
} else if (table_plan.target_tablet_count_aligned < table_plan.current_tablet_count) {
table_plan.resize_decision = locator::resize_decision::merge();
}
lblogger.debug("Table {}, {} => {} ({}), resize: {}", table,
table_plan.current_tablet_count,
table_plan.target_tablet_count_aligned,
table_plan.target_tablet_count,
table_plan.resize_decision);
}
co_return std::move(plan);
}
@@ -2486,7 +2593,8 @@ public:
auto shuffle = in_shuffle_mode();
_stats.for_dc(dc).calls++;
lblogger.debug("Examining DC {} (shuffle={}, balancing={})", dc, shuffle, _tm->tablets().balancing_enabled());
lblogger.debug("Examining DC {} (shuffle={}, balancing={}, tablets_per_shard_goal={})",
dc, shuffle, _tm->tablets().balancing_enabled(), _tablets_per_shard_goal);
const locator::topology& topo = _tm->get_topology();

View File

@@ -2531,6 +2531,61 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) {
do_with_cql_env_thread([] (auto& e) {
auto per_shard_goal = e.local_db().get_config().tablets_per_shard_goal();
topology_builder topo(e);
std::vector<endpoint_dc_rack> racks = {
topo.rack(),
topo.start_new_dc(),
};
std::vector<host_id> hosts;
hosts.push_back(topo.add_node(node_state::normal, 2, racks[0]));
hosts.push_back(topo.add_node(node_state::normal, 2, racks[0]));
hosts.push_back(topo.add_node(node_state::normal, 2, racks[0]));
hosts.push_back(topo.add_node(node_state::normal, 1, racks[1]));
hosts.push_back(topo.add_node(node_state::normal, 1, racks[1]));
auto ks_name1 = add_keyspace(e, {{racks[0].dc, 3}});
auto ks_name2 = add_keyspace(e, {{racks[1].dc, 2}});
// table1 overflows per-shard goal in dc1, should be scaled down.
// wants 400 tablets (3 nodes * 2 shards * 200 tablets/shard / rf=3 = 400 tablets)
// which will be scaled down by a factor of 0.5 to achieve 100 tablets/shard, giving
// 200 tablets, scaled up to the nearest power of 2, which is 256.
e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_per_shard_tablet_count': 200}}", ks_name1)).get();
auto table1 = e.local_db().find_schema(ks_name1, "table1")->id();
// table2 has 64 tablets/shard in dc2, should not be scaled down.
e.execute_cql(fmt::format("CREATE TABLE {}.table2 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_per_shard_tablet_count': 64}}", ks_name2)).get();
auto table2 = e.local_db().find_schema(ks_name2, "table2")->id();
rebalance_tablets(e);
{
auto& stm = e.shared_token_metadata().local();
auto tm = stm.get();
BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table1).tablet_count(), 256);
BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table2).tablet_count(), 64);
load_sketch load(tm);
load.populate().get();
for (auto h: hosts) {
auto l = load.get_shard_minmax(h);
testlog.info("Load on host {}: min={}, max={}", h, l.min(), l.max());
BOOST_REQUIRE_LE(l.max(), 2 * per_shard_goal);
}
}
}, tablet_cql_test_config()).get();
}
SEASTAR_TEST_CASE(test_tablet_id_and_range_side) {
static constexpr size_t tablet_count = 128;
locator::tablet_map tmap(tablet_count);