service/raft: raft_group0: drop dependency on cdc::generation_service

raft_group0 does not really depends on cdc::generation_service, it needs
it only transiently, so pass it to appropriate methods of raft_group0
instead of during its creation.
This commit is contained in:
Gleb Natapov
2023-04-27 11:51:38 +03:00
parent 9849409c2a
commit e9fb885e82
5 changed files with 39 additions and 38 deletions

View File

@@ -1480,7 +1480,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
service::raft_group0 group0_service{
stop_signal.as_local_abort_source(), raft_gr.local(), messaging,
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client, cdc_generation_service.local()};
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client};
group0_service.start().get();
auto stop_group0_service = defer_verbose_shutdown("group 0 service", [&group0_service] {
group0_service.abort().get();

View File

@@ -131,9 +131,8 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
gms::gossiper& gs,
gms::feature_service& feat,
db::system_keyspace& sys_ks,
raft_group0_client& client,
cdc::generation_service& cdc_gen_svc)
: _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _feat(feat), _sys_ks(sys_ks), _client(client), _cdc_gen_svc(cdc_gen_svc)
raft_group0_client& client)
: _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _feat(feat), _sys_ks(sys_ks), _client(client)
, _status_for_monitoring(_raft_gr.is_enabled() ? status_for_monitoring::normal : status_for_monitoring::disabled)
{
register_metrics();
@@ -193,8 +192,8 @@ const raft::server_id& raft_group0::load_my_id() {
}
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm) {
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, _cdc_gen_svc);
service::migration_manager& mm, cdc::generation_service& cdc_gen_svc) {
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, cdc_gen_svc);
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), gid, my_id);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
@@ -369,7 +368,8 @@ future<> raft_group0::abort() {
co_await _shutdown_gate.close();
}
future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm,
cdc::generation_service& cdc_gen_service) {
assert(group0_id != raft::group_id{});
// The address map may miss our own id in case we connect
// to an existing Raft Group 0 leader.
@@ -381,18 +381,19 @@ future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service:
// we ensure we haven't missed any IP update in the map.
load_initial_raft_address_map();
group0_log.info("Server {} is starting group 0 with id {}", my_id, group0_id);
co_await _raft_gr.start_server_for_group(create_server_for_group0(group0_id, my_id, ss, qp, mm));
co_await _raft_gr.start_server_for_group(create_server_for_group0(group0_id, my_id, ss, qp, mm, cdc_gen_service));
_group0.emplace<raft::group_id>(group0_id);
}
future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, bool as_voter, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, bool as_voter, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm,
cdc::generation_service& cdc_gen_service) {
assert(this_shard_id() == 0);
assert(!joined_group0());
auto group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()};
if (group0_id) {
// Group 0 ID present means we've already joined group 0 before.
co_return co_await start_server_for_group0(group0_id, ss, qp, mm);
co_return co_await start_server_for_group0(group0_id, ss, qp, mm, cdc_gen_service);
}
raft::server* server = nullptr;
@@ -426,7 +427,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, bool as_
}
// Bootstrap the initial configuration
co_await raft_sys_table_storage(qp, group0_id, my_id).bootstrap(std::move(initial_configuration));
co_await start_server_for_group0(group0_id, ss, qp, mm);
co_await start_server_for_group0(group0_id, ss, qp, mm, cdc_gen_service);
server = &_raft_gr.group0();
// FIXME if we crash now or after getting added to the config but before storing group 0 ID,
// we'll end with a bootstrapped server that possibly added some entries, but we won't remember that we have such a server
@@ -508,7 +509,7 @@ static future<bool> synchronize_schema(
future<> raft_group0::setup_group0(
db::system_keyspace& sys_ks, const std::unordered_set<gms::inet_address>& initial_contact_nodes,
std::optional<replace_info> replace_info, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
std::optional<replace_info> replace_info, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_servic) {
assert(this_shard_id() == 0);
if (!_raft_gr.is_enabled()) {
@@ -529,7 +530,7 @@ future<> raft_group0::setup_group0(
if (group0_id) {
// Group 0 ID is present => we've already joined group 0 earlier.
group0_log.info("setup_group0: group 0 ID present. Starting existing Raft server.");
co_await start_server_for_group0(group0_id, ss, qp, mm);
co_await start_server_for_group0(group0_id, ss, qp, mm, cdc_gen_servic);
} else {
// Scylla has bootstrapped earlier but group 0 ID not present. This means we're upgrading.
// Upgrade will start through a feature listener created after we enter NORMAL state.
@@ -552,7 +553,7 @@ future<> raft_group0::setup_group0(
}
group0_log.info("setup_group0: joining group 0...");
co_await join_group0(std::move(seeds), false /* non-voter */, ss, qp, mm);
co_await join_group0(std::move(seeds), false /* non-voter */, ss, qp, mm, cdc_gen_servic);
group0_log.info("setup_group0: successfully joined group 0.");
if (replace_info) {
@@ -628,7 +629,7 @@ void raft_group0::load_initial_raft_address_map() {
}
}
future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service) {
if (joined_group0()) {
group0_log.info("finish_setup_after_join: group 0 ID present, loading server info.");
auto my_id = load_my_id();
@@ -663,10 +664,10 @@ future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3
}
// The listener may fire immediately, create a thread for that case.
co_await seastar::async([this, &ss, &qp, &mm] {
_raft_support_listener = _feat.supports_raft_cluster_mgmt.when_enabled([this, &ss, &qp, &mm] {
co_await seastar::async([this, &ss, &qp, &mm, &cdc_gen_service] {
_raft_support_listener = _feat.supports_raft_cluster_mgmt.when_enabled([this, &ss, &qp, &mm, &cdc_gen_service] {
group0_log.info("finish_setup_after_join: SUPPORTS_RAFT feature enabled. Starting internal upgrade-to-raft procedure.");
upgrade_to_group0(ss, qp, mm).get();
upgrade_to_group0(ss, qp, mm, cdc_gen_service).get();
});
});
}
@@ -1424,7 +1425,7 @@ static auto warn_if_upgrade_takes_too_long() {
});
}
future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service) {
assert(this_shard_id() == 0);
// The SUPPORTS_RAFT cluster feature is enabled, so the local RAFT feature must also be enabled
@@ -1450,10 +1451,10 @@ future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::quer
}
(void)[] (raft_group0& self, abort_source& as, group0_upgrade_state start_state, gate::holder pause_shutdown, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm) -> future<> {
service::migration_manager& mm, cdc::generation_service& cdc_gen_service) -> future<> {
auto warner = warn_if_upgrade_takes_too_long();
try {
co_await self.do_upgrade_to_group0(start_state, ss, qp, mm);
co_await self.do_upgrade_to_group0(start_state, ss, qp, mm, cdc_gen_service);
co_await self._client.set_group0_upgrade_state(group0_upgrade_state::use_post_raft_procedures);
upgrade_log.info("Raft upgrade finished.");
} catch (...) {
@@ -1462,11 +1463,12 @@ future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::quer
" If the procedure gets stuck, manual recovery may be required."
" Consult the relevant documentation: {}", std::current_exception(), raft_upgrade_doc);
}
}(std::ref(*this), std::ref(_abort_source), start_state, _shutdown_gate.hold(), ss, qp, mm);
}(std::ref(*this), std::ref(_abort_source), start_state, _shutdown_gate.hold(), ss, qp, mm, cdc_gen_service);
}
// `start_state` is either `use_pre_raft_procedures` or `synchronize`.
future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm,
cdc::generation_service& cdc_gen_service) {
assert(this_shard_id() == 0);
// Check if every peer knows about the upgrade procedure.
@@ -1493,7 +1495,7 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state, ser
if (!joined_group0()) {
upgrade_log.info("Joining group 0...");
co_await join_group0(co_await _sys_ks.load_peers(), true, ss, qp, mm);
co_await join_group0(co_await _sys_ks.load_peers(), true, ss, qp, mm, cdc_gen_service);
} else {
upgrade_log.info(
"We're already a member of group 0."

View File

@@ -75,7 +75,6 @@ class raft_group0 {
gms::feature_service& _feat;
db::system_keyspace& _sys_ks;
raft_group0_client& _client;
cdc::generation_service& _cdc_gen_svc;
// Status of leader discovery. Initially there is no group 0,
// and the variant contains no state. During initial cluster
@@ -111,8 +110,7 @@ public:
gms::gossiper& gs,
gms::feature_service& feat,
db::system_keyspace& sys_ks,
raft_group0_client& client,
cdc::generation_service& cdc_gen_svc);
raft_group0_client& client);
// Initialises RPC verbs on all shards.
// Call after construction but before using the object.
@@ -140,7 +138,7 @@ public:
//
// Also make sure to call `finish_setup_after_join` after the node has joined the cluster and entered NORMAL state.
future<> setup_group0(db::system_keyspace&, const std::unordered_set<gms::inet_address>& initial_contact_nodes,
std::optional<replace_info>, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
std::optional<replace_info>, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service);
// Call at the end of the startup procedure, after the node entered NORMAL state.
// `setup_group0()` must have finished earlier.
@@ -149,7 +147,7 @@ public:
//
// If the node has just upgraded, enables a feature listener for the RAFT feature
// which will start a procedure to create group 0 and switch administrative operations to use it.
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service);
// If Raft is disabled or in RECOVERY mode, returns `false`.
// Otherwise:
@@ -237,7 +235,7 @@ private:
future<group0_peer_exchange> peer_exchange(discovery::peer_list peers);
raft_server_for_group create_server_for_group0(raft::group_id id, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm);
service::migration_manager& mm, cdc::generation_service& cdc_gen_service);
// Run the discovery algorithm.
//
@@ -257,10 +255,10 @@ private:
// from places which must not block.
//
// Precondition: the SUPPORTS_RAFT cluster feature is enabled.
future<> upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service);
// Blocking part of `upgrade_to_group0`, runs in background.
future<> do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service);
// Start a Raft server for the cluster-wide group 0 and join it to the group.
// Called during bootstrap or upgrade.
@@ -285,7 +283,7 @@ private:
// Preconditions: Raft local feature enabled
// and we haven't initialized group 0 yet after last Scylla start (`joined_group0()` is false).
// Postcondition: `joined_group0()` is true.
future<> join_group0(std::vector<gms::inet_address> seeds, bool as_voter, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> join_group0(std::vector<gms::inet_address> seeds, bool as_voter, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service);
// Start an existing Raft server for the cluster-wide group 0.
// Assumes the server was already added to the group earlier so we don't attempt to join it again.
@@ -298,7 +296,8 @@ private:
// XXX: perhaps it would be good to make this function callable multiple times,
// if we want to handle crashes of the group 0 server without crashing the entire Scylla process
// (we could then try restarting the server internally).
future<> start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
future<> start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm,
cdc::generation_service& cdc_gen_service);
// Make the given server a non-voter in Raft group 0 configuration.
// Retries on raft::commit_status_unknown.

View File

@@ -1414,7 +1414,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);
assert(_group0);
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info, *this, qp, _migration_manager.local());
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info, *this, qp, _migration_manager.local(), cdc_gen_service);
raft::server* raft_server = co_await [this] () -> future<raft::server*> {
if (!_raft_topology_change_enabled) {
@@ -1480,7 +1480,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
throw std::runtime_error(err);
}
co_await _group0->finish_setup_after_join(*this, qp, _migration_manager.local());
co_await _group0->finish_setup_after_join(*this, qp, _migration_manager.local(), cdc_gen_service);
co_return;
}
@@ -1638,7 +1638,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
}
assert(_group0);
co_await _group0->finish_setup_after_join(*this, qp, _migration_manager.local());
co_await _group0->finish_setup_after_join(*this, qp, _migration_manager.local(), cdc_gen_service);
co_await cdc_gen_service.after_join(std::move(cdc_gen_id));
}

View File

@@ -893,7 +893,7 @@ public:
service::raft_group0 group0_service{
abort_sources.local(), raft_gr.local(), ms,
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client, cdc_generation_service.local()};
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client};
group0_service.start().get();
auto stop_group0_service = defer([&group0_service] {
group0_service.abort().get();