Merge 'fix shutdown order between group0 and storage service' from Gleb
Storage service uses group0 internally, but group0 is create long after storage service is initialized and passed to it using ss::set_group0() function. What it means is that during shutdown group0 is destroyed before ss::stop() is called and thus storage service is left with a dangling reference. Fix it by introducing a function that cancels all group0 operations and waits for background fibers to complete. For that we need separate abort source for group0 operation which the patch series also introduces. * 'gleb/group0-ss-shutdown' of github.com:scylladb/scylla-dev: storage_service: topology coordinator: ignore abort_requested_exception in background fibers storage_service: fix de-initialization order between storage service and group0_service
This commit is contained in:
5
main.cc
5
main.cc
@@ -1658,6 +1658,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// Set up group0 service earlier since it is needed by group0 setup just below
|
||||
ss.local().set_group0(group0_service, raft_topology_change_enabled);
|
||||
|
||||
// Need to make sure storage service does not use group0 before running group0_service.abort()
|
||||
auto stop_group0_usage_in_storage_service = defer_verbose_shutdown("group 0 usage in local storage", [&ss] {
|
||||
ss.local().wait_for_group0_stop().get();
|
||||
});
|
||||
|
||||
// Setup group0 early in case the node is bootstrapped already and the group exists.
|
||||
// Need to do it before allowing incomming messaging service connections since
|
||||
// storage proxy's and migration manager's verbs may access group0.
|
||||
|
||||
@@ -1385,6 +1385,8 @@ class topology_coordinator {
|
||||
}
|
||||
} catch (raft::request_aborted&) {
|
||||
slogger.debug("raft topology: CDC generation publisher fiber aborted");
|
||||
} catch (seastar::abort_requested_exception) {
|
||||
slogger.debug("raft topology: CDC generation publisher fiber aborted");
|
||||
} catch (group0_concurrent_modification&) {
|
||||
} catch (term_changed_error&) {
|
||||
slogger.debug("raft topology: CDC generation publisher fiber notices term change {} -> {}", _term, _raft.get_current_term());
|
||||
@@ -2640,6 +2642,8 @@ future<> topology_coordinator::run() {
|
||||
}
|
||||
} catch (raft::request_aborted&) {
|
||||
slogger.debug("raft topology: topology change coordinator fiber aborted");
|
||||
} catch (seastar::abort_requested_exception&) {
|
||||
slogger.debug("raft topology: topology change coordinator fiber aborted");
|
||||
} catch (raft::commit_status_unknown&) {
|
||||
slogger.warn("raft topology: topology change coordinator fiber got commit_status_unknown");
|
||||
} catch (group0_concurrent_modification&) {
|
||||
@@ -2666,12 +2670,13 @@ future<> topology_coordinator::run() {
|
||||
|
||||
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
std::optional<abort_source> as;
|
||||
|
||||
try {
|
||||
while (!_abort_source.abort_requested()) {
|
||||
while (!_group0_as.abort_requested()) {
|
||||
// Wait for a state change in case we are not a leader yet, or we are are the leader
|
||||
// and coordinator work is running (in which case 'as' is engaged)
|
||||
while (!raft.is_leader() || as) {
|
||||
co_await raft.wait_for_state_change(&_abort_source);
|
||||
co_await raft.wait_for_state_change(&_group0_as);
|
||||
if (as) {
|
||||
as->request_abort(); // we are no longer a leader, so abort the coordinator
|
||||
co_await std::exchange(_topology_change_coordinator, make_ready_future<>());
|
||||
@@ -2824,7 +2829,7 @@ public:
|
||||
|
||||
future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_server, const join_node_request_params& params) {
|
||||
if (_topology_state_machine._topology.is_empty()) {
|
||||
co_await raft_server.read_barrier(&_abort_source);
|
||||
co_await raft_server.read_barrier(&_group0_as);
|
||||
}
|
||||
|
||||
while (_topology_state_machine._topology.is_empty()) {
|
||||
@@ -2833,7 +2838,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se
|
||||
}
|
||||
|
||||
slogger.info("raft topology: adding myself as the first node to the topology");
|
||||
auto guard = co_await _group0->client().start_operation(&_abort_source);
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
|
||||
auto insert_join_request_mutation = build_mutation_from_join_params(params, guard);
|
||||
|
||||
@@ -2847,7 +2852,7 @@ future<> storage_service::raft_initialize_discovery_leader(raft::server& raft_se
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
"bootstrap: adding myself as the first node to the topology");
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: bootstrap: concurrent operation is detected, retrying.");
|
||||
}
|
||||
@@ -2894,7 +2899,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
while (true) {
|
||||
slogger.info("raft topology: refreshing topology to check if it's synchronized with local metadata");
|
||||
|
||||
auto guard = co_await _group0->client().start_operation(&_abort_source);
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
|
||||
if (synchronized()) {
|
||||
break;
|
||||
@@ -2929,7 +2934,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id()));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: update topology with local metadata:"
|
||||
" concurrent operation is detected, retrying.");
|
||||
@@ -4264,9 +4269,14 @@ future<> storage_service::stop() {
|
||||
// make sure nobody uses the semaphore
|
||||
node_ops_signal_abort(std::nullopt);
|
||||
_listeners.clear();
|
||||
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
|
||||
co_await _async_gate.close();
|
||||
co_await when_all(std::move(_node_ops_abort_thread), std::move(_raft_state_monitor));
|
||||
co_await std::move(_node_ops_abort_thread);
|
||||
}
|
||||
|
||||
future<> storage_service::wait_for_group0_stop() {
|
||||
_group0_as.request_abort();
|
||||
_topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception()));
|
||||
co_await std::move(_raft_state_monitor);
|
||||
}
|
||||
|
||||
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {
|
||||
@@ -4618,7 +4628,7 @@ future<> storage_service::raft_decommission() {
|
||||
});
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_abort_source);
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
|
||||
auto it = _topology_state_machine._topology.find(raft_server.id());
|
||||
if (!it) {
|
||||
@@ -4643,7 +4653,7 @@ future<> storage_service::raft_decommission() {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}", raft_server.id()));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: decommission: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -4968,7 +4978,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
auto id = raft::server_id{host_id.uuid()};
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_abort_source);
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
|
||||
auto it = _topology_state_machine._topology.find(id);
|
||||
|
||||
@@ -5008,7 +5018,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: removenode: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -5548,7 +5558,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_abort_source);
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
|
||||
auto it = _topology_state_machine._topology.find(raft_server.id());
|
||||
if (!it) {
|
||||
@@ -5574,7 +5584,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: rebuild: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -5593,7 +5603,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
|
||||
while (true) {
|
||||
slogger.info("raft topology: request check_and_repair_cdc_streams, refreshing topology");
|
||||
auto guard = co_await _group0->client().start_operation(&_abort_source);
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
|
||||
// FIXME: replace this with a queue
|
||||
@@ -5619,7 +5629,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: request check+repair CDC: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -6113,7 +6123,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
try {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
// do barrier to make sure we always see the latest topology
|
||||
co_await raft_server.read_barrier(&_abort_source);
|
||||
co_await raft_server.read_barrier(&_group0_as);
|
||||
if (raft_server.get_current_term() != term) {
|
||||
// Return an error since the command is from outdated leader
|
||||
co_return result;
|
||||
@@ -6441,7 +6451,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
|
||||
// The coordinator may not execute global token metadata barrier before triggering the operation, so we need
|
||||
// a barrier here to see the token metadata which is at least as recent as that of the sender.
|
||||
auto& raft_server = _group0->group0_server();
|
||||
co_await raft_server.read_barrier(&_abort_source);
|
||||
co_await raft_server.read_barrier(&_group0_as);
|
||||
|
||||
if (_tablet_ops.contains(tablet)) {
|
||||
slogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet);
|
||||
@@ -6451,7 +6461,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
|
||||
|
||||
locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet);
|
||||
auto& as = guard.get_abort_source();
|
||||
auto sub = _abort_source.subscribe([&as] () noexcept {
|
||||
auto sub = _group0_as.subscribe([&as] () noexcept {
|
||||
as.request_abort();
|
||||
});
|
||||
|
||||
@@ -6664,7 +6674,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_abort_source);
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
|
||||
if (const auto *p = _topology_state_machine._topology.find(params.host_id)) {
|
||||
const auto& rs = p->second;
|
||||
@@ -6695,7 +6705,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
format("raft topology: placing join request for {}", params.host_id));
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: join_node_request: concurrent operation is detected, retrying.");
|
||||
@@ -6722,7 +6732,7 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
|
||||
auto lock = co_await get_units(_join_node_response_handler_mutex, 1);
|
||||
|
||||
// Wait until we sent and completed the join_node_request RPC
|
||||
co_await _join_node_request_done.get_shared_future(_abort_source);
|
||||
co_await _join_node_request_done.get_shared_future(_group0_as);
|
||||
|
||||
if (_join_node_response_done.available()) {
|
||||
// We already handled this RPC. No need to retry it. Return immediately for idempotence.
|
||||
@@ -6743,7 +6753,7 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
|
||||
|
||||
// Do a read barrier to read/initialize the topology state
|
||||
auto& raft_server = _group0->group0_server();
|
||||
co_await raft_server.read_barrier(&_abort_source);
|
||||
co_await raft_server.read_barrier(&_group0_as);
|
||||
|
||||
// Calculate nodes to ignore
|
||||
// TODO: ignore_dead_nodes setting for bootstrap
|
||||
@@ -6795,7 +6805,7 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
|
||||
slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.",
|
||||
untranslated_ids);
|
||||
|
||||
co_await sleep_abortable(std::chrono::milliseconds(5), _abort_source);
|
||||
co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@@ -6909,7 +6919,7 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled)
|
||||
});
|
||||
ser::join_node_rpc_verbs::register_join_node_response(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_response_params params) {
|
||||
return container().invoke_on(0, [dst_id, params = std::move(params)] (auto& ss) mutable -> future<join_node_response_result> {
|
||||
co_await ss._join_node_group0_started.get_shared_future(ss._abort_source);
|
||||
co_await ss._join_node_group0_started.get_shared_future(ss._group0_as);
|
||||
if (ss._group0->load_my_id() != dst_id) {
|
||||
throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id);
|
||||
}
|
||||
|
||||
@@ -331,6 +331,8 @@ public:
|
||||
|
||||
future<> stop_transport();
|
||||
|
||||
future<> wait_for_group0_stop();
|
||||
|
||||
private:
|
||||
bool should_bootstrap();
|
||||
bool is_replacing();
|
||||
@@ -786,6 +788,9 @@ private:
|
||||
shared_promise<> _join_node_response_done;
|
||||
semaphore _join_node_response_handler_mutex{1};
|
||||
|
||||
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
|
||||
abort_source _group0_as;
|
||||
|
||||
friend class join_node_rpc_handshaker;
|
||||
};
|
||||
|
||||
|
||||
@@ -867,6 +867,10 @@ private:
|
||||
|
||||
_ss.local().set_group0(group0_service, raft_topology_change_enabled);
|
||||
|
||||
auto stop_group0_usage_in_storage_service = defer([this] {
|
||||
_ss.local().wait_for_group0_stop().get();
|
||||
});
|
||||
|
||||
try {
|
||||
_ss.local().join_cluster(_sys_dist_ks, _proxy).get();
|
||||
} catch (std::exception& e) {
|
||||
|
||||
Reference in New Issue
Block a user