raft topology: publish new CDC generation to the user description tables

Once a new CDC generation is committed to the cluster by the topology
coordinator, we also need to publish it to the user-facing description
tables so CDC applications know which streams to read from.

This uses regular distributed table writes underneath (tables living
in the `system_distributed` keyspace) so it requires `token_metadata`
to be nonempty. We need a hack for the case of bootstrapping the
first node in the cluster - turning the tokens into normal tokens
earlier in the procedure in `token_metadata`, but this is fine for the
single-node case since no streaming is happening.
This commit is contained in:
Kamil Braun
2023-03-29 18:06:43 +02:00
parent 58baf998c1
commit 5f2b297f99
3 changed files with 66 additions and 2 deletions

View File

@@ -67,6 +67,7 @@
#include "service/topology_state_machine.hh"
#include "sstables/open_info.hh"
#include "sstables/generation_type.hh"
#include "cdc/generation.hh"
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
@@ -3677,6 +3678,42 @@ future<service::topology> system_keyspace::load_topology_state() {
co_return ret;
}
future<cdc::topology_description>
system_keyspace::read_cdc_generation(utils::UUID id) {
std::vector<cdc::token_range_description> entries;
auto num_ranges = 0;
co_await _qp.local().query_internal(
format("SELECT range_end, streams, ignore_msb, num_ranges FROM {}.{} WHERE id = ?",
NAME, CDC_GENERATIONS_V3),
db::consistency_level::ONE,
{ id },
1000, // for ~1KB rows, ~1MB page size
[&] (const cql3::untyped_result_set_row& row) {
std::vector<cdc::stream_id> streams;
row.get_list_data<bytes>("streams", std::back_inserter(streams));
entries.push_back(cdc::token_range_description{
dht::token::from_int64(row.get_as<int64_t>("range_end")),
std::move(streams),
uint8_t(row.get_as<int8_t>("ignore_msb"))});
num_ranges = row.get_as<int32_t>("num_ranges");
return make_ready_future<stop_iteration>(stop_iteration::no);
});
if (entries.empty()) {
// The data must be present by precondition.
on_internal_error(slogger, format(
"read_cdc_generation: data for CDC generation {} not present", id));
}
if (entries.size() != num_ranges) {
throw std::runtime_error(format(
"read_cdc_generation: wrong number of rows. The `num_ranges` column claimed {} rows,"
" but reading the partition returned {}.", num_ranges, entries.size()));
}
co_return cdc::topology_description{std::move(entries)};
}
future<> system_keyspace::sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc) {
static const auto req = format("INSERT INTO system.{} (location, generation, uuid, status, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY);
slogger.trace("Inserting {}.{}:{} into {}", location, desc.generation.value(), uuid, SSTABLES_REGISTRY);

View File

@@ -68,6 +68,10 @@ namespace gms {
class gossiper;
}
namespace cdc {
class topology_description;
}
bool is_system_keyspace(std::string_view ks_name);
namespace db {
@@ -445,6 +449,10 @@ public:
static future<service::topology> load_topology_state();
// Read CDC generation data with the given UUID as key.
// Precondition: the data is known to be present in the table (because it was committed earlier through group 0).
future<cdc::topology_description> read_cdc_generation(utils::UUID id);
// The mutation appends the given state ID to the group 0 history table, with the given description if non-empty.
//
// If `gc_older_than` is provided, the mutation will also contain a tombstone that clears all entries whose

View File

@@ -371,8 +371,16 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid());
}
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack});
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip);
co_await update_pending_ranges(tmptr, format("bootstrapping node {}/{}", id, ip));
if (_topology_state_machine._topology.normal_nodes.empty()) {
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
// (such as the CDC generation write).
// It doesn't break anything to set the tokens to normal early in this single-node case.
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
} else {
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip);
co_await update_pending_ranges(tmptr, format("bootstrapping node {}/{}", id, ip));
}
break;
case node_state::decommissioning:
case node_state::removing:
@@ -730,6 +738,17 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
if (!res) {
break;
}
// If a node is bootstrapping, we just committed a new CDC generation in the commit_cdc_generation step.
// Publish it to the user-facing distributed CDC description tables.
if (node.rs->state == node_state::bootstrapping) {
auto curr_gen_id = node.topology->current_cdc_generation_id.value();
auto gen_data = co_await _sys_ks.local().read_cdc_generation(curr_gen_id.id);
co_await sys_dist_ks.local().create_cdc_desc(
curr_gen_id.ts, gen_data, { get_token_metadata().count_normal_token_owners() });
}
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
if (node.rs->state == node_state::removing) {
// tell all nodes to stream data of the removed node to new range owners