test: cluster_manager: add ability to wait for supervisor STATUS=serving
When running under systemd, ScyllaDB sends a STATUS=serving message to systemd. Co-opt this mechanism by setting up NOTIFY_SOCKET, thus making the cluster manager pretend it is systemd. Users of the cluster manager can now wait for the node to report itself up, rather than having to parse log files or retry connections.
This commit is contained in:
@@ -39,3 +39,4 @@ class ServerUpState(IntEnum):
|
||||
HOST_ID_QUERIED = auto()
|
||||
CQL_CONNECTED = auto()
|
||||
CQL_QUERIED = auto()
|
||||
SERVING = auto() # Scylla sent sd_notify("serving")
|
||||
|
||||
@@ -44,6 +44,7 @@ import platform
|
||||
import contextlib
|
||||
import fcntl
|
||||
import urllib
|
||||
import socket
|
||||
|
||||
import psutil
|
||||
|
||||
@@ -385,6 +386,10 @@ class ScyllaServer:
|
||||
prefix=f"scylladb-{f'{xdist_worker_id}-' if xdist_worker_id else ''}{self.server_id}-test.py-"
|
||||
)
|
||||
self.maintenance_socket_path = f"{self.maintenance_socket_dir.name}/cql.m"
|
||||
# Unix socket for receiving sd_notify messages from Scylla
|
||||
self.notify_socket_path = pathlib.Path(self.maintenance_socket_dir.name) / "notify.sock"
|
||||
self.notify_socket: Optional[socket.socket] = None
|
||||
self._received_serving = False
|
||||
self.exe = pathlib.Path(version.path).resolve()
|
||||
self.vardir = pathlib.Path(vardir)
|
||||
self.logger = logger
|
||||
@@ -712,6 +717,50 @@ class ScyllaServer:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
|
||||
def _setup_notify_socket(self) -> None:
|
||||
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
|
||||
if self.notify_socket is not None:
|
||||
return
|
||||
# Remove existing socket file if present
|
||||
self.notify_socket_path.unlink(missing_ok=True)
|
||||
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC)
|
||||
self.notify_socket.bind(str(self.notify_socket_path))
|
||||
self._received_serving = False
|
||||
|
||||
def _cleanup_notify_socket(self) -> None:
|
||||
"""Clean up the sd_notify socket."""
|
||||
if self.notify_socket is not None:
|
||||
self.notify_socket.close()
|
||||
self.notify_socket = None
|
||||
self.notify_socket_path.unlink(missing_ok=True)
|
||||
|
||||
def check_serving_notification(self) -> bool:
|
||||
"""Check if Scylla has sent the 'serving' sd_notify message.
|
||||
|
||||
Returns True if the SERVING state has been reached.
|
||||
"""
|
||||
if self._received_serving:
|
||||
return True
|
||||
if self.notify_socket is None:
|
||||
return False
|
||||
# Try to read all available messages from the socket
|
||||
while True:
|
||||
try:
|
||||
data = self.notify_socket.recv(4096)
|
||||
# sd_notify message format: "STATUS=serving\n" or "READY=1\nSTATUS=serving\n"
|
||||
message = data.decode('utf-8', errors='replace')
|
||||
if 'STATUS=serving' in message:
|
||||
self._received_serving = True
|
||||
self.logger.debug("Received sd_notify 'serving' message")
|
||||
return True
|
||||
except BlockingIOError:
|
||||
# No more messages available
|
||||
break
|
||||
except Exception as e:
|
||||
self.logger.debug("Error reading from notify socket: %s", e)
|
||||
break
|
||||
return False
|
||||
|
||||
async def try_get_host_id(self, api: ScyllaRESTAPIClient) -> Optional[HostID]:
|
||||
"""Try to get the host id (also tests Scylla REST API is serving)"""
|
||||
|
||||
@@ -754,6 +803,10 @@ class ScyllaServer:
|
||||
env['UBSAN_OPTIONS'] = f'halt_on_error=1:abort_on_error=1:suppressions={TOP_SRC_DIR / "ubsan-suppressions.supp"}'
|
||||
env['ASAN_OPTIONS'] = f'disable_coredump=0:abort_on_error=1:detect_stack_use_after_return=1'
|
||||
|
||||
# Set up socket for receiving sd_notify messages from Scylla
|
||||
self._setup_notify_socket()
|
||||
env['NOTIFY_SOCKET'] = self.notify_socket_path
|
||||
|
||||
# Reopen log file if it was closed (e.g., after a previous stop)
|
||||
if self.log_file is None or self.log_file.closed:
|
||||
self.log_file = self.log_filename.open("ab") # append mode to preserve previous logs
|
||||
@@ -808,7 +861,10 @@ class ScyllaServer:
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
server_up_state = await self.get_cql_up_state() or server_up_state
|
||||
if server_up_state == expected_server_up_state:
|
||||
# Check for SERVING state (sd_notify "serving" message)
|
||||
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
|
||||
server_up_state = ServerUpState.SERVING
|
||||
if server_up_state >= expected_server_up_state:
|
||||
if expected_error is not None:
|
||||
await report_error(
|
||||
f"the node has reached {server_up_state} state,"
|
||||
@@ -847,13 +903,14 @@ class ScyllaServer:
|
||||
session.execute("DROP KEYSPACE k")
|
||||
|
||||
async def shutdown_control_connection(self) -> None:
|
||||
"""Shut down driver connection"""
|
||||
"""Shut down driver connection and notify socket"""
|
||||
if self.control_connection is not None:
|
||||
self.control_connection.shutdown()
|
||||
self.control_connection = None
|
||||
if self.control_cluster is not None:
|
||||
self.control_cluster.shutdown()
|
||||
self.control_cluster = None
|
||||
self._cleanup_notify_socket()
|
||||
|
||||
@stop_event
|
||||
@start_stop_lock
|
||||
|
||||
Reference in New Issue
Block a user