diff --git a/main.cc b/main.cc index c03652e0bc..763b193921 100644 --- a/main.cc +++ b/main.cc @@ -1658,6 +1658,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // Set up group0 service earlier since it is needed by group0 setup just below ss.local().set_group0(group0_service, raft_topology_change_enabled); + // Need to make sure storage service does not use group0 before running group0_service.abort() + auto stop_group0_usage_in_storage_service = defer_verbose_shutdown("group 0 usage in local storage", [&ss] { + ss.local().wait_for_group0_stop().get(); + }); + // Setup group0 early in case the node is bootstrapped already and the group exists. // Need to do it before allowing incomming messaging service connections since // storage proxy's and migration manager's verbs may access group0. diff --git a/service/storage_service.cc b/service/storage_service.cc index 9b63e3ec39..037917e7c1 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1385,6 +1385,8 @@ class topology_coordinator { } } catch (raft::request_aborted&) { slogger.debug("raft topology: CDC generation publisher fiber aborted"); + } catch (seastar::abort_requested_exception) { + slogger.debug("raft topology: CDC generation publisher fiber aborted"); } catch (group0_concurrent_modification&) { } catch (term_changed_error&) { slogger.debug("raft topology: CDC generation publisher fiber notices term change {} -> {}", _term, _raft.get_current_term()); @@ -2640,6 +2642,8 @@ future<> topology_coordinator::run() { } } catch (raft::request_aborted&) { slogger.debug("raft topology: topology change coordinator fiber aborted"); + } catch (seastar::abort_requested_exception&) { + slogger.debug("raft topology: topology change coordinator fiber aborted"); } catch (raft::commit_status_unknown&) { slogger.warn("raft topology: topology change coordinator fiber got commit_status_unknown"); } catch (group0_concurrent_modification&) { @@ -2666,12 +2670,13 @@ future<> topology_coordinator::run() { future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded& sys_dist_ks) { std::optional as; + try { - while (!_abort_source.abort_requested()) { + while (!_group0_as.abort_requested()) { // Wait for a state change in case we are not a leader yet, or we are are the leader // and coordinator work is running (in which case 'as' is engaged) while (!raft.is_leader() || as) { - co_await raft.wait_for_state_change(&_abort_source); + co_await raft.wait_for_state_change(&_group0_as); if (as) { as->request_abort(); // we are no longer a leader, so abort the coordinator co_await std::exchange(_topology_change_coordinator, make_ready_future<>()); @@ -2824,7 +2829,7 @@ public: future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_server, const join_node_request_params& params) { if (_topology_state_machine._topology.is_empty()) { - co_await raft_server.read_barrier(&_abort_source); + co_await raft_server.read_barrier(&_group0_as); } while (_topology_state_machine._topology.is_empty()) { @@ -2833,7 +2838,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se } slogger.info("raft topology: adding myself as the first node to the topology"); - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto insert_join_request_mutation = build_mutation_from_join_params(params, guard); @@ -2847,7 +2852,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "bootstrap: adding myself as the first node to the topology"); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: bootstrap: concurrent operation is detected, retrying."); } @@ -2894,7 +2899,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft while (true) { slogger.info("raft topology: refreshing topology to check if it's synchronized with local metadata"); - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); if (synchronized()) { break; @@ -2929,7 +2934,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id())); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: update topology with local metadata:" " concurrent operation is detected, retrying."); @@ -4264,9 +4269,14 @@ future<> storage_service::stop() { // make sure nobody uses the semaphore node_ops_signal_abort(std::nullopt); _listeners.clear(); - _topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception())); co_await _async_gate.close(); - co_await when_all(std::move(_node_ops_abort_thread), std::move(_raft_state_monitor)); + co_await std::move(_node_ops_abort_thread); +} + +future<> storage_service::wait_for_group0_stop() { + _group0_as.request_abort(); + _topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception())); + co_await std::move(_raft_state_monitor); } future<> storage_service::check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features) { @@ -4618,7 +4628,7 @@ future<> storage_service::raft_decommission() { }); while (true) { - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto it = _topology_state_machine._topology.find(raft_server.id()); if (!it) { @@ -4643,7 +4653,7 @@ future<> storage_service::raft_decommission() { group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}", raft_server.id())); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: decommission: concurrent operation is detected, retrying."); continue; @@ -4968,7 +4978,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::listclient().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto it = _topology_state_machine._topology.find(id); @@ -5008,7 +5018,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::listclient().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id)); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: removenode: concurrent operation is detected, retrying."); continue; @@ -5548,7 +5558,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) { auto& raft_server = _group0->group0_server(); while (true) { - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto it = _topology_state_machine._topology.find(raft_server.id()); if (!it) { @@ -5574,7 +5584,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) { group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc)); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: rebuild: concurrent operation is detected, retrying."); continue; @@ -5593,7 +5603,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { while (true) { slogger.info("raft topology: request check_and_repair_cdc_streams, refreshing topology"); - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto curr_req = _topology_state_machine._topology.global_request; if (curr_req && *curr_req != global_topology_request::new_cdc_generation) { // FIXME: replace this with a queue @@ -5619,7 +5629,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("request check+repair CDC generation from {}", _group0->group0_server().id())); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: request check+repair CDC: concurrent operation is detected, retrying."); continue; @@ -6113,7 +6123,7 @@ future storage_service::raft_topology_cmd_handler(raft try { auto& raft_server = _group0->group0_server(); // do barrier to make sure we always see the latest topology - co_await raft_server.read_barrier(&_abort_source); + co_await raft_server.read_barrier(&_group0_as); if (raft_server.get_current_term() != term) { // Return an error since the command is from outdated leader co_return result; @@ -6441,7 +6451,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, // The coordinator may not execute global token metadata barrier before triggering the operation, so we need // a barrier here to see the token metadata which is at least as recent as that of the sender. auto& raft_server = _group0->group0_server(); - co_await raft_server.read_barrier(&_abort_source); + co_await raft_server.read_barrier(&_group0_as); if (_tablet_ops.contains(tablet)) { slogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet); @@ -6451,7 +6461,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet); auto& as = guard.get_abort_source(); - auto sub = _abort_source.subscribe([&as] () noexcept { + auto sub = _group0_as.subscribe([&as] () noexcept { as.request_abort(); }); @@ -6664,7 +6674,7 @@ future storage_service::join_node_request_handler(join } while (true) { - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); if (const auto *p = _topology_state_machine._topology.find(params.host_id)) { const auto& rs = p->second; @@ -6695,7 +6705,7 @@ future storage_service::join_node_request_handler(join group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, format("raft topology: placing join request for {}", params.host_id)); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); break; } catch (group0_concurrent_modification&) { slogger.info("raft topology: join_node_request: concurrent operation is detected, retrying."); @@ -6722,7 +6732,7 @@ future storage_service::join_node_response_handler(jo auto lock = co_await get_units(_join_node_response_handler_mutex, 1); // Wait until we sent and completed the join_node_request RPC - co_await _join_node_request_done.get_shared_future(_abort_source); + co_await _join_node_request_done.get_shared_future(_group0_as); if (_join_node_response_done.available()) { // We already handled this RPC. No need to retry it. Return immediately for idempotence. @@ -6743,7 +6753,7 @@ future storage_service::join_node_response_handler(jo // Do a read barrier to read/initialize the topology state auto& raft_server = _group0->group0_server(); - co_await raft_server.read_barrier(&_abort_source); + co_await raft_server.read_barrier(&_group0_as); // Calculate nodes to ignore // TODO: ignore_dead_nodes setting for bootstrap @@ -6795,7 +6805,7 @@ future storage_service::join_node_response_handler(jo slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.", untranslated_ids); - co_await sleep_abortable(std::chrono::milliseconds(5), _abort_source); + co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as); } else { break; } @@ -6909,7 +6919,7 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled) }); ser::join_node_rpc_verbs::register_join_node_response(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_response_params params) { return container().invoke_on(0, [dst_id, params = std::move(params)] (auto& ss) mutable -> future { - co_await ss._join_node_group0_started.get_shared_future(ss._abort_source); + co_await ss._join_node_group0_started.get_shared_future(ss._group0_as); if (ss._group0->load_my_id() != dst_id) { throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 1e06804ef7..e6a0a527cf 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -331,6 +331,8 @@ public: future<> stop_transport(); + future<> wait_for_group0_stop(); + private: bool should_bootstrap(); bool is_replacing(); @@ -786,6 +788,9 @@ private: shared_promise<> _join_node_response_done; semaphore _join_node_response_handler_mutex{1}; + // We need to be able to abort all group0 operation during shutdown, so we need special abort source for that + abort_source _group0_as; + friend class join_node_rpc_handshaker; }; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 72e6eb2d8f..56182b9037 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -867,6 +867,10 @@ private: _ss.local().set_group0(group0_service, raft_topology_change_enabled); + auto stop_group0_usage_in_storage_service = defer([this] { + _ss.local().wait_for_group0_stop().get(); + }); + try { _ss.local().join_cluster(_sys_dist_ks, _proxy).get(); } catch (std::exception& e) {