|
|
|
|
@@ -443,12 +443,18 @@ future<> storage_service::topology_transition(storage_proxy& proxy, cdc::generat
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
|
|
|
|
|
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
|
|
|
|
std::vector<mutation> muts;
|
|
|
|
|
muts.reserve(snp.mutations.size());
|
|
|
|
|
boost::transform(snp.mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) {
|
|
|
|
|
return m.to_mutation(s);
|
|
|
|
|
});
|
|
|
|
|
muts.reserve(snp.topology_mutations.size() + (snp.cdc_generation_mutation ? 1 : 0));
|
|
|
|
|
{
|
|
|
|
|
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
|
|
|
|
boost::transform(snp.topology_mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) {
|
|
|
|
|
return m.to_mutation(s);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
if (snp.cdc_generation_mutation) {
|
|
|
|
|
auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
|
|
|
|
|
muts.push_back(snp.cdc_generation_mutation->to_mutation(s));
|
|
|
|
|
}
|
|
|
|
|
co_await _db.local().apply(freeze(muts), db::no_timeout);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -4822,17 +4828,57 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
|
|
|
|
|
if (!ss._raft_topology_change_enabled) {
|
|
|
|
|
co_return raft_topology_snapshot{};
|
|
|
|
|
}
|
|
|
|
|
// FIXME: make it an rwlock, here we only need to lock for reads,
|
|
|
|
|
// might be useful if multiple nodes are trying to pull concurrently.
|
|
|
|
|
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
|
|
|
|
|
auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
|
|
|
|
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
|
|
|
|
std::vector<canonical_mutation> results;
|
|
|
|
|
results.reserve(rs->partitions().size());
|
|
|
|
|
boost::range::transform(rs->partitions(), std::back_inserter(results), [s] (const partition& p) {
|
|
|
|
|
return canonical_mutation{p.mut().unfreeze(s)};
|
|
|
|
|
});
|
|
|
|
|
co_return raft_topology_snapshot{std::move(results)};
|
|
|
|
|
|
|
|
|
|
std::vector<canonical_mutation> topology_mutations;
|
|
|
|
|
std::optional<cdc::generation_id_v2> curr_cdc_gen_id;
|
|
|
|
|
{
|
|
|
|
|
// FIXME: make it an rwlock, here we only need to lock for reads,
|
|
|
|
|
// might be useful if multiple nodes are trying to pull concurrently.
|
|
|
|
|
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
|
|
|
|
|
auto rs = co_await db::system_keyspace::query_mutations(
|
|
|
|
|
proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
|
|
|
|
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
|
|
|
|
topology_mutations.reserve(rs->partitions().size());
|
|
|
|
|
boost::range::transform(
|
|
|
|
|
rs->partitions(), std::back_inserter(topology_mutations), [s] (const partition& p) {
|
|
|
|
|
return canonical_mutation{p.mut().unfreeze(s)};
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
curr_cdc_gen_id = ss._topology_state_machine._topology.current_cdc_generation_id;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::optional<canonical_mutation> cdc_generation_mutation;
|
|
|
|
|
if (curr_cdc_gen_id) {
|
|
|
|
|
// We don't need to fetch this data under group0 apply mutex, it's immutable.
|
|
|
|
|
// We only need to include the current CDC generation data in the snapshot,
|
|
|
|
|
// because nodes only load whatever `current_cdc_generation_id` points to in topology.
|
|
|
|
|
//
|
|
|
|
|
// FIXME: when we bootstrap nodes in quick succession, the timestamp of the newest CDC generation
|
|
|
|
|
// may be for some time larger than the clocks of our nodes. The last bootstrapped node
|
|
|
|
|
// will only receive the newest CDC generation and not earlier ones, so it will only be able
|
|
|
|
|
// to coordinate writes to CDC-enabled tables after its clock advances to reach the newest
|
|
|
|
|
// generation's timestamp. In other words, it may not be able to coordinate writes for some
|
|
|
|
|
// time after bootstrapping and drivers connecting to it will receive errors.
|
|
|
|
|
// To fix that, we could store in topology a small history of recent CDC generation IDs
|
|
|
|
|
// (garbage-collected with time) instead of just the last one, and load all of them.
|
|
|
|
|
// Alternatively, a node would wait for some time before switching to normal state.
|
|
|
|
|
auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
|
|
|
|
|
auto key = dht::decorate_key(*s, partition_key::from_singular(*s, curr_cdc_gen_id->id));
|
|
|
|
|
auto partition_range = dht::partition_range::make_singular(key);
|
|
|
|
|
auto rs = co_await db::system_keyspace::query_mutations(
|
|
|
|
|
proxy, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3, partition_range);
|
|
|
|
|
if (rs->partitions().size() != 1) {
|
|
|
|
|
on_internal_error(slogger, format(
|
|
|
|
|
"pull_raft_topology_snapshot: expected a single partition in CDC generation query,"
|
|
|
|
|
", got {} (generation ID: {})", rs->partitions().size(), *curr_cdc_gen_id));
|
|
|
|
|
}
|
|
|
|
|
cdc_generation_mutation.emplace(rs->partitions().begin()->mut().unfreeze(s));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
co_return raft_topology_snapshot{
|
|
|
|
|
.topology_mutations = std::move(topology_mutations),
|
|
|
|
|
.cdc_generation_mutation = std::move(cdc_generation_mutation),
|
|
|
|
|
};
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|