diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 7447ed8804..1e100cb477 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1803,15 +1803,27 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { break; } case node_state::rebuilding: { - node = co_await exec_direct_command( - std::move(node), raft_topology_cmd::command::stream_ranges); + bool retake = false; + auto id = node.id; topology_mutation_builder builder(node.guard.write_timestamp()); - builder.del_session(); topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id); - builder.with_node(node.id) + try { + node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::stream_ranges); + rtbuilder.done(); + } catch (term_changed_error&) { + throw; + } catch (...) { + rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception" + " (node state is rebuilding): {}", std::current_exception()); + rtbuilder.done("streaming failed"); + retake = true; + } + if (retake) { + node = retake_node(co_await start_operation(), id); + } + builder.del_session().with_node(node.id) .set("node_state", node_state::normal) .del("rebuild_option"); - rtbuilder.done(); co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed"); } break; diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 56e7522ee3..16c54c2fa0 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -326,10 +326,12 @@ class ManagerClient(): timeout=ScyllaServer.TOPOLOGY_TIMEOUT) self._driver_update() - async def rebuild_node(self, server_id: ServerNum) -> None: + async def rebuild_node(self, server_id: ServerNum, + expected_error: str | None = None) -> None: """Tell a node to rebuild with Scylla REST API""" logger.debug("ManagerClient rebuild %s", server_id) - await self.client.put_json(f"/cluster/rebuild-node/{server_id}", + data = {"expected_error": expected_error} + await self.client.put_json(f"/cluster/rebuild-node/{server_id}", data, timeout=ScyllaServer.TOPOLOGY_TIMEOUT) self._driver_update() diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index 20779e17c4..c7721bd603 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -1386,16 +1386,32 @@ class ScyllaClusterManager: async def _cluster_rebuild_node(self, request) -> None: """Run rebuild node on Scylla REST API for a specified server""" assert self.cluster + data = await request.json() server_id = ServerNum(int(request.match_info["server_id"])) self.logger.info("_cluster_rebuild_node %s", server_id) assert server_id in self.cluster.running, "Can't rebuild not running node" server = self.cluster.running[server_id] + expected_error = data["expected_error"] try: await self.cluster.api.rebuild_node(server.ip_addr, timeout=ScyllaServer.TOPOLOGY_TIMEOUT) except (RuntimeError, HTTPError) as exc: - raise RuntimeError( - f"rebuild failed (server: {server}), check log at {server.log_filename}," - f" error: \"{exc}\"") + if expected_error: + if expected_error not in str(exc): + raise RuntimeError( + f"rebuild failed (server: {server}) but did not contain expected error" + f"(\"{expected_error}\", check log file at {server.log_filename}, error: \"{exc}\"") + else: + return + else: + raise RuntimeError( + f"rebuild failed (server: {server}), check log at {server.log_filename}," + f" error: \"{exc}\"") + else: + if expected_error: + raise RuntimeError( + f"rebuild succeeded when it should have failed (server: {server}," + f" expected_error: \"{expected_error}\"), check log file at {server.log_filename}") + await self.cluster.server_stop(server_id, gracefully=True) async def _server_get_config(self, request: aiohttp.web.Request) -> dict[str, object]: diff --git a/test/topology/test_topology_failure_recovery.py b/test/topology/test_topology_failure_recovery.py index a1b62065f8..c83019c358 100644 --- a/test/topology/test_topology_failure_recovery.py +++ b/test/topology/test_topology_failure_recovery.py @@ -63,6 +63,11 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure, moving transition state to left token ring", from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 + # rebuild failure + marks = [await log.mark() for log in logs] + servers = await manager.running_servers() + await manager.api.enable_injection(servers[1].ip_addr, 'stream_ranges_fail', one_shot=True) + await manager.rebuild_node(servers[1].server_id, expected_error="rebuild failed:") # replace failure marks = [await log.mark() for log in logs] servers = await manager.running_servers()