diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index c2cb480350..ecfd53ec48 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -25,7 +25,8 @@ namespace service { }; struct raft_topology_snapshot { - std::vector mutations; + std::vector topology_mutations; + std::optional cdc_generation_mutation; }; struct raft_topology_pull_params {}; diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index cb7ea03a28..22024ee4a1 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -156,7 +156,7 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s co_await _mm.merge_schema_from(addr, std::move(*cm)); - if (!topology_snp.mutations.empty()) { + if (!topology_snp.topology_mutations.empty()) { co_await _ss.merge_topology_snapshot(std::move(topology_snp)); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 975cd72972..5749284cf6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -443,12 +443,18 @@ future<> storage_service::topology_transition(storage_proxy& proxy, cdc::generat } future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) { - auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); std::vector muts; - muts.reserve(snp.mutations.size()); - boost::transform(snp.mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) { - return m.to_mutation(s); - }); + muts.reserve(snp.topology_mutations.size() + (snp.cdc_generation_mutation ? 1 : 0)); + { + auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + boost::transform(snp.topology_mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) { + return m.to_mutation(s); + }); + } + if (snp.cdc_generation_mutation) { + auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3); + muts.push_back(snp.cdc_generation_mutation->to_mutation(s)); + } co_await _db.local().apply(freeze(muts), db::no_timeout); } @@ -4822,17 +4828,57 @@ void storage_service::init_messaging_service(sharded& pr if (!ss._raft_topology_change_enabled) { co_return raft_topology_snapshot{}; } - // FIXME: make it an rwlock, here we only need to lock for reads, - // might be useful if multiple nodes are trying to pull concurrently. - auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(); - auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); - auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); - std::vector results; - results.reserve(rs->partitions().size()); - boost::range::transform(rs->partitions(), std::back_inserter(results), [s] (const partition& p) { - return canonical_mutation{p.mut().unfreeze(s)}; - }); - co_return raft_topology_snapshot{std::move(results)}; + + std::vector topology_mutations; + std::optional curr_cdc_gen_id; + { + // FIXME: make it an rwlock, here we only need to lock for reads, + // might be useful if multiple nodes are trying to pull concurrently. + auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(); + auto rs = co_await db::system_keyspace::query_mutations( + proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + topology_mutations.reserve(rs->partitions().size()); + boost::range::transform( + rs->partitions(), std::back_inserter(topology_mutations), [s] (const partition& p) { + return canonical_mutation{p.mut().unfreeze(s)}; + }); + + curr_cdc_gen_id = ss._topology_state_machine._topology.current_cdc_generation_id; + } + + std::optional cdc_generation_mutation; + if (curr_cdc_gen_id) { + // We don't need to fetch this data under group0 apply mutex, it's immutable. + // We only need to include the current CDC generation data in the snapshot, + // because nodes only load whatever `current_cdc_generation_id` points to in topology. + // + // FIXME: when we bootstrap nodes in quick succession, the timestamp of the newest CDC generation + // may be for some time larger than the clocks of our nodes. The last bootstrapped node + // will only receive the newest CDC generation and not earlier ones, so it will only be able + // to coordinate writes to CDC-enabled tables after its clock advances to reach the newest + // generation's timestamp. In other words, it may not be able to coordinate writes for some + // time after bootstrapping and drivers connecting to it will receive errors. + // To fix that, we could store in topology a small history of recent CDC generation IDs + // (garbage-collected with time) instead of just the last one, and load all of them. + // Alternatively, a node would wait for some time before switching to normal state. + auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3); + auto key = dht::decorate_key(*s, partition_key::from_singular(*s, curr_cdc_gen_id->id)); + auto partition_range = dht::partition_range::make_singular(key); + auto rs = co_await db::system_keyspace::query_mutations( + proxy, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3, partition_range); + if (rs->partitions().size() != 1) { + on_internal_error(slogger, format( + "pull_raft_topology_snapshot: expected a single partition in CDC generation query," + ", got {} (generation ID: {})", rs->partitions().size(), *curr_cdc_gen_id)); + } + cdc_generation_mutation.emplace(rs->partitions().begin()->mut().unfreeze(s)); + } + + co_return raft_topology_snapshot{ + .topology_mutations = std::move(topology_mutations), + .cdc_generation_mutation = std::move(cdc_generation_mutation), + }; }); }); } diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 5b894ae16c..9bf0d25072 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -101,7 +101,11 @@ struct topology { }; struct raft_topology_snapshot { - std::vector mutations; + // Mutations for the system.topology table. + std::vector topology_mutations; + + // Mutation for system.cdc_generations_v3, contains the current CDC generation data. + std::optional cdc_generation_mutation; }; struct raft_topology_pull_params {