From fab066cffedb366de0dfe6b1ec05c4c398dba89d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Tue, 12 Sep 2023 11:18:01 +0200 Subject: [PATCH] cdc: generation: remove topology_description_generator After moving the creation of uuid out of make_new_generation_description, this function only calls the topology_description_generator's constructor and its generate method. We could remove this function, but we instead simplify the code by removing the topology_description_generator class. We can do this refactor because make_new_generation_description is the only place using it. We inline its generate method into make_new_generation_description and turn its private methods into static functions. --- cdc/generation.cc | 116 ++++++++++++++++------------------------------ cdc/generation.hh | 15 ++++++ docs/dev/cdc.md | 4 +- 3 files changed, 58 insertions(+), 77 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index 1c26cc6872..1a445bb0b3 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -68,10 +68,10 @@ static constexpr auto stream_id_index_shift = stream_id_version_shift + stream_i static constexpr auto stream_id_random_shift = stream_id_index_shift + stream_id_index_bits; /** - * Responsibilty for encoding stream_id moved from factory method to - * this constructor, to keep knowledge of composition in a single place. - * Note this is private and friended to topology_description_generator, - * because he is the one who defined the "order" we view vnodes etc. + * Responsibility for encoding stream_id moved from the create_stream_ids + * function to this constructor, to keep knowledge of composition in a + * single place. Note the make_new_generation_description function + * defines the "order" in which we view vnodes etc. */ stream_id::stream_id(dht::token token, size_t vnode_index) : _value(bytes::initialized_later(), 2 * sizeof(int64_t)) @@ -185,76 +185,6 @@ static std::vector create_stream_ids( return result; } -class topology_description_generator final { - const std::unordered_set& _bootstrap_tokens; - const locator::token_metadata_ptr _tmptr; - const noncopyable_function (dht::token)>& _get_sharding_info; - - // Compute a set of tokens that split the token ring into vnodes - auto get_tokens() const { - auto tokens = _tmptr->sorted_tokens(); - auto it = tokens.insert( - tokens.end(), _bootstrap_tokens.begin(), _bootstrap_tokens.end()); - std::sort(it, tokens.end()); - std::inplace_merge(tokens.begin(), it, tokens.end()); - tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end()); - return tokens; - } - - token_range_description create_description(size_t index, dht::token start, dht::token end) const { - token_range_description desc; - - desc.token_range_end = end; - - auto [shard_count, ignore_msb] = _get_sharding_info(end); - desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb); - desc.sharding_ignore_msb = ignore_msb; - - return desc; - } -public: - topology_description_generator( - const std::unordered_set& bootstrap_tokens, - const locator::token_metadata_ptr tmptr, - // This function must return sharding parameters for a node that owns the vnode ending with - // the given token. Returns pair. - const noncopyable_function (dht::token)>& get_sharding_info) - : _bootstrap_tokens(bootstrap_tokens) - , _tmptr(std::move(tmptr)) - , _get_sharding_info(get_sharding_info) - {} - - /* - * Generate a set of CDC stream identifiers such that for each shard - * and vnode pair there exists a stream whose token falls into this vnode - * and is owned by this shard. It is sometimes not possible to generate - * a CDC stream identifier for some (vnode, shard) pair because not all - * shards have to own tokens in a vnode. Small vnode can be totally owned - * by a single shard. In such case, a stream identifier that maps to - * end of the vnode is generated. - * - * Then build a cdc::topology_description which maps tokens to generated - * stream identifiers, such that if token T is owned by shard S in vnode V, - * it gets mapped to the stream identifier generated for (S, V). - */ - // Run in seastar::async context. - topology_description generate() const { - const auto tokens = get_tokens(); - - std::vector vnode_descriptions; - vnode_descriptions.reserve(tokens.size()); - - vnode_descriptions.push_back( - create_description(0, tokens.back(), tokens.front())); - for (size_t idx = 1; idx < tokens.size(); ++idx) { - vnode_descriptions.push_back( - create_description(idx, tokens[idx - 1], tokens[idx])); - } - - return {std::move(vnode_descriptions)}; - } -}; - bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper& g) { auto my_host_id = g.get_host_id(me); return g.for_each_endpoint_state_until([&] (const gms::inet_address& node, const gms::endpoint_state& eps) { @@ -393,11 +323,47 @@ topology_description limit_number_of_streams_if_needed(topology_description&& de return topology_description(std::move(entries)); } +// Compute a set of tokens that split the token ring into vnodes. +static auto get_tokens(const std::unordered_set& bootstrap_tokens, const locator::token_metadata_ptr tmptr) { + auto tokens = tmptr->sorted_tokens(); + auto it = tokens.insert(tokens.end(), bootstrap_tokens.begin(), bootstrap_tokens.end()); + std::sort(it, tokens.end()); + std::inplace_merge(tokens.begin(), it, tokens.end()); + tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end()); + return tokens; +} + +static token_range_description create_token_range_description( + size_t index, + dht::token start, + dht::token end, + const noncopyable_function (dht::token)>& get_sharding_info) { + token_range_description desc; + + desc.token_range_end = end; + + auto [shard_count, ignore_msb] = get_sharding_info(end); + desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb); + desc.sharding_ignore_msb = ignore_msb; + + return desc; +} + cdc::topology_description make_new_generation_description( const std::unordered_set& bootstrap_tokens, const noncopyable_function(dht::token)>& get_sharding_info, const locator::token_metadata_ptr tmptr) { - return topology_description_generator(bootstrap_tokens, tmptr, get_sharding_info).generate(); + const auto tokens = get_tokens(bootstrap_tokens, tmptr); + + std::vector vnode_descriptions; + vnode_descriptions.reserve(tokens.size()); + + vnode_descriptions.push_back(create_token_range_description(0, tokens.back(), tokens.front(), get_sharding_info)); + for (size_t idx = 1; idx < tokens.size(); ++idx) { + vnode_descriptions.push_back(create_token_range_description(idx, tokens[idx - 1], tokens[idx], get_sharding_info)); + } + + return {std::move(vnode_descriptions)}; } db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay) { diff --git a/cdc/generation.hh b/cdc/generation.hh index 5b2ef5379c..fa89b33372 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -139,6 +139,21 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos */ bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm); +/* + * Generate a set of CDC stream identifiers such that for each shard + * and vnode pair there exists a stream whose token falls into this vnode + * and is owned by this shard. It is sometimes not possible to generate + * a CDC stream identifier for some (vnode, shard) pair because not all + * shards have to own tokens in a vnode. Small vnode can be totally owned + * by a single shard. In such case, a stream identifier that maps to + * end of the vnode is generated. + * + * Then build a cdc::topology_description which maps tokens to generated + * stream identifiers, such that if token T is owned by shard S in vnode V, + * it gets mapped to the stream identifier generated for (S, V). + * + * Run in seastar::async context. + */ cdc::topology_description make_new_generation_description( const std::unordered_set& bootstrap_tokens, const noncopyable_function (dht::token)>& get_sharding_info, diff --git a/docs/dev/cdc.md b/docs/dev/cdc.md index 3304efcabd..48ed314b37 100644 --- a/docs/dev/cdc.md +++ b/docs/dev/cdc.md @@ -109,7 +109,7 @@ Having different generations operating at different points in time is necessary #### Gossiper-based topology changes -The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::topology_description_generator` class. It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows: +The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::make_new_generation_description` function It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows: ``` CREATE TABLE system_distributed_everywhere.cdc_generation_descriptions_v2 ( id uuid, @@ -160,7 +160,7 @@ Thus we give up availability for safety. This likely won't happen if the adminis #### Raft group 0 based topology changes (WIP) -When a node requests the cluster to join, the topology coordinator chooses tokens for the new node. This splits vnodes in the token ring into smaller vnodes. The coordinator then creates a new `cdc::topology_description` which refines those smaller vnodes. This is node using the `cdc::topology_description_generator` class. +When a node requests the cluster to join, the topology coordinator chooses tokens for the new node. This splits vnodes in the token ring into smaller vnodes. The coordinator then creates a new `cdc::topology_description` which refines those smaller vnodes. This is node using the `cdc::make_new_generation_description` function. The generation data described by `cdc::topology_description` is then translated into mutations and committed to group 0 using Raft commands. When a node applies these commands (every node in the cluster eventually does that, being a member of group 0), it writes the data into a local table `system.cdc_generations_v3`. The table has the following schema: ```