diff --git a/test/pylib/tablets.py b/test/pylib/tablets.py new file mode 100644 index 0000000000..b2fec1fbe5 --- /dev/null +++ b/test/pylib/tablets.py @@ -0,0 +1,61 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# + +from test.pylib.util import read_barrier +from test.pylib.manager_client import ManagerClient +from test.pylib.internal_types import ServerInfo, HostID +from typing import NamedTuple + +class TabletReplicas(NamedTuple): + last_token: int + replicas: list[tuple[HostID, int]] + +async def get_all_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str) -> list[TabletReplicas]: + """ + Retrieves the tablet distribution for a given table. + This call is guaranteed to see all prior changes applied to group0 tables. + + :param server: server to query. Can be any live node. + """ + + host = manager.get_cql().cluster.metadata.get_host(server.ip_addr) + + # read_barrier is needed to ensure that local tablet metadata on the queried node + # reflects the finalized tablet movement. + await read_barrier(manager.get_cql(), host) + + table_id = await manager.get_table_id(keyspace_name, table_name) + rows = await manager.get_cql().run_async(f"SELECT last_token, replicas FROM system.tablets where " + f"table_id = {table_id}", host=host) + return [TabletReplicas( + last_token=x.last_token, + replicas=[(HostID(str(host)), shard) for (host, shard) in x.replicas] + ) for x in rows] + +async def get_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, token: int) -> list[tuple[HostID, int]]: + """ + Gets tablet replicas of the tablet which owns a given token of a given table. + This call is guaranteed to see all prior changes applied to group0 tables. + + :param server: server to query. Can be any live node. + """ + rows = await get_all_tablet_replicas(manager, server, keyspace_name, table_name) + for row in rows: + if row.last_token >= token: + return row.replicas + return [] + + +async def get_tablet_replica(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, token: int) -> tuple[HostID, int]: + """ + Get the first replica of the tablet which owns a given token of a given table. + This call is guaranteed to see all prior changes applied to group0 tables. + + :param server: server to query. Can be any live node. + """ + replicas = await get_tablet_replicas(manager, server, keyspace_name, table_name, token) + return replicas[0] + diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 6a1163d7f9..b8986fa0aa 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -10,16 +10,15 @@ from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error_one_shot, HTTPError from test.pylib.rest_client import inject_error from test.pylib.util import wait_for_cql_and_get_hosts, read_barrier +from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas from test.topology.conftest import skip_mode from test.topology.util import reconnect_driver -from test.pylib.internal_types import HostID import pytest import asyncio import logging import time import random -from typing import NamedTuple logger = logging.getLogger(__name__) @@ -33,56 +32,6 @@ async def inject_error_on(manager, error_name, servers): errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers] await asyncio.gather(*errs) -class TabletReplicas(NamedTuple): - last_token: int - replicas: list[tuple[HostID, int]] - -async def get_all_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str) -> list[TabletReplicas]: - """ - Retrieves the tablet distribution for a given table. - This call is guaranteed to see all prior changes applied to group0 tables. - - :param server: server to query. Can be any live node. - """ - - host = manager.get_cql().cluster.metadata.get_host(server.ip_addr) - - # read_barrier is needed to ensure that local tablet metadata on the queried node - # reflects the finalized tablet movement. - await read_barrier(manager.get_cql(), host) - - table_id = await manager.get_table_id(keyspace_name, table_name) - rows = await manager.get_cql().run_async(f"SELECT last_token, replicas FROM system.tablets where " - f"table_id = {table_id}", host=host) - return [TabletReplicas( - last_token=x.last_token, - replicas=[(HostID(str(host)), shard) for (host, shard) in x.replicas] - ) for x in rows] - -async def get_tablet_replicas(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, token: int) -> list[tuple[HostID, int]]: - """ - Gets tablet replicas of the tablet which owns a given token of a given table. - This call is guaranteed to see all prior changes applied to group0 tables. - - :param server: server to query. Can be any live node. - """ - rows = await get_all_tablet_replicas(manager, server, keyspace_name, table_name) - for row in rows: - if row.last_token >= token: - return row.replicas - return [] - - -async def get_tablet_replica(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, token: int) -> tuple[HostID, int]: - """ - Get the first replica of the tablet which owns a given token of a given table. - This call is guaranteed to see all prior changes applied to group0 tables. - - :param server: server to query. Can be any live node. - """ - replicas = await get_tablet_replicas(manager, server, keyspace_name, table_name, token) - return replicas[0] - async def repair_on_node(manager: ManagerClient, server: ServerInfo, servers: list[ServerInfo]): node = server.ip_addr await manager.servers_see_each_other(servers)