generic_server: convert connection tracking to seastar::gate
If we call server::stop() right after "server" construction, it hangs:
With the server never listening (never accepting connections and never
serving connections), nothing ever calls server::maybe_stop().
Consequently,
co_await _all_connections_stopped.get_future();
at the end of server::stop() deadlocks.
Such a server::stop() call does occur in controller::do_start_server()
[transport/controller.cc], when
- cserver->start() (sharded<cql_server>::start()) constructs a
"server"-derived object,
- start_listening_on_tcp_sockets() throws an exception before reaching
listen_on_all_shards() (for example because it fails to set up client
encryption -- certificate file is inaccessible etc.),
- the "deferred_action"
cserver->stop().get();
is invoked during cleanup.
(The cserver->stop() call exposing the connection tracking problem dates
back to commit ae4d5a60ca ("transport::controller: Shut down distributed
object on startup exception", 2020-11-25), and it's been triggerable
through the above code path since commit 6b178f9a4a
("transport/controller: split configuring sockets into separate
functions", 2024-02-05).)
Tracking live connections and connection acceptances seems like a good fit
for "seastar::gate", so rewrite the tracking with that. "seastar::gate"
can be closed (and the returned future can be waited for) without anyone
ever having entered the gate.
NOTE: this change makes it quite clear that neither server::stop() nor
server::shutdown() must be called multiple times. The permitted sequences
are:
- server::shutdown() + server::stop()
- or just server::stop().
Fixes #10305
Signed-off-by: Laszlo Ersek <laszlo.ersek@scylladb.com>
This commit is contained in:
@@ -21,15 +21,14 @@ connection::connection(server& server, connected_socket&& fd)
|
||||
, _fd{std::move(fd)}
|
||||
, _read_buf(_fd.input())
|
||||
, _write_buf(_fd.output())
|
||||
, _hold_server(_server._gate)
|
||||
{
|
||||
++_server._total_connections;
|
||||
++_server._current_connections;
|
||||
_server._connections_list.push_back(*this);
|
||||
}
|
||||
|
||||
connection::~connection()
|
||||
{
|
||||
--_server._current_connections;
|
||||
server::connections_list_t::iterator iter = _server._connections_list.iterator_to(*this);
|
||||
for (auto&& gi : _server._gentle_iterators) {
|
||||
if (gi.iter == iter) {
|
||||
@@ -37,7 +36,6 @@ connection::~connection()
|
||||
}
|
||||
}
|
||||
_server._connections_list.erase(iter);
|
||||
_server.maybe_stop();
|
||||
}
|
||||
|
||||
future<> server::for_each_gently(noncopyable_function<future<>(connection&)> fn) {
|
||||
@@ -115,15 +113,15 @@ server::~server()
|
||||
}
|
||||
|
||||
future<> server::stop() {
|
||||
if (!_stopping) {
|
||||
if (!_gate.is_closed()) {
|
||||
co_await shutdown();
|
||||
}
|
||||
|
||||
co_await _all_connections_stopped.get_future();
|
||||
co_await std::move(_all_connections_stopped);
|
||||
}
|
||||
|
||||
future<> server::shutdown() {
|
||||
_stopping = true;
|
||||
_all_connections_stopped = _gate.close();
|
||||
size_t nr = 0;
|
||||
size_t nr_total = _listeners.size();
|
||||
_logger.debug("abort accept nr_total={}", nr_total);
|
||||
@@ -175,12 +173,10 @@ server::listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_bu
|
||||
|
||||
future<> server::do_accepts(int which, bool keepalive, socket_address server_addr) {
|
||||
return repeat([this, which, keepalive, server_addr] {
|
||||
++_connections_being_accepted;
|
||||
return _listeners[which].accept().then_wrapped([this, keepalive, server_addr] (future<accept_result> f_cs_sa) mutable {
|
||||
--_connections_being_accepted;
|
||||
if (_stopping) {
|
||||
seastar::gate::holder holder(_gate);
|
||||
return _listeners[which].accept().then_wrapped([this, keepalive, server_addr, holder = std::move(holder)] (future<accept_result> f_cs_sa) mutable {
|
||||
if (_gate.is_closed()) {
|
||||
f_cs_sa.ignore_ready_future();
|
||||
maybe_stop();
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
auto cs_sa = f_cs_sa.get();
|
||||
@@ -231,11 +227,4 @@ server::unadvertise_connection(shared_ptr<generic_server::connection> raw_conn)
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Signal that all connections are stopped if the server is stopping and can be stopped.
|
||||
void server::maybe_stop() {
|
||||
if (_stopping && !_connections_being_accepted && !_current_connections) {
|
||||
_all_connections_stopped.set_value();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ protected:
|
||||
output_stream<char> _write_buf;
|
||||
future<> _ready_to_respond = make_ready_future<>();
|
||||
seastar::gate _pending_requests_gate;
|
||||
seastar::gate::holder _hold_server;
|
||||
|
||||
public:
|
||||
connection(server& server, connected_socket&& fd);
|
||||
@@ -77,10 +78,8 @@ class server {
|
||||
protected:
|
||||
sstring _server_name;
|
||||
logging::logger& _logger;
|
||||
bool _stopping = false;
|
||||
promise<> _all_connections_stopped;
|
||||
uint64_t _current_connections = 0;
|
||||
uint64_t _connections_being_accepted = 0;
|
||||
seastar::gate _gate;
|
||||
future<> _all_connections_stopped = make_ready_future<>();
|
||||
uint64_t _total_connections = 0;
|
||||
future<> _listeners_stopped = make_ready_future<>();
|
||||
using connections_list_t = boost::intrusive::list<connection>;
|
||||
@@ -120,8 +119,6 @@ protected:
|
||||
virtual future<> unadvertise_connection(shared_ptr<connection> conn);
|
||||
|
||||
future<> for_each_gently(noncopyable_function<future<>(connection&)>);
|
||||
|
||||
void maybe_stop();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user