test/topology: Move helpers to get tablet replicas to pylib
These are very useful and will be used across different test files soon Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
61
test/pylib/tablets.py
Normal file
61
test/pylib/tablets.py
Normal file
@@ -0,0 +1,61 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
from test.pylib.util import read_barrier
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.internal_types import ServerInfo, HostID
|
||||
from typing import NamedTuple
|
||||
|
||||
class TabletReplicas(NamedTuple):
|
||||
last_token: int
|
||||
replicas: list[tuple[HostID, int]]
|
||||
|
||||
async def get_all_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str) -> list[TabletReplicas]:
|
||||
"""
|
||||
Retrieves the tablet distribution for a given table.
|
||||
This call is guaranteed to see all prior changes applied to group0 tables.
|
||||
|
||||
:param server: server to query. Can be any live node.
|
||||
"""
|
||||
|
||||
host = manager.get_cql().cluster.metadata.get_host(server.ip_addr)
|
||||
|
||||
# read_barrier is needed to ensure that local tablet metadata on the queried node
|
||||
# reflects the finalized tablet movement.
|
||||
await read_barrier(manager.get_cql(), host)
|
||||
|
||||
table_id = await manager.get_table_id(keyspace_name, table_name)
|
||||
rows = await manager.get_cql().run_async(f"SELECT last_token, replicas FROM system.tablets where "
|
||||
f"table_id = {table_id}", host=host)
|
||||
return [TabletReplicas(
|
||||
last_token=x.last_token,
|
||||
replicas=[(HostID(str(host)), shard) for (host, shard) in x.replicas]
|
||||
) for x in rows]
|
||||
|
||||
async def get_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, token: int) -> list[tuple[HostID, int]]:
|
||||
"""
|
||||
Gets tablet replicas of the tablet which owns a given token of a given table.
|
||||
This call is guaranteed to see all prior changes applied to group0 tables.
|
||||
|
||||
:param server: server to query. Can be any live node.
|
||||
"""
|
||||
rows = await get_all_tablet_replicas(manager, server, keyspace_name, table_name)
|
||||
for row in rows:
|
||||
if row.last_token >= token:
|
||||
return row.replicas
|
||||
return []
|
||||
|
||||
|
||||
async def get_tablet_replica(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, token: int) -> tuple[HostID, int]:
|
||||
"""
|
||||
Get the first replica of the tablet which owns a given token of a given table.
|
||||
This call is guaranteed to see all prior changes applied to group0 tables.
|
||||
|
||||
:param server: server to query. Can be any live node.
|
||||
"""
|
||||
replicas = await get_tablet_replicas(manager, server, keyspace_name, table_name, token)
|
||||
return replicas[0]
|
||||
|
||||
@@ -10,16 +10,15 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot, HTTPError
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, read_barrier
|
||||
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.topology.util import reconnect_driver
|
||||
from test.pylib.internal_types import HostID
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
import random
|
||||
from typing import NamedTuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -33,56 +32,6 @@ async def inject_error_on(manager, error_name, servers):
|
||||
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
|
||||
await asyncio.gather(*errs)
|
||||
|
||||
class TabletReplicas(NamedTuple):
|
||||
last_token: int
|
||||
replicas: list[tuple[HostID, int]]
|
||||
|
||||
async def get_all_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str) -> list[TabletReplicas]:
|
||||
"""
|
||||
Retrieves the tablet distribution for a given table.
|
||||
This call is guaranteed to see all prior changes applied to group0 tables.
|
||||
|
||||
:param server: server to query. Can be any live node.
|
||||
"""
|
||||
|
||||
host = manager.get_cql().cluster.metadata.get_host(server.ip_addr)
|
||||
|
||||
# read_barrier is needed to ensure that local tablet metadata on the queried node
|
||||
# reflects the finalized tablet movement.
|
||||
await read_barrier(manager.get_cql(), host)
|
||||
|
||||
table_id = await manager.get_table_id(keyspace_name, table_name)
|
||||
rows = await manager.get_cql().run_async(f"SELECT last_token, replicas FROM system.tablets where "
|
||||
f"table_id = {table_id}", host=host)
|
||||
return [TabletReplicas(
|
||||
last_token=x.last_token,
|
||||
replicas=[(HostID(str(host)), shard) for (host, shard) in x.replicas]
|
||||
) for x in rows]
|
||||
|
||||
async def get_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, token: int) -> list[tuple[HostID, int]]:
|
||||
"""
|
||||
Gets tablet replicas of the tablet which owns a given token of a given table.
|
||||
This call is guaranteed to see all prior changes applied to group0 tables.
|
||||
|
||||
:param server: server to query. Can be any live node.
|
||||
"""
|
||||
rows = await get_all_tablet_replicas(manager, server, keyspace_name, table_name)
|
||||
for row in rows:
|
||||
if row.last_token >= token:
|
||||
return row.replicas
|
||||
return []
|
||||
|
||||
|
||||
async def get_tablet_replica(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, token: int) -> tuple[HostID, int]:
|
||||
"""
|
||||
Get the first replica of the tablet which owns a given token of a given table.
|
||||
This call is guaranteed to see all prior changes applied to group0 tables.
|
||||
|
||||
:param server: server to query. Can be any live node.
|
||||
"""
|
||||
replicas = await get_tablet_replicas(manager, server, keyspace_name, table_name, token)
|
||||
return replicas[0]
|
||||
|
||||
async def repair_on_node(manager: ManagerClient, server: ServerInfo, servers: list[ServerInfo]):
|
||||
node = server.ip_addr
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
Reference in New Issue
Block a user