Merge 'More logging for Raft-based topology' from Kamil Braun

Currently if topology coordinator gets stuck in a CI test run it's hard to debug this (e.g. scylladb/scylladb#16708). We can add a lot of logging inside topology coordinator code to aid debugging, without spamming the logs -- these are relatively rare control plane events.

Closes scylladb/scylladb#16749

* github.com:scylladb/scylladb:
  test/pylib: scylla_cluster: enable raft_topology=debug level by default
  raft topology: increase level of some TRACE messages
  raft topology: log when entering transition states
  raft topology: don't include null ID in exclude_nodes
  raft topology: INFO log when executing global commands and updating topology state
  storage_service: separate logger for raft topology
This commit is contained in:
Pavel Emelyanov
2024-01-19 16:19:44 +03:00
10 changed files with 214 additions and 205 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -129,7 +129,8 @@ SCYLLA_CMDLINE_OPTIONS = [
'--abort-on-lsa-bad-alloc', '1',
'--abort-on-seastar-bad-alloc',
'--abort-on-internal-error', '1',
'--abort-on-ebadf', '1'
'--abort-on-ebadf', '1',
'--logger-log-level', 'raft_topology=debug',
]
# [--smp, 1], [--smp, 2] -> [--smp, 2]

View File

@@ -26,21 +26,21 @@ async def test_no_cleanup_when_unnecessary(request, manager: ManagerClient):
marks = [await log.mark() for log in logs]
await manager.server_add()
await manager.server_add()
matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 0
servers = await manager.running_servers()
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
marks = [await log.mark() for log in logs]
await manager.decommission_node(servers[4].server_id)
matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 4
servers = await manager.running_servers()
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
marks = [await log.mark() for log in logs]
await manager.decommission_node(servers[3].server_id)
matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 0
await manager.server_add()
@@ -48,11 +48,11 @@ async def test_no_cleanup_when_unnecessary(request, manager: ManagerClient):
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
marks = [await log.mark() for log in logs]
await manager.api.client.post("/storage_service/cleanup_all", servers[0].ip_addr)
matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 3
marks = [await log.mark() for log in logs]
await manager.decommission_node(servers[3].server_id)
matches = [await log.grep("storage_service - raft topology: start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - start cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 0

View File

@@ -40,7 +40,7 @@ async def test_coordinator_queue_management(manager: ManagerClient):
done, pending = await asyncio.wait(search, return_when = asyncio.FIRST_COMPLETED)
for t in pending: t.cancel()
[await l.wait_for("raft topology: removenode: wait for completion", m) for l, m in zip(logs[:2], marks[:2])]
[await l.wait_for("raft_topology - removenode: wait for completion", m) for l, m in zip(logs[:2], marks[:2])]
[await manager.api.message_injection(s.ip_addr, inj) for s in servers[:3]]
@@ -61,7 +61,7 @@ async def test_coordinator_queue_management(manager: ManagerClient):
done, pending = await asyncio.wait(search, return_when = asyncio.FIRST_COMPLETED)
for t in pending: t.cancel()
logs[1].wait_for("raft topology: decommission: wait for completion", marks[1])
logs[1].wait_for("raft_topology - decommission: wait for completion", marks[1])
[await manager.api.message_injection(s.ip_addr, inj) for s in servers[:3]]

View File

@@ -27,7 +27,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors")
servers = await manager.running_servers()
assert len(servers) == 3
matches = [await log.grep("storage_service - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
# remove failure
marks = [await log.mark() for log in logs]
@@ -36,7 +36,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_stop_gracefully(servers[3].server_id)
await manager.api.enable_injection(servers[2].ip_addr, 'stream_ranges_fail', one_shot=True)
await manager.remove_node(servers[0].server_id, servers[3].server_id, expected_error="Removenode failed. See earlier errors")
matches = [await log.grep("storage_service - rollback.*after removing failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after removing failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
await manager.server_start(servers[3].server_id)
await manager.servers_see_each_other(servers)
@@ -49,7 +49,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("storage_service - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
# bootstrap failure in raft barrier
marks = [await log.mark() for log in logs]
@@ -59,7 +59,7 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("storage_service - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
# replace failure
marks = [await log.mark() for log in logs]
@@ -72,5 +72,5 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Replace failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("storage_service - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1

View File

@@ -44,4 +44,4 @@ async def test_removing_alive_node_fails(manager: ManagerClient) -> None:
# topology_coordinator::handle_node_transition).
logging.info(f"Removing {srv3} initiated by {srv2}")
await manager.remove_node(srv2.server_id, srv3.server_id, [], "Removenode failed. See earlier errors", False)
await log_file1.wait_for("raft topology: rejected removenode operation for node", timeout=60)
await log_file1.wait_for("raft_topology - rejected removenode operation for node", timeout=60)

View File

@@ -38,7 +38,7 @@ async def test_tablet_drain_failure_during_decommission(manager: ManagerClient):
await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors")
matches = [await log.grep("storage_service - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
await cql.run_async("DROP KEYSPACE test;")

View File

@@ -50,7 +50,7 @@ async def test_nodes_with_different_smp(request: FixtureRequest, manager: Manage
log_args = [
'--default-log-level', 'debug',
'--logger-log-level', 'raft_group0=trace:group0_client=trace:storage_service=trace'
':raft=trace:raft_group_registry=trace'
':raft=trace:raft_group_registry=trace:raft_topology=trace'
]
logger.info(f'Adding --smp=3 server')

View File

@@ -26,7 +26,7 @@ async def test_current_cdc_generation_is_not_removed(manager: ManagerClient):
# We enable the injection to ensure that a too-late timestamp does not prevent removing the CDC generation.
logger.info("Bootstrapping first node")
server = await manager.server_add(
cmdline=['--logger-log-level', 'storage_service=trace'],
cmdline=['--logger-log-level', 'storage_service=trace:raft_topology=trace'],
config={'error_injections_at_startup': ['clean_obsolete_cdc_generations_ignore_ts']}
)
@@ -53,7 +53,7 @@ async def test_dependency_on_timestamps(manager: ManagerClient):
and the topology coordinator's clock is too small. Then, test that the CDC generation publisher removes
the clean-up candidate (together with older generations) if its timestamp is old enough."""
logger.info("Bootstrapping first node")
servers = [await manager.server_add(cmdline=['--logger-log-level', 'storage_service=trace'])]
servers = [await manager.server_add(cmdline=['--logger-log-level', 'storage_service=trace:raft_topology=trace'])]
log_file1 = await manager.server_open_log(servers[0].server_id)
mark: Optional[int] = None

View File

@@ -79,6 +79,7 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m
'--logger-log-level', 'query_processor=trace',
'--logger-log-level', 'gossip=trace',
'--logger-log-level', 'storage_service=trace',
'--logger-log-level', 'raft_topology=trace',
'--logger-log-level', 'messaging_service=trace',
'--logger-log-level', 'rpc=trace',
]
@@ -237,6 +238,7 @@ async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=trace',
'--logger-log-level', 'raft_topology=trace',
]
servers = [await manager.server_add(cmdline=cmdline)]