From 8ed8b151da732260e2d6322839cffb5b497103ab Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 30 Nov 2023 11:10:26 +0200 Subject: [PATCH 1/2] storage_service: fix de-initialization order between storage service and group0_service Storage service uses group0 internally, but group0 is create long after storage service is initialized and passed to it using ss::set_group0() function. But what it means is that during shutdown group0 is destroyed before ss::stop() is called and thus storage service is left with a dangling reference. Fix it by introducing a function that cancels all group0 operations and waits for background fibers to complete. For that we need separate abort source for group0 operation which the patch also introduces. --- main.cc | 5 ++++ service/storage_service.cc | 58 +++++++++++++++++++++----------------- service/storage_service.hh | 5 ++++ test/lib/cql_test_env.cc | 4 +++ 4 files changed, 46 insertions(+), 26 deletions(-) diff --git a/main.cc b/main.cc index 63777e55b6..9a1c0a7afa 100644 --- a/main.cc +++ b/main.cc @@ -1662,6 +1662,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // Set up group0 service earlier since it is needed by group0 setup just below ss.local().set_group0(group0_service, raft_topology_change_enabled); + // Need to make sure storage service does not use group0 before running group0_service.abort() + auto stop_group0_usage_in_storage_service = defer_verbose_shutdown("group 0 usage in local storage", [&ss] { + ss.local().wait_for_group0_stop().get(); + }); + // Setup group0 early in case the node is bootstrapped already and the group exists. // Need to do it before allowing incomming messaging service connections since // storage proxy's and migration manager's verbs may access group0. diff --git a/service/storage_service.cc b/service/storage_service.cc index 4d9d8ac45e..98132954ed 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2666,12 +2666,13 @@ future<> topology_coordinator::run() { future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded& sys_dist_ks) { std::optional as; + try { - while (!_abort_source.abort_requested()) { + while (!_group0_as.abort_requested()) { // Wait for a state change in case we are not a leader yet, or we are are the leader // and coordinator work is running (in which case 'as' is engaged) while (!raft.is_leader() || as) { - co_await raft.wait_for_state_change(&_abort_source); + co_await raft.wait_for_state_change(&_group0_as); if (as) { as->request_abort(); // we are no longer a leader, so abort the coordinator co_await std::exchange(_topology_change_coordinator, make_ready_future<>()); @@ -2824,7 +2825,7 @@ public: future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_server, const join_node_request_params& params) { if (_topology_state_machine._topology.is_empty()) { - co_await raft_server.read_barrier(&_abort_source); + co_await raft_server.read_barrier(&_group0_as); } while (_topology_state_machine._topology.is_empty()) { @@ -2833,7 +2834,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se } slogger.info("raft topology: adding myself as the first node to the topology"); - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto insert_join_request_mutation = build_mutation_from_join_params(params, guard); @@ -2847,7 +2848,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "bootstrap: adding myself as the first node to the topology"); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: bootstrap: concurrent operation is detected, retrying."); } @@ -2894,7 +2895,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft while (true) { slogger.info("raft topology: refreshing topology to check if it's synchronized with local metadata"); - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); if (synchronized()) { break; @@ -2929,7 +2930,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id())); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: update topology with local metadata:" " concurrent operation is detected, retrying."); @@ -4264,9 +4265,14 @@ future<> storage_service::stop() { // make sure nobody uses the semaphore node_ops_signal_abort(std::nullopt); _listeners.clear(); - _topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception())); co_await _async_gate.close(); - co_await when_all(std::move(_node_ops_abort_thread), std::move(_raft_state_monitor)); + co_await std::move(_node_ops_abort_thread); +} + +future<> storage_service::wait_for_group0_stop() { + _group0_as.request_abort(); + _topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception())); + co_await std::move(_raft_state_monitor); } future<> storage_service::check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features) { @@ -4618,7 +4624,7 @@ future<> storage_service::raft_decommission() { }); while (true) { - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto it = _topology_state_machine._topology.find(raft_server.id()); if (!it) { @@ -4643,7 +4649,7 @@ future<> storage_service::raft_decommission() { group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}", raft_server.id())); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: decommission: concurrent operation is detected, retrying."); continue; @@ -4968,7 +4974,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::listclient().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto it = _topology_state_machine._topology.find(id); @@ -5008,7 +5014,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::listclient().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id)); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: removenode: concurrent operation is detected, retrying."); continue; @@ -5548,7 +5554,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) { auto& raft_server = _group0->group0_server(); while (true) { - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto it = _topology_state_machine._topology.find(raft_server.id()); if (!it) { @@ -5574,7 +5580,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) { group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc)); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: rebuild: concurrent operation is detected, retrying."); continue; @@ -5593,7 +5599,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { while (true) { slogger.info("raft topology: request check_and_repair_cdc_streams, refreshing topology"); - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); auto curr_req = _topology_state_machine._topology.global_request; if (curr_req && *curr_req != global_topology_request::new_cdc_generation) { // FIXME: replace this with a queue @@ -5619,7 +5625,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("request check+repair CDC generation from {}", _group0->group0_server().id())); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { slogger.info("raft topology: request check+repair CDC: concurrent operation is detected, retrying."); continue; @@ -6113,7 +6119,7 @@ future storage_service::raft_topology_cmd_handler(raft try { auto& raft_server = _group0->group0_server(); // do barrier to make sure we always see the latest topology - co_await raft_server.read_barrier(&_abort_source); + co_await raft_server.read_barrier(&_group0_as); if (raft_server.get_current_term() != term) { // Return an error since the command is from outdated leader co_return result; @@ -6441,7 +6447,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, // The coordinator may not execute global token metadata barrier before triggering the operation, so we need // a barrier here to see the token metadata which is at least as recent as that of the sender. auto& raft_server = _group0->group0_server(); - co_await raft_server.read_barrier(&_abort_source); + co_await raft_server.read_barrier(&_group0_as); if (_tablet_ops.contains(tablet)) { slogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet); @@ -6451,7 +6457,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet); auto& as = guard.get_abort_source(); - auto sub = _abort_source.subscribe([&as] () noexcept { + auto sub = _group0_as.subscribe([&as] () noexcept { as.request_abort(); }); @@ -6664,7 +6670,7 @@ future storage_service::join_node_request_handler(join } while (true) { - auto guard = co_await _group0->client().start_operation(&_abort_source); + auto guard = co_await _group0->client().start_operation(&_group0_as); if (const auto *p = _topology_state_machine._topology.find(params.host_id)) { const auto& rs = p->second; @@ -6695,7 +6701,7 @@ future storage_service::join_node_request_handler(join group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, format("raft topology: placing join request for {}", params.host_id)); try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); break; } catch (group0_concurrent_modification&) { slogger.info("raft topology: join_node_request: concurrent operation is detected, retrying."); @@ -6722,7 +6728,7 @@ future storage_service::join_node_response_handler(jo auto lock = co_await get_units(_join_node_response_handler_mutex, 1); // Wait until we sent and completed the join_node_request RPC - co_await _join_node_request_done.get_shared_future(_abort_source); + co_await _join_node_request_done.get_shared_future(_group0_as); if (_join_node_response_done.available()) { // We already handled this RPC. No need to retry it. Return immediately for idempotence. @@ -6743,7 +6749,7 @@ future storage_service::join_node_response_handler(jo // Do a read barrier to read/initialize the topology state auto& raft_server = _group0->group0_server(); - co_await raft_server.read_barrier(&_abort_source); + co_await raft_server.read_barrier(&_group0_as); // Calculate nodes to ignore // TODO: ignore_dead_nodes setting for bootstrap @@ -6795,7 +6801,7 @@ future storage_service::join_node_response_handler(jo slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.", untranslated_ids); - co_await sleep_abortable(std::chrono::milliseconds(5), _abort_source); + co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as); } else { break; } @@ -6909,7 +6915,7 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled) }); ser::join_node_rpc_verbs::register_join_node_response(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_response_params params) { return container().invoke_on(0, [dst_id, params = std::move(params)] (auto& ss) mutable -> future { - co_await ss._join_node_group0_started.get_shared_future(ss._abort_source); + co_await ss._join_node_group0_started.get_shared_future(ss._group0_as); if (ss._group0->load_my_id() != dst_id) { throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 1e06804ef7..e6a0a527cf 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -331,6 +331,8 @@ public: future<> stop_transport(); + future<> wait_for_group0_stop(); + private: bool should_bootstrap(); bool is_replacing(); @@ -786,6 +788,9 @@ private: shared_promise<> _join_node_response_done; semaphore _join_node_response_handler_mutex{1}; + // We need to be able to abort all group0 operation during shutdown, so we need special abort source for that + abort_source _group0_as; + friend class join_node_rpc_handshaker; }; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 8f5f517f6a..ba5a45d898 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -867,6 +867,10 @@ private: _ss.local().set_group0(group0_service, raft_topology_change_enabled); + auto stop_group0_usage_in_storage_service = defer([this] { + _ss.local().wait_for_group0_stop().get(); + }); + try { _ss.local().join_cluster(_sys_dist_ks, _proxy).get(); } catch (std::exception& e) { From 3ddc1458ee12ea0315c9be666dfd496c17e9aa9d Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 30 Nov 2023 13:34:58 +0200 Subject: [PATCH 2/2] storage_service: topology coordinator: ignore abort_requested_exception in background fibers The exception may be thrown by "event" CV during shutdown. --- service/storage_service.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 98132954ed..aaae43b3d8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1385,6 +1385,8 @@ class topology_coordinator { } } catch (raft::request_aborted&) { slogger.debug("raft topology: CDC generation publisher fiber aborted"); + } catch (seastar::abort_requested_exception) { + slogger.debug("raft topology: CDC generation publisher fiber aborted"); } catch (group0_concurrent_modification&) { } catch (term_changed_error&) { slogger.debug("raft topology: CDC generation publisher fiber notices term change {} -> {}", _term, _raft.get_current_term()); @@ -2640,6 +2642,8 @@ future<> topology_coordinator::run() { } } catch (raft::request_aborted&) { slogger.debug("raft topology: topology change coordinator fiber aborted"); + } catch (seastar::abort_requested_exception&) { + slogger.debug("raft topology: topology change coordinator fiber aborted"); } catch (raft::commit_status_unknown&) { slogger.warn("raft topology: topology change coordinator fiber got commit_status_unknown"); } catch (group0_concurrent_modification&) {