cdc: generation: remove topology_description_generator

After moving the creation of uuid out of
make_new_generation_description, this function only calls the
topology_description_generator's constructor and its generate
method. We could remove this function, but we instead simplify
the code by removing the topology_description_generator class.
We can do this refactor because make_new_generation_description
is the only place using it. We inline its generate method into
make_new_generation_description and turn its private methods into
static functions.
This commit is contained in:
Patryk Jędrzejczak
2023-09-12 11:18:01 +02:00
parent 3bf4cac72e
commit fab066cffe
3 changed files with 58 additions and 77 deletions

View File

@@ -68,10 +68,10 @@ static constexpr auto stream_id_index_shift = stream_id_version_shift + stream_i
static constexpr auto stream_id_random_shift = stream_id_index_shift + stream_id_index_bits;
/**
* Responsibilty for encoding stream_id moved from factory method to
* this constructor, to keep knowledge of composition in a single place.
* Note this is private and friended to topology_description_generator,
* because he is the one who defined the "order" we view vnodes etc.
* Responsibility for encoding stream_id moved from the create_stream_ids
* function to this constructor, to keep knowledge of composition in a
* single place. Note the make_new_generation_description function
* defines the "order" in which we view vnodes etc.
*/
stream_id::stream_id(dht::token token, size_t vnode_index)
: _value(bytes::initialized_later(), 2 * sizeof(int64_t))
@@ -185,76 +185,6 @@ static std::vector<stream_id> create_stream_ids(
return result;
}
class topology_description_generator final {
const std::unordered_set<dht::token>& _bootstrap_tokens;
const locator::token_metadata_ptr _tmptr;
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& _get_sharding_info;
// Compute a set of tokens that split the token ring into vnodes
auto get_tokens() const {
auto tokens = _tmptr->sorted_tokens();
auto it = tokens.insert(
tokens.end(), _bootstrap_tokens.begin(), _bootstrap_tokens.end());
std::sort(it, tokens.end());
std::inplace_merge(tokens.begin(), it, tokens.end());
tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end());
return tokens;
}
token_range_description create_description(size_t index, dht::token start, dht::token end) const {
token_range_description desc;
desc.token_range_end = end;
auto [shard_count, ignore_msb] = _get_sharding_info(end);
desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb);
desc.sharding_ignore_msb = ignore_msb;
return desc;
}
public:
topology_description_generator(
const std::unordered_set<dht::token>& bootstrap_tokens,
const locator::token_metadata_ptr tmptr,
// This function must return sharding parameters for a node that owns the vnode ending with
// the given token. Returns <shard_count, ignore_msb> pair.
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info)
: _bootstrap_tokens(bootstrap_tokens)
, _tmptr(std::move(tmptr))
, _get_sharding_info(get_sharding_info)
{}
/*
* Generate a set of CDC stream identifiers such that for each shard
* and vnode pair there exists a stream whose token falls into this vnode
* and is owned by this shard. It is sometimes not possible to generate
* a CDC stream identifier for some (vnode, shard) pair because not all
* shards have to own tokens in a vnode. Small vnode can be totally owned
* by a single shard. In such case, a stream identifier that maps to
* end of the vnode is generated.
*
* Then build a cdc::topology_description which maps tokens to generated
* stream identifiers, such that if token T is owned by shard S in vnode V,
* it gets mapped to the stream identifier generated for (S, V).
*/
// Run in seastar::async context.
topology_description generate() const {
const auto tokens = get_tokens();
std::vector<token_range_description> vnode_descriptions;
vnode_descriptions.reserve(tokens.size());
vnode_descriptions.push_back(
create_description(0, tokens.back(), tokens.front()));
for (size_t idx = 1; idx < tokens.size(); ++idx) {
vnode_descriptions.push_back(
create_description(idx, tokens[idx - 1], tokens[idx]));
}
return {std::move(vnode_descriptions)};
}
};
bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper& g) {
auto my_host_id = g.get_host_id(me);
return g.for_each_endpoint_state_until([&] (const gms::inet_address& node, const gms::endpoint_state& eps) {
@@ -393,11 +323,47 @@ topology_description limit_number_of_streams_if_needed(topology_description&& de
return topology_description(std::move(entries));
}
// Compute a set of tokens that split the token ring into vnodes.
static auto get_tokens(const std::unordered_set<dht::token>& bootstrap_tokens, const locator::token_metadata_ptr tmptr) {
auto tokens = tmptr->sorted_tokens();
auto it = tokens.insert(tokens.end(), bootstrap_tokens.begin(), bootstrap_tokens.end());
std::sort(it, tokens.end());
std::inplace_merge(tokens.begin(), it, tokens.end());
tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end());
return tokens;
}
static token_range_description create_token_range_description(
size_t index,
dht::token start,
dht::token end,
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info) {
token_range_description desc;
desc.token_range_end = end;
auto [shard_count, ignore_msb] = get_sharding_info(end);
desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb);
desc.sharding_ignore_msb = ignore_msb;
return desc;
}
cdc::topology_description make_new_generation_description(
const std::unordered_set<dht::token>& bootstrap_tokens,
const noncopyable_function<std::pair<size_t, uint8_t>(dht::token)>& get_sharding_info,
const locator::token_metadata_ptr tmptr) {
return topology_description_generator(bootstrap_tokens, tmptr, get_sharding_info).generate();
const auto tokens = get_tokens(bootstrap_tokens, tmptr);
std::vector<token_range_description> vnode_descriptions;
vnode_descriptions.reserve(tokens.size());
vnode_descriptions.push_back(create_token_range_description(0, tokens.back(), tokens.front(), get_sharding_info));
for (size_t idx = 1; idx < tokens.size(); ++idx) {
vnode_descriptions.push_back(create_token_range_description(idx, tokens[idx - 1], tokens[idx], get_sharding_info));
}
return {std::move(vnode_descriptions)};
}
db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay) {

View File

@@ -139,6 +139,21 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
*/
bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm);
/*
* Generate a set of CDC stream identifiers such that for each shard
* and vnode pair there exists a stream whose token falls into this vnode
* and is owned by this shard. It is sometimes not possible to generate
* a CDC stream identifier for some (vnode, shard) pair because not all
* shards have to own tokens in a vnode. Small vnode can be totally owned
* by a single shard. In such case, a stream identifier that maps to
* end of the vnode is generated.
*
* Then build a cdc::topology_description which maps tokens to generated
* stream identifiers, such that if token T is owned by shard S in vnode V,
* it gets mapped to the stream identifier generated for (S, V).
*
* Run in seastar::async context.
*/
cdc::topology_description make_new_generation_description(
const std::unordered_set<dht::token>& bootstrap_tokens,
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info,

View File

@@ -109,7 +109,7 @@ Having different generations operating at different points in time is necessary
#### Gossiper-based topology changes
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::topology_description_generator` class. It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows:
The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::make_new_generation_description` function It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows:
```
CREATE TABLE system_distributed_everywhere.cdc_generation_descriptions_v2 (
id uuid,
@@ -160,7 +160,7 @@ Thus we give up availability for safety. This likely won't happen if the adminis
#### Raft group 0 based topology changes (WIP)
When a node requests the cluster to join, the topology coordinator chooses tokens for the new node. This splits vnodes in the token ring into smaller vnodes. The coordinator then creates a new `cdc::topology_description` which refines those smaller vnodes. This is node using the `cdc::topology_description_generator` class.
When a node requests the cluster to join, the topology coordinator chooses tokens for the new node. This splits vnodes in the token ring into smaller vnodes. The coordinator then creates a new `cdc::topology_description` which refines those smaller vnodes. This is node using the `cdc::make_new_generation_description` function.
The generation data described by `cdc::topology_description` is then translated into mutations and committed to group 0 using Raft commands. When a node applies these commands (every node in the cluster eventually does that, being a member of group 0), it writes the data into a local table `system.cdc_generations_v3`. The table has the following schema:
```