Merge 'abort failed rebuild instead of retrying it forever' from Gleb
Add error handling to rebuild instead of retrying it until succeeds. * 'gleb/rebuild-fail-v2' of github.com:scylladb/scylla-dev: test: add test for rebuild failure test: add expected_error to rebuild_node operation topology_coordinator: Propagate rebuild failure to the initiator
This commit is contained in:
@@ -1803,15 +1803,27 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
}
|
||||
case node_state::rebuilding: {
|
||||
node = co_await exec_direct_command(
|
||||
std::move(node), raft_topology_cmd::command::stream_ranges);
|
||||
bool retake = false;
|
||||
auto id = node.id;
|
||||
topology_mutation_builder builder(node.guard.write_timestamp());
|
||||
builder.del_session();
|
||||
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
|
||||
builder.with_node(node.id)
|
||||
try {
|
||||
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::stream_ranges);
|
||||
rtbuilder.done();
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
|
||||
" (node state is rebuilding): {}", std::current_exception());
|
||||
rtbuilder.done("streaming failed");
|
||||
retake = true;
|
||||
}
|
||||
if (retake) {
|
||||
node = retake_node(co_await start_operation(), id);
|
||||
}
|
||||
builder.del_session().with_node(node.id)
|
||||
.set("node_state", node_state::normal)
|
||||
.del("rebuild_option");
|
||||
rtbuilder.done();
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed");
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -326,10 +326,12 @@ class ManagerClient():
|
||||
timeout=ScyllaServer.TOPOLOGY_TIMEOUT)
|
||||
self._driver_update()
|
||||
|
||||
async def rebuild_node(self, server_id: ServerNum) -> None:
|
||||
async def rebuild_node(self, server_id: ServerNum,
|
||||
expected_error: str | None = None) -> None:
|
||||
"""Tell a node to rebuild with Scylla REST API"""
|
||||
logger.debug("ManagerClient rebuild %s", server_id)
|
||||
await self.client.put_json(f"/cluster/rebuild-node/{server_id}",
|
||||
data = {"expected_error": expected_error}
|
||||
await self.client.put_json(f"/cluster/rebuild-node/{server_id}", data,
|
||||
timeout=ScyllaServer.TOPOLOGY_TIMEOUT)
|
||||
self._driver_update()
|
||||
|
||||
|
||||
@@ -1386,16 +1386,32 @@ class ScyllaClusterManager:
|
||||
async def _cluster_rebuild_node(self, request) -> None:
|
||||
"""Run rebuild node on Scylla REST API for a specified server"""
|
||||
assert self.cluster
|
||||
data = await request.json()
|
||||
server_id = ServerNum(int(request.match_info["server_id"]))
|
||||
self.logger.info("_cluster_rebuild_node %s", server_id)
|
||||
assert server_id in self.cluster.running, "Can't rebuild not running node"
|
||||
server = self.cluster.running[server_id]
|
||||
expected_error = data["expected_error"]
|
||||
try:
|
||||
await self.cluster.api.rebuild_node(server.ip_addr, timeout=ScyllaServer.TOPOLOGY_TIMEOUT)
|
||||
except (RuntimeError, HTTPError) as exc:
|
||||
raise RuntimeError(
|
||||
f"rebuild failed (server: {server}), check log at {server.log_filename},"
|
||||
f" error: \"{exc}\"")
|
||||
if expected_error:
|
||||
if expected_error not in str(exc):
|
||||
raise RuntimeError(
|
||||
f"rebuild failed (server: {server}) but did not contain expected error"
|
||||
f"(\"{expected_error}\", check log file at {server.log_filename}, error: \"{exc}\"")
|
||||
else:
|
||||
return
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"rebuild failed (server: {server}), check log at {server.log_filename},"
|
||||
f" error: \"{exc}\"")
|
||||
else:
|
||||
if expected_error:
|
||||
raise RuntimeError(
|
||||
f"rebuild succeeded when it should have failed (server: {server},"
|
||||
f" expected_error: \"{expected_error}\"), check log file at {server.log_filename}")
|
||||
|
||||
await self.cluster.server_stop(server_id, gracefully=True)
|
||||
|
||||
async def _server_get_config(self, request: aiohttp.web.Request) -> dict[str, object]:
|
||||
|
||||
@@ -63,6 +63,11 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
|
||||
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure, moving transition state to left token ring",
|
||||
from_mark=mark) for log, mark in zip(logs, marks)]
|
||||
assert sum(len(x) for x in matches) == 1
|
||||
# rebuild failure
|
||||
marks = [await log.mark() for log in logs]
|
||||
servers = await manager.running_servers()
|
||||
await manager.api.enable_injection(servers[1].ip_addr, 'stream_ranges_fail', one_shot=True)
|
||||
await manager.rebuild_node(servers[1].server_id, expected_error="rebuild failed:")
|
||||
# replace failure
|
||||
marks = [await log.mark() for log in logs]
|
||||
servers = await manager.running_servers()
|
||||
|
||||
Reference in New Issue
Block a user