From f1bda8d4c168d0480a5ed3df40510248f7747838 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 2 Jan 2025 11:27:20 +0100 Subject: [PATCH] tablets: load_balancer: Scale down tablet count to respect per-shard tablet count goal The limit is enforced by controlling average per-shard tablet replica count in a given DC, which is controlled by per-table tablet count. This is effective in respecting the limit on individual shards as long as tablet replicas are distributed evenly between shards. There is no attempt to move tablets around in order to enforce limits on individual shards in case of imbalance between shards. If the average per-shard tablet count exceeds the limit, all tables which contribute to it (have replicas in the DC) are scaled down by the same factor. Due to rounding up to the nearest power of 2, we may overshoot the per-shard goal by at most a factor of 2. If different DCs want different scale factors of a given table, the lowest scale factor is chosen for a given table. The limit is configurable. It's a global per-cluster config which controls how many tablet replicas per shard in total we consider to be still ok. It controls tablet allocator behavior, when choosing initial tablet count. Even though it's a per-node config, we don't support different limits per node. All nodes must have the same value of that config. It's similar in that regard to other scheduler config items like tablets_initial_scale_factor and target_tablet_size_in_bytes. --- db/config.cc | 2 + db/config.hh | 1 + locator/network_topology_strategy.hh | 2 +- locator/tablet_replication_strategy.hh | 6 ++ main.cc | 3 + service/tablet_allocator.cc | 132 ++++++++++++++++++++++--- test/boost/tablets_test.cc | 55 +++++++++++ 7 files changed, 188 insertions(+), 13 deletions(-) diff --git a/db/config.cc b/db/config.cc index edee9ed80a..82d35d01b7 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1336,6 +1336,8 @@ db::config::config(std::shared_ptr exts) , maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "") , tablets_initial_scale_factor(this, "tablets_initial_scale_factor", value_status::Used, 10, "Minimum average number of tablet replicas per shard per table. Suppressed by tablet options in table's schema: min_per_shard_tablet_count and min_tablet_count") + , tablets_per_shard_goal(this, "tablets_per_shard_goal", liveness::LiveUpdate, value_status::Used, 100, + "The goal for the maximum number of tablet replicas per shard. Tablet allocator tries to keep this goal.") , target_tablet_size_in_bytes(this, "target_tablet_size_in_bytes", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size, "Allows target tablet size to be configured. Defaults to 5G (in bytes). Maintaining tablets at reasonable sizes is important to be able to " \ "redistribute load. A higher value means tablet migration throughput can be reduced. A lower value may cause number of tablets to increase significantly, " \ diff --git a/db/config.hh b/db/config.hh index c822946954..3eb7f28f09 100644 --- a/db/config.hh +++ b/db/config.hh @@ -503,6 +503,7 @@ public: named_value maximum_replication_factor_fail_threshold; named_value tablets_initial_scale_factor; + named_value tablets_per_shard_goal; named_value target_tablet_size_in_bytes; named_value>> replication_strategy_warn_list; diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh index 332fbbbecf..7dfd4e369c 100644 --- a/locator/network_topology_strategy.hh +++ b/locator/network_topology_strategy.hh @@ -29,7 +29,7 @@ public: return _rep_factor; } - size_t get_replication_factor(const sstring& dc) const { + size_t get_replication_factor(const sstring& dc) const override { auto dc_factor = _dc_rep_factor.find(dc); return (dc_factor == _dc_rep_factor.end()) ? 0 : dc_factor->second; } diff --git a/locator/tablet_replication_strategy.hh b/locator/tablet_replication_strategy.hh index e63c8fa480..c896f8bf2a 100644 --- a/locator/tablet_replication_strategy.hh +++ b/locator/tablet_replication_strategy.hh @@ -49,6 +49,12 @@ public: /// otherwise, cur_tablets is a copy of the current tablet_map. /// Runs under group0 guard. virtual future reallocate_tablets(schema_ptr, token_metadata_ptr, tablet_map cur_tablets) const = 0; + + /// Returns replication factor in a given DC. + /// Note that individual tablets may lag behind desired replication factor in their + /// current replica list, as replication factor changes involve table rebuilding transitions + /// which are not instantaneous. + virtual size_t get_replication_factor(const sstring& dc) const = 0; }; } // namespace locator diff --git a/main.cc b/main.cc index cbe6933e50..03a595dc91 100644 --- a/main.cc +++ b/main.cc @@ -1693,6 +1693,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl auto stop_tsm = defer_verbose_shutdown("topology_state_machine", [&tsm] { tsm.stop().get(); }); + auto tablets_per_shard_goal_observer = cfg->tablets_per_shard_goal.observe([&tsm] (unsigned goal) { + tsm.local().event.broadcast(); + }); auto compression_dict_updated_callback = [] () -> future<> { auto dict = co_await sys_ks.local().query_dict(); diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 7a4457d25c..15fadf19b1 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -480,6 +480,8 @@ class load_balancer { // due to the average size dropping below the merge threshold, as tablet count doubles. const uint64_t _target_tablet_size = default_target_tablet_size; + const unsigned _tablets_per_shard_goal; + uint64_t target_max_tablet_size() const noexcept { return _target_tablet_size * 2; } @@ -662,8 +664,13 @@ private: return streaming_infos; } public: - load_balancer(replica::database& db, token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats, uint64_t target_tablet_size, std::unordered_set skiplist) + load_balancer(replica::database& db, token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, + load_balancer_stats_manager& stats, + uint64_t target_tablet_size, + unsigned tablets_per_shard_goal, + std::unordered_set skiplist) : _target_tablet_size(target_tablet_size) + , _tablets_per_shard_goal(tablets_per_shard_goal) , _db(db) , _tm(std::move(tm)) , _table_load_stats(std::move(table_load_stats)) @@ -1068,11 +1075,13 @@ public: }; future make_sizing_plan(schema_ptr new_table = nullptr, const tablet_aware_replication_strategy* new_rs = nullptr) { + std::unordered_map rs_by_table; sizing_plan plan; auto process_table = [&] (table_id table, schema_ptr s, const tablet_aware_replication_strategy* rs, size_t tablet_count) -> future<> { table_sizing& table_plan = plan.tables[table]; table_plan.current_tablet_count = tablet_count; + rs_by_table[table] = rs; size_t target_tablet_count = 1; @@ -1111,16 +1120,6 @@ public: } table_plan.target_tablet_count = target_tablet_count; - table_plan.target_tablet_count_aligned = 1u << log2ceil(table_plan.target_tablet_count); - - if (table_plan.target_tablet_count_aligned > table_plan.current_tablet_count) { - table_plan.resize_decision = locator::resize_decision::split(); - } else if (table_plan.target_tablet_count_aligned < table_plan.current_tablet_count) { - table_plan.resize_decision = locator::resize_decision::merge(); - } - - lblogger.debug("make_sizing_plan: table={}.{}, id={}, tablet_count={}, avg_tablet_size={}, min_tablet_count={}, target_tablet_count={}: decision={}", - s->ks_name(), s->cf_name(), s->id(), tablet_count, table_plan.avg_tablet_size, min_tablet_count, target_tablet_count, table_plan.resize_decision); }; for (auto&& [table, tmap] : _tm->tablets().all_tables()) { @@ -1132,6 +1131,114 @@ public: co_await process_table(new_table->id(), new_table, new_rs, 0); } + // Below section ensures we respect the _tablets_per_shard_goal. + // + // It will scale down target_tablet_count for all tables so that + // the average number of tablets per shard in each DC does not exceed _tablets_per_shard_goal. + // + // The impact of table's tablet count on average per-shard tablet replica count + // is different in each DC because replication factors are different in each DC. + // + // The algorithm works like this: + // Compute average tablet replica count per-shard in each DC, + // determine if per-shard goal is exceeded in that DC, + // compute scale factor by which tablet count should be multiplied so that the goal + // is not exceeded in that DC. + // Take the smallest scale factor among all DCs, which ensures that no DC is overloaded. + // + // We align tablet counts to the nearest power of 2 post-scaling, which + // means that scaling may not be effective and in the worst case we may overshoot the goal by + // a factor of 2. This is acceptable since the goal is a soft limit and not a hard constraint. + // Scaling post-alignment would be problematic. If we scale down all tables fairly, we undershoot the goal + // by a factor of 2 in the worst case. If we choose a subset of tables to scale down by a factor of 2 then + // we have a problem of making sure that the choice is stable across scheduler invocations to avoid + // oscillations of decisions. + + std::unordered_map table_scaling; + + std::unordered_map shards_per_dc; + _tm->for_each_token_owner([&] (const node& n) { + if (n.is_normal()) { + shards_per_dc[n.dc_rack().dc] += n.get_shard_count(); + } + }); + + for (auto&& [dc, shard_count] : shards_per_dc) { + double cur_avg_tablets_per_shard = 0; + double new_avg_tablets_per_shard = 0; + + for (auto&& [table, table_plan] : plan.tables) { + auto* rs = rs_by_table[table]; + auto rf = rs->get_replication_factor(dc); + auto get_avg_tablets_per_shard = [&] (size_t tablet_count) { + return double(tablet_count) * rf / shard_count; + }; + + auto cur_tablets_per_shard = get_avg_tablets_per_shard(table_plan.current_tablet_count); + cur_avg_tablets_per_shard += cur_tablets_per_shard; + lblogger.debug("cur_avg_tablets_per_shard [dc={}, table={}]: {:.3f}", dc, table, cur_tablets_per_shard); + + auto new_tablets_per_shard = get_avg_tablets_per_shard(table_plan.target_tablet_count); + new_avg_tablets_per_shard += new_tablets_per_shard; + lblogger.debug("new_avg_tablets_per_shard [dc={}, table={}]: {:.3f}", dc, table, new_tablets_per_shard); + } + + { + bool overloaded = cur_avg_tablets_per_shard > _tablets_per_shard_goal; + lblogger.debug("cur_avg_tablets_per_shard[dc={}]: {:.3f}{}", dc, cur_avg_tablets_per_shard, + overloaded ? " (overloaded!)" : ""); + } + + bool overloaded = new_avg_tablets_per_shard > _tablets_per_shard_goal; + lblogger.debug("new_avg_tablets_per_shard[dc={}]: {:.3f}{}", dc, new_avg_tablets_per_shard, + overloaded ? " (overloaded!)" : ""); + + if (overloaded) { + auto scale = _tablets_per_shard_goal / new_avg_tablets_per_shard; + + for (auto&& [table, table_plan]: plan.tables) { + auto* rs = rs_by_table[table]; + auto rf = rs->get_replication_factor(dc); + + // If table has no replicas in this DC, scaling it won't help and is harmful to its distribution + // in other DCs. + if (rf) { + auto [i, inserted] = table_scaling.try_emplace(table, scale); + if (!inserted) { + i->second = std::min(i->second, scale); + } + } + } + } + } + + for (auto&& [table, scale] : table_scaling) { + auto& table_plan = plan.tables[table]; + auto new_count = std::max(1, table_plan.target_tablet_count * scale); + lblogger.debug("Scaling down table {} by a factor of {:.3f}: {} => {}", table, scale, table_plan.target_tablet_count, new_count); + table_plan.target_tablet_count = new_count; + } + + // Generate: + // table_plan.target_tablet_count_aligned + // table_plan.resize_decision + + for (auto&& [table, table_plan] : plan.tables) { + table_plan.target_tablet_count_aligned = 1u << log2ceil(table_plan.target_tablet_count); + + if (table_plan.target_tablet_count_aligned > table_plan.current_tablet_count) { + table_plan.resize_decision = locator::resize_decision::split(); + } else if (table_plan.target_tablet_count_aligned < table_plan.current_tablet_count) { + table_plan.resize_decision = locator::resize_decision::merge(); + } + + lblogger.debug("Table {}, {} => {} ({}), resize: {}", table, + table_plan.current_tablet_count, + table_plan.target_tablet_count_aligned, + table_plan.target_tablet_count, + table_plan.resize_decision); + } + co_return std::move(plan); } @@ -2486,7 +2593,8 @@ public: auto shuffle = in_shuffle_mode(); _stats.for_dc(dc).calls++; - lblogger.debug("Examining DC {} (shuffle={}, balancing={})", dc, shuffle, _tm->tablets().balancing_enabled()); + lblogger.debug("Examining DC {} (shuffle={}, balancing={}, tablets_per_shard_goal={})", + dc, shuffle, _tm->tablets().balancing_enabled(), _tablets_per_shard_goal); const locator::topology& topo = _tm->get_topology(); diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index c83cae0443..b3774568bf 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -2531,6 +2531,61 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { }).get(); } +SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) { + do_with_cql_env_thread([] (auto& e) { + auto per_shard_goal = e.local_db().get_config().tablets_per_shard_goal(); + + topology_builder topo(e); + + std::vector racks = { + topo.rack(), + topo.start_new_dc(), + }; + + std::vector hosts; + hosts.push_back(topo.add_node(node_state::normal, 2, racks[0])); + hosts.push_back(topo.add_node(node_state::normal, 2, racks[0])); + hosts.push_back(topo.add_node(node_state::normal, 2, racks[0])); + + hosts.push_back(topo.add_node(node_state::normal, 1, racks[1])); + hosts.push_back(topo.add_node(node_state::normal, 1, racks[1])); + + auto ks_name1 = add_keyspace(e, {{racks[0].dc, 3}}); + auto ks_name2 = add_keyspace(e, {{racks[1].dc, 2}}); + + // table1 overflows per-shard goal in dc1, should be scaled down. + // wants 400 tablets (3 nodes * 2 shards * 200 tablets/shard / rf=3 = 400 tablets) + // which will be scaled down by a factor of 0.5 to achieve 100 tablets/shard, giving + // 200 tablets, scaled up to the nearest power of 2, which is 256. + e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) " + "WITH tablets = {{'min_per_shard_tablet_count': 200}}", ks_name1)).get(); + auto table1 = e.local_db().find_schema(ks_name1, "table1")->id(); + + // table2 has 64 tablets/shard in dc2, should not be scaled down. + e.execute_cql(fmt::format("CREATE TABLE {}.table2 (p1 text, r1 int, PRIMARY KEY (p1)) " + "WITH tablets = {{'min_per_shard_tablet_count': 64}}", ks_name2)).get(); + auto table2 = e.local_db().find_schema(ks_name2, "table2")->id(); + + rebalance_tablets(e); + + { + auto& stm = e.shared_token_metadata().local(); + auto tm = stm.get(); + BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table1).tablet_count(), 256); + BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table2).tablet_count(), 64); + + load_sketch load(tm); + load.populate().get(); + + for (auto h: hosts) { + auto l = load.get_shard_minmax(h); + testlog.info("Load on host {}: min={}, max={}", h, l.min(), l.max()); + BOOST_REQUIRE_LE(l.max(), 2 * per_shard_goal); + } + } + }, tablet_cql_test_config()).get(); +} + SEASTAR_TEST_CASE(test_tablet_id_and_range_side) { static constexpr size_t tablet_count = 128; locator::tablet_map tmap(tablet_count);