diff --git a/db/config.cc b/db/config.cc index 862b46f35f..e5cbcb3f3d 100644 --- a/db/config.cc +++ b/db/config.cc @@ -29,6 +29,7 @@ #include "config.hh" #include "extensions.hh" #include "log.hh" +#include "service/tablet_allocator_fwd.hh" #include "utils/config_file_impl.hh" #include #include @@ -1132,6 +1133,10 @@ db::config::config(std::shared_ptr exts) , maximum_replication_factor_warn_threshold(this, "maximum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, -1, "") , maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "") , tablets_initial_scale_factor(this, "tablets_initial_scale_factor", value_status::Used, 1, "Calculated initial tablets are multiplied by this number") + , target_tablet_size_in_bytes(this, "target_tablet_size_in_bytes", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size, + "Allows target tablet size to be configured. Defaults to 5G (in bytes). Maintaining tablets at reasonable sizes is important to be able to " \ + "redistribute load. A higher value means tablet migration throughput can be reduced. A lower value may cause number of tablets to increase significantly, " \ + "potentially resulting in performance drawbacks.") , replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.") , replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.") , service_levels_interval(this, "service_levels_interval_ms", liveness::LiveUpdate, value_status::Used, 10000, "Controls how often service levels module polls configuration table") diff --git a/db/config.hh b/db/config.hh index 1deace41f6..662a828689 100644 --- a/db/config.hh +++ b/db/config.hh @@ -463,7 +463,10 @@ public: named_value minimum_replication_factor_warn_threshold; named_value maximum_replication_factor_warn_threshold; named_value maximum_replication_factor_fail_threshold; + named_value tablets_initial_scale_factor; + named_value target_tablet_size_in_bytes; + named_value>> replication_strategy_warn_list; named_value>> replication_strategy_fail_list; diff --git a/docs/dev/system_keyspace.md b/docs/dev/system_keyspace.md index cf8c69ac2d..5b6c7adc5c 100644 --- a/docs/dev/system_keyspace.md +++ b/docs/dev/system_keyspace.md @@ -205,6 +205,8 @@ CREATE TABLE system.tablets ( transition text, table_name text static, tablet_count int static, + resize_type text static, + resize_seq_number bigint static, PRIMARY KEY ((keyspace_name, table_id), last_token) ) ~~~ @@ -216,6 +218,10 @@ Only tables which use tablet-based replication strategy have an entry here. `tablet_count` is the number of tablets in the map. `table_name` is the name of the table, provided for convenience. +`resize_type` is the resize decision type that spans all tablets of a given table, which can be one of: `merge`, `split` or `none`. + +`resize_seq_number` is the sequence number (>= 0) of the resize decision that globally identifies it. It's monotonically increasing, incremented by one for every new decision, so a higher value means it came later in time. + `last_token` is the last token owned by the tablet. The i-th tablet, where i = 0, 1, ..., `tablet_count`-1), owns the token range: ``` @@ -229,6 +235,8 @@ It's a list of tuples where the first element is `host_id` of the replica and th During tablet migration, the columns `new_replicas`, `stage` and `transition` are set to represent the transition. The `new_replicas` column holds what will be put in `replicas` after transition is done. +During tablet splitting, the load balancer sets `resize_type` column with `split`, and sets `resize_seq_number` with the next sequence number, which is the previous value incremented by one. + The `transition` column can have the following values: * `migration` - One tablet replica is moving from one shard to another. * `rebuild` - New tablet replica is created from the remaining replicas. diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 44df932853..f984abab60 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -182,6 +182,47 @@ When tablet is not in transition, the following invariants hold: 1. The storage layer (database) on any node contains writes for keys which belong to the tablet only if that shard is one of the current tablet replicas. +# Tablet splitting + +Each table has its resize metadata stored in group0. + +Resize metadata is composed of: + - resize_type: it's the resize decision type, and can be either of 'split', 'merge' or 'none' + - resize_seq_number: a sequence number that globally identifies the resize; it's monotonically increasing +and increased by one on every new decision. + +In order to determine if a table needs resize, the load balancer will calculate the average tablet size +for a given table, which can be done by dividing average table size[1] by the tablet count. + +[1]: The average size of a table is the total size across all DCs divided by the number of replicas across +all DCs. + +A table will need split if its average size surpasses the split threshold, which is 100% of the target +tablet size, which defaults to 5G. The reasoning is that after split we want average size to return +to the target size. By the same reason, merge threshold is 50% of target size. + +The load balancer might also decide to cancel an ongoing split if it realizes that after split, a merge +will be needed. It does that, to avoid some back-and-forth, which is wasteful. Revoking an ongoing +decision is done by setting resize metadata with type 'none'. + +When the load balancer decides to split a table, it sets resize_type field in metadata with 'split' and +sets resize_seq_number with the next sequence number, which is the current seq number -- loaded from +tablet metadata -- increased by 1. + +All table replicas will listen for the need to split with a replica component named split monitor, that +wakes up on replication map updates, checks the need for split, and if so, it will start segregating the +storage of every tablet replica into left and right sides of the token range spanned by an individual +tablet. When a tablet replica has completed this work, it will then communicate its ready completion +status with the coordinator by loading (mirroring) the resize_seq_number from tablet metadata into its +local state, which is pulled periodically by the coordinator. + +When the coordinator realizes all tablet replicas have completed the splitting work, the load balancer +emits a decision to finalize the split request. The finalization is serialized with migration, as +doubling tablet count would interfere with the migration process. When the state machine leaves the +migration track, then finalize can proceed and split each preexisting tablet into two in the topology +metadata. The replicas will react to that by remapping its compaction groups into a new set which size +is equal to the new tablet count. + # Topology guards In addition to synchronizing with data access operations (e.g. CQL requests), we need to synchronize with diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index 9e48fca547..08982cc5c1 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -17,6 +17,15 @@ struct global_tablet_id final { locator::tablet_id tablet; }; +struct table_load_stats final { + uint64_t size_in_bytes; + int64_t split_ready_seq_number; +}; + +struct load_stats final { + std::unordered_map<::table_id, locator::table_load_stats> tables; +}; + } namespace service { @@ -54,4 +63,5 @@ verb raft_topology_cmd (raft::server_id dst_id, raft::term_t term, uint64_t cmd_ verb [[cancellable]] raft_pull_topology_snapshot (raft::server_id dst_id, service::raft_topology_pull_params) -> service::raft_topology_snapshot; verb [[cancellable]] tablet_stream_data (raft::server_id dst_id, locator::global_tablet_id); verb [[cancellable]] tablet_cleanup (raft::server_id dst_id, locator::global_tablet_id); +verb [[cancellable]] table_load_stats (raft::server_id dst_id) -> locator::load_stats; } diff --git a/locator/tablets.cc b/locator/tablets.cc index 609abcb2f0..9200f75f2e 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -222,6 +222,10 @@ void tablet_map::set_tablet_transition_info(tablet_id id, tablet_transition_info _transitions.insert_or_assign(id, std::move(info)); } +void tablet_map::set_resize_decision(locator::resize_decision decision) { + _resize_decision = std::move(decision); +} + future<> tablet_map::for_each_tablet(seastar::noncopyable_function func) const { std::optional tid = first_tablet(); for (const tablet_info& ti : tablets()) { @@ -374,6 +378,63 @@ size_t tablet_map::external_memory_usage() const { return result; } +bool resize_decision::operator==(const resize_decision& o) const { + return way.index() == o.way.index() && sequence_number == o.sequence_number; +} + +bool tablet_map::needs_split() const { + return std::holds_alternative(_resize_decision.way); +} + +const locator::resize_decision& tablet_map::resize_decision() const { + return _resize_decision; +} + +static auto to_resize_type(sstring decision) { + static const std::unordered_map string_to_type = { + {"none", resize_decision::none{}}, + {"split", resize_decision::split{}}, + {"merge", resize_decision::merge{}}, + }; + return string_to_type.at(decision); +} + +resize_decision::resize_decision(sstring decision, uint64_t seq_number) + : way(to_resize_type(decision)) + , sequence_number(seq_number) { +} + +sstring resize_decision::type_name() const { + static const std::array index_to_string = { + "none", + "split", + "merge", + }; + static_assert(std::variant_size_v == index_to_string.size()); + return index_to_string[way.index()]; +} + +resize_decision::seq_number_t resize_decision::next_sequence_number() const { + // Doubt we'll ever wrap around, but just in case. + // Even if sequence number is bumped every second, it would take 292471208677 years + // for it to happen, about 21x the age of the universe, or ~11x according to the new + // prediction after james webb. + return (sequence_number == std::numeric_limits::max()) ? 0 : sequence_number + 1; +} + +table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexcept { + size_in_bytes = size_in_bytes + s.size_in_bytes; + split_ready_seq_number = std::min(split_ready_seq_number, s.split_ready_seq_number); + return *this; +} + +load_stats& load_stats::operator+=(const load_stats& s) { + for (auto& [id, stats] : s.tables) { + tables[id] += stats; + } + return *this; +} + // Estimates the external memory usage of std::unordered_map<>. // Does not include external memory usage of elements. template diff --git a/locator/tablets.hh b/locator/tablets.hh index 6b7c85fadf..28f9afca4f 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -235,6 +235,56 @@ enum tablet_range_side { right = 1, }; +// The decision of whether tablets of a given should be split, merged, or none, is made +// by the load balancer. This decision is recorded in the tablet_map and stored in group0. +struct resize_decision { + struct none {}; + struct split {}; + struct merge {}; + + using seq_number_t = int64_t; + + std::variant way; + // The sequence number globally identifies a resize decision. + // It's monotonically increasing, globally. + // Needed to distinguish stale decision from latest one, in case coordinator + // revokes the current decision and signal it again later. + seq_number_t sequence_number = 0; + + resize_decision() = default; + resize_decision(sstring decision, uint64_t seq_number); + bool split_or_merge() const { + return !std::holds_alternative(way); + } + bool operator==(const resize_decision&) const; + sstring type_name() const; + seq_number_t next_sequence_number() const; +}; + +struct table_load_stats { + uint64_t size_in_bytes = 0; + // Stores the minimum seq number among all replicas, as coordinator wants to know if + // all replicas have completed splitting, which happens when they all store the + // seq number of the current split decision. + resize_decision::seq_number_t split_ready_seq_number = std::numeric_limits::max(); + + table_load_stats& operator+=(const table_load_stats& s) noexcept; + friend table_load_stats operator+(table_load_stats a, const table_load_stats& b) { + return a += b; + } +}; + +struct load_stats { + std::unordered_map tables; + + load_stats& operator+=(const load_stats& s); + friend load_stats operator+(load_stats a, const load_stats& b) { + return a += b; + } +}; + +using load_stats_ptr = lw_shared_ptr; + /// Stores information about tablets of a single table. /// /// The map contains a constant number of tablets, tablet_count(). @@ -261,6 +311,7 @@ private: tablet_container _tablets; size_t _log2_tablets; // log_2(_tablets.size()) std::unordered_map _transitions; + resize_decision _resize_decision; public: /// Constructs a tablet map. /// @@ -349,9 +400,14 @@ public: size_t external_memory_usage() const; bool operator==(const tablet_map&) const = default; + + bool needs_split() const; + + const locator::resize_decision& resize_decision() const; public: void set_tablet(tablet_id, tablet_info); void set_tablet_transition_info(tablet_id, tablet_transition_info); + void set_resize_decision(locator::resize_decision); void clear_transitions(); // Destroys gently. diff --git a/locator/topology.hh b/locator/topology.hh index 3a4cdf2ced..de24a0a7c5 100644 --- a/locator/topology.hh +++ b/locator/topology.hh @@ -264,6 +264,12 @@ public: return _dc_endpoints; } + const std::unordered_map>& + get_datacenter_nodes() const { + return _dc_nodes; + } + const std::unordered_map>>& diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 732f4842df..8bd9ad3b9a 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -618,6 +618,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::TABLET_STREAM_FILES: case messaging_verb::TABLET_STREAM_DATA: case messaging_verb::TABLET_CLEANUP: + case messaging_verb::TABLE_LOAD_STATS: return 1; case messaging_verb::CLIENT_ID: case messaging_verb::MUTATION: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 4fa49b5092..99ab26d1d9 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -192,7 +192,8 @@ enum class messaging_verb : int32_t { JOIN_NODE_RESPONSE = 69, TABLET_STREAM_FILES = 70, STREAM_BLOB = 71, - LAST = 72, + TABLE_LOAD_STATS = 72, + LAST = 73, }; } // namespace netw diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 8e2505e01a..50b4ccc45f 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -36,7 +36,7 @@ class compaction_group { table& _t; class table_state; std::unique_ptr _table_state; - const size_t _group_id; + size_t _group_id; // Tokens included in this compaction_groups dht::token_range _token_range; compaction::compaction_strategy_state _compaction_strategy_state; @@ -80,6 +80,11 @@ public: compaction_group(table& t, size_t gid, dht::token_range token_range); + void update_id_and_range(size_t id, dht::token_range token_range) { + _group_id = id; + _token_range = std::move(token_range); + } + size_t group_id() const noexcept { return _group_id; } @@ -174,22 +179,25 @@ using compaction_group_list = compaction_group::list_t; // by the same storage group. class storage_group { compaction_group_ptr _main_cg; - compaction_group_ptr _left_cg; - compaction_group_ptr _right_cg; + std::vector _split_ready_groups; private: bool splitting_mode() const { - return bool(_left_cg) && bool(_right_cg); + return !_split_ready_groups.empty(); } + size_t to_idx(locator::tablet_range_side) const; public: - storage_group(compaction_group_ptr cg, compaction_group_list& list); + storage_group(compaction_group_ptr cg, compaction_group_list* list); const dht::token_range& token_range() const noexcept; size_t memtable_count() const noexcept; compaction_group_ptr& main_compaction_group() noexcept; + std::vector split_ready_compaction_groups() &&; compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept; + uint64_t live_disk_space_used() const noexcept; + utils::small_vector compaction_groups() noexcept; // Puts the storage group in split mode, in which it internally segregates data diff --git a/replica/database.hh b/replica/database.hh index 3be66dd894..ef5f8e3fde 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -537,6 +537,11 @@ private: bool _is_bootstrap_or_replace = false; sstables::shared_sstable make_sstable(sstables::sstable_state state); + // Every table replica that completes split work will load the seq number from tablet metadata into its local + // state. So when coordinator pull the local state of a table, it will know whether the table is ready for the + // current split, and not a previously revoked (stale) decision. + // The minimum value, which is a negative number, is not used by coordinator for first decision. + locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits::min(); public: void deregister_metrics(); @@ -575,9 +580,24 @@ public: const std::vector& old_sstables); }; + // Precondition: table needs tablet splitting. + // Returns true if all storage of table is ready for splitting. bool all_storage_groups_split(); future<> split_all_storage_groups(); + + // Splits compaction group of a single tablet, if and only if the underlying table has + // split request emitted by coordinator (found in tablet metadata). + // If split is required, then the compaction group of the given tablet is guaranteed to + // be split once it returns. + future<> maybe_split_compaction_group_of(locator::tablet_id); private: + // Called when coordinator executes tablet splitting, i.e. commit the new tablet map with + // each tablet split into two, so this replica will remap all of its compaction groups + // that were previously split. + void handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap); + + sstables::compaction_type_options::split split_compaction_options() const noexcept; + // Select a compaction group from a given token. std::pair storage_group_of(dht::token token) const noexcept; size_t storage_group_id_for_token(dht::token token) const noexcept; @@ -1002,6 +1022,10 @@ public: return _stats; } + // The tablet filter is used to not double account migrating tablets, so it's important that + // only one of pending or leaving replica is accounted based on current migration stage. + locator::table_load_stats table_load_stats(std::function tablet_filter) const noexcept; + const db::view::stats& get_view_stats() const { return _view_stats; } diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 3bf5bb9fc3..b3f079bdb5 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -391,10 +391,14 @@ future<> table_populator::populate_subdir(sstables::sstable_state state, allow_o return sst->get_origin() != sstables::repair_origin; }; - co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, state] (shard_id shard) { - auto gen = _global_table->calculate_generation_for_new_table(); - return make_sstable(*_global_table, state, gen, _highest_version); - }, eligible_for_reshape_on_boot); + // FIXME: Bypass reshape as it is not tablet aware, so it could mix sstables from different tablets together. + // Refs: https://github.com/scylladb/scylladb/issues/16966. + if (!_global_table->uses_tablets()) { + co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, state](shard_id shard) { + auto gen = _global_table->calculate_generation_for_new_table(); + return make_sstable(*_global_table, state, gen, _highest_version); + }, eligible_for_reshape_on_boot); + } co_await directory.invoke_on_all([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> { co_await dir.do_for_each_sstable([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) { diff --git a/replica/table.cc b/replica/table.cc index 20c1516f15..97e20204e6 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -552,7 +552,7 @@ public: storage_group_vector make_storage_groups(compaction_group_list& list) const override { storage_group_vector r; auto cg = std::make_unique(_t, size_t(0), dht::token_range::make_open_ended_both_sides()); - r.push_back(std::make_unique(std::move(cg), list)); + r.push_back(std::make_unique(std::move(cg), &list)); return r; } std::pair storage_group_of(dht::token) const override { @@ -598,7 +598,7 @@ public: } // FIXME: don't allocate compaction groups for tablets that aren't present in this shard. auto cg = std::make_unique(_t, tid.value(), std::move(range)); - ret.emplace_back(std::make_unique(std::move(cg), list)); + ret.emplace_back(std::make_unique(std::move(cg), &list)); } return ret; } @@ -615,9 +615,12 @@ bool table::uses_tablets() const { return _erm && _erm->get_replication_strategy().uses_tablets(); } -storage_group::storage_group(compaction_group_ptr cg, compaction_group_list& list) +storage_group::storage_group(compaction_group_ptr cg, compaction_group_list* list) : _main_cg(std::move(cg)) { - list.push_back(*_main_cg); + // FIXME: get rid of compaction group list. + if (list) { + list->push_back(*_main_cg); + } } const dht::token_range& storage_group::token_range() const noexcept { @@ -628,20 +631,25 @@ compaction_group_ptr& storage_group::main_compaction_group() noexcept { return _main_cg; } +std::vector storage_group::split_ready_compaction_groups() && { + return std::exchange(_split_ready_groups, {}); +} + +size_t storage_group::to_idx(locator::tablet_range_side side) const { + return size_t(side); +} + compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_range_side side) noexcept { if (splitting_mode()) { - return (side == locator::tablet_range_side::left) ? _left_cg : _right_cg; + return _split_ready_groups[to_idx(side)]; } return _main_cg; } utils::small_vector storage_group::compaction_groups() noexcept { utils::small_vector cgs = {_main_cg.get()}; - if (_left_cg) { - cgs.push_back(_left_cg.get()); - } - if (_right_cg) { - cgs.push_back(_right_cg.get()); + for (auto& cg : _split_ready_groups) { + cgs.push_back(cg.get()); } return cgs; } @@ -654,10 +662,10 @@ bool storage_group::set_split_mode(compaction_group_list& list) { list.push_back(*cg); return cg; }; - auto left_cg = create_cg(); - auto right_cg = create_cg(); - _left_cg = std::move(left_cg); - _right_cg = std::move(right_cg); + std::vector split_ready_groups(2); + split_ready_groups[to_idx(locator::tablet_range_side::left)] = create_cg(); + split_ready_groups[to_idx(locator::tablet_range_side::right)] = create_cg(); + _split_ready_groups = std::move(split_ready_groups); } // The storage group is considered "split ready" if its main compaction group is empty. @@ -674,16 +682,36 @@ future<> storage_group::split(compaction_group_list& list, sstables::compaction_ } bool table::all_storage_groups_split() { - return std::ranges::all_of(_storage_groups, - std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(_compaction_groups))); + auto id = _schema->id(); + auto& tmap = _erm->get_token_metadata().tablets().get_tablet_map(id); + if (_split_ready_seq_number == tmap.resize_decision().sequence_number) { + return true; + } + + auto split_ready = std::ranges::all_of(_storage_groups, + std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(_compaction_groups))); + + // The table replica will say to coordinator that its split status is ready by + // mirroring the sequence number from tablet metadata into its local state, + // which is pulled periodically by coordinator. + if (split_ready) { + _split_ready_seq_number = tmap.resize_decision().sequence_number; + tlogger.info0("Setting split ready sequence number to {} for table {}.{}", + _split_ready_seq_number, _schema->ks_name(), _schema->cf_name()); + } + return split_ready; } -future<> table::split_all_storage_groups() { - sstables::compaction_type_options::split opt {[this] (dht::token t) { +sstables::compaction_type_options::split table::split_compaction_options() const noexcept { + return {[this](dht::token t) { // Classifies the input stream into either left or right side. auto [_, side] = storage_group_of(t); return mutation_writer::token_group_id(side); }}; +} + +future<> table::split_all_storage_groups() { + sstables::compaction_type_options::split opt = split_compaction_options(); auto holder = async_gate().hold(); @@ -692,6 +720,23 @@ future<> table::split_all_storage_groups() { } } +future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) { + auto table_id = _schema->id(); + if (!_erm->get_token_metadata().tablets().get_tablet_map(table_id).needs_split()) { + return make_ready_future<>(); + } + + auto holder = async_gate().hold(); + + auto& sg = _storage_groups[tablet_id.value()]; + if (!sg) { + on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard", + tablet_id, _schema->ks_name(), _schema->cf_name())); + } + + return sg->split(_compaction_groups, split_compaction_options()); +} + std::unique_ptr table::make_storage_group_manager() { if (uses_tablets()) { return std::make_unique(*this); @@ -767,8 +812,21 @@ compaction_group& table::compaction_group_for_key(partition_key_view key, const } compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept { - // FIXME: a sstable can belong to more than one group, change interface to reflect that. - return compaction_group_for_token(sst->get_first_decorated_key().token()); + auto [first_id, first_range_side] = storage_group_of(sst->get_first_decorated_key().token()); + auto [last_id, last_range_side] = storage_group_of(sst->get_last_decorated_key().token()); + + if (first_id != last_id) { + on_internal_error(tlogger, format("Unable to load SSTable {} that belongs to tablets {} and {}", + sst->get_filename(), first_id, last_id)); + } + + auto& sg = _storage_groups[first_id]; + + if (first_range_side != last_range_side) { + return *sg->main_compaction_group(); + } + + return *sg->select_compaction_group(first_range_side); } compaction_group_list& table::compaction_groups() const noexcept { @@ -1291,6 +1349,11 @@ uint64_t compaction_group::live_disk_space_used() const noexcept { return _main_sstables->bytes_on_disk() + _maintenance_sstables->bytes_on_disk(); } +uint64_t storage_group::live_disk_space_used() const noexcept { + auto cgs = const_cast(*this).compaction_groups(); + return boost::accumulate(cgs | boost::adaptors::transformed(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0)); +} + uint64_t compaction_group::total_disk_space_used() const noexcept { return live_disk_space_used() + boost::accumulate(_sstables_compacted_but_not_deleted | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::bytes_on_disk)), uint64_t(0)); } @@ -1833,8 +1896,98 @@ table::table(schema_ptr schema, config config, lw_shared_ptr tablet_filter) const noexcept { + locator::table_load_stats stats; + stats.split_ready_seq_number = _split_ready_seq_number; + + for (unsigned id = 0; id < _storage_groups.size(); id++) { + auto& sg = _storage_groups[id]; + if (!sg) { + continue; + } + locator::global_tablet_id gid { _schema->id(), locator::tablet_id(id) }; + if (!tablet_filter(gid)) { + continue; + } + stats.size_in_bytes += sg->live_disk_space_used(); + } + return stats; +} + +void table::handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap) { + auto table_id = _schema->id(); + storage_group_vector new_storage_groups; + new_storage_groups.resize(new_tmap.tablet_count()); + + if (!old_tablet_count) { + on_internal_error(tlogger, format("Table {} had zero tablets, it should never happen when splitting.", table_id)); + } + + // NOTE: exception when applying replica changes to reflect token metadata will abort for obvious reasons, + // so exception safety is not required here. + + unsigned growth_factor = log2ceil(new_tmap.tablet_count() / old_tablet_count); + unsigned split_size = 1 << growth_factor; + tlogger.debug("Growth factor: {}, split size {}", growth_factor, split_size); + + if (old_tablet_count*split_size != new_tmap.tablet_count()) { + on_internal_error(tlogger, format("New tablet count for table {} is unexpected, actual: {}, expected {}.", + table_id, new_tmap.tablet_count(), old_tablet_count*split_size)); + } + + for (auto id = 0; id < _storage_groups.size(); id++) { + auto& sg = _storage_groups[id]; + if (!sg) { + continue; + } + if (!sg->main_compaction_group()->empty()) { + on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \ + "therefore groups cannot be remapped with the new tablet count.", + id, table_id)); + } + unsigned first_new_id = id << growth_factor; + auto split_ready_groups = std::move(*sg).split_ready_compaction_groups(); + if (split_ready_groups.size() != split_size) { + on_internal_error(tlogger, format("Found {} split ready compaction groups, but expected {} instead.", split_ready_groups.size(), split_size)); + } + for (auto i = 0; i < split_size; i++) { + auto group_id = first_new_id + i; + split_ready_groups[i]->update_id_and_range(group_id, new_tmap.get_token_range(locator::tablet_id(group_id))); + new_storage_groups[group_id] = std::make_unique(std::move(split_ready_groups[i]), nullptr); + } + + tlogger.debug("Remapping tablet {} of table {} into new tablets [{}].", + id, table_id, fmt::join(boost::irange(first_new_id, first_new_id+split_size), ", ")); + } + + auto old_groups = std::exchange(_storage_groups, std::move(new_storage_groups)); + + // Remove old main groups in background, they're unused, but they need to be deregistered properly + (void) do_with(std::move(old_groups), _async_gate.hold(), [] (storage_group_vector& groups, gate::holder&) { + return do_for_each(groups, [] (std::unique_ptr& sg) { + return sg->main_compaction_group()->stop(); + }); + }); +} + void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { auto old_erm = std::exchange(_erm, std::move(erm)); + + if (uses_tablets()) { + auto table_id = _schema->id(); + auto tablet_count = [table_id] (locator::effective_replication_map_ptr& erm) { + return erm->get_token_metadata().tablets().get_tablet_map(table_id).tablet_count(); + }; + + size_t old_tablet_count = tablet_count(old_erm); + size_t new_tablet_count = tablet_count(_erm); + + if (new_tablet_count > old_tablet_count) { + tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets", + _schema->ks_name(), _schema->cf_name(), old_tablet_count, new_tablet_count); + handle_tablet_split_completion(old_tablet_count, _erm->get_token_metadata().tablets().get_tablet_map(table_id)); + } + } if (old_erm) { old_erm->invalidate(); } @@ -2111,7 +2264,8 @@ size_t compaction_group::memtable_count() const noexcept { size_t storage_group::memtable_count() const noexcept { auto memtable_count = [] (const compaction_group_ptr& cg) { return cg ? cg->memtable_count() : 0; }; - return memtable_count(_main_cg) + memtable_count(_left_cg) + memtable_count(_right_cg); + return memtable_count(_main_cg) + + boost::accumulate(_split_ready_groups | boost::adaptors::transformed(std::mem_fn(&compaction_group::memtable_count)), size_t(0)); } future<> table::flush(std::optional pos) { diff --git a/replica/tablet_mutation_builder.hh b/replica/tablet_mutation_builder.hh index 3d0e01710a..0e5be6fdd4 100644 --- a/replica/tablet_mutation_builder.hh +++ b/replica/tablet_mutation_builder.hh @@ -39,6 +39,7 @@ public: tablet_mutation_builder& set_session(dht::token last_token, service::session_id); tablet_mutation_builder& del_session(dht::token last_token); tablet_mutation_builder& del_transition(dht::token last_token); + tablet_mutation_builder& set_resize_decision(locator::resize_decision); mutation build() { return std::move(_m); diff --git a/replica/tablets.cc b/replica/tablets.cc index 74d88e33ed..d9528881df 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -50,6 +50,8 @@ schema_ptr make_tablets_schema() { .with_column("stage", utf8_type) .with_column("transition", utf8_type) .with_column("session", uuid_type) + .with_column("resize_type", utf8_type, column_kind::static_column) + .with_column("resize_seq_number", long_type, column_kind::static_column) .with_version(db::system_keyspace::generate_schema_version(id)) .build(); } @@ -80,6 +82,8 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke m.set_static_cell("tablet_count", data_value(int(tablets.tablet_count())), ts); m.set_static_cell("keyspace_name", data_value(keyspace_name), ts); m.set_static_cell("table_name", data_value(table_name), ts); + m.set_static_cell("resize_type", data_value(tablets.resize_decision().type_name()), ts); + m.set_static_cell("resize_seq_number", data_value(int64_t(tablets.resize_decision().sequence_number)), ts); tablet_id tid = tablets.first_tablet(); for (auto&& tablet : tablets.tablets()) { @@ -151,6 +155,13 @@ tablet_mutation_builder::del_transition(dht::token last_token) { return *this; } +tablet_mutation_builder& +tablet_mutation_builder::set_resize_decision(locator::resize_decision resize_decision) { + _m.set_static_cell("resize_type", data_value(resize_decision.type_name()), _ts); + _m.set_static_cell("resize_seq_number", data_value(int64_t(resize_decision.sequence_number)), _ts); + return *this; +} + mutation make_drop_tablet_map_mutation(table_id id, api::timestamp_type ts) { auto s = db::system_keyspace::tablets(); mutation m(s, partition_key::from_single_value(*s, @@ -208,6 +219,15 @@ future read_tablet_metadata(cql3::query_processor& qp) { auto tablet_count = row.get_as("tablet_count"); auto tmap = tablet_map(tablet_count); current = active_tablet_map{table, tmap, tmap.first_tablet()}; + + // Resize decision fields are static columns, so set them only once per table. + if (row.has("resize_type") && row.has("resize_seq_number")) { + auto resize_type_name = row.get_as("resize_type"); + int64_t resize_seq_number = row.get_as("resize_seq_number"); + + locator::resize_decision resize_decision(std::move(resize_type_name), resize_seq_number); + current->map.set_resize_decision(std::move(resize_decision)); + } } tablet_replica_set tablet_replicas; diff --git a/service/storage_service.cc b/service/storage_service.cc index 65d6c3b7bd..eaa20eeb5a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1448,6 +1448,10 @@ future<> storage_service::join_token_ring(shardedfinish_setup_after_join(*this, _qp, _migration_manager.local(), _raft_topology_change_enabled); + + // Initializes monitor only after updating local topology. + start_tablet_split_monitor(); + co_return; } @@ -2572,6 +2576,9 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt for (auto it = table_erms.begin(); it != table_erms.end(); ) { auto& cf = db.find_column_family(it->first); cf.update_effective_replication_map(std::move(it->second)); + if (cf.uses_tablets()) { + register_tablet_split_candidate(it->first); + } it = table_erms.erase(it); } @@ -2596,6 +2603,8 @@ future<> storage_service::stop() { _listeners.clear(); co_await _async_gate.close(); co_await std::move(_node_ops_abort_thread); + _tablet_split_monitor_event.signal(); + co_await std::move(_tablet_split_monitor); } future<> storage_service::wait_for_group0_stop() { @@ -4527,6 +4536,89 @@ future<> storage_service::load_tablet_metadata() { }, acquire_merge_lock::no); } +future<> storage_service::process_tablet_split_candidate(table_id table) { + auto all_compaction_groups_split = [&] () mutable { + return _db.map_reduce0([table_ = table] (replica::database& db) { + auto all_split = db.find_column_family(table_).all_storage_groups_split(); + return make_ready_future(all_split); + }, bool{true}, std::logical_and()); + }; + + auto split_all_compaction_groups = [&] () -> future<> { + return _db.invoke_on_all([table] (replica::database& db) -> future<> { + return db.find_column_family(table).split_all_storage_groups(); + }); + }; + + exponential_backoff_retry split_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300)); + bool sleep = false; + + while (!_async_gate.is_closed() && !_group0_as.abort_requested()) { + try { + // Ensures that latest changes to tablet metadata, in group0, are visible + auto guard = co_await _group0->client().start_operation(&_group0_as); + auto& tmap = get_token_metadata().tablets().get_tablet_map(table); + if (!tmap.needs_split()) { + release_guard(std::move(guard)); + break; + } + + if (co_await all_compaction_groups_split()) { + slogger.info0("All compaction groups of table {} are split ready.", table); + release_guard(std::move(guard)); + break; + } else { + release_guard(std::move(guard)); + co_await split_all_compaction_groups(); + } + } catch (...) { + slogger.error("Failed to complete splitting of table {} due to {}, retrying after {} seconds", + table, std::current_exception(), split_retry.sleep_time()); + sleep = true; + break; + } + if (sleep) { + co_await split_retry.retry(_group0_as); + } + } +} + +void storage_service::register_tablet_split_candidate(table_id table) noexcept { + if (this_shard_id() != 0) { + return; + } + try { + if (get_token_metadata().tablets().get_tablet_map(table).needs_split()) { + _tablet_split_candidates.push_back(table); + _tablet_split_monitor_event.signal(); + } + } catch (...) { + slogger.error("Unable to register table {} as candidate for tablet splitting, due to {}", table, std::current_exception()); + } +} + +future<> storage_service::run_tablet_split_monitor() { + while (!_async_gate.is_closed() && !_group0_as.abort_requested()) { + while (!_tablet_split_candidates.empty()) { + auto candidate = _tablet_split_candidates.front(); + _tablet_split_candidates.pop_front(); + co_await process_tablet_split_candidate(candidate); + } + co_await _tablet_split_monitor_event.when(); + } +} + +void storage_service::start_tablet_split_monitor() { + if (this_shard_id() != 0) { + return; + } + if (!_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) { + return; + } + slogger.info("Starting the tablet split monitor..."); + _tablet_split_monitor = run_tablet_split_monitor(); +} + future<> storage_service::snitch_reconfigured() { assert(this_shard_id() == 0); auto& snitch = _snitch.local(); @@ -5006,6 +5098,23 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { } streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint)); co_await streamer->stream_async(); + + // If new pending tablet replica needs splitting, streaming waits for it to complete. + // That's to provide a guarantee that once migration is over, the coordinator can finalize + // splitting under the promise that compaction groups of tablets are all split, ready + // for the subsequent topology change. + // + // FIXME: + // We could do the splitting not in the streaming stage, but in a later stage, so that + // from the tablet scheduler's perspective migrations blocked on compaction are not + // participating in streaming anymore (which is true), so it could schedule more + // migrations. This way compaction would run in parallel with streaming which can + // reduce the delay. + co_await _db.invoke_on(trinfo->pending_replica.shard, [tablet] (replica::database& db) { + auto& table = db.find_column_family(tablet.table); + return table.maybe_split_compaction_group_of(tablet.tablet); + }); + co_return; }); } @@ -5123,6 +5232,79 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: }); } +future storage_service::load_stats_for_tablet_based_tables() { + auto holder = _async_gate.hold(); + + if (this_shard_id() != 0) { + // topology coordinator only exists in shard 0. + co_return co_await container().invoke_on(0, [&] (auto& ss) { + return ss.load_stats_for_tablet_based_tables(); + }); + } + + using table_erms_t = std::unordered_map; + // Creates a snapshot of transitions, so different shards will find their tablet replicas in the + // same migration stage. Important for intra-node migration. + const auto erms = co_await std::invoke([this] () -> future { + table_erms_t erms; + co_await _db.local().get_tables_metadata().for_each_table_gently([&] (table_id id, lw_shared_ptr table) mutable { + if (table->uses_tablets()) { + erms.emplace(id, table->get_effective_replication_map()); + } + return make_ready_future<>(); + }); + co_return std::move(erms); + }); + + // Each node combines a per-table load map from all of its shards and returns it to the coordinator. + // So if there are 1k nodes, there will be 1k RPCs in total. + auto load_stats = co_await _db.map_reduce0([&erms] (replica::database& db) -> future { + locator::load_stats load_stats{}; + auto& tables_metadata = db.get_tables_metadata(); + + for (const auto& [id, erm] : erms) { + auto table = tables_metadata.get_table_if_exists(id); + if (!table) { + continue; + } + + auto& token_metadata = erm->get_token_metadata(); + auto& tmap = token_metadata.tablets().get_tablet_map(id); + auto me = locator::tablet_replica { token_metadata.get_my_id(), this_shard_id() }; + + // It's important to tackle the anomaly in reported size, since both leaving and + // pending replicas could otherwise be accounted during tablet migration. + // If transition hasn't reached cleanup stage, then leaving replicas are accounted. + // If transition is past cleanup stage, then pending replicas are accounted. + // This helps to reduce the discrepancy window. + auto tablet_filter = [&tmap, &me] (locator::global_tablet_id id) { + auto transition = tmap.get_tablet_transition_info(id.tablet); + auto& info = tmap.get_tablet_info(id.tablet); + + // if tablet is not in transit, it's filtered in. + if (!transition) { + return true; + } + + bool is_pending = transition->pending_replica == me; + bool is_leaving = locator::get_leaving_replica(info, *transition) == me; + auto s = transition->stage; + + return (!is_pending && !is_leaving) + || (is_leaving && s < locator::tablet_transition_stage::cleanup) + || (is_pending && s >= locator::tablet_transition_stage::cleanup); + }; + + load_stats.tables.emplace(id, table->table_load_stats(tablet_filter)); + co_await coroutine::maybe_yield(); + } + + co_return std::move(load_stats); + }, locator::load_stats{}, std::plus()); + + co_return std::move(load_stats); +} + future<> storage_service::set_tablet_balancing_enabled(bool enabled) { auto holder = _async_gate.hold(); @@ -5542,6 +5724,11 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled) return ss.cleanup_tablet(tablet); }); }); + ser::storage_service_rpc_verbs::register_table_load_stats(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id) { + return handle_raft_rpc(dst_id, [] (auto& ss) mutable { + return ss.load_stats_for_tablet_based_tables(); + }); + }); ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_request_params params) { return handle_raft_rpc(dst_id, [params = std::move(params)] (auto& ss) mutable { return ss.join_node_request_handler(std::move(params)); diff --git a/service/storage_service.hh b/service/storage_service.hh index d77c8e06ab..dcd1fa8651 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -154,6 +154,10 @@ private: std::vector _listeners; gate _async_gate; + condition_variable _tablet_split_monitor_event; + std::deque _tablet_split_candidates; + future<> _tablet_split_monitor = make_ready_future<>(); + std::unordered_map _node_ops; std::list> _node_ops_abort_queue; seastar::condition_variable _node_ops_abort_cond; @@ -172,6 +176,11 @@ private: future<> stream_tablet(locator::global_tablet_id); future<> cleanup_tablet(locator::global_tablet_id); inet_address host2ip(locator::host_id); + // Handler for table load stats RPC. + future load_stats_for_tablet_based_tables(); + future<> process_tablet_split_candidate(table_id); + void register_tablet_split_candidate(table_id) noexcept; + future<> run_tablet_split_monitor(); public: storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, @@ -197,6 +206,7 @@ public: future<> uninit_messaging_service(); future<> load_tablet_metadata(); + void start_tablet_split_monitor(); private: using acquire_merge_lock = bool_class; diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index aa54179913..9ed08345e4 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -41,11 +41,18 @@ struct load_balancer_node_stats { double load = 0; }; +struct load_balancer_cluster_stats { + uint64_t resizes_emitted = 0; + uint64_t resizes_revoked = 0; + uint64_t resizes_finalized = 0; +}; + using dc_name = sstring; class load_balancer_stats_manager { std::unordered_map> _dc_stats; std::unordered_map> _node_stats; + load_balancer_cluster_stats _cluster_stats; seastar::metrics::label dc_label{"target_dc"}; seastar::metrics::label node_label{"target_node"}; seastar::metrics::metric_groups _metrics; @@ -72,7 +79,24 @@ class load_balancer_stats_manager { stats.load)(dc_lb)(node_lb) }); } + + void setup_metrics(load_balancer_cluster_stats& stats) { + namespace sm = seastar::metrics; + // FIXME: we can probably improve it by making it per resize type (split, merge or none). + _metrics.add_group("load_balancer", { + sm::make_counter("resizes_emitted", sm::description("number of resizes produced by the load balancer"), + stats.resizes_emitted), + sm::make_counter("resizes_revoked", sm::description("number of resizes revoked by the load balancer"), + stats.resizes_revoked), + sm::make_counter("resizes_finalized", sm::description("number of resizes finalized by the load balancer"), + stats.resizes_finalized) + }); + } public: + load_balancer_stats_manager() { + setup_metrics(_cluster_stats); + } + load_balancer_dc_stats& for_dc(const dc_name& dc) { auto it = _dc_stats.find(dc); if (it == _dc_stats.end()) { @@ -93,6 +117,10 @@ public: return *it->second; } + load_balancer_cluster_stats& for_cluster() { + return _cluster_stats; + } + void unregister() { _metrics.clear(); } @@ -226,6 +254,119 @@ class load_balancer { } }; + // We have split and merge thresholds, which work respectively as (target) upper and lower + // bound for average size of tablets. + // + // The merge threshold is 50% of target tablet size (a midpoint between split and merge), + // such that after a merge, the average size is equally far from split and merge. + // The same applies to split. It's 100% of target size, so after split, the average is + // close to the target size (assuming small variations during the operation). + // + // It might happen that during a resize decision, average size changes drastically, and + // split or merge might get cancelled. E.g. after deleting a large partition or lots of + // data becoming suddenly expired. + // If we're splitting, we will only cancel it, if the average size dropped below the + // target size. That's because a merge would be required right after split completes, + // due to the average size dropping below the merge threshold, as tablet count doubles. + const uint64_t _target_tablet_size = default_target_tablet_size; + + static constexpr uint64_t target_max_tablet_size(uint64_t target_tablet_size) { + return target_tablet_size * 2; + } + static constexpr uint64_t target_min_tablet_size(uint64_t max_tablet_size) { + return double(max_tablet_size / 2) * 0.5; + } + + struct table_size_desc { + uint64_t target_max_tablet_size; + uint64_t avg_tablet_size; + locator::resize_decision resize_decision; + size_t tablet_count; + size_t shard_count; + + uint64_t target_min_tablet_size() const noexcept { + return load_balancer::target_min_tablet_size(target_max_tablet_size); + } + }; + + struct cluster_resize_load { + using table_id_and_size_desc = std::pair; + std::vector tables_need_resize; + std::vector tables_being_resized; + + static bool table_needs_merge(const table_size_desc& d) { + // FIXME: ignore merge request if tablet_count == initial_tablets. + return d.tablet_count > 1 && d.avg_tablet_size < d.target_min_tablet_size(); + } + static bool table_needs_split(const table_size_desc& d) { + return d.avg_tablet_size > d.target_max_tablet_size; + } + + bool table_needs_resize(const table_size_desc& d) const { + return table_needs_merge(d) || table_needs_split(d); + } + + // Resize cancellation will account for possible oscillations caused by compaction, etc. + // We shouldn't rush into cancelling an ongoing resize. That will only happen if the + // average size is past the point it would be if either split or merge had completed. + // If we cancel a split, that's because average size dropped so much a merge would be + // required post completion, and vice-versa. + bool table_needs_resize_cancellation(const table_size_desc& d) const { + auto& way = d.resize_decision.way; + if (std::holds_alternative(way)) { + return d.avg_tablet_size < d.target_max_tablet_size / 2; + } else if (std::holds_alternative(way)) { + return d.avg_tablet_size > d.target_min_tablet_size() * 2; + } + return false; + } + + void update(table_id id, table_size_desc d) { + bool table_undergoing_resize = d.resize_decision.split_or_merge(); + + // Resizing tables that no longer need resize will have the resize decision revoked, + // therefore they must be listed as being resized. + if (!table_needs_resize(d) && !table_undergoing_resize) { + return; + } + + auto entry = std::make_pair(id, std::move(d)); + if (table_undergoing_resize) { + tables_being_resized.push_back(entry); + } else { + tables_need_resize.push_back(entry); + } + } + + // Comparator that measures the weight of the need for resizing. + auto resize_urgency_cmp() const { + return [] (const table_id_and_size_desc& a, const table_id_and_size_desc& b) { + auto urgency = [] (const table_size_desc& d) -> double { + // FIXME: only takes into account split today. + return double(d.avg_tablet_size) / d.target_max_tablet_size; + }; + return urgency(a.second) < urgency(b.second); + }; + } + + static locator::resize_decision to_resize_decision(const table_size_desc& d) { + locator::resize_decision decision; + if (table_needs_split(d)) { + decision.way = locator::resize_decision::split{}; + } else if (table_needs_merge(d)) { + decision.way = locator::resize_decision::merge{}; + } + return decision; + } + + // Resize decisions can be revoked with an empty (none) decision, so replicas + // will know they're no longer required to prepare storage for the execution of + // topology changes. + static locator::resize_decision revoke_resize_decision() { + return locator::resize_decision{}; + } + }; + // Per-shard limits for active tablet streaming sessions. // // There is no hard reason for these values being what they are other than @@ -255,6 +396,7 @@ class load_balancer { const size_t max_read_streaming_load = 4; token_metadata_ptr _tm; + locator::load_stats_ptr _table_load_stats; load_balancer_stats_manager& _stats; private: tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { @@ -289,8 +431,10 @@ private: } public: - load_balancer(token_metadata_ptr tm, load_balancer_stats_manager& stats) - : _tm(std::move(tm)) + load_balancer(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats, uint64_t target_tablet_size) + : _target_tablet_size(target_tablet_size) + , _tm(std::move(tm)) + , _table_load_stats(std::move(table_load_stats)) , _stats(stats) { } @@ -304,11 +448,132 @@ public: lblogger.info("Prepared {} migrations in DC {}", dc_plan.size(), dc); plan.merge(std::move(dc_plan)); } + plan.set_resize_plan(co_await make_resize_plan()); - lblogger.info("Prepared {} migrations", plan.size()); + lblogger.info("Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s)", + plan.size(), plan.tablet_migration_count(), plan.resize_decision_count()); co_return std::move(plan); } + const locator::table_load_stats* load_stats_for_table(table_id id) const { + if (!_table_load_stats) { + return nullptr; + } + auto it = _table_load_stats->tables.find(id); + return (it != _table_load_stats->tables.end()) ? &it->second : nullptr; + } + + future make_resize_plan() { + table_resize_plan resize_plan; + + if (!_tm->tablets().balancing_enabled()) { + co_return std::move(resize_plan); + } + + cluster_resize_load resize_load; + + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { + auto& tmap = tmap_; + + const auto* table_stats = load_stats_for_table(table); + if (!table_stats) { + continue; + } + + auto avg_tablet_size = table_stats->size_in_bytes / std::max(tmap.tablet_count(), size_t(1)); + // shard presence of a table across the cluster + size_t shard_count = std::accumulate(tmap.tablets().begin(), tmap.tablets().end(), size_t(0), + [] (size_t shard_count, const locator::tablet_info& info) { + return shard_count + info.replicas.size(); + }); + + table_size_desc size_desc { + .target_max_tablet_size = target_max_tablet_size(_target_tablet_size), + .avg_tablet_size = avg_tablet_size, + .resize_decision = tmap.resize_decision(), + .tablet_count = tmap.tablet_count(), + .shard_count = shard_count + }; + + resize_load.update(table, std::move(size_desc)); + lblogger.info("Table {} with tablet_count={} has an average tablet size of {}", table, tmap.tablet_count(), avg_tablet_size); + co_await coroutine::maybe_yield(); + } + + // Emit new resize decisions + + // The limit of resize requests is determined by the shard presence (count) of tables involved. + // If tables still have a low tablet count, the concurrency must be high in order to saturate the cluster. + // If a table covers the entire cluster, and needs split, concurrency will be reduced to 1. + + size_t total_shard_count = std::invoke([&topo = _tm->get_topology()] { + size_t shard_count = 0; + topo.for_each_node([&] (const locator::node* node_ptr) { + shard_count += node_ptr->get_shard_count(); + }); + return shard_count; + }); + size_t resizing_shard_count = std::accumulate(resize_load.tables_being_resized.begin(), resize_load.tables_being_resized.end(), size_t(0), + [] (size_t shard_count, const auto& table_desc) { + return shard_count + table_desc.second.shard_count; + }); + // Limits the amount of new resize requests to be generated in a single round, as each one is a mutation to group0. + constexpr size_t max_new_resize_requests = 10; + + auto available_shards = std::max(ssize_t(total_shard_count) - ssize_t(resizing_shard_count), ssize_t(0)); + + std::make_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp()); + while (resize_load.tables_need_resize.size() && resize_plan.size() < max_new_resize_requests) { + const auto& [table, size_desc] = resize_load.tables_need_resize.front(); + + if (resize_plan.size() > 0 && available_shards < size_desc.shard_count) { + break; + } + + auto resize_decision = cluster_resize_load::to_resize_decision(size_desc); + lblogger.info("Emitting resize decision of type {} for table {} due to avg tablet size of {}", + resize_decision.type_name(), table, size_desc.avg_tablet_size); + resize_plan.resize[table] = std::move(resize_decision); + _stats.for_cluster().resizes_emitted++; + + std::pop_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp()); + resize_load.tables_need_resize.pop_back(); + + available_shards -= size_desc.shard_count; + } + + // Revoke resize decision if any table no longer needs it + // Also communicate coordinator if any table is ready for finalizing resizing + + for (const auto& [table, size_desc] : resize_load.tables_being_resized) { + if (resize_load.table_needs_resize_cancellation(size_desc)) { + resize_plan.resize[table] = cluster_resize_load::revoke_resize_decision(); + _stats.for_cluster().resizes_revoked++; + lblogger.info("Revoking resize decision for table {} due to avg tablet size of {}", table, size_desc.avg_tablet_size); + continue; + } + + auto& tmap = _tm->tablets().get_tablet_map(table); + + const auto* table_stats = load_stats_for_table(table); + if (!table_stats) { + continue; + } + + // If all replicas have completed split work for the current sequence number, it means that + // load balancer can emit finalize decision, for split to be completed. + if (table_stats->split_ready_seq_number == tmap.resize_decision().sequence_number) { + resize_plan.resize[table] = cluster_resize_load::revoke_resize_decision(); + _stats.for_cluster().resizes_finalized++; + resize_plan.finalize_resize.insert(table); + lblogger.info("Finalizing resize decision for table {} as all replicas agree on sequence number {}", + table, table_stats->split_ready_seq_number); + } + } + + co_return std::move(resize_plan); + } + future make_plan(dc_name dc) { migration_plan plan; @@ -363,6 +628,7 @@ public: for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { auto& tmap = tmap_; + co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) { auto trinfo = tmap.get_tablet_transition_info(tid); @@ -863,8 +1129,8 @@ public: _stopped = true; } - future balance_tablets(token_metadata_ptr tm) { - load_balancer lb(tm, _load_balancer_stats); + future balance_tablets(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats) { + load_balancer lb(tm, std::move(table_load_stats), _load_balancer_stats, _db.get_config().target_tablet_size_in_bytes()); co_return co_await lb.make_plan(); } @@ -903,6 +1169,33 @@ public: _load_balancer_stats.unregister(); } + // The splitting of tablets today is completely based on the power-of-two constraint. + // A tablet of id X is split into 2 new tablets, which new ids are (x << 1) and + // (x << 1) + 1. + // So a tablet of id 0 is remapped into ids 0 and 1. Another of id 1 is remapped + // into ids 2 and 3, and so on. + future split_tablets(token_metadata_ptr tm, table_id table) { + auto& tablets = tm->tablets().get_tablet_map(table); + + tablet_map new_tablets(tablets.tablet_count() * 2); + + for (tablet_id tid : tablets.tablet_ids()) { + co_await coroutine::maybe_yield(); + + tablet_id new_left_tid = tablet_id(tid.value() << 1); + tablet_id new_right_tid = tablet_id(new_left_tid.value() + 1); + + auto& tablet_info = tablets.get_tablet_info(tid); + + new_tablets.set_tablet(new_left_tid, tablet_info); + new_tablets.set_tablet(new_right_tid, tablet_info); + } + + lblogger.info("Split tablets for table {}, increasing tablet count from {} to {}", + table, tablets.tablet_count(), new_tablets.tablet_count()); + co_return std::move(new_tablets); + } + // FIXME: Handle materialized views. }; @@ -914,8 +1207,12 @@ future<> tablet_allocator::stop() { return impl().stop(); } -future tablet_allocator::balance_tablets(locator::token_metadata_ptr tm) { - return impl().balance_tablets(tm); +future tablet_allocator::balance_tablets(locator::token_metadata_ptr tm, locator::load_stats_ptr load_stats) { + return impl().balance_tablets(std::move(tm), std::move(load_stats)); +} + +future tablet_allocator::split_tablets(locator::token_metadata_ptr tm, table_id table) { + return impl().split_tablets(std::move(tm), table); } tablet_allocator_impl& tablet_allocator::impl() { diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 975ae64488..0c2ae31a5c 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -8,26 +8,49 @@ #pragma once -#include "replica/database.hh" +#include "replica/database_fwd.hh" #include "locator/tablets.hh" +#include "tablet_allocator_fwd.hh" +#include "locator/token_metadata_fwd.hh" namespace service { using tablet_migration_info = locator::tablet_migration_info; +/// Represents intention to emit resize (split or merge) request for a +/// table, and finalize or revoke the request previously initiated. +struct table_resize_plan { + std::unordered_map resize; + std::unordered_set finalize_resize; + + size_t size() const { return resize.size() + finalize_resize.size(); } + + void merge(table_resize_plan&& other) { + for (auto&& [id, other_resize] : other.resize) { + if (!resize.contains(id) || other_resize.sequence_number > resize[id].sequence_number) { + resize[id] = std::move(other_resize); + } + } + finalize_resize.merge(std::move(other.finalize_resize)); + } +}; + class migration_plan { public: using migrations_vector = utils::chunked_vector; private: migrations_vector _migrations; + table_resize_plan _resize_plan; bool _has_nodes_to_drain = false; public: /// Returns true iff there are decommissioning nodes which own some tablet replicas. bool has_nodes_to_drain() const { return _has_nodes_to_drain; } const migrations_vector& migrations() const { return _migrations; } - bool empty() const { return _migrations.empty(); } - size_t size() const { return _migrations.size(); } + bool empty() const { return _migrations.empty() && !_resize_plan.size(); } + size_t size() const { return _migrations.size() + _resize_plan.size(); } + size_t tablet_migration_count() const { return _migrations.size(); } + size_t resize_decision_count() const { return _resize_plan.size(); } void add(tablet_migration_info info) { _migrations.emplace_back(std::move(info)); @@ -36,14 +59,21 @@ public: void merge(migration_plan&& other) { std::move(other._migrations.begin(), other._migrations.end(), std::back_inserter(_migrations)); _has_nodes_to_drain |= other._has_nodes_to_drain; + _resize_plan.merge(std::move(other._resize_plan)); } void set_has_nodes_to_drain(bool b) { _has_nodes_to_drain = b; } + + const table_resize_plan& resize_plan() const { return _resize_plan; } + + void set_resize_plan(table_resize_plan resize_plan) { + _resize_plan = std::move(resize_plan); + } }; -class tablet_allocator_impl; +class migration_notifier; class tablet_allocator { public: @@ -86,7 +116,9 @@ public: /// /// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account. /// - future balance_tablets(locator::token_metadata_ptr); + future balance_tablets(locator::token_metadata_ptr, locator::load_stats_ptr = {}); + + future split_tablets(locator::token_metadata_ptr, table_id); /// Should be called when the node is no longer a leader. void on_leadership_lost(); diff --git a/service/tablet_allocator_fwd.hh b/service/tablet_allocator_fwd.hh new file mode 100644 index 0000000000..8873cd02a4 --- /dev/null +++ b/service/tablet_allocator_fwd.hh @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include + +namespace service { + +// This the default target size of tablets. +static constexpr uint64_t default_target_tablet_size = 5UL * 1024 * 1024 * 1024; + +class tablet_allocator_impl; + +class tablet_allocator; + +} diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 1e100cb477..1ef5aa7af1 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -20,9 +21,11 @@ #include "gms/gossiper.hh" #include "locator/tablets.hh" #include "locator/token_metadata.hh" +#include "locator/network_topology_strategy.hh" #include "message/messaging_service.hh" #include "replica/database.hh" #include "replica/tablet_mutation_builder.hh" +#include "replica/tablets.hh" #include "service/raft/join_node.hh" #include "service/raft/raft_address_map.hh" #include "service/raft/raft_group0.hh" @@ -84,6 +87,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { tablet_allocator& _tablet_allocator; + // The reason load_stats_ptr is a shared ptr is that load balancer can yield, and we don't want it + // to suffer lifetime issues when stats refresh fiber overrides the current stats. + locator::load_stats_ptr _tablet_load_stats; + // FIXME: make frequency per table in order to reduce work in each iteration. + // Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration. + static constexpr std::chrono::seconds tablet_load_stats_refresh_interval = std::chrono::seconds(60); + std::chrono::milliseconds _ring_delay; using drop_guard_and_retake = bool_class; @@ -866,9 +876,42 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } future<> generate_migration_updates(std::vector& out, const group0_guard& guard, const migration_plan& plan) { + std::unordered_set new_transitions; for (const tablet_migration_info& mig : plan.migrations()) { co_await coroutine::maybe_yield(); generate_migration_update(out, guard, mig); + new_transitions.insert(mig.tablet.table); + } + + for (auto [table_id, resize_decision] : plan.resize_plan().resize) { + auto s = _db.find_schema(table_id); + auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id); + // Sequence number is monotonically increasing, globally. Therefore, it can be used to identify a decision. + resize_decision.sequence_number = tmap.resize_decision().next_sequence_number(); + rtlogger.debug("Generating resize decision for table {} of type {} and sequence number {}", + table_id, resize_decision.type_name(), resize_decision.sequence_number); + out.emplace_back( + replica::tablet_mutation_builder(guard.write_timestamp(), table_id) + .set_resize_decision(std::move(resize_decision)) + .build()); + } + + // FIXME: Finalize split requests when exiting the tablet migration track. + for (auto table_id : plan.resize_plan().finalize_resize) { + auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id); + // Only finalize split request if there's no ongoing transition or new ones for a given table. + if (tmap.transitions().size() > 0 || new_transitions.contains(table_id)) { + continue; + } + + auto s = _db.find_schema(table_id); + auto new_tablet_map = co_await _tablet_allocator.split_tablets(get_token_metadata_ptr(), table_id); + out.emplace_back(co_await replica::tablet_map_to_mutation( + new_tablet_map, + table_id, + s->ks_name(), + s->cf_name(), + guard.write_timestamp())); } } @@ -1015,7 +1058,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } } if (!preempt) { - auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr()); + auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr(), _tablet_load_stats); if (!drain || plan.has_nodes_to_drain()) { co_await generate_migration_updates(updates, guard, plan); } @@ -2002,6 +2045,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Returns true if the state machine was transitioned into tablet migration path. future maybe_start_tablet_migration(group0_guard); + future refresh_tablet_load_stats(); + future<> start_tablet_load_stats_refresher(); + future<> await_event() { _as.check(); co_await _topo_sm.event.when(); @@ -2039,8 +2085,12 @@ public: future topology_coordinator::maybe_start_tablet_migration(group0_guard guard) { rtlogger.debug("Evaluating tablet balance"); + if (utils::get_local_injector().enter("tablet_load_stats_refresh_before_rebalancing")) { + _tablet_load_stats = make_lw_shared(co_await refresh_tablet_load_stats()); + } + auto tm = get_token_metadata_ptr(); - auto plan = co_await _tablet_allocator.balance_tablets(tm); + auto plan = co_await _tablet_allocator.balance_tablets(tm, _tablet_load_stats); if (plan.empty()) { rtlogger.debug("Tablets are balanced"); co_return false; @@ -2060,6 +2110,110 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua co_return true; } +future topology_coordinator::refresh_tablet_load_stats() { + auto tm = get_token_metadata_ptr(); + auto& topology = tm->get_topology(); + + locator::load_stats stats; + static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30}; + + std::unordered_map total_replicas; + + for (auto& [dc, nodes] : topology.get_datacenter_nodes()) { + locator::load_stats dc_stats; + rtlogger.info("raft topology: Refreshing table load stats for DC {} that has {} endpoints", dc, nodes.size()); + co_await coroutine::parallel_for_each(nodes, [&] (const auto& node) -> future<> { + auto dst = node->host_id(); + + _as.check(); + + // FIXME: if a node is down, completion status cannot be relied upon, but the average tablet size + // could still be inferred from the replicas available. + + auto timeout = netw::messaging_service::clock_type::now() + wait_for_live_nodes_timeout; + + abort_source as; + auto request_abort = [&as] () mutable noexcept { + as.request_abort(); + }; + auto t = timer(request_abort); + t.arm(timeout); + auto sub = _as.subscribe(request_abort); + + auto node_stats = co_await ser::storage_service_rpc_verbs::send_table_load_stats(&_messaging, + netw::msg_addr(id2ip(dst)), + as, + raft::server_id(dst.uuid())); + + dc_stats += node_stats; + }); + + for (auto& [table_id, table_stats] : dc_stats.tables) { + co_await coroutine::maybe_yield(); + + auto& t = _db.find_column_family(table_id); + auto& rs = t.get_effective_replication_map()->get_replication_strategy(); + if (!rs.uses_tablets()) { + continue; + } + const auto* nts_ptr = dynamic_cast(&rs); + if (!nts_ptr) { + on_internal_error(rtlogger, "Cannot convert replication_strategy that uses tablets into network_topology_strategy"); + } + + auto rf_for_this_dc = nts_ptr->get_replication_factor(dc); + if (rf_for_this_dc <= 0) { + continue; + } + total_replicas[table_id] += rf_for_this_dc; + rtlogger.debug("raft topology: Refreshed table load stats for DC {}, table={}, RF={}, size_in_bytes={}, split_ready_seq_number={}", + dc, table_id, rf_for_this_dc, table_stats.size_in_bytes, table_stats.split_ready_seq_number); + } + + stats += dc_stats; + } + + for (auto& [table_id, table_load_stats] : stats.tables) { + auto table_total_replicas = total_replicas.at(table_id); + if (table_total_replicas == 0) { + continue; + } + // Takes into account the RF of each DC, so we can compute the average total size + // for a single table replica. This allows the load balancer to compute, in turn, + // the average tablet size by dividing total size by tablet count. + table_load_stats.size_in_bytes /= table_total_replicas; + } + rtlogger.debug("raft topology: Refreshed table load stats for all DC(s)."); + + co_return std::move(stats); +} + +future<> topology_coordinator::start_tablet_load_stats_refresher() { + auto can_proceed = [this] { return !_async_gate.is_closed() && !_as.abort_requested(); }; + while (can_proceed()) { + bool sleep = true; + try { + _tablet_load_stats = make_lw_shared(co_await refresh_tablet_load_stats()); + _topo_sm.event.broadcast(); // wake up load balancer. + } catch (raft::request_aborted&) { + rtlogger.debug("raft topology: Tablet load stats refresher aborted"); + sleep = false; + } catch (seastar::abort_requested_exception) { + rtlogger.debug("raft topology: Tablet load stats refresher aborted"); + sleep = false; + } catch (...) { + rtlogger.warn("Found error while refreshing load stats for tablets: {}, retrying...", std::current_exception()); + } + if (sleep && can_proceed()) { + try { + co_await seastar::sleep_abortable(tablet_load_stats_refresh_interval, _as); + } catch (...) { + rtlogger.debug("raft topology: Tablet load stats refresher: sleep failed: {}", std::current_exception()); + } + } + } +} + future<> topology_coordinator::fence_previous_coordinator() { // Write empty change to make sure that a guard taken by any previous coordinator cannot // be used to do a successful write any more. Otherwise the following can theoretically happen @@ -2166,6 +2320,7 @@ future<> topology_coordinator::run() { co_await fence_previous_coordinator(); auto cdc_generation_publisher = cdc_generation_publisher_fiber(); + auto tablet_load_stats_refresher = start_tablet_load_stats_refresher(); while (!_as.abort_requested()) { bool sleep = false; @@ -2213,6 +2368,7 @@ future<> topology_coordinator::run() { } co_await _async_gate.close(); + co_await std::move(tablet_load_stats_refresher); co_await std::move(cdc_generation_publisher); } diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index a99087ee7f..df6d8f1a26 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -230,6 +230,18 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { } verify_tablet_metadata_persistence(e, tm, ts); + + // Change resize decision of table1 + { + tablet_map tmap(1); + locator::resize_decision decision; + decision.way = locator::resize_decision::split{}, + decision.sequence_number = 1; + tmap.set_resize_decision(decision); + tm.set_tablet_map(table1, std::move(tmap)); + } + + verify_tablet_metadata_persistence(e, tm, ts); } }, tablet_cql_test_config()); } @@ -432,6 +444,42 @@ SEASTAR_TEST_CASE(test_mutation_builder) { auto tm_from_disk = read_tablet_metadata(e.local_qp()).get0(); BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1)); } + + static const auto resize_decision = locator::resize_decision("split", 1); + + { + tablet_mutation_builder b(ts++, table1); + auto last_token = tm.get_tablet_map(table1).get_last_token(tid1); + b.set_replicas(last_token, tablet_replica_set { + tablet_replica {h1, 2}, + tablet_replica {h2, 3}, + }); + b.del_transition(last_token); + b.set_resize_decision(resize_decision); + e.local_db().apply({freeze(b.build())}, db::no_timeout).get(); + } + + { + tablet_map expected_tmap(2); + tid = expected_tmap.first_tablet(); + expected_tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {h1, 0}, + tablet_replica {h3, 5}, + } + }); + tid1 = *expected_tmap.next_tablet(tid); + expected_tmap.set_tablet(tid1, tablet_info { + tablet_replica_set { + tablet_replica {h1, 2}, + tablet_replica {h2, 3}, + } + }); + expected_tmap.set_resize_decision(resize_decision); + + auto tm_from_disk = read_tablet_metadata(e.local_qp()).get0(); + BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1)); + } }, tablet_cql_test_config()); } @@ -601,6 +649,21 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) { } } +static +void apply_resize_plan(token_metadata& tm, const migration_plan& plan) { + for (auto [table_id, resize_decision] : plan.resize_plan().resize) { + tablet_map& tmap = tm.tablets().get_tablet_map(table_id); + resize_decision.sequence_number = tmap.resize_decision().sequence_number + 1; + tmap.set_resize_decision(resize_decision); + } + for (auto table_id : plan.resize_plan().finalize_resize) { + auto& old_tmap = tm.tablets().get_tablet_map(table_id); + testlog.info("Setting new tablet map of size {}", old_tmap.tablet_count() * 2); + tablet_map tmap(old_tmap.tablet_count() * 2); + tm.tablets().set_tablet_map(table_id, std::move(tmap)); + } +} + // Reflects the plan in a given token metadata as if the migrations were fully executed. static void apply_plan(token_metadata& tm, const migration_plan& plan) { @@ -610,6 +673,7 @@ void apply_plan(token_metadata& tm, const migration_plan& plan) { tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst); tmap.set_tablet(mig.tablet.tablet, tinfo); } + apply_resize_plan(tm, plan); } // Reflects the plan in a given token metadata as if the migrations were started but not yet executed. @@ -620,12 +684,13 @@ void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) { auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); tmap.set_tablet_transition_info(mig.tablet.tablet, migration_to_transition_info(tinfo, mig)); } + apply_resize_plan(tm, plan); } static -void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm) { +void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm, locator::load_stats_ptr load_stats = {}) { while (true) { - auto plan = talloc.balance_tablets(stm.get()).get0(); + auto plan = talloc.balance_tablets(stm.get(), load_stats).get0(); if (plan.empty()) { break; } @@ -1555,3 +1620,121 @@ SEASTAR_THREAD_TEST_CASE(basic_tablet_storage_splitting_test) { }, bool(false), std::logical_or()).get0(), true); }, std::move(cfg)).get(); } + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { + do_with_cql_env_thread([] (auto& e) { + inet_address ip1("192.168.0.1"); + inet_address ip2("192.168.0.2"); + + auto host1 = host_id(next_uuid()); + auto host2 = host_id(next_uuid()); + + auto table1 = table_id(next_uuid()); + + unsigned shard_count = 2; + + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = ip1, + .local_dc_rack = locator::endpoint_dc_rack::default_location + } + }); + + stm.mutate_token_metadata([&] (token_metadata& tm) { + tm.update_host_id(host1, ip1); + tm.update_host_id(host2, ip2); + tm.update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + + tablet_map tmap(2); + for (auto tid : tmap.tablet_ids()) { + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, tests::random::get_int(0, shard_count - 1)}, + tablet_replica {host2, tests::random::get_int(0, shard_count - 1)}, + } + }); + } + tablet_metadata tmeta; + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future<>(); + }).get(); + + auto tablet_count = [&] { + return stm.get()->tablets().get_tablet_map(table1).tablet_count(); + }; + auto resize_decision = [&] { + return stm.get()->tablets().get_tablet_map(table1).resize_decision(); + }; + + auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { + rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + }; + + const size_t initial_tablets = tablet_count(); + const uint64_t max_tablet_size = service::default_target_tablet_size * 2; + auto to_size_in_bytes = [&] (double max_tablet_size_pctg) -> uint64_t { + return (max_tablet_size * max_tablet_size_pctg) * tablet_count(); + }; + + + const auto initial_ready_seq_number = std::numeric_limits::min(); + + // there are 2 tablets, each with avg size hitting merge threshold, so merge request is emitted + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.0), .split_ready_seq_number = initial_ready_seq_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + BOOST_REQUIRE(tablet_count() == initial_tablets); + BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); + } + + // avg size moved above target size, so merge is cancelled + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.75), .split_ready_seq_number = initial_ready_seq_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + BOOST_REQUIRE(tablet_count() == initial_tablets); + BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); + } + + // avg size hits split threshold, and balancer emits split request + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(1.1), .split_ready_seq_number = initial_ready_seq_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + BOOST_REQUIRE(tablet_count() == initial_tablets); + BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); + BOOST_REQUIRE(resize_decision().sequence_number > 0); + } + + // replicas set their split status as ready, and load balancer finalizes split generating a new + // tablet map, twice as large as the previous one. + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(1.1), .split_ready_seq_number = resize_decision().sequence_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + + BOOST_REQUIRE(tablet_count() == initial_tablets * 2); + BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); + } + }).get(); +} diff --git a/test/topology_experimental_raft/test_fencing.py b/test/topology_experimental_raft/test_fencing.py index d545d47e51..054138326e 100644 --- a/test/topology_experimental_raft/test_fencing.py +++ b/test/topology_experimental_raft/test_fencing.py @@ -84,6 +84,9 @@ async def test_fence_writes(request, manager: ManagerClient): logger.info(f'Waiting for cql and hosts') host2 = (await wait_for_cql_and_get_hosts(cql, [servers[2]], time.time() + 60))[0] + # Disable load balancer as it might bump topology version, undoing the decrement below. + await manager.api.disable_tablet_balancing(servers[2].ip_addr) + version = await get_version(manager, host2) logger.info(f"version on host2 {version}") @@ -129,6 +132,10 @@ async def test_fence_hints(request, manager: ManagerClient): logger.info(f'Waiting for cql and hosts') hosts = await wait_for_cql_and_get_hosts(cql, [s0, s2], time.time() + 60) + # Disable load balancer as it might bump topology version, potentially creating a race condition + # with read modify write below + await manager.api.disable_tablet_balancing(s2.ip_addr) + host2 = host_by_server(hosts, s2) new_version = (await get_version(manager, host2)) + 1 logger.info(f"Set version and fence_version to {new_version} on node {host2}") diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 6ff09ca456..cada60f14b 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -576,3 +576,73 @@ async def test_tablet_resharding(manager: ManagerClient): await manager.server_start( server.server_id, expected_error="Detected a tablet with invalid replica shard, reducing shard count with tablet-enabled tables is not yet supported. Replace the node instead.") + +async def get_tablet_count(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str): + host = manager.cql.cluster.metadata.get_host(server.ip_addr) + + # read_barrier is needed to ensure that local tablet metadata on the queried node + # reflects the finalized tablet movement. + await read_barrier(manager.cql, host) + + table_id = await manager.get_table_id(keyspace_name, table_name) + rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where " + f"table_id = {table_id}", host=host) + return rows[0].tablet_count + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_split(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=debug', + '--target-tablet-size-in-bytes', '1024', + ] + servers = [await manager.server_add(cmdline=cmdline)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + + # enough to trigger multiple splits with max size of 1024 bytes. + keys = range(256) + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + async def check(): + logger.info("Checking table") + cql = manager.get_cql() + rows = await cql.run_async("SELECT * FROM test.test;") + assert len(rows) == len(keys) + for r in rows: + assert r.c == r.pk + + await check() + + await manager.api.flush_keyspace(servers[0].ip_addr, "test") + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count == 1 + + logger.info("Adding new server") + servers.append(await manager.server_add(cmdline=cmdline)) + + # Increases the chance of tablet migration concurrent with split + await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers) + await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) + + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() + + # Now there's a split and migration need, so they'll potentially run concurrently. + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + await check() + time.sleep(5) # Give load balancer some time to do work + + await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) + + await check() + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count > 1