raft topology: implement check_and_repair_cdc_streams API

The original API is gossiper-based. Since we're moving CDC generations
handling to Raft-based topology, we need to implement this API as well.

For now the API creates a new generation unconditionally, in a follow-up
I'll introduce a check to skip the creation if the current generation is
optimal.
This commit is contained in:
Kamil Braun
2023-04-26 13:43:28 +02:00
parent a09ed01ffa
commit 372a06f735
3 changed files with 76 additions and 6 deletions

View File

@@ -666,11 +666,14 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
req.get_query_param("key")));
});
ss::cdc_streams_check_and_repair.set(r, [&cdc_gs] (std::unique_ptr<http::request> req) {
if (!cdc_gs.local_is_initialized()) {
throw std::runtime_error("get_cdc_generation_service: not initialized yet");
}
return cdc_gs.local().check_and_repair_cdc_streams().then([] {
ss::cdc_streams_check_and_repair.set(r, [&cdc_gs, &ss] (std::unique_ptr<http::request> req) {
return ss.invoke_on(0, [&cdc_gs] (service::storage_service& ss) {
if (!cdc_gs.local_is_initialized()) {
throw std::runtime_error("CDC generation service not initialized yet");
}
return ss.check_and_repair_cdc_streams(cdc_gs.local());
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

View File

@@ -495,7 +495,9 @@ public:
topology_mutation_builder& set_transition_state(topology::transition_state);
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value);
topology_mutation_builder& set_global_topology_request(global_topology_request);
topology_mutation_builder& del_transition_state();
topology_mutation_builder& del_global_topology_request();
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
@@ -603,6 +605,18 @@ topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_dat
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) {
_m.set_static_cell("global_topology_request", ::format("{}", value), _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::del_global_topology_request() {
auto cdef = _s->get_column_definition("global_topology_request");
assert(cdef);
_m.partition().static_row().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now()));
return *this;
}
topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) {
_node_builder.emplace(*this, n);
return *_node_builder;
@@ -931,7 +945,8 @@ class topology_coordinator {
std::vector<canonical_mutation> updates{gen_mutations.begin(), gen_mutations.end()};
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
.set_new_cdc_generation_data_uuid(gen_uuid);
.set_new_cdc_generation_data_uuid(gen_uuid)
.del_global_topology_request();
updates.push_back(builder.build());
auto reason = ::format(
"insert CDC generation data (UUID: {})", gen_uuid);
@@ -3973,6 +3988,16 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
});
}
future<> storage_service::check_and_repair_cdc_streams(cdc::generation_service& cdc_gen_svc) {
assert(this_shard_id() == 0);
if (_raft_topology_change_enabled) {
return raft_check_and_repair_cdc_streams();
}
return cdc_gen_svc.check_and_repair_cdc_streams();
}
class node_ops_meta_data {
node_ops_id _ops_uuid;
gms::inet_address _coordinator;
@@ -4359,6 +4384,44 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
});
}
future<> storage_service::raft_check_and_repair_cdc_streams() {
std::optional<cdc::generation_id_v2> curr_gen;
while (true) {
slogger.info("raft topology: request check_and_repair_cdc_streams, refreshing topology");
auto guard = co_await _group0->client().start_operation(&_abort_source);
auto curr_req = _topology_state_machine._topology.global_request;
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
// FIXME: replace this with a queue
throw std::runtime_error{
"check_and_repair_cdc_streams: a different topology request is already pending, try again later"};
}
curr_gen = _topology_state_machine._topology.current_cdc_generation_id;
// FIXME: check if the current generation is optimal, don't request new one if it isn't
topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
} catch (group0_concurrent_modification&) {
slogger.info("raft topology: request check+repair CDC: concurrent operation is detected, retrying.");
continue;
}
break;
}
// Wait until the current CDC generation changes.
// This might happen due to a different reason than our request but we don't care.
co_await _topology_state_machine.event.when([this, &curr_gen] {
return curr_gen != _topology_state_machine._topology.current_cdc_generation_id;
});
}
future<> storage_service::rebuild(sstring source_dc) {
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) -> future<> {
if (ss._raft_topology_change_enabled) {

View File

@@ -710,6 +710,9 @@ public:
future<std::map<gms::inet_address, float>> effective_ownership(sstring keyspace_name);
// Must run on shard 0.
future<> check_and_repair_cdc_streams(cdc::generation_service&);
private:
promise<> _drain_finished;
std::optional<shared_future<>> _transport_stopped;
@@ -789,6 +792,7 @@ private:
future<> raft_removenode(locator::host_id host_id);
future<> raft_replace(raft::server&, raft::server_id, gms::inet_address);
future<> raft_rebuild(sstring source_dc);
future<> raft_check_and_repair_cdc_streams();
future<> update_topology_with_local_metadata(raft::server&);
// This is called on all nodes for each new command received through raft