From 57582ac9c42c590e8e81029f0ac8fecb4a0d5d33 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 13:02:55 -0300 Subject: [PATCH 01/25] locator: Introduce resize_decision resize_decision is the metadata the says whether tablets of a table needs split, merge, or none. That will be recorded in tablet metadata, and therefore stored in group0. Signed-off-by: Raphael S. Carvalho --- locator/tablets.cc | 28 ++++++++++++++++++++++++++++ locator/tablets.hh | 26 ++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index 8b7ae19a10..ec692ff303 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -374,6 +374,34 @@ size_t tablet_map::external_memory_usage() const { return result; } +bool resize_decision::operator==(const resize_decision& o) const { + return way.index() == o.way.index() && sequence_number == o.sequence_number; +} + +static auto to_resize_type(sstring decision) { + static const std::unordered_map string_to_type = { + {"none", resize_decision::none{}}, + {"split", resize_decision::split{}}, + {"merge", resize_decision::merge{}}, + }; + return string_to_type.at(decision); +} + +resize_decision::resize_decision(sstring decision, uint64_t seq_number) + : way(to_resize_type(decision)) + , sequence_number(seq_number) { +} + +sstring resize_decision::type_name() const { + static const std::array index_to_string = { + "none", + "split", + "merge", + }; + static_assert(std::variant_size_v == index_to_string.size()); + return index_to_string[way.index()]; +} + // Estimates the external memory usage of std::unordered_map<>. // Does not include external memory usage of elements. template diff --git a/locator/tablets.hh b/locator/tablets.hh index 45332d2879..5f1afdee9d 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -235,6 +235,32 @@ enum tablet_range_side { right = 1, }; +// The decision of whether tablets of a given should be split, merged, or none, is made +// by the load balancer. This decision is recorded in the tablet_map and stored in group0. +struct resize_decision { + struct none {}; + struct split {}; + struct merge {}; + + using seq_number_t = int64_t; + + std::variant way; + // The sequence number globally identifies a resize decision. + // It's monotonically increasing, globally. + // Needed to distinguish stale decision from latest one, in case coordinator + // revokes the current decision and signal it again later. + seq_number_t sequence_number = 0; + + resize_decision() = default; + resize_decision(sstring decision, uint64_t seq_number); + bool split_or_merge() const { + return !std::holds_alternative(way); + } + bool operator==(const resize_decision&) const; + sstring type_name() const; +}; + + /// Stores information about tablets of a single table. /// /// The map contains a constant number of tablets, tablet_count(). From 0d5ba1ee4b9e8f1e33038fdd17bf3bc463210930 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sun, 7 Jan 2024 17:53:58 -0300 Subject: [PATCH 02/25] tablets: Add resize decision metadata to tablet metadata The new metadata describes the ongoing resize operation (can be either of merge, split or none) that spans tablets of a given table. That's managed by group0, so down nodes will be able to see the decision when they come back up and see the changes to the metadata. Signed-off-by: Raphael S. Carvalho --- docs/dev/system_keyspace.md | 8 ++++++++ locator/tablets.cc | 12 ++++++++++++ locator/tablets.hh | 6 ++++++ replica/tablets.cc | 13 +++++++++++++ test/boost/tablets_test.cc | 12 ++++++++++++ 5 files changed, 51 insertions(+) diff --git a/docs/dev/system_keyspace.md b/docs/dev/system_keyspace.md index cf8c69ac2d..5b6c7adc5c 100644 --- a/docs/dev/system_keyspace.md +++ b/docs/dev/system_keyspace.md @@ -205,6 +205,8 @@ CREATE TABLE system.tablets ( transition text, table_name text static, tablet_count int static, + resize_type text static, + resize_seq_number bigint static, PRIMARY KEY ((keyspace_name, table_id), last_token) ) ~~~ @@ -216,6 +218,10 @@ Only tables which use tablet-based replication strategy have an entry here. `tablet_count` is the number of tablets in the map. `table_name` is the name of the table, provided for convenience. +`resize_type` is the resize decision type that spans all tablets of a given table, which can be one of: `merge`, `split` or `none`. + +`resize_seq_number` is the sequence number (>= 0) of the resize decision that globally identifies it. It's monotonically increasing, incremented by one for every new decision, so a higher value means it came later in time. + `last_token` is the last token owned by the tablet. The i-th tablet, where i = 0, 1, ..., `tablet_count`-1), owns the token range: ``` @@ -229,6 +235,8 @@ It's a list of tuples where the first element is `host_id` of the replica and th During tablet migration, the columns `new_replicas`, `stage` and `transition` are set to represent the transition. The `new_replicas` column holds what will be put in `replicas` after transition is done. +During tablet splitting, the load balancer sets `resize_type` column with `split`, and sets `resize_seq_number` with the next sequence number, which is the previous value incremented by one. + The `transition` column can have the following values: * `migration` - One tablet replica is moving from one shard to another. * `rebuild` - New tablet replica is created from the remaining replicas. diff --git a/locator/tablets.cc b/locator/tablets.cc index ec692ff303..5bc571d173 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -222,6 +222,10 @@ void tablet_map::set_tablet_transition_info(tablet_id id, tablet_transition_info _transitions.insert_or_assign(id, std::move(info)); } +void tablet_map::set_resize_decision(locator::resize_decision decision) { + _resize_decision = std::move(decision); +} + future<> tablet_map::for_each_tablet(seastar::noncopyable_function func) const { std::optional tid = first_tablet(); for (const tablet_info& ti : tablets()) { @@ -378,6 +382,14 @@ bool resize_decision::operator==(const resize_decision& o) const { return way.index() == o.way.index() && sequence_number == o.sequence_number; } +bool tablet_map::needs_split() const { + return std::holds_alternative(_resize_decision.way); +} + +const locator::resize_decision& tablet_map::resize_decision() const { + return _resize_decision; +} + static auto to_resize_type(sstring decision) { static const std::unordered_map string_to_type = { {"none", resize_decision::none{}}, diff --git a/locator/tablets.hh b/locator/tablets.hh index 5f1afdee9d..ec79b8899b 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -287,6 +287,7 @@ private: tablet_container _tablets; size_t _log2_tablets; // log_2(_tablets.size()) std::unordered_map _transitions; + resize_decision _resize_decision; public: /// Constructs a tablet map. /// @@ -375,9 +376,14 @@ public: size_t external_memory_usage() const; bool operator==(const tablet_map&) const = default; + + bool needs_split() const; + + const locator::resize_decision& resize_decision() const; public: void set_tablet(tablet_id, tablet_info); void set_tablet_transition_info(tablet_id, tablet_transition_info); + void set_resize_decision(locator::resize_decision); void clear_transitions(); // Destroys gently. diff --git a/replica/tablets.cc b/replica/tablets.cc index 74d88e33ed..083c87c7c0 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -50,6 +50,8 @@ schema_ptr make_tablets_schema() { .with_column("stage", utf8_type) .with_column("transition", utf8_type) .with_column("session", uuid_type) + .with_column("resize_type", utf8_type, column_kind::static_column) + .with_column("resize_seq_number", long_type, column_kind::static_column) .with_version(db::system_keyspace::generate_schema_version(id)) .build(); } @@ -80,6 +82,8 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke m.set_static_cell("tablet_count", data_value(int(tablets.tablet_count())), ts); m.set_static_cell("keyspace_name", data_value(keyspace_name), ts); m.set_static_cell("table_name", data_value(table_name), ts); + m.set_static_cell("resize_type", data_value(tablets.resize_decision().type_name()), ts); + m.set_static_cell("resize_seq_number", data_value(int64_t(tablets.resize_decision().sequence_number)), ts); tablet_id tid = tablets.first_tablet(); for (auto&& tablet : tablets.tablets()) { @@ -208,6 +212,15 @@ future read_tablet_metadata(cql3::query_processor& qp) { auto tablet_count = row.get_as("tablet_count"); auto tmap = tablet_map(tablet_count); current = active_tablet_map{table, tmap, tmap.first_tablet()}; + + // Resize decision fields are static columns, so set them only once per table. + if (row.has("resize_type") && row.has("resize_seq_number")) { + auto resize_type_name = row.get_as("resize_type"); + int64_t resize_seq_number = row.get_as("resize_seq_number"); + + locator::resize_decision resize_decision(std::move(resize_type_name), resize_seq_number); + current->map.set_resize_decision(std::move(resize_decision)); + } } tablet_replica_set tablet_replicas; diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index a99087ee7f..136696bd1d 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -230,6 +230,18 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { } verify_tablet_metadata_persistence(e, tm, ts); + + // Change resize decision of table1 + { + tablet_map tmap(1); + locator::resize_decision decision; + decision.way = locator::resize_decision::split{}, + decision.sequence_number = 1; + tmap.set_resize_decision(decision); + tm.set_tablet_map(table1, std::move(tmap)); + } + + verify_tablet_metadata_persistence(e, tm, ts); } }, tablet_cql_test_config()); } From 6c74fc4b824e42e74c9ceab4135711784b7dd15f Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 9 Jan 2024 14:21:29 -0300 Subject: [PATCH 03/25] locator: Introduce table_load_stats This is per table stats that will be aggregated from all nodes, by the coordinator, in order to help load balancer make resize decisions. size_in_bytes is the total aggregated table size, so coordinator becomes responsible for taking into account RF of each DC and also tablet count, for computing an accurate average size. split_ready_seq_number is the minimum sequence number among all replicas. If coordinator sees all replicas store the seq number of current split, then it knows all replicas are ready for the next stage in the split process. Signed-off-by: Raphael S. Carvalho --- locator/tablets.cc | 13 +++++++++++++ locator/tablets.hh | 21 +++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index 5bc571d173..07a377db71 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -414,6 +414,19 @@ sstring resize_decision::type_name() const { return index_to_string[way.index()]; } +table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexcept { + size_in_bytes = size_in_bytes + s.size_in_bytes; + split_ready_seq_number = std::min(split_ready_seq_number, s.split_ready_seq_number); + return *this; +} + +load_stats& load_stats::operator+=(const load_stats& s) { + for (auto& [id, stats] : s.tables) { + tables[id] += stats; + } + return *this; +} + // Estimates the external memory usage of std::unordered_map<>. // Does not include external memory usage of elements. template diff --git a/locator/tablets.hh b/locator/tablets.hh index ec79b8899b..3fab28f51b 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -260,6 +260,27 @@ struct resize_decision { sstring type_name() const; }; +struct table_load_stats { + uint64_t size_in_bytes = 0; + // Stores the minimum seq number among all replicas, as coordinator wants to know if + // all replicas have completed splitting, which happens when they all store the + // seq number of the current split decision. + resize_decision::seq_number_t split_ready_seq_number = std::numeric_limits::max(); + + table_load_stats& operator+=(const table_load_stats& s) noexcept; + friend table_load_stats operator+(table_load_stats a, const table_load_stats& b) { + return a += b; + } +}; + +struct load_stats { + std::unordered_map tables; + + load_stats& operator+=(const load_stats& s); + friend load_stats operator+(load_stats a, const load_stats& b) { + return a += b; + } +}; /// Stores information about tablets of a single table. /// From beef9c9f705b8b29d3f43abd565f71613fe38f39 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 13 Jan 2024 12:35:04 -0300 Subject: [PATCH 04/25] replica: Introduce storage_group::live_disk_space_used() Signed-off-by: Raphael S. Carvalho --- replica/compaction_group.hh | 2 ++ replica/table.cc | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 8e2505e01a..c6dcd6ebe8 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -190,6 +190,8 @@ public: compaction_group_ptr& main_compaction_group() noexcept; compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept; + uint64_t live_disk_space_used() const noexcept; + utils::small_vector compaction_groups() noexcept; // Puts the storage group in split mode, in which it internally segregates data diff --git a/replica/table.cc b/replica/table.cc index 20c1516f15..f6bfc4e4f3 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1291,6 +1291,11 @@ uint64_t compaction_group::live_disk_space_used() const noexcept { return _main_sstables->bytes_on_disk() + _maintenance_sstables->bytes_on_disk(); } +uint64_t storage_group::live_disk_space_used() const noexcept { + auto cgs = const_cast(*this).compaction_groups(); + return boost::accumulate(cgs | boost::adaptors::transformed(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0)); +} + uint64_t compaction_group::total_disk_space_used() const noexcept { return live_disk_space_used() + boost::accumulate(_sstables_compacted_but_not_deleted | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::bytes_on_disk)), uint64_t(0)); } From 468461592798f96fb7dbefaec125ab1aebcca950 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 13:55:01 -0300 Subject: [PATCH 05/25] replica: Expose table_load_stats in table This is the table replica state that coordinator will aggregate from all nodes and feed into the load balancer. A tablet filter is added to not double account migrating tablets, so only one of pending or leaving tablet replica will be accounted based on current migration stage. More details can be known in the patch that will implement the filter. Signed-off-by: Raphael S. Carvalho --- replica/database.hh | 9 +++++++++ replica/table.cc | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/replica/database.hh b/replica/database.hh index 756c03a5fd..f74630dafb 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -535,6 +535,11 @@ private: bool _is_bootstrap_or_replace = false; sstables::shared_sstable make_sstable(sstables::sstable_state state); + // Every table replica that completes split work will load the seq number from tablet metadata into its local + // state. So when coordinator pull the local state of a table, it will know whether the table is ready for the + // current split, and not a previously revoked (stale) decision. + // The minimum value, which is a negative number, is not used by coordinator for first decision. + locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits::min(); public: void deregister_metrics(); @@ -1000,6 +1005,10 @@ public: return _stats; } + // The tablet filter is used to not double account migrating tablets, so it's important that + // only one of pending or leaving replica is accounted based on current migration stage. + locator::table_load_stats table_load_stats(std::function tablet_filter) const noexcept; + const db::view::stats& get_view_stats() const { return _view_stats; } diff --git a/replica/table.cc b/replica/table.cc index f6bfc4e4f3..f7a28c5380 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1838,6 +1838,24 @@ table::table(schema_ptr schema, config config, lw_shared_ptr tablet_filter) const noexcept { + locator::table_load_stats stats; + stats.split_ready_seq_number = _split_ready_seq_number; + + for (unsigned id = 0; id < _storage_groups.size(); id++) { + auto& sg = _storage_groups[id]; + if (!sg) { + continue; + } + locator::global_tablet_id gid { _schema->id(), locator::tablet_id(id) }; + if (!tablet_filter(gid)) { + continue; + } + stats.size_in_bytes += sg->live_disk_space_used(); + } + return stats; +} + void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { auto old_erm = std::exchange(_erm, std::move(erm)); if (old_erm) { From 9519a0c9e404a573c5826e7896ff22a00cec974a Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 13:59:21 -0300 Subject: [PATCH 06/25] storage_service: Implement table_load_stats RPC This implements the RPC for collecting table stats. Since both leaving and pending replica can be accounted during tablet migration, the RPC handler will look at tablet transition info and account only either leaving or replica based on the tablet migration stage. Replicas that are not leaving or pending, of course, don't contribute to the anomaly in the reported size. Signed-off-by: Raphael S. Carvalho --- idl/storage_service.idl.hh | 10 +++++ message/messaging_service.cc | 1 + message/messaging_service.hh | 3 +- service/storage_service.cc | 78 ++++++++++++++++++++++++++++++++++++ service/storage_service.hh | 2 + 5 files changed, 93 insertions(+), 1 deletion(-) diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index 9e48fca547..08982cc5c1 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -17,6 +17,15 @@ struct global_tablet_id final { locator::tablet_id tablet; }; +struct table_load_stats final { + uint64_t size_in_bytes; + int64_t split_ready_seq_number; +}; + +struct load_stats final { + std::unordered_map<::table_id, locator::table_load_stats> tables; +}; + } namespace service { @@ -54,4 +63,5 @@ verb raft_topology_cmd (raft::server_id dst_id, raft::term_t term, uint64_t cmd_ verb [[cancellable]] raft_pull_topology_snapshot (raft::server_id dst_id, service::raft_topology_pull_params) -> service::raft_topology_snapshot; verb [[cancellable]] tablet_stream_data (raft::server_id dst_id, locator::global_tablet_id); verb [[cancellable]] tablet_cleanup (raft::server_id dst_id, locator::global_tablet_id); +verb [[cancellable]] table_load_stats (raft::server_id dst_id) -> locator::load_stats; } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index d69e7a0931..08a1b150ab 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -618,6 +618,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::TABLET_STREAM_FILES: case messaging_verb::TABLET_STREAM_DATA: case messaging_verb::TABLET_CLEANUP: + case messaging_verb::TABLE_LOAD_STATS: return 1; case messaging_verb::CLIENT_ID: case messaging_verb::MUTATION: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 98c38ebea5..9b42b30c34 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -190,7 +190,8 @@ enum class messaging_verb : int32_t { JOIN_NODE_RESPONSE = 69, TABLET_STREAM_FILES = 70, STREAM_BLOB = 71, - LAST = 72, + TABLE_LOAD_STATS = 72, + LAST = 73, }; } // namespace netw diff --git a/service/storage_service.cc b/service/storage_service.cc index 6be5bef94e..ac85d04e8d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5095,6 +5095,79 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator: }); } +future storage_service::load_stats_for_tablet_based_tables() { + auto holder = _async_gate.hold(); + + if (this_shard_id() != 0) { + // topology coordinator only exists in shard 0. + co_return co_await container().invoke_on(0, [&] (auto& ss) { + return ss.load_stats_for_tablet_based_tables(); + }); + } + + using table_erms_t = std::unordered_map; + // Creates a snapshot of transitions, so different shards will find their tablet replicas in the + // same migration stage. Important for intra-node migration. + const auto erms = co_await std::invoke([this] () -> future { + table_erms_t erms; + co_await _db.local().get_tables_metadata().for_each_table_gently([&] (table_id id, lw_shared_ptr table) mutable { + if (table->uses_tablets()) { + erms.emplace(id, table->get_effective_replication_map()); + } + return make_ready_future<>(); + }); + co_return std::move(erms); + }); + + // Each node combines a per-table load map from all of its shards and returns it to the coordinator. + // So if there are 1k nodes, there will be 1k RPCs in total. + auto load_stats = co_await _db.map_reduce0([&erms] (replica::database& db) -> future { + locator::load_stats load_stats{}; + auto& tables_metadata = db.get_tables_metadata(); + + for (const auto& [id, erm] : erms) { + auto table = tables_metadata.get_table_if_exists(id); + if (!table) { + continue; + } + + auto& token_metadata = erm->get_token_metadata(); + auto& tmap = token_metadata.tablets().get_tablet_map(id); + auto me = locator::tablet_replica { token_metadata.get_my_id(), this_shard_id() }; + + // It's important to tackle the anomaly in reported size, since both leaving and + // pending replicas could otherwise be accounted during tablet migration. + // If transition hasn't reached cleanup stage, then leaving replicas are accounted. + // If transition is past cleanup stage, then pending replicas are accounted. + // This helps to reduce the discrepancy window. + auto tablet_filter = [&tmap, &me] (locator::global_tablet_id id) { + auto transition = tmap.get_tablet_transition_info(id.tablet); + auto& info = tmap.get_tablet_info(id.tablet); + + // if tablet is not in transit, it's filtered in. + if (!transition) { + return true; + } + + bool is_pending = transition->pending_replica == me; + bool is_leaving = locator::get_leaving_replica(info, *transition) == me; + auto s = transition->stage; + + return (!is_pending && !is_leaving) + || (is_leaving && s < locator::tablet_transition_stage::cleanup) + || (is_pending && s >= locator::tablet_transition_stage::cleanup); + }; + + load_stats.tables.emplace(id, table->table_load_stats(tablet_filter)); + co_await coroutine::maybe_yield(); + } + + co_return std::move(load_stats); + }, locator::load_stats{}, std::plus()); + + co_return std::move(load_stats); +} + future<> storage_service::set_tablet_balancing_enabled(bool enabled) { auto holder = _async_gate.hold(); @@ -5514,6 +5587,11 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled) return ss.cleanup_tablet(tablet); }); }); + ser::storage_service_rpc_verbs::register_table_load_stats(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id) { + return handle_raft_rpc(dst_id, [] (auto& ss) mutable { + return ss.load_stats_for_tablet_based_tables(); + }); + }); ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_request_params params) { return handle_raft_rpc(dst_id, [params = std::move(params)] (auto& ss) mutable { return ss.join_node_request_handler(std::move(params)); diff --git a/service/storage_service.hh b/service/storage_service.hh index 0f511789e6..cbd053697e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -170,6 +170,8 @@ private: future<> stream_tablet(locator::global_tablet_id); future<> cleanup_tablet(locator::global_tablet_id); inet_address host2ip(locator::host_id); + // Handler for table load stats RPC. + future load_stats_for_tablet_based_tables(); public: storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, From 489a527e20549938e1f0d0f6e0978ecad48632e1 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 9 Jan 2024 22:03:09 -0300 Subject: [PATCH 07/25] locator: Introduce topology::get_datacenter_nodes() Signed-off-by: Raphael S. Carvalho --- locator/topology.hh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/locator/topology.hh b/locator/topology.hh index 3a4cdf2ced..de24a0a7c5 100644 --- a/locator/topology.hh +++ b/locator/topology.hh @@ -264,6 +264,12 @@ public: return _dc_endpoints; } + const std::unordered_map>& + get_datacenter_nodes() const { + return _dc_nodes; + } + const std::unordered_map>>& From 2209c7440cbac66d7a27f8f8be050cb22376598a Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 14:05:05 -0300 Subject: [PATCH 08/25] topology_coordinator: Periodically retrieve table_load_stats This implements the fiber that aggregates per-table stats that will be feeded into load balancer to make resize decisions (split, merge, or revoke ongoing ones). Initially, the stats will be refreshed every 60s, but the idea is that eventually we make the frequency table based, where the size of each table is taken into account. Signed-off-by: Raphael S. Carvalho --- locator/tablets.hh | 2 + service/topology_coordinator.cc | 122 ++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/locator/tablets.hh b/locator/tablets.hh index 3fab28f51b..36eb3447ef 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -282,6 +282,8 @@ struct load_stats { } }; +using load_stats_ptr = lw_shared_ptr; + /// Stores information about tablets of a single table. /// /// The map contains a constant number of tablets, tablet_count(). diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index e73165055d..d4513d4ad4 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,7 @@ #include "gms/gossiper.hh" #include "locator/tablets.hh" #include "locator/token_metadata.hh" +#include "locator/network_topology_strategy.hh" #include "message/messaging_service.hh" #include "replica/database.hh" #include "replica/tablet_mutation_builder.hh" @@ -84,6 +86,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { tablet_allocator& _tablet_allocator; + // The reason load_stats_ptr is a shared ptr is that load balancer can yield, and we don't want it + // to suffer lifetime issues when stats refresh fiber overrides the current stats. + locator::load_stats_ptr _tablet_load_stats; + // FIXME: make frequency per table in order to reduce work in each iteration. + // Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration. + static constexpr std::chrono::seconds tablet_load_stats_refresh_interval = std::chrono::seconds(60); + std::chrono::milliseconds _ring_delay; using drop_guard_and_retake = bool_class; @@ -1982,6 +1991,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Returns true if the state machine was transitioned into tablet migration path. future maybe_start_tablet_migration(group0_guard); + future refresh_tablet_load_stats(); + future<> start_tablet_load_stats_refresher(); + future<> await_event() { _as.check(); co_await _topo_sm.event.when(); @@ -2019,6 +2031,10 @@ public: future topology_coordinator::maybe_start_tablet_migration(group0_guard guard) { rtlogger.debug("Evaluating tablet balance"); + if (utils::get_local_injector().enter("tablet_load_stats_refresh_before_rebalancing")) { + _tablet_load_stats = make_lw_shared(co_await refresh_tablet_load_stats()); + } + auto tm = get_token_metadata_ptr(); auto plan = co_await _tablet_allocator.balance_tablets(tm); if (plan.empty()) { @@ -2040,6 +2056,110 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua co_return true; } +future topology_coordinator::refresh_tablet_load_stats() { + auto tm = get_token_metadata_ptr(); + auto& topology = tm->get_topology(); + + locator::load_stats stats; + static constexpr std::chrono::seconds wait_for_live_nodes_timeout{30}; + + std::unordered_map total_replicas; + + for (auto& [dc, nodes] : topology.get_datacenter_nodes()) { + locator::load_stats dc_stats; + rtlogger.info("raft topology: Refreshing table load stats for DC {} that has {} endpoints", dc, nodes.size()); + co_await coroutine::parallel_for_each(nodes, [&] (const auto& node) -> future<> { + auto dst = node->host_id(); + + _as.check(); + + // FIXME: if a node is down, completion status cannot be relied upon, but the average tablet size + // could still be inferred from the replicas available. + + auto timeout = netw::messaging_service::clock_type::now() + wait_for_live_nodes_timeout; + + abort_source as; + auto request_abort = [&as] () mutable noexcept { + as.request_abort(); + }; + auto t = timer(request_abort); + t.arm(timeout); + auto sub = _as.subscribe(request_abort); + + auto node_stats = co_await ser::storage_service_rpc_verbs::send_table_load_stats(&_messaging, + netw::msg_addr(id2ip(dst)), + as, + raft::server_id(dst.uuid())); + + dc_stats += node_stats; + }); + + for (auto& [table_id, table_stats] : dc_stats.tables) { + co_await coroutine::maybe_yield(); + + auto& t = _db.find_column_family(table_id); + auto& rs = t.get_effective_replication_map()->get_replication_strategy(); + if (!rs.uses_tablets()) { + continue; + } + const auto* nts_ptr = dynamic_cast(&rs); + if (!nts_ptr) { + on_internal_error(rtlogger, "Cannot convert replication_strategy that uses tablets into network_topology_strategy"); + } + + auto rf_for_this_dc = nts_ptr->get_replication_factor(dc); + if (rf_for_this_dc <= 0) { + continue; + } + total_replicas[table_id] += rf_for_this_dc; + rtlogger.debug("raft topology: Refreshed table load stats for DC {}, table={}, RF={}, size_in_bytes={}, split_ready_seq_number={}", + dc, table_id, rf_for_this_dc, table_stats.size_in_bytes, table_stats.split_ready_seq_number); + } + + stats += dc_stats; + } + + for (auto& [table_id, table_load_stats] : stats.tables) { + auto table_total_replicas = total_replicas.at(table_id); + if (table_total_replicas == 0) { + continue; + } + // Takes into account the RF of each DC, so we can compute the average total size + // for a single table replica. This allows the load balancer to compute, in turn, + // the average tablet size by dividing total size by tablet count. + table_load_stats.size_in_bytes /= table_total_replicas; + } + rtlogger.debug("raft topology: Refreshed table load stats for all DC(s)."); + + co_return std::move(stats); +} + +future<> topology_coordinator::start_tablet_load_stats_refresher() { + auto can_proceed = [this] { return !_async_gate.is_closed() && !_as.abort_requested(); }; + while (can_proceed()) { + bool sleep = true; + try { + _tablet_load_stats = make_lw_shared(co_await refresh_tablet_load_stats()); + _topo_sm.event.broadcast(); // wake up load balancer. + } catch (raft::request_aborted&) { + rtlogger.debug("raft topology: Tablet load stats refresher aborted"); + sleep = false; + } catch (seastar::abort_requested_exception) { + rtlogger.debug("raft topology: Tablet load stats refresher aborted"); + sleep = false; + } catch (...) { + rtlogger.warn("Found error while refreshing load stats for tablets: {}, retrying...", std::current_exception()); + } + if (sleep && can_proceed()) { + try { + co_await seastar::sleep_abortable(tablet_load_stats_refresh_interval, _as); + } catch (...) { + rtlogger.debug("raft topology: Tablet load stats refresher: sleep failed: {}", std::current_exception()); + } + } + } +} + future<> topology_coordinator::fence_previous_coordinator() { // Write empty change to make sure that a guard taken by any previous coordinator cannot // be used to do a successful write any more. Otherwise the following can theoretically happen @@ -2142,6 +2262,7 @@ future<> topology_coordinator::run() { co_await fence_previous_coordinator(); auto cdc_generation_publisher = cdc_generation_publisher_fiber(); + auto tablet_load_stats_refresher = start_tablet_load_stats_refresher(); while (!_as.abort_requested()) { bool sleep = false; @@ -2189,6 +2310,7 @@ future<> topology_coordinator::run() { } co_await _async_gate.close(); + co_await std::move(tablet_load_stats_refresher); co_await std::move(cdc_generation_publisher); } From ce353bf47c5655093eea880be31a3f5bc7fa233d Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sun, 14 Jan 2024 13:57:01 -0300 Subject: [PATCH 09/25] storage_service: Allow tablet split and migration to happen concurrently Lack of synchronization could lead the coordinator to think that a pending replica in migration has split ready status, when in reality it escaped the check if it happens that the leaving replica escaped the split ready check, after the status has already been pulled at destination by coordinator. Example: 1) Coordinator pulls split status (ready) from destination replica 2) Migration sends a non-split tablet into destination 3) Coordinator pulls split status (ready) from source after transition stage of migration moved to cleanup (so there's no longer a leaving replica in it). 4) Migration completes, but compaction group is not split yet. Coordinator thinks destination is ready. To solve it, streaming now guarantees that pending replica is split before returning, so migration can only advance to next stage after the pending replica is split, if and only if there's a split request emitted. Signed-off-by: Raphael S. Carvalho --- replica/database.hh | 8 ++++++++ replica/table.cc | 25 +++++++++++++++++++++++-- service/storage_service.cc | 17 +++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index f74630dafb..2d50a86747 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -580,7 +580,15 @@ public: bool all_storage_groups_split(); future<> split_all_storage_groups(); + + // Splits compaction group of a single tablet, if and only if the underlying table has + // split request emitted by coordinator (found in tablet metadata). + // If split is required, then the compaction group of the given tablet is guaranteed to + // be split once it returns. + future<> maybe_split_compaction_group_of(locator::tablet_id); private: + sstables::compaction_type_options::split split_compaction_options() const noexcept; + // Select a compaction group from a given token. std::pair storage_group_of(dht::token token) const noexcept; size_t storage_group_id_for_token(dht::token token) const noexcept; diff --git a/replica/table.cc b/replica/table.cc index f7a28c5380..3b02a60950 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -678,12 +678,16 @@ bool table::all_storage_groups_split() { std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(_compaction_groups))); } -future<> table::split_all_storage_groups() { - sstables::compaction_type_options::split opt {[this] (dht::token t) { +sstables::compaction_type_options::split table::split_compaction_options() const noexcept { + return {[this](dht::token t) { // Classifies the input stream into either left or right side. auto [_, side] = storage_group_of(t); return mutation_writer::token_group_id(side); }}; +} + +future<> table::split_all_storage_groups() { + sstables::compaction_type_options::split opt = split_compaction_options(); auto holder = async_gate().hold(); @@ -692,6 +696,23 @@ future<> table::split_all_storage_groups() { } } +future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) { + auto table_id = _schema->id(); + if (!_erm->get_token_metadata().tablets().get_tablet_map(table_id).needs_split()) { + return make_ready_future<>(); + } + + auto holder = async_gate().hold(); + + auto& sg = _storage_groups[tablet_id.value()]; + if (!sg) { + on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard", + tablet_id, _schema->ks_name(), _schema->cf_name())); + } + + return sg->split(_compaction_groups, split_compaction_options()); +} + std::unique_ptr table::make_storage_group_manager() { if (uses_tablets()) { return std::make_unique(*this); diff --git a/service/storage_service.cc b/service/storage_service.cc index ac85d04e8d..a98b6e0cdf 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4978,6 +4978,23 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { } streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint)); co_await streamer->stream_async(); + + // If new pending tablet replica needs splitting, streaming waits for it to complete. + // That's to provide a guarantee that once migration is over, the coordinator can finalize + // splitting under the promise that compaction groups of tablets are all split, ready + // for the subsequent topology change. + // + // FIXME: + // We could do the splitting not in the streaming stage, but in a later stage, so that + // from the tablet scheduler's perspective migrations blocked on compaction are not + // participating in streaming anymore (which is true), so it could schedule more + // migrations. This way compaction would run in parallel with streaming which can + // reduce the delay. + co_await _db.invoke_on(trinfo->pending_replica.shard, [tablet] (replica::database& db) { + auto& table = db.find_column_family(tablet.table); + return table.maybe_split_compaction_group_of(tablet.tablet); + }); + co_return; }); } From 490d10905579bbe3b813b884b5e1de2426cbfc8e Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sun, 14 Jan 2024 21:02:56 -0300 Subject: [PATCH 10/25] topology_coordinator: Wire load stats into load balancer Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 12 +++++++----- service/tablet_allocator.hh | 2 +- service/topology_coordinator.cc | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index aa54179913..9382c7d0d9 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -255,6 +255,7 @@ class load_balancer { const size_t max_read_streaming_load = 4; token_metadata_ptr _tm; + locator::load_stats_ptr _table_load_stats; load_balancer_stats_manager& _stats; private: tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { @@ -289,8 +290,9 @@ private: } public: - load_balancer(token_metadata_ptr tm, load_balancer_stats_manager& stats) + load_balancer(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats) : _tm(std::move(tm)) + , _table_load_stats(std::move(table_load_stats)) , _stats(stats) { } @@ -863,8 +865,8 @@ public: _stopped = true; } - future balance_tablets(token_metadata_ptr tm) { - load_balancer lb(tm, _load_balancer_stats); + future balance_tablets(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats) { + load_balancer lb(tm, std::move(table_load_stats), _load_balancer_stats); co_return co_await lb.make_plan(); } @@ -914,8 +916,8 @@ future<> tablet_allocator::stop() { return impl().stop(); } -future tablet_allocator::balance_tablets(locator::token_metadata_ptr tm) { - return impl().balance_tablets(tm); +future tablet_allocator::balance_tablets(locator::token_metadata_ptr tm, locator::load_stats_ptr load_stats) { + return impl().balance_tablets(std::move(tm), std::move(load_stats)); } tablet_allocator_impl& tablet_allocator::impl() { diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 975ae64488..7de3d46d21 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -86,7 +86,7 @@ public: /// /// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account. /// - future balance_tablets(locator::token_metadata_ptr); + future balance_tablets(locator::token_metadata_ptr, locator::load_stats_ptr = {}); /// Should be called when the node is no longer a leader. void on_leadership_lost(); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index d4513d4ad4..c08d5357cd 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1024,7 +1024,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } } if (!preempt) { - auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr()); + auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr(), _tablet_load_stats); if (!drain || plan.has_nodes_to_drain()) { co_await generate_migration_updates(updates, guard, plan); } @@ -2036,7 +2036,7 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua } auto tm = get_token_metadata_ptr(); - auto plan = co_await _tablet_allocator.balance_tablets(tm); + auto plan = co_await _tablet_allocator.balance_tablets(tm, _tablet_load_stats); if (plan.empty()) { rtlogger.debug("Tablets are balanced"); co_return false; From ed2138a35a29ef2450d63307f51a015a919ad55b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 14:12:57 -0300 Subject: [PATCH 11/25] tablet_mutation_builder: Add set_resize_decision() Signed-off-by: Raphael S. Carvalho --- replica/tablet_mutation_builder.hh | 1 + replica/tablets.cc | 7 ++++++ test/boost/tablets_test.cc | 36 ++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/replica/tablet_mutation_builder.hh b/replica/tablet_mutation_builder.hh index 3d0e01710a..0e5be6fdd4 100644 --- a/replica/tablet_mutation_builder.hh +++ b/replica/tablet_mutation_builder.hh @@ -39,6 +39,7 @@ public: tablet_mutation_builder& set_session(dht::token last_token, service::session_id); tablet_mutation_builder& del_session(dht::token last_token); tablet_mutation_builder& del_transition(dht::token last_token); + tablet_mutation_builder& set_resize_decision(locator::resize_decision); mutation build() { return std::move(_m); diff --git a/replica/tablets.cc b/replica/tablets.cc index 083c87c7c0..d9528881df 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -155,6 +155,13 @@ tablet_mutation_builder::del_transition(dht::token last_token) { return *this; } +tablet_mutation_builder& +tablet_mutation_builder::set_resize_decision(locator::resize_decision resize_decision) { + _m.set_static_cell("resize_type", data_value(resize_decision.type_name()), _ts); + _m.set_static_cell("resize_seq_number", data_value(int64_t(resize_decision.sequence_number)), _ts); + return *this; +} + mutation make_drop_tablet_map_mutation(table_id id, api::timestamp_type ts) { auto s = db::system_keyspace::tablets(); mutation m(s, partition_key::from_single_value(*s, diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 136696bd1d..d93d5fb36a 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -444,6 +444,42 @@ SEASTAR_TEST_CASE(test_mutation_builder) { auto tm_from_disk = read_tablet_metadata(e.local_qp()).get0(); BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1)); } + + static const auto resize_decision = locator::resize_decision("split", 1); + + { + tablet_mutation_builder b(ts++, table1); + auto last_token = tm.get_tablet_map(table1).get_last_token(tid1); + b.set_replicas(last_token, tablet_replica_set { + tablet_replica {h1, 2}, + tablet_replica {h2, 3}, + }); + b.del_transition(last_token); + b.set_resize_decision(resize_decision); + e.local_db().apply({freeze(b.build())}, db::no_timeout).get(); + } + + { + tablet_map expected_tmap(2); + tid = expected_tmap.first_tablet(); + expected_tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {h1, 0}, + tablet_replica {h3, 5}, + } + }); + tid1 = *expected_tmap.next_tablet(tid); + expected_tmap.set_tablet(tid1, tablet_info { + tablet_replica_set { + tablet_replica {h1, 2}, + tablet_replica {h2, 3}, + } + }); + expected_tmap.set_resize_decision(resize_decision); + + auto tm_from_disk = read_tablet_metadata(e.local_qp()).get0(); + BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1)); + } }, tablet_cql_test_config()); } From 8d283b2593c5f8642bb2300e9606fbb0c24800f4 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 14:16:13 -0300 Subject: [PATCH 12/25] service: Introduce table_resize_plan Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.hh | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 7de3d46d21..bd7b68aebe 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -15,6 +15,24 @@ namespace service { using tablet_migration_info = locator::tablet_migration_info; +/// Represents intention to emit resize (split or merge) request for a +/// table, and finalize or revoke the request previously initiated. +struct table_resize_plan { + std::unordered_map resize; + std::unordered_set finalize_resize; + + size_t size() const { return resize.size() + finalize_resize.size(); } + + void merge(table_resize_plan&& other) { + for (auto&& [id, other_resize] : other.resize) { + if (!resize.contains(id) || other_resize.sequence_number > resize[id].sequence_number) { + resize[id] = std::move(other_resize); + } + } + finalize_resize.merge(std::move(other.finalize_resize)); + } +}; + class migration_plan { public: using migrations_vector = utils::chunked_vector; From 8f7f74c4904a287577250ab371e3beba1e1c2739 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 14:16:56 -0300 Subject: [PATCH 13/25] service: Wire table_resize_plan into migration_plan Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 3 ++- service/tablet_allocator.hh | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 9382c7d0d9..ac0b6e3b01 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -307,7 +307,8 @@ public: plan.merge(std::move(dc_plan)); } - lblogger.info("Prepared {} migrations", plan.size()); + lblogger.info("Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s)", + plan.size(), plan.tablet_migration_count(), plan.resize_decision_count()); co_return std::move(plan); } diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index bd7b68aebe..e9860f392c 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -38,14 +38,17 @@ public: using migrations_vector = utils::chunked_vector; private: migrations_vector _migrations; + table_resize_plan _resize_plan; bool _has_nodes_to_drain = false; public: /// Returns true iff there are decommissioning nodes which own some tablet replicas. bool has_nodes_to_drain() const { return _has_nodes_to_drain; } const migrations_vector& migrations() const { return _migrations; } - bool empty() const { return _migrations.empty(); } - size_t size() const { return _migrations.size(); } + bool empty() const { return _migrations.empty() && !_resize_plan.size(); } + size_t size() const { return _migrations.size() + _resize_plan.size(); } + size_t tablet_migration_count() const { return _migrations.size(); } + size_t resize_decision_count() const { return _resize_plan.size(); } void add(tablet_migration_info info) { _migrations.emplace_back(std::move(info)); @@ -54,11 +57,18 @@ public: void merge(migration_plan&& other) { std::move(other._migrations.begin(), other._migrations.end(), std::back_inserter(_migrations)); _has_nodes_to_drain |= other._has_nodes_to_drain; + _resize_plan.merge(std::move(other._resize_plan)); } void set_has_nodes_to_drain(bool b) { _has_nodes_to_drain = b; } + + const table_resize_plan& resize_plan() const { return _resize_plan; } + + void set_resize_plan(table_resize_plan resize_plan) { + _resize_plan = std::move(resize_plan); + } }; class tablet_allocator_impl; From 7ed5b44d52571cd9074d0e0a32466c8b2195f3d5 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 16 Jan 2024 20:35:17 -0300 Subject: [PATCH 14/25] load_balancer: Implement resize decisions This implements the ability in load balancer to emit split or merge requests, cancel ongoing ones if they're no longer needed, and also finalize those that are ready for the topology changes. That's all based on average tablet size, collected by coordinator from all nodes, and split and merge thresholds. Signed-off-by: Raphael S. Carvalho --- docs/dev/topology-over-raft.md | 41 ++++++ service/tablet_allocator.cc | 239 +++++++++++++++++++++++++++++++- service/tablet_allocator.hh | 3 +- service/tablet_allocator_fwd.hh | 22 +++ test/boost/tablets_test.cc | 139 ++++++++++++++++++- 5 files changed, 437 insertions(+), 7 deletions(-) create mode 100644 service/tablet_allocator_fwd.hh diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 75cbc9c497..8995efde94 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -179,6 +179,47 @@ When tablet is not in transition, the following invariants hold: 1. The storage layer (database) on any node contains writes for keys which belong to the tablet only if that shard is one of the current tablet replicas. +# Tablet splitting + +Each table has its resize metadata stored in group0. + +Resize metadata is composed of: + - resize_type: it's the resize decision type, and can be either of 'split', 'merge' or 'none' + - resize_seq_number: a sequence number that globally identifies the resize; it's monotonically increasing +and increased by one on every new decision. + +In order to determine if a table needs resize, the load balancer will calculate the average tablet size +for a given table, which can be done by dividing average table size[1] by the tablet count. + +[1]: The average size of a table is the total size across all DCs divided by the number of replicas across +all DCs. + +A table will need split if its average size surpasses the split threshold, which is 100% of the target +tablet size, which defaults to 5G. The reasoning is that after split we want average size to return +to the target size. By the same reason, merge threshold is 50% of target size. + +The load balancer might also decide to cancel an ongoing split if it realizes that after split, a merge +will be needed. It does that, to avoid some back-and-forth, which is wasteful. Revoking an ongoing +decision is done by setting resize metadata with type 'none'. + +When the load balancer decides to split a table, it sets resize_type field in metadata with 'split' and +sets resize_seq_number with the next sequence number, which is the current seq number -- loaded from +tablet metadata -- increased by 1. + +All table replicas will listen for the need to split with a replica component named split monitor, that +wakes up on replication map updates, checks the need for split, and if so, it will start segregating the +storage of every tablet replica into left and right sides of the token range spanned by an individual +tablet. When a tablet replica has completed this work, it will then communicate its ready completion +status with the coordinator by loading (mirroring) the resize_seq_number from tablet metadata into its +local state, which is pulled periodically by the coordinator. + +When the coordinator realizes all tablet replicas have completed the splitting work, the load balancer +emits a decision to finalize the split request. The finalization is serialized with migration, as +doubling tablet count would interfere with the migration process. When the state machine leaves the +migration track, then finalize can proceed and split each preexisting tablet into two in the topology +metadata. The replicas will react to that by remapping its compaction groups into a new set which size +is equal to the new tablet count. + # Topology guards In addition to synchronizing with data access operations (e.g. CQL requests), we need to synchronize with diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index ac0b6e3b01..25fd164366 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -226,6 +226,119 @@ class load_balancer { } }; + // We have split and merge thresholds, which work respectively as (target) upper and lower + // bound for average size of tablets. + // + // The merge threshold is 50% of target tablet size (a midpoint between split and merge), + // such that after a merge, the average size is equally far from split and merge. + // The same applies to split. It's 100% of target size, so after split, the average is + // close to the target size (assuming small variations during the operation). + // + // It might happen that during a resize decision, average size changes drastically, and + // split or merge might get cancelled. E.g. after deleting a large partition or lots of + // data becoming suddenly expired. + // If we're splitting, we will only cancel it, if the average size dropped below the + // target size. That's because a merge would be required right after split completes, + // due to the average size dropping below the merge threshold, as tablet count doubles. + const uint64_t _target_tablet_size = default_target_tablet_size; + + static constexpr uint64_t target_max_tablet_size(uint64_t target_tablet_size) { + return target_tablet_size * 2; + } + static constexpr uint64_t target_min_tablet_size(uint64_t max_tablet_size) { + return double(max_tablet_size / 2) * 0.5; + } + + struct table_size_desc { + uint64_t target_max_tablet_size; + uint64_t avg_tablet_size; + locator::resize_decision resize_decision; + size_t tablet_count; + size_t shard_count; + + uint64_t target_min_tablet_size() const noexcept { + return load_balancer::target_min_tablet_size(target_max_tablet_size); + } + }; + + struct cluster_resize_load { + using table_id_and_size_desc = std::pair; + std::vector tables_need_resize; + std::vector tables_being_resized; + + static bool table_needs_merge(const table_size_desc& d) { + // FIXME: ignore merge request if tablet_count == initial_tablets. + return d.tablet_count > 1 && d.avg_tablet_size < d.target_min_tablet_size(); + } + static bool table_needs_split(const table_size_desc& d) { + return d.avg_tablet_size > d.target_max_tablet_size; + } + + bool table_needs_resize(const table_size_desc& d) const { + return table_needs_merge(d) || table_needs_split(d); + } + + // Resize cancellation will account for possible oscillations caused by compaction, etc. + // We shouldn't rush into cancelling an ongoing resize. That will only happen if the + // average size is past the point it would be if either split or merge had completed. + // If we cancel a split, that's because average size dropped so much a merge would be + // required post completion, and vice-versa. + bool table_needs_resize_cancellation(const table_size_desc& d) const { + auto& way = d.resize_decision.way; + if (std::holds_alternative(way)) { + return d.avg_tablet_size < d.target_max_tablet_size / 2; + } else if (std::holds_alternative(way)) { + return d.avg_tablet_size > d.target_min_tablet_size() * 2; + } + return false; + } + + void update(table_id id, table_size_desc d) { + bool table_undergoing_resize = d.resize_decision.split_or_merge(); + + // Resizing tables that no longer need resize will have the resize decision revoked, + // therefore they must be listed as being resized. + if (!table_needs_resize(d) && !table_undergoing_resize) { + return; + } + + auto entry = std::make_pair(id, std::move(d)); + if (table_undergoing_resize) { + tables_being_resized.push_back(entry); + } else { + tables_need_resize.push_back(entry); + } + } + + // Comparator that measures the weight of the need for resizing. + auto resize_urgency_cmp() const { + return [] (const table_id_and_size_desc& a, const table_id_and_size_desc& b) { + auto urgency = [] (const table_size_desc& d) -> double { + // FIXME: only takes into account split today. + return double(d.avg_tablet_size) / d.target_max_tablet_size; + }; + return urgency(a.second) < urgency(b.second); + }; + } + + static locator::resize_decision to_resize_decision(const table_size_desc& d) { + locator::resize_decision decision; + if (table_needs_split(d)) { + decision.way = locator::resize_decision::split{}; + } else if (table_needs_merge(d)) { + decision.way = locator::resize_decision::merge{}; + } + return decision; + } + + // Resize decisions can be revoked with an empty (none) decision, so replicas + // will know they're no longer required to prepare storage for the execution of + // topology changes. + static locator::resize_decision revoke_resize_decision() { + return locator::resize_decision{}; + } + }; + // Per-shard limits for active tablet streaming sessions. // // There is no hard reason for these values being what they are other than @@ -290,8 +403,9 @@ private: } public: - load_balancer(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats) - : _tm(std::move(tm)) + load_balancer(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats, uint64_t target_tablet_size) + : _target_tablet_size(target_tablet_size) + , _tm(std::move(tm)) , _table_load_stats(std::move(table_load_stats)) , _stats(stats) { } @@ -306,12 +420,129 @@ public: lblogger.info("Prepared {} migrations in DC {}", dc_plan.size(), dc); plan.merge(std::move(dc_plan)); } + plan.set_resize_plan(co_await make_resize_plan()); lblogger.info("Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s)", plan.size(), plan.tablet_migration_count(), plan.resize_decision_count()); co_return std::move(plan); } + const locator::table_load_stats* load_stats_for_table(table_id id) const { + if (!_table_load_stats) { + return nullptr; + } + auto it = _table_load_stats->tables.find(id); + return (it != _table_load_stats->tables.end()) ? &it->second : nullptr; + } + + future make_resize_plan() { + table_resize_plan resize_plan; + + if (!_tm->tablets().balancing_enabled()) { + co_return std::move(resize_plan); + } + + cluster_resize_load resize_load; + + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { + auto& tmap = tmap_; + + const auto* table_stats = load_stats_for_table(table); + if (!table_stats) { + continue; + } + + auto avg_tablet_size = table_stats->size_in_bytes / std::max(tmap.tablet_count(), size_t(1)); + // shard presence of a table across the cluster + size_t shard_count = std::accumulate(tmap.tablets().begin(), tmap.tablets().end(), size_t(0), + [] (size_t shard_count, const locator::tablet_info& info) { + return shard_count + info.replicas.size(); + }); + + table_size_desc size_desc { + .target_max_tablet_size = target_max_tablet_size(_target_tablet_size), + .avg_tablet_size = avg_tablet_size, + .resize_decision = tmap.resize_decision(), + .tablet_count = tmap.tablet_count(), + .shard_count = shard_count + }; + + resize_load.update(table, std::move(size_desc)); + lblogger.info("Table {} with tablet_count={} has an average tablet size of {}", table, tmap.tablet_count(), avg_tablet_size); + co_await coroutine::maybe_yield(); + } + + // Emit new resize decisions + + // The limit of resize requests is determined by the shard presence (count) of tables involved. + // If tables still have a low tablet count, the concurrency must be high in order to saturate the cluster. + // If a table covers the entire cluster, and needs split, concurrency will be reduced to 1. + + size_t total_shard_count = std::invoke([&topo = _tm->get_topology()] { + size_t shard_count = 0; + topo.for_each_node([&] (const locator::node* node_ptr) { + shard_count += node_ptr->get_shard_count(); + }); + return shard_count; + }); + size_t resizing_shard_count = std::accumulate(resize_load.tables_being_resized.begin(), resize_load.tables_being_resized.end(), size_t(0), + [] (size_t shard_count, const auto& table_desc) { + return shard_count + table_desc.second.shard_count; + }); + // Limits the amount of new resize requests to be generated in a single round, as each one is a mutation to group0. + constexpr size_t max_new_resize_requests = 10; + + auto available_shards = std::max(ssize_t(total_shard_count) - ssize_t(resizing_shard_count), ssize_t(0)); + + std::make_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp()); + while (resize_load.tables_need_resize.size() && resize_plan.size() < max_new_resize_requests) { + const auto& [table, size_desc] = resize_load.tables_need_resize.front(); + + if (resize_plan.size() > 0 && available_shards < size_desc.shard_count) { + break; + } + + auto resize_decision = cluster_resize_load::to_resize_decision(size_desc); + lblogger.info("Emitting resize decision of type {} for table {} due to avg tablet size of {}", + resize_decision.type_name(), table, size_desc.avg_tablet_size); + resize_plan.resize[table] = std::move(resize_decision); + + std::pop_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp()); + resize_load.tables_need_resize.pop_back(); + + available_shards -= size_desc.shard_count; + } + + // Revoke resize decision if any table no longer needs it + // Also communicate coordinator if any table is ready for finalizing resizing + + for (const auto& [table, size_desc] : resize_load.tables_being_resized) { + if (resize_load.table_needs_resize_cancellation(size_desc)) { + resize_plan.resize[table] = cluster_resize_load::revoke_resize_decision(); + lblogger.info("Revoking resize decision for table {} due to avg tablet size of {}", table, size_desc.avg_tablet_size); + continue; + } + + auto& tmap = _tm->tablets().get_tablet_map(table); + + const auto* table_stats = load_stats_for_table(table); + if (!table_stats) { + continue; + } + + // If all replicas have completed split work for the current sequence number, it means that + // load balancer can emit finalize decision, for split to be completed. + if (table_stats->split_ready_seq_number == tmap.resize_decision().sequence_number) { + resize_plan.resize[table] = cluster_resize_load::revoke_resize_decision(); + resize_plan.finalize_resize.insert(table); + lblogger.info("Finalizing resize decision for table {} as all replicas agree on sequence number {}", + table, table_stats->split_ready_seq_number); + } + } + + co_return std::move(resize_plan); + } + future make_plan(dc_name dc) { migration_plan plan; @@ -366,6 +597,7 @@ public: for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { auto& tmap = tmap_; + co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) { auto trinfo = tmap.get_tablet_transition_info(tid); @@ -867,7 +1099,8 @@ public: } future balance_tablets(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats) { - load_balancer lb(tm, std::move(table_load_stats), _load_balancer_stats); + // TODO: make target size a live-update-able config for tests. + load_balancer lb(tm, std::move(table_load_stats), _load_balancer_stats, default_target_tablet_size); co_return co_await lb.make_plan(); } diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index e9860f392c..a5ff9dd62d 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -10,6 +10,7 @@ #include "replica/database.hh" #include "locator/tablets.hh" +#include "tablet_allocator_fwd.hh" namespace service { @@ -71,8 +72,6 @@ public: } }; -class tablet_allocator_impl; - class tablet_allocator { public: struct config { diff --git a/service/tablet_allocator_fwd.hh b/service/tablet_allocator_fwd.hh new file mode 100644 index 0000000000..8873cd02a4 --- /dev/null +++ b/service/tablet_allocator_fwd.hh @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include + +namespace service { + +// This the default target size of tablets. +static constexpr uint64_t default_target_tablet_size = 5UL * 1024 * 1024 * 1024; + +class tablet_allocator_impl; + +class tablet_allocator; + +} diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index d93d5fb36a..df6d8f1a26 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -649,6 +649,21 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) { } } +static +void apply_resize_plan(token_metadata& tm, const migration_plan& plan) { + for (auto [table_id, resize_decision] : plan.resize_plan().resize) { + tablet_map& tmap = tm.tablets().get_tablet_map(table_id); + resize_decision.sequence_number = tmap.resize_decision().sequence_number + 1; + tmap.set_resize_decision(resize_decision); + } + for (auto table_id : plan.resize_plan().finalize_resize) { + auto& old_tmap = tm.tablets().get_tablet_map(table_id); + testlog.info("Setting new tablet map of size {}", old_tmap.tablet_count() * 2); + tablet_map tmap(old_tmap.tablet_count() * 2); + tm.tablets().set_tablet_map(table_id, std::move(tmap)); + } +} + // Reflects the plan in a given token metadata as if the migrations were fully executed. static void apply_plan(token_metadata& tm, const migration_plan& plan) { @@ -658,6 +673,7 @@ void apply_plan(token_metadata& tm, const migration_plan& plan) { tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst); tmap.set_tablet(mig.tablet.tablet, tinfo); } + apply_resize_plan(tm, plan); } // Reflects the plan in a given token metadata as if the migrations were started but not yet executed. @@ -668,12 +684,13 @@ void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) { auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); tmap.set_tablet_transition_info(mig.tablet.tablet, migration_to_transition_info(tinfo, mig)); } + apply_resize_plan(tm, plan); } static -void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm) { +void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm, locator::load_stats_ptr load_stats = {}) { while (true) { - auto plan = talloc.balance_tablets(stm.get()).get0(); + auto plan = talloc.balance_tablets(stm.get(), load_stats).get0(); if (plan.empty()) { break; } @@ -1603,3 +1620,121 @@ SEASTAR_THREAD_TEST_CASE(basic_tablet_storage_splitting_test) { }, bool(false), std::logical_or()).get0(), true); }, std::move(cfg)).get(); } + +SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { + do_with_cql_env_thread([] (auto& e) { + inet_address ip1("192.168.0.1"); + inet_address ip2("192.168.0.2"); + + auto host1 = host_id(next_uuid()); + auto host2 = host_id(next_uuid()); + + auto table1 = table_id(next_uuid()); + + unsigned shard_count = 2; + + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = ip1, + .local_dc_rack = locator::endpoint_dc_rack::default_location + } + }); + + stm.mutate_token_metadata([&] (token_metadata& tm) { + tm.update_host_id(host1, ip1); + tm.update_host_id(host2, ip2); + tm.update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + tm.update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count); + + tablet_map tmap(2); + for (auto tid : tmap.tablet_ids()) { + tmap.set_tablet(tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, tests::random::get_int(0, shard_count - 1)}, + tablet_replica {host2, tests::random::get_int(0, shard_count - 1)}, + } + }); + } + tablet_metadata tmeta; + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future<>(); + }).get(); + + auto tablet_count = [&] { + return stm.get()->tablets().get_tablet_map(table1).tablet_count(); + }; + auto resize_decision = [&] { + return stm.get()->tablets().get_tablet_map(table1).resize_decision(); + }; + + auto do_rebalance_tablets = [&] (locator::load_stats load_stats) { + rebalance_tablets(e.get_tablet_allocator().local(), stm, make_lw_shared(std::move(load_stats))); + }; + + const size_t initial_tablets = tablet_count(); + const uint64_t max_tablet_size = service::default_target_tablet_size * 2; + auto to_size_in_bytes = [&] (double max_tablet_size_pctg) -> uint64_t { + return (max_tablet_size * max_tablet_size_pctg) * tablet_count(); + }; + + + const auto initial_ready_seq_number = std::numeric_limits::min(); + + // there are 2 tablets, each with avg size hitting merge threshold, so merge request is emitted + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.0), .split_ready_seq_number = initial_ready_seq_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + BOOST_REQUIRE(tablet_count() == initial_tablets); + BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); + } + + // avg size moved above target size, so merge is cancelled + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(0.75), .split_ready_seq_number = initial_ready_seq_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + BOOST_REQUIRE(tablet_count() == initial_tablets); + BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); + } + + // avg size hits split threshold, and balancer emits split request + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(1.1), .split_ready_seq_number = initial_ready_seq_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + BOOST_REQUIRE(tablet_count() == initial_tablets); + BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); + BOOST_REQUIRE(resize_decision().sequence_number > 0); + } + + // replicas set their split status as ready, and load balancer finalizes split generating a new + // tablet map, twice as large as the previous one. + { + locator::load_stats load_stats = { + .tables = { + { table1, table_load_stats{ .size_in_bytes = to_size_in_bytes(1.1), .split_ready_seq_number = resize_decision().sequence_number }}, + } + }; + + do_rebalance_tablets(std::move(load_stats)); + + BOOST_REQUIRE(tablet_count() == initial_tablets * 2); + BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); + } + }).get(); +} From 638e6e30cbe74f19b4234e1daa40b78fdd2bd530 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 17 Jan 2024 00:16:06 -0300 Subject: [PATCH 15/25] db: Make target tablet size a live-updateable config option Signed-off-by: Raphael S. Carvalho --- db/config.cc | 5 +++++ db/config.hh | 3 +++ service/tablet_allocator.cc | 3 +-- service/tablet_allocator.hh | 5 ++++- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/db/config.cc b/db/config.cc index 13cc57f994..9bdcd49d72 100644 --- a/db/config.cc +++ b/db/config.cc @@ -29,6 +29,7 @@ #include "config.hh" #include "extensions.hh" #include "log.hh" +#include "service/tablet_allocator_fwd.hh" #include "utils/config_file_impl.hh" #include #include @@ -1131,6 +1132,10 @@ db::config::config(std::shared_ptr exts) , maximum_replication_factor_warn_threshold(this, "maximum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, -1, "") , maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "") , tablets_initial_scale_factor(this, "tablets_initial_scale_factor", value_status::Used, 1, "Calculated initial tablets are multiplied by this number") + , target_tablet_size_in_bytes(this, "target_tablet_size_in_bytes", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size, + "Allows target tablet size to be configured. Defaults to 5G (in bytes). Maintaining tablets at reasonable sizes is important to be able to " \ + "redistribute load. A higher value means tablet migration throughput can be reduced. A lower value may cause number of tablets to increase significantly, " \ + "potentially resulting in performance drawbacks.") , replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.") , replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.") , service_levels_interval(this, "service_levels_interval_ms", liveness::LiveUpdate, value_status::Used, 10000, "Controls how often service levels module polls configuration table") diff --git a/db/config.hh b/db/config.hh index 2a1b438d10..64c13bf137 100644 --- a/db/config.hh +++ b/db/config.hh @@ -466,7 +466,10 @@ public: named_value minimum_replication_factor_warn_threshold; named_value maximum_replication_factor_warn_threshold; named_value maximum_replication_factor_fail_threshold; + named_value tablets_initial_scale_factor; + named_value target_tablet_size_in_bytes; + named_value>> replication_strategy_warn_list; named_value>> replication_strategy_fail_list; diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 25fd164366..9fd65e3a7f 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -1099,8 +1099,7 @@ public: } future balance_tablets(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats) { - // TODO: make target size a live-update-able config for tests. - load_balancer lb(tm, std::move(table_load_stats), _load_balancer_stats, default_target_tablet_size); + load_balancer lb(tm, std::move(table_load_stats), _load_balancer_stats, _db.get_config().target_tablet_size_in_bytes()); co_return co_await lb.make_plan(); } diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index a5ff9dd62d..f4f03abde5 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -8,9 +8,10 @@ #pragma once -#include "replica/database.hh" +#include "replica/database_fwd.hh" #include "locator/tablets.hh" #include "tablet_allocator_fwd.hh" +#include "locator/token_metadata_fwd.hh" namespace service { @@ -72,6 +73,8 @@ public: } }; +class migration_notifier; + class tablet_allocator { public: struct config { From 3ef792c4e848439cfbd5a18a9df1227c3682880c Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 17 Jan 2024 00:34:45 -0300 Subject: [PATCH 16/25] load_balancer: Introduce metrics for resize decisions Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 9fd65e3a7f..e3c5cf60c7 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -41,11 +41,18 @@ struct load_balancer_node_stats { double load = 0; }; +struct load_balancer_cluster_stats { + uint64_t resizes_emitted = 0; + uint64_t resizes_revoked = 0; + uint64_t resizes_finalized = 0; +}; + using dc_name = sstring; class load_balancer_stats_manager { std::unordered_map> _dc_stats; std::unordered_map> _node_stats; + load_balancer_cluster_stats _cluster_stats; seastar::metrics::label dc_label{"target_dc"}; seastar::metrics::label node_label{"target_node"}; seastar::metrics::metric_groups _metrics; @@ -72,7 +79,24 @@ class load_balancer_stats_manager { stats.load)(dc_lb)(node_lb) }); } + + void setup_metrics(load_balancer_cluster_stats& stats) { + namespace sm = seastar::metrics; + // FIXME: we can probably improve it by making it per resize type (split, merge or none). + _metrics.add_group("load_balancer", { + sm::make_counter("resizes_emitted", sm::description("number of resizes produced by the load balancer"), + stats.resizes_emitted), + sm::make_counter("resizes_revoked", sm::description("number of resizes revoked by the load balancer"), + stats.resizes_revoked), + sm::make_counter("resizes_finalized", sm::description("number of resizes finalized by the load balancer"), + stats.resizes_finalized) + }); + } public: + load_balancer_stats_manager() { + setup_metrics(_cluster_stats); + } + load_balancer_dc_stats& for_dc(const dc_name& dc) { auto it = _dc_stats.find(dc); if (it == _dc_stats.end()) { @@ -93,6 +117,10 @@ public: return *it->second; } + load_balancer_cluster_stats& for_cluster() { + return _cluster_stats; + } + void unregister() { _metrics.clear(); } @@ -506,6 +534,7 @@ public: lblogger.info("Emitting resize decision of type {} for table {} due to avg tablet size of {}", resize_decision.type_name(), table, size_desc.avg_tablet_size); resize_plan.resize[table] = std::move(resize_decision); + _stats.for_cluster().resizes_emitted++; std::pop_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp()); resize_load.tables_need_resize.pop_back(); @@ -519,6 +548,7 @@ public: for (const auto& [table, size_desc] : resize_load.tables_being_resized) { if (resize_load.table_needs_resize_cancellation(size_desc)) { resize_plan.resize[table] = cluster_resize_load::revoke_resize_decision(); + _stats.for_cluster().resizes_revoked++; lblogger.info("Revoking resize decision for table {} due to avg tablet size of {}", table, size_desc.avg_tablet_size); continue; } @@ -534,6 +564,7 @@ public: // load balancer can emit finalize decision, for split to be completed. if (table_stats->split_ready_seq_number == tmap.resize_decision().sequence_number) { resize_plan.resize[table] = cluster_resize_load::revoke_resize_decision(); + _stats.for_cluster().resizes_finalized++; resize_plan.finalize_resize.insert(table); lblogger.info("Finalizing resize decision for table {} as all replicas agree on sequence number {}", table, table_stats->split_ready_seq_number); From e0de3dd8446c8a0a6eb393a40f617b4d95a77cd4 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 14:24:12 -0300 Subject: [PATCH 17/25] topology_cordinator: Generate updates for resize decisions made by balancer Signed-off-by: Raphael S. Carvalho --- locator/tablets.cc | 8 ++++++++ locator/tablets.hh | 1 + service/topology_coordinator.cc | 13 +++++++++++++ 3 files changed, 22 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index 07a377db71..61c5cc59ff 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -414,6 +414,14 @@ sstring resize_decision::type_name() const { return index_to_string[way.index()]; } +resize_decision::seq_number_t resize_decision::next_sequence_number() const { + // Doubt we'll ever wrap around, but just in case. + // Even if sequence number is bumped every second, it would take 292471208677 years + // for it to happen, about 21x the age of the universe, or ~11x according to the new + // prediction after james webb. + return (sequence_number == std::numeric_limits::max()) ? 0 : sequence_number + 1; +} + table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexcept { size_in_bytes = size_in_bytes + s.size_in_bytes; split_ready_seq_number = std::min(split_ready_seq_number, s.split_ready_seq_number); diff --git a/locator/tablets.hh b/locator/tablets.hh index 36eb3447ef..3c9908cfad 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -258,6 +258,7 @@ struct resize_decision { } bool operator==(const resize_decision&) const; sstring type_name() const; + seq_number_t next_sequence_number() const; }; struct table_load_stats { diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index c08d5357cd..617eaaf251 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -879,6 +879,19 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await coroutine::maybe_yield(); generate_migration_update(out, guard, mig); } + + for (auto [table_id, resize_decision] : plan.resize_plan().resize) { + auto s = _db.find_schema(table_id); + auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id); + // Sequence number is monotonically increasing, globally. Therefore, it can be used to identify a decision. + resize_decision.sequence_number = tmap.resize_decision().next_sequence_number(); + rtlogger.debug("Generating resize decision for table {} of type {} and sequence number {}", + table_id, resize_decision.type_name(), resize_decision.sequence_number); + out.emplace_back( + replica::tablet_mutation_builder(guard.write_timestamp(), table_id) + .set_resize_decision(std::move(resize_decision)) + .build()); + } } // When "drain" is true, we migrate tablets only as long as there are nodes to drain From cfa8200da59d83454f16780e71d1ecf4883b928d Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 28 Dec 2023 14:26:19 -0300 Subject: [PATCH 18/25] storage_service: Implement split monitor Signed-off-by: Raphael S. Carvalho --- service/storage_service.cc | 92 ++++++++++++++++++++++++++++++++++++++ service/storage_service.hh | 8 ++++ 2 files changed, 100 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index a98b6e0cdf..2f8b5109ae 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1430,6 +1430,10 @@ future<> storage_service::join_token_ring(shardedfinish_setup_after_join(*this, _qp, _migration_manager.local(), _raft_topology_change_enabled); + + // Initializes monitor only after updating local topology. + start_tablet_split_monitor(); + co_return; } @@ -2552,6 +2556,9 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt for (auto it = table_erms.begin(); it != table_erms.end(); ) { auto& cf = db.find_column_family(it->first); cf.update_effective_replication_map(std::move(it->second)); + if (cf.uses_tablets()) { + register_tablet_split_candidate(it->first); + } it = table_erms.erase(it); } @@ -2576,6 +2583,8 @@ future<> storage_service::stop() { _listeners.clear(); co_await _async_gate.close(); co_await std::move(_node_ops_abort_thread); + _tablet_split_monitor_event.signal(); + co_await std::move(_tablet_split_monitor); } future<> storage_service::wait_for_group0_stop() { @@ -4498,6 +4507,89 @@ future<> storage_service::load_tablet_metadata() { }, acquire_merge_lock::no); } +future<> storage_service::process_tablet_split_candidate(table_id table) { + auto all_compaction_groups_split = [&] () mutable { + return _db.map_reduce0([table_ = table] (replica::database& db) { + auto all_split = db.find_column_family(table_).all_storage_groups_split(); + return make_ready_future(all_split); + }, bool{true}, std::logical_and()); + }; + + auto split_all_compaction_groups = [&] () -> future<> { + return _db.invoke_on_all([table] (replica::database& db) -> future<> { + return db.find_column_family(table).split_all_storage_groups(); + }); + }; + + exponential_backoff_retry split_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300)); + bool sleep = false; + + while (!_async_gate.is_closed() && !_group0_as.abort_requested()) { + try { + // Ensures that latest changes to tablet metadata, in group0, are visible + auto guard = co_await _group0->client().start_operation(&_group0_as); + auto& tmap = get_token_metadata().tablets().get_tablet_map(table); + if (!tmap.needs_split()) { + release_guard(std::move(guard)); + break; + } + + if (co_await all_compaction_groups_split()) { + slogger.info0("All compaction groups of table {} are split ready.", table); + release_guard(std::move(guard)); + break; + } else { + release_guard(std::move(guard)); + co_await split_all_compaction_groups(); + } + } catch (...) { + slogger.error("Failed to complete splitting of table {} due to {}, retrying after {} seconds", + table, std::current_exception(), split_retry.sleep_time()); + sleep = true; + break; + } + if (sleep) { + co_await split_retry.retry(_group0_as); + } + } +} + +void storage_service::register_tablet_split_candidate(table_id table) noexcept { + if (this_shard_id() != 0) { + return; + } + try { + if (get_token_metadata().tablets().get_tablet_map(table).needs_split()) { + _tablet_split_candidates.push_back(table); + _tablet_split_monitor_event.signal(); + } + } catch (...) { + slogger.error("Unable to register table {} as candidate for tablet splitting, due to {}", table, std::current_exception()); + } +} + +future<> storage_service::run_tablet_split_monitor() { + while (!_async_gate.is_closed() && !_group0_as.abort_requested()) { + while (!_tablet_split_candidates.empty()) { + auto candidate = _tablet_split_candidates.front(); + _tablet_split_candidates.pop_front(); + co_await process_tablet_split_candidate(candidate); + } + co_await _tablet_split_monitor_event.when(); + } +} + +void storage_service::start_tablet_split_monitor() { + if (this_shard_id() != 0) { + return; + } + if (!_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) { + return; + } + slogger.info("Starting the tablet split monitor..."); + _tablet_split_monitor = run_tablet_split_monitor(); +} + future<> storage_service::snitch_reconfigured() { assert(this_shard_id() == 0); auto& snitch = _snitch.local(); diff --git a/service/storage_service.hh b/service/storage_service.hh index cbd053697e..0f313aa64e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -152,6 +152,10 @@ private: std::vector _listeners; gate _async_gate; + condition_variable _tablet_split_monitor_event; + std::deque _tablet_split_candidates; + future<> _tablet_split_monitor = make_ready_future<>(); + std::unordered_map _node_ops; std::list> _node_ops_abort_queue; seastar::condition_variable _node_ops_abort_cond; @@ -172,6 +176,9 @@ private: inet_address host2ip(locator::host_id); // Handler for table load stats RPC. future load_stats_for_tablet_based_tables(); + future<> process_tablet_split_candidate(table_id); + void register_tablet_split_candidate(table_id) noexcept; + future<> run_tablet_split_monitor(); public: storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, @@ -197,6 +204,7 @@ public: future<> uninit_messaging_service(); future<> load_tablet_metadata(); + void start_tablet_split_monitor(); private: using acquire_merge_lock = bool_class; From 9342792173d927b117b73a9fb0a6cffa31e83b97 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 17 Jan 2024 01:06:24 -0300 Subject: [PATCH 19/25] replica: Update table split status if completed split compaction work The table replica will say to coordinator that its split status is ready by loading the sequence number from tablet metadata into its local state, which is pulled periodically by the coordinator via RPC. Signed-off-by: Raphael S. Carvalho --- replica/database.hh | 2 ++ replica/table.cc | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index 2d50a86747..26d7bec644 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -578,6 +578,8 @@ public: const std::vector& old_sstables); }; + // Precondition: table needs tablet splitting. + // Returns true if all storage of table is ready for splitting. bool all_storage_groups_split(); future<> split_all_storage_groups(); diff --git a/replica/table.cc b/replica/table.cc index 3b02a60950..2f628b0f94 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -674,8 +674,24 @@ future<> storage_group::split(compaction_group_list& list, sstables::compaction_ } bool table::all_storage_groups_split() { - return std::ranges::all_of(_storage_groups, - std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(_compaction_groups))); + auto id = _schema->id(); + auto& tmap = _erm->get_token_metadata().tablets().get_tablet_map(id); + if (_split_ready_seq_number == tmap.resize_decision().sequence_number) { + return true; + } + + auto split_ready = std::ranges::all_of(_storage_groups, + std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(_compaction_groups))); + + // The table replica will say to coordinator that its split status is ready by + // mirroring the sequence number from tablet metadata into its local state, + // which is pulled periodically by coordinator. + if (split_ready) { + _split_ready_seq_number = tmap.resize_decision().sequence_number; + tlogger.info0("Setting split ready sequence number to {} for table {}.{}", + _split_ready_seq_number, _schema->ks_name(), _schema->cf_name()); + } + return split_ready; } sstables::compaction_type_options::split table::split_compaction_options() const noexcept { From bf6f692f60091ced1847b211cb738c0d1a13dc4b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 17 Jan 2024 02:40:41 -0300 Subject: [PATCH 20/25] service: Split tablet map when split request is finalized When load balancer emits finalize request, the coordinator will now react to it by splitting each tablet in the current tablet map and then committing the new map. There can be no active migration while we do it. Signed-off-by: Raphael S. Carvalho --- service/tablet_allocator.cc | 31 +++++++++++++++++++++++++++++++ service/tablet_allocator.hh | 2 ++ service/topology_coordinator.cc | 21 +++++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index e3c5cf60c7..9ed08345e4 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -1169,6 +1169,33 @@ public: _load_balancer_stats.unregister(); } + // The splitting of tablets today is completely based on the power-of-two constraint. + // A tablet of id X is split into 2 new tablets, which new ids are (x << 1) and + // (x << 1) + 1. + // So a tablet of id 0 is remapped into ids 0 and 1. Another of id 1 is remapped + // into ids 2 and 3, and so on. + future split_tablets(token_metadata_ptr tm, table_id table) { + auto& tablets = tm->tablets().get_tablet_map(table); + + tablet_map new_tablets(tablets.tablet_count() * 2); + + for (tablet_id tid : tablets.tablet_ids()) { + co_await coroutine::maybe_yield(); + + tablet_id new_left_tid = tablet_id(tid.value() << 1); + tablet_id new_right_tid = tablet_id(new_left_tid.value() + 1); + + auto& tablet_info = tablets.get_tablet_info(tid); + + new_tablets.set_tablet(new_left_tid, tablet_info); + new_tablets.set_tablet(new_right_tid, tablet_info); + } + + lblogger.info("Split tablets for table {}, increasing tablet count from {} to {}", + table, tablets.tablet_count(), new_tablets.tablet_count()); + co_return std::move(new_tablets); + } + // FIXME: Handle materialized views. }; @@ -1184,6 +1211,10 @@ future tablet_allocator::balance_tablets(locator::token_metadata return impl().balance_tablets(std::move(tm), std::move(load_stats)); } +future tablet_allocator::split_tablets(locator::token_metadata_ptr tm, table_id table) { + return impl().split_tablets(std::move(tm), table); +} + tablet_allocator_impl& tablet_allocator::impl() { return static_cast(*_impl); } diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index f4f03abde5..0c2ae31a5c 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -118,6 +118,8 @@ public: /// future balance_tablets(locator::token_metadata_ptr, locator::load_stats_ptr = {}); + future split_tablets(locator::token_metadata_ptr, table_id); + /// Should be called when the node is no longer a leader. void on_leadership_lost(); }; diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 617eaaf251..4a5759f4bb 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -25,6 +25,7 @@ #include "message/messaging_service.hh" #include "replica/database.hh" #include "replica/tablet_mutation_builder.hh" +#include "replica/tablets.hh" #include "service/raft/join_node.hh" #include "service/raft/raft_address_map.hh" #include "service/raft/raft_group0.hh" @@ -875,9 +876,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } future<> generate_migration_updates(std::vector& out, const group0_guard& guard, const migration_plan& plan) { + std::unordered_set new_transitions; for (const tablet_migration_info& mig : plan.migrations()) { co_await coroutine::maybe_yield(); generate_migration_update(out, guard, mig); + new_transitions.insert(mig.tablet.table); } for (auto [table_id, resize_decision] : plan.resize_plan().resize) { @@ -892,6 +895,24 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { .set_resize_decision(std::move(resize_decision)) .build()); } + + // FIXME: Finalize split requests when exiting the tablet migration track. + for (auto table_id : plan.resize_plan().finalize_resize) { + auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id); + // Only finalize split request if there's no ongoing transition or new ones for a given table. + if (tmap.transitions().size() > 0 || new_transitions.contains(table_id)) { + continue; + } + + auto s = _db.find_schema(table_id); + auto new_tablet_map = co_await _tablet_allocator.split_tablets(get_token_metadata_ptr(), table_id); + out.emplace_back(co_await replica::tablet_map_to_mutation( + new_tablet_map, + table_id, + s->ks_name(), + s->cf_name(), + guard.write_timestamp())); + } } // When "drain" is true, we migrate tablets only as long as there are nodes to drain From 85020861fca3b92f42f7b3db47c1c8907a755182 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 17 Jan 2024 02:44:23 -0300 Subject: [PATCH 21/25] replica: Remap compaction groups when tablet split is finalized When coordinator executes split, i.e. commit the new tablet map with each tablet split into two, all replicas must then proceed with remapping of compaction groups that were previously split. Signed-off-by: Raphael S. Carvalho --- replica/compaction_group.hh | 16 ++++-- replica/database.hh | 5 ++ replica/table.cc | 111 +++++++++++++++++++++++++++++++----- 3 files changed, 112 insertions(+), 20 deletions(-) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index c6dcd6ebe8..50b4ccc45f 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -36,7 +36,7 @@ class compaction_group { table& _t; class table_state; std::unique_ptr _table_state; - const size_t _group_id; + size_t _group_id; // Tokens included in this compaction_groups dht::token_range _token_range; compaction::compaction_strategy_state _compaction_strategy_state; @@ -80,6 +80,11 @@ public: compaction_group(table& t, size_t gid, dht::token_range token_range); + void update_id_and_range(size_t id, dht::token_range token_range) { + _group_id = id; + _token_range = std::move(token_range); + } + size_t group_id() const noexcept { return _group_id; } @@ -174,20 +179,21 @@ using compaction_group_list = compaction_group::list_t; // by the same storage group. class storage_group { compaction_group_ptr _main_cg; - compaction_group_ptr _left_cg; - compaction_group_ptr _right_cg; + std::vector _split_ready_groups; private: bool splitting_mode() const { - return bool(_left_cg) && bool(_right_cg); + return !_split_ready_groups.empty(); } + size_t to_idx(locator::tablet_range_side) const; public: - storage_group(compaction_group_ptr cg, compaction_group_list& list); + storage_group(compaction_group_ptr cg, compaction_group_list* list); const dht::token_range& token_range() const noexcept; size_t memtable_count() const noexcept; compaction_group_ptr& main_compaction_group() noexcept; + std::vector split_ready_compaction_groups() &&; compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept; uint64_t live_disk_space_used() const noexcept; diff --git a/replica/database.hh b/replica/database.hh index 26d7bec644..050500b6de 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -589,6 +589,11 @@ public: // be split once it returns. future<> maybe_split_compaction_group_of(locator::tablet_id); private: + // Called when coordinator executes tablet splitting, i.e. commit the new tablet map with + // each tablet split into two, so this replica will remap all of its compaction groups + // that were previously split. + void handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap); + sstables::compaction_type_options::split split_compaction_options() const noexcept; // Select a compaction group from a given token. diff --git a/replica/table.cc b/replica/table.cc index 2f628b0f94..c192d51724 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -552,7 +552,7 @@ public: storage_group_vector make_storage_groups(compaction_group_list& list) const override { storage_group_vector r; auto cg = std::make_unique(_t, size_t(0), dht::token_range::make_open_ended_both_sides()); - r.push_back(std::make_unique(std::move(cg), list)); + r.push_back(std::make_unique(std::move(cg), &list)); return r; } std::pair storage_group_of(dht::token) const override { @@ -598,7 +598,7 @@ public: } // FIXME: don't allocate compaction groups for tablets that aren't present in this shard. auto cg = std::make_unique(_t, tid.value(), std::move(range)); - ret.emplace_back(std::make_unique(std::move(cg), list)); + ret.emplace_back(std::make_unique(std::move(cg), &list)); } return ret; } @@ -615,9 +615,12 @@ bool table::uses_tablets() const { return _erm && _erm->get_replication_strategy().uses_tablets(); } -storage_group::storage_group(compaction_group_ptr cg, compaction_group_list& list) +storage_group::storage_group(compaction_group_ptr cg, compaction_group_list* list) : _main_cg(std::move(cg)) { - list.push_back(*_main_cg); + // FIXME: get rid of compaction group list. + if (list) { + list->push_back(*_main_cg); + } } const dht::token_range& storage_group::token_range() const noexcept { @@ -628,20 +631,25 @@ compaction_group_ptr& storage_group::main_compaction_group() noexcept { return _main_cg; } +std::vector storage_group::split_ready_compaction_groups() && { + return std::exchange(_split_ready_groups, {}); +} + +size_t storage_group::to_idx(locator::tablet_range_side side) const { + return size_t(side); +} + compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_range_side side) noexcept { if (splitting_mode()) { - return (side == locator::tablet_range_side::left) ? _left_cg : _right_cg; + return _split_ready_groups[to_idx(side)]; } return _main_cg; } utils::small_vector storage_group::compaction_groups() noexcept { utils::small_vector cgs = {_main_cg.get()}; - if (_left_cg) { - cgs.push_back(_left_cg.get()); - } - if (_right_cg) { - cgs.push_back(_right_cg.get()); + for (auto& cg : _split_ready_groups) { + cgs.push_back(cg.get()); } return cgs; } @@ -654,10 +662,10 @@ bool storage_group::set_split_mode(compaction_group_list& list) { list.push_back(*cg); return cg; }; - auto left_cg = create_cg(); - auto right_cg = create_cg(); - _left_cg = std::move(left_cg); - _right_cg = std::move(right_cg); + std::vector split_ready_groups(2); + split_ready_groups[to_idx(locator::tablet_range_side::left)] = create_cg(); + split_ready_groups[to_idx(locator::tablet_range_side::right)] = create_cg(); + _split_ready_groups = std::move(split_ready_groups); } // The storage group is considered "split ready" if its main compaction group is empty. @@ -1893,8 +1901,80 @@ locator::table_load_stats table::table_load_stats(std::functionid(); + storage_group_vector new_storage_groups; + new_storage_groups.resize(new_tmap.tablet_count()); + + if (!old_tablet_count) { + on_internal_error(tlogger, format("Table {} had zero tablets, it should never happen when splitting.", table_id)); + } + + // NOTE: exception when applying replica changes to reflect token metadata will abort for obvious reasons, + // so exception safety is not required here. + + unsigned growth_factor = log2ceil(new_tmap.tablet_count() / old_tablet_count); + unsigned split_size = 1 << growth_factor; + tlogger.debug("Growth factor: {}, split size {}", growth_factor, split_size); + + if (old_tablet_count*split_size != new_tmap.tablet_count()) { + on_internal_error(tlogger, format("New tablet count for table {} is unexpected, actual: {}, expected {}.", + table_id, new_tmap.tablet_count(), old_tablet_count*split_size)); + } + + for (auto id = 0; id < _storage_groups.size(); id++) { + auto& sg = _storage_groups[id]; + if (!sg) { + continue; + } + if (!sg->main_compaction_group()->empty()) { + on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \ + "therefore groups cannot be remapped with the new tablet count.", + id, table_id)); + } + unsigned first_new_id = id << growth_factor; + auto split_ready_groups = std::move(*sg).split_ready_compaction_groups(); + if (split_ready_groups.size() != split_size) { + on_internal_error(tlogger, format("Found {} split ready compaction groups, but expected {} instead.", split_ready_groups.size(), split_size)); + } + for (auto i = 0; i < split_size; i++) { + auto group_id = first_new_id + i; + split_ready_groups[i]->update_id_and_range(group_id, new_tmap.get_token_range(locator::tablet_id(group_id))); + new_storage_groups[group_id] = std::make_unique(std::move(split_ready_groups[i]), nullptr); + } + + tlogger.debug("Remapping tablet {} of table {} into new tablets [{}].", + id, table_id, fmt::join(boost::irange(first_new_id, first_new_id+split_size), ", ")); + } + + auto old_groups = std::exchange(_storage_groups, std::move(new_storage_groups)); + + // Remove old main groups in background, they're unused, but they need to be deregistered properly + (void) do_with(std::move(old_groups), _async_gate.hold(), [] (storage_group_vector& groups, gate::holder&) { + return do_for_each(groups, [] (std::unique_ptr& sg) { + return sg->main_compaction_group()->stop(); + }); + }); +} + void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { auto old_erm = std::exchange(_erm, std::move(erm)); + + if (uses_tablets()) { + auto table_id = _schema->id(); + auto tablet_count = [table_id] (locator::effective_replication_map_ptr& erm) { + return erm->get_token_metadata().tablets().get_tablet_map(table_id).tablet_count(); + }; + + size_t old_tablet_count = tablet_count(old_erm); + size_t new_tablet_count = tablet_count(_erm); + + if (new_tablet_count > old_tablet_count) { + tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets", + _schema->ks_name(), _schema->cf_name(), old_tablet_count, new_tablet_count); + handle_tablet_split_completion(old_tablet_count, _erm->get_token_metadata().tablets().get_tablet_map(table_id)); + } + } if (old_erm) { old_erm->invalidate(); } @@ -2171,7 +2251,8 @@ size_t compaction_group::memtable_count() const noexcept { size_t storage_group::memtable_count() const noexcept { auto memtable_count = [] (const compaction_group_ptr& cg) { return cg ? cg->memtable_count() : 0; }; - return memtable_count(_main_cg) + memtable_count(_left_cg) + memtable_count(_right_cg); + return memtable_count(_main_cg) + + boost::accumulate(_split_ready_groups | boost::adaptors::transformed(std::mem_fn(&compaction_group::memtable_count)), size_t(0)); } future<> table::flush(std::optional pos) { From 4245ad333a6a2e058a75a47a4b02745795ac1676 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 17 Jan 2024 14:40:10 -0300 Subject: [PATCH 22/25] test/topology_experimental_raft: Disable load balancer in test fencing This is easier to reproducer after changes in load balancer, to emit resize decisions, which in turn results in topology version being incremented, and that might race with fencing tests that manipulate the topology version manually. Signed-off-by: Raphael S. Carvalho --- test/topology_experimental_raft/test_fencing.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/topology_experimental_raft/test_fencing.py b/test/topology_experimental_raft/test_fencing.py index d545d47e51..054138326e 100644 --- a/test/topology_experimental_raft/test_fencing.py +++ b/test/topology_experimental_raft/test_fencing.py @@ -84,6 +84,9 @@ async def test_fence_writes(request, manager: ManagerClient): logger.info(f'Waiting for cql and hosts') host2 = (await wait_for_cql_and_get_hosts(cql, [servers[2]], time.time() + 60))[0] + # Disable load balancer as it might bump topology version, undoing the decrement below. + await manager.api.disable_tablet_balancing(servers[2].ip_addr) + version = await get_version(manager, host2) logger.info(f"version on host2 {version}") @@ -129,6 +132,10 @@ async def test_fence_hints(request, manager: ManagerClient): logger.info(f'Waiting for cql and hosts') hosts = await wait_for_cql_and_get_hosts(cql, [s0, s2], time.time() + 60) + # Disable load balancer as it might bump topology version, potentially creating a race condition + # with read modify write below + await manager.api.disable_tablet_balancing(s2.ip_addr) + host2 = host_by_server(hosts, s2) new_version = (await get_version(manager, host2)) + 1 logger.info(f"Set version and fence_version to {new_version} on node {host2}") From 2cb8a824ec99c7bd18ac56454af2f0790a0bee93 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 23 Jan 2024 21:23:05 -0300 Subject: [PATCH 23/25] replica: Fix table::compaction_group_for_sstable() for tablet streaming It might happen that sstable being streamed during migration is not split yet, therefore it should be added to the main compaction group, allowing the streaming stage to start split work on it, and not fool the coordinator thinking it can proceed with split execution which would cause problems. Signed-off-by: Raphael S. Carvalho --- replica/table.cc | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/replica/table.cc b/replica/table.cc index c192d51724..97e20204e6 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -812,8 +812,21 @@ compaction_group& table::compaction_group_for_key(partition_key_view key, const } compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept { - // FIXME: a sstable can belong to more than one group, change interface to reflect that. - return compaction_group_for_token(sst->get_first_decorated_key().token()); + auto [first_id, first_range_side] = storage_group_of(sst->get_first_decorated_key().token()); + auto [last_id, last_range_side] = storage_group_of(sst->get_last_decorated_key().token()); + + if (first_id != last_id) { + on_internal_error(tlogger, format("Unable to load SSTable {} that belongs to tablets {} and {}", + sst->get_filename(), first_id, last_id)); + } + + auto& sg = _storage_groups[first_id]; + + if (first_range_side != last_range_side) { + return *sg->main_compaction_group(); + } + + return *sg->select_compaction_group(first_range_side); } compaction_group_list& table::compaction_groups() const noexcept { From 90c9a5d7af808e6a227d194510a27075107b10a6 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 25 Jan 2024 17:28:55 -0300 Subject: [PATCH 24/25] replica: Bypass reshape on boot with tablets temporarily Without it, table loading fails as reshape mixes sstables from different tablets together, and now we have a guard for that: Unable to load SSTable ...-big-Data.db that belongs to tablets 1 and 31, The fix is about making reshape compaction group aware. It will be fixed, but not now. Refs #16966. Signed-off-by: Raphael S. Carvalho --- replica/distributed_loader.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 3bf5bb9fc3..b3f079bdb5 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -391,10 +391,14 @@ future<> table_populator::populate_subdir(sstables::sstable_state state, allow_o return sst->get_origin() != sstables::repair_origin; }; - co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, state] (shard_id shard) { - auto gen = _global_table->calculate_generation_for_new_table(); - return make_sstable(*_global_table, state, gen, _highest_version); - }, eligible_for_reshape_on_boot); + // FIXME: Bypass reshape as it is not tablet aware, so it could mix sstables from different tablets together. + // Refs: https://github.com/scylladb/scylladb/issues/16966. + if (!_global_table->uses_tablets()) { + co_await distributed_loader::reshape(directory, _db, sstables::reshape_mode::relaxed, _ks, _cf, [this, state](shard_id shard) { + auto gen = _global_table->calculate_generation_for_new_table(); + return make_sstable(*_global_table, state, gen, _highest_version); + }, eligible_for_reshape_on_boot); + } co_await directory.invoke_on_all([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> { co_await dir.do_for_each_sstable([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) { From 3b14c5b84a5b356c59c45c70dff47992bfd5ff6a Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 23 Jan 2024 21:29:40 -0300 Subject: [PATCH 25/25] test/topology_experimental_raft: Add tablet split test Signed-off-by: Raphael S. Carvalho --- .../test_tablets.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 61e18955c0..7046425de3 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -551,3 +551,73 @@ async def test_tablet_cleanup(manager: ManagerClient): # Bonus: check that commitlog_cleanups doesn't have any garbage after restart. assert 0 == (await cql.run_async("SELECT COUNT(*) FROM system.commitlog_cleanups", host=hosts[0]))[0].count + +async def get_tablet_count(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str): + host = manager.cql.cluster.metadata.get_host(server.ip_addr) + + # read_barrier is needed to ensure that local tablet metadata on the queried node + # reflects the finalized tablet movement. + await read_barrier(manager.cql, host) + + table_id = await manager.get_table_id(keyspace_name, table_name) + rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where " + f"table_id = {table_id}", host=host) + return rows[0].tablet_count + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_split(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=debug', + '--target-tablet-size-in-bytes', '1024', + ] + servers = [await manager.server_add(cmdline=cmdline)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + + # enough to trigger multiple splits with max size of 1024 bytes. + keys = range(256) + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + async def check(): + logger.info("Checking table") + cql = manager.get_cql() + rows = await cql.run_async("SELECT * FROM test.test;") + assert len(rows) == len(keys) + for r in rows: + assert r.c == r.pk + + await check() + + await manager.api.flush_keyspace(servers[0].ip_addr, "test") + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count == 1 + + logger.info("Adding new server") + servers.append(await manager.server_add(cmdline=cmdline)) + + # Increases the chance of tablet migration concurrent with split + await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers) + await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) + + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() + + # Now there's a split and migration need, so they'll potentially run concurrently. + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + await check() + time.sleep(5) # Give load balancer some time to do work + + await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) + + await check() + + tablet_count = await get_tablet_count(manager, servers[0], 'test', 'test') + assert tablet_count > 1