diff --git a/db/config.cc b/db/config.cc index edee9ed80a..82d35d01b7 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1336,6 +1336,8 @@ db::config::config(std::shared_ptr exts) , maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "") , tablets_initial_scale_factor(this, "tablets_initial_scale_factor", value_status::Used, 10, "Minimum average number of tablet replicas per shard per table. Suppressed by tablet options in table's schema: min_per_shard_tablet_count and min_tablet_count") + , tablets_per_shard_goal(this, "tablets_per_shard_goal", liveness::LiveUpdate, value_status::Used, 100, + "The goal for the maximum number of tablet replicas per shard. Tablet allocator tries to keep this goal.") , target_tablet_size_in_bytes(this, "target_tablet_size_in_bytes", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size, "Allows target tablet size to be configured. Defaults to 5G (in bytes). Maintaining tablets at reasonable sizes is important to be able to " \ "redistribute load. A higher value means tablet migration throughput can be reduced. A lower value may cause number of tablets to increase significantly, " \ diff --git a/db/config.hh b/db/config.hh index c822946954..3eb7f28f09 100644 --- a/db/config.hh +++ b/db/config.hh @@ -503,6 +503,7 @@ public: named_value maximum_replication_factor_fail_threshold; named_value tablets_initial_scale_factor; + named_value tablets_per_shard_goal; named_value target_tablet_size_in_bytes; named_value>> replication_strategy_warn_list; diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh index 332fbbbecf..7dfd4e369c 100644 --- a/locator/network_topology_strategy.hh +++ b/locator/network_topology_strategy.hh @@ -29,7 +29,7 @@ public: return _rep_factor; } - size_t get_replication_factor(const sstring& dc) const { + size_t get_replication_factor(const sstring& dc) const override { auto dc_factor = _dc_rep_factor.find(dc); return (dc_factor == _dc_rep_factor.end()) ? 0 : dc_factor->second; } diff --git a/locator/tablet_replication_strategy.hh b/locator/tablet_replication_strategy.hh index e63c8fa480..c896f8bf2a 100644 --- a/locator/tablet_replication_strategy.hh +++ b/locator/tablet_replication_strategy.hh @@ -49,6 +49,12 @@ public: /// otherwise, cur_tablets is a copy of the current tablet_map. /// Runs under group0 guard. virtual future reallocate_tablets(schema_ptr, token_metadata_ptr, tablet_map cur_tablets) const = 0; + + /// Returns replication factor in a given DC. + /// Note that individual tablets may lag behind desired replication factor in their + /// current replica list, as replication factor changes involve table rebuilding transitions + /// which are not instantaneous. + virtual size_t get_replication_factor(const sstring& dc) const = 0; }; } // namespace locator diff --git a/main.cc b/main.cc index cbe6933e50..03a595dc91 100644 --- a/main.cc +++ b/main.cc @@ -1693,6 +1693,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl auto stop_tsm = defer_verbose_shutdown("topology_state_machine", [&tsm] { tsm.stop().get(); }); + auto tablets_per_shard_goal_observer = cfg->tablets_per_shard_goal.observe([&tsm] (unsigned goal) { + tsm.local().event.broadcast(); + }); auto compression_dict_updated_callback = [] () -> future<> { auto dict = co_await sys_ks.local().query_dict(); diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 7a4457d25c..15fadf19b1 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -480,6 +480,8 @@ class load_balancer { // due to the average size dropping below the merge threshold, as tablet count doubles. const uint64_t _target_tablet_size = default_target_tablet_size; + const unsigned _tablets_per_shard_goal; + uint64_t target_max_tablet_size() const noexcept { return _target_tablet_size * 2; } @@ -662,8 +664,13 @@ private: return streaming_infos; } public: - load_balancer(replica::database& db, token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats, uint64_t target_tablet_size, std::unordered_set skiplist) + load_balancer(replica::database& db, token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, + load_balancer_stats_manager& stats, + uint64_t target_tablet_size, + unsigned tablets_per_shard_goal, + std::unordered_set skiplist) : _target_tablet_size(target_tablet_size) + , _tablets_per_shard_goal(tablets_per_shard_goal) , _db(db) , _tm(std::move(tm)) , _table_load_stats(std::move(table_load_stats)) @@ -1068,11 +1075,13 @@ public: }; future make_sizing_plan(schema_ptr new_table = nullptr, const tablet_aware_replication_strategy* new_rs = nullptr) { + std::unordered_map rs_by_table; sizing_plan plan; auto process_table = [&] (table_id table, schema_ptr s, const tablet_aware_replication_strategy* rs, size_t tablet_count) -> future<> { table_sizing& table_plan = plan.tables[table]; table_plan.current_tablet_count = tablet_count; + rs_by_table[table] = rs; size_t target_tablet_count = 1; @@ -1111,16 +1120,6 @@ public: } table_plan.target_tablet_count = target_tablet_count; - table_plan.target_tablet_count_aligned = 1u << log2ceil(table_plan.target_tablet_count); - - if (table_plan.target_tablet_count_aligned > table_plan.current_tablet_count) { - table_plan.resize_decision = locator::resize_decision::split(); - } else if (table_plan.target_tablet_count_aligned < table_plan.current_tablet_count) { - table_plan.resize_decision = locator::resize_decision::merge(); - } - - lblogger.debug("make_sizing_plan: table={}.{}, id={}, tablet_count={}, avg_tablet_size={}, min_tablet_count={}, target_tablet_count={}: decision={}", - s->ks_name(), s->cf_name(), s->id(), tablet_count, table_plan.avg_tablet_size, min_tablet_count, target_tablet_count, table_plan.resize_decision); }; for (auto&& [table, tmap] : _tm->tablets().all_tables()) { @@ -1132,6 +1131,114 @@ public: co_await process_table(new_table->id(), new_table, new_rs, 0); } + // Below section ensures we respect the _tablets_per_shard_goal. + // + // It will scale down target_tablet_count for all tables so that + // the average number of tablets per shard in each DC does not exceed _tablets_per_shard_goal. + // + // The impact of table's tablet count on average per-shard tablet replica count + // is different in each DC because replication factors are different in each DC. + // + // The algorithm works like this: + // Compute average tablet replica count per-shard in each DC, + // determine if per-shard goal is exceeded in that DC, + // compute scale factor by which tablet count should be multiplied so that the goal + // is not exceeded in that DC. + // Take the smallest scale factor among all DCs, which ensures that no DC is overloaded. + // + // We align tablet counts to the nearest power of 2 post-scaling, which + // means that scaling may not be effective and in the worst case we may overshoot the goal by + // a factor of 2. This is acceptable since the goal is a soft limit and not a hard constraint. + // Scaling post-alignment would be problematic. If we scale down all tables fairly, we undershoot the goal + // by a factor of 2 in the worst case. If we choose a subset of tables to scale down by a factor of 2 then + // we have a problem of making sure that the choice is stable across scheduler invocations to avoid + // oscillations of decisions. + + std::unordered_map table_scaling; + + std::unordered_map shards_per_dc; + _tm->for_each_token_owner([&] (const node& n) { + if (n.is_normal()) { + shards_per_dc[n.dc_rack().dc] += n.get_shard_count(); + } + }); + + for (auto&& [dc, shard_count] : shards_per_dc) { + double cur_avg_tablets_per_shard = 0; + double new_avg_tablets_per_shard = 0; + + for (auto&& [table, table_plan] : plan.tables) { + auto* rs = rs_by_table[table]; + auto rf = rs->get_replication_factor(dc); + auto get_avg_tablets_per_shard = [&] (size_t tablet_count) { + return double(tablet_count) * rf / shard_count; + }; + + auto cur_tablets_per_shard = get_avg_tablets_per_shard(table_plan.current_tablet_count); + cur_avg_tablets_per_shard += cur_tablets_per_shard; + lblogger.debug("cur_avg_tablets_per_shard [dc={}, table={}]: {:.3f}", dc, table, cur_tablets_per_shard); + + auto new_tablets_per_shard = get_avg_tablets_per_shard(table_plan.target_tablet_count); + new_avg_tablets_per_shard += new_tablets_per_shard; + lblogger.debug("new_avg_tablets_per_shard [dc={}, table={}]: {:.3f}", dc, table, new_tablets_per_shard); + } + + { + bool overloaded = cur_avg_tablets_per_shard > _tablets_per_shard_goal; + lblogger.debug("cur_avg_tablets_per_shard[dc={}]: {:.3f}{}", dc, cur_avg_tablets_per_shard, + overloaded ? " (overloaded!)" : ""); + } + + bool overloaded = new_avg_tablets_per_shard > _tablets_per_shard_goal; + lblogger.debug("new_avg_tablets_per_shard[dc={}]: {:.3f}{}", dc, new_avg_tablets_per_shard, + overloaded ? " (overloaded!)" : ""); + + if (overloaded) { + auto scale = _tablets_per_shard_goal / new_avg_tablets_per_shard; + + for (auto&& [table, table_plan]: plan.tables) { + auto* rs = rs_by_table[table]; + auto rf = rs->get_replication_factor(dc); + + // If table has no replicas in this DC, scaling it won't help and is harmful to its distribution + // in other DCs. + if (rf) { + auto [i, inserted] = table_scaling.try_emplace(table, scale); + if (!inserted) { + i->second = std::min(i->second, scale); + } + } + } + } + } + + for (auto&& [table, scale] : table_scaling) { + auto& table_plan = plan.tables[table]; + auto new_count = std::max(1, table_plan.target_tablet_count * scale); + lblogger.debug("Scaling down table {} by a factor of {:.3f}: {} => {}", table, scale, table_plan.target_tablet_count, new_count); + table_plan.target_tablet_count = new_count; + } + + // Generate: + // table_plan.target_tablet_count_aligned + // table_plan.resize_decision + + for (auto&& [table, table_plan] : plan.tables) { + table_plan.target_tablet_count_aligned = 1u << log2ceil(table_plan.target_tablet_count); + + if (table_plan.target_tablet_count_aligned > table_plan.current_tablet_count) { + table_plan.resize_decision = locator::resize_decision::split(); + } else if (table_plan.target_tablet_count_aligned < table_plan.current_tablet_count) { + table_plan.resize_decision = locator::resize_decision::merge(); + } + + lblogger.debug("Table {}, {} => {} ({}), resize: {}", table, + table_plan.current_tablet_count, + table_plan.target_tablet_count_aligned, + table_plan.target_tablet_count, + table_plan.resize_decision); + } + co_return std::move(plan); } @@ -2486,7 +2593,8 @@ public: auto shuffle = in_shuffle_mode(); _stats.for_dc(dc).calls++; - lblogger.debug("Examining DC {} (shuffle={}, balancing={})", dc, shuffle, _tm->tablets().balancing_enabled()); + lblogger.debug("Examining DC {} (shuffle={}, balancing={}, tablets_per_shard_goal={})", + dc, shuffle, _tm->tablets().balancing_enabled(), _tablets_per_shard_goal); const locator::topology& topo = _tm->get_topology(); diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index c83cae0443..b3774568bf 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -2531,6 +2531,61 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { }).get(); } +SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) { + do_with_cql_env_thread([] (auto& e) { + auto per_shard_goal = e.local_db().get_config().tablets_per_shard_goal(); + + topology_builder topo(e); + + std::vector racks = { + topo.rack(), + topo.start_new_dc(), + }; + + std::vector hosts; + hosts.push_back(topo.add_node(node_state::normal, 2, racks[0])); + hosts.push_back(topo.add_node(node_state::normal, 2, racks[0])); + hosts.push_back(topo.add_node(node_state::normal, 2, racks[0])); + + hosts.push_back(topo.add_node(node_state::normal, 1, racks[1])); + hosts.push_back(topo.add_node(node_state::normal, 1, racks[1])); + + auto ks_name1 = add_keyspace(e, {{racks[0].dc, 3}}); + auto ks_name2 = add_keyspace(e, {{racks[1].dc, 2}}); + + // table1 overflows per-shard goal in dc1, should be scaled down. + // wants 400 tablets (3 nodes * 2 shards * 200 tablets/shard / rf=3 = 400 tablets) + // which will be scaled down by a factor of 0.5 to achieve 100 tablets/shard, giving + // 200 tablets, scaled up to the nearest power of 2, which is 256. + e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) " + "WITH tablets = {{'min_per_shard_tablet_count': 200}}", ks_name1)).get(); + auto table1 = e.local_db().find_schema(ks_name1, "table1")->id(); + + // table2 has 64 tablets/shard in dc2, should not be scaled down. + e.execute_cql(fmt::format("CREATE TABLE {}.table2 (p1 text, r1 int, PRIMARY KEY (p1)) " + "WITH tablets = {{'min_per_shard_tablet_count': 64}}", ks_name2)).get(); + auto table2 = e.local_db().find_schema(ks_name2, "table2")->id(); + + rebalance_tablets(e); + + { + auto& stm = e.shared_token_metadata().local(); + auto tm = stm.get(); + BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table1).tablet_count(), 256); + BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table2).tablet_count(), 64); + + load_sketch load(tm); + load.populate().get(); + + for (auto h: hosts) { + auto l = load.get_shard_minmax(h); + testlog.info("Load on host {}: min={}, max={}", h, l.min(), l.max()); + BOOST_REQUIRE_LE(l.max(), 2 * per_shard_goal); + } + } + }, tablet_cql_test_config()).get(); +} + SEASTAR_TEST_CASE(test_tablet_id_and_range_side) { static constexpr size_t tablet_count = 128; locator::tablet_map tmap(tablet_count);