test/pylib: pool: replace steal with put(is_dirty=True)
The pool usage was kind of awkward previously: if the user of a pool
decided that a previously borrowed object should no longer be used,
it was their responsibility to destroy the object (releasing associated
resources and so on) and then call `steal()` on the pool to free space
for a new object.
Change the interface. Now the `Pool` constructor obtains a `destroy`
function additionally to the `build` function. The user calls the
function `put` to return both objects that are still usable and those
aren't. For the latter, they set `is_dirty=True`. The pool will
'destroy' the object with the provided function, which could mean e.g.
releasing associated resources.
For example, instead of:
```
if self.cluster.is_dirty:
self.clusters.stop()
self.clusters.release_ips()
self.clusters.steal()
else:
self.clusters.put(self.cluster)
```
we can now use:
```
self.clusters.put(self.cluster, is_dirty=self.cluster.is_dirty)
```
(assuming that `self.clusters` is a pool constructed with a `destroy`
function that stops the cluster and releases its IPs.)
Also extend the interface of the context manager obtained by
`instance()` - the user must now pass a flag `dirty_on_exception`. If
the context manager exists due to an exception and that flag was `True`,
the object will be considered dirty. The dirty flag can also be set
manually on the context manager. For example:
```
async with (cm := pool.instance(dirty_on_exception=True)) as server:
cm.dirty = await run_test(test, server)
# It will also be considered dirty if run_test throws an exception
```
This commit is contained in:
20
test.py
20
test.py
@@ -343,7 +343,16 @@ class PythonTestSuite(TestSuite):
|
||||
pool_size = cfg.get("pool_size", 2)
|
||||
|
||||
self.create_cluster = self.get_cluster_factory(cluster_size)
|
||||
self.clusters = Pool(pool_size, self.create_cluster)
|
||||
async def recycle_cluster(cluster: ScyllaCluster) -> None:
|
||||
"""When a dirty cluster is returned to the cluster pool,
|
||||
stop it and release the used IPs. We don't necessarily uninstall() it yet,
|
||||
which would delete the log file and directory - we might want to preserve
|
||||
these if it came from a failed test.
|
||||
"""
|
||||
await cluster.stop()
|
||||
await cluster.release_ips()
|
||||
|
||||
self.clusters = Pool(pool_size, self.create_cluster, recycle_cluster)
|
||||
|
||||
def get_cluster_factory(self, cluster_size: int) -> Callable[..., Awaitable]:
|
||||
def create_server(create_cfg: ScyllaCluster.CreateServerParams):
|
||||
@@ -686,7 +695,8 @@ class CQLApprovalTest(Test):
|
||||
if self.server_log is not None:
|
||||
logger.info("Server log:\n%s", self.server_log)
|
||||
|
||||
async with self.suite.clusters.instance(logger) as cluster:
|
||||
# TODO: consider dirty_on_exception=True
|
||||
async with self.suite.clusters.instance(False, logger) as cluster:
|
||||
try:
|
||||
cluster.before_test(self.uname)
|
||||
logger.info("Leasing Scylla cluster %s for test %s", cluster, self.uname)
|
||||
@@ -864,11 +874,9 @@ class PythonTest(Test):
|
||||
print("Test {} post-check failed: {}".format(self.name, str(e)))
|
||||
print("Server log of the first server:\n{}".format(self.server_log))
|
||||
logger.info(f"Discarding cluster after failed test %s...", self.name)
|
||||
await self.suite.clusters.steal()
|
||||
await cluster.stop()
|
||||
await cluster.release_ips()
|
||||
await self.suite.clusters.put(cluster, is_dirty=True)
|
||||
else:
|
||||
await self.suite.clusters.put(cluster)
|
||||
await self.suite.clusters.put(cluster, is_dirty=False)
|
||||
logger.info("Test %s %s", self.uname, "succeeded" if self.success else "failed ")
|
||||
return self
|
||||
|
||||
|
||||
@@ -72,7 +72,11 @@ class HostRegistry:
|
||||
self.next_host_id += 1
|
||||
return Host(self.subnet.format(self.next_host_id))
|
||||
|
||||
self.pool = Pool[Host](254, create_host)
|
||||
async def destroy_host(h: Host) -> None:
|
||||
# Doesn't matter, we never return hosts to the pool as 'dirty'.
|
||||
pass
|
||||
|
||||
self.pool = Pool[Host](254, create_host, destroy_host)
|
||||
|
||||
async def cleanup() -> None:
|
||||
if self.lock_filename:
|
||||
@@ -85,5 +89,5 @@ class HostRegistry:
|
||||
return await self.pool.get()
|
||||
|
||||
async def release_host(self, host: Host) -> None:
|
||||
return await self.pool.put(host)
|
||||
return await self.pool.put(host, is_dirty=False)
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import asyncio
|
||||
from typing import Generic, Callable, Awaitable, TypeVar, AsyncContextManager, Final
|
||||
from typing import Generic, Callable, Awaitable, TypeVar, AsyncContextManager, Final, Optional
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
@@ -10,12 +10,15 @@ class Pool(Generic[T]):
|
||||
on demand, so that if you use less, you don't create anything upfront.
|
||||
If there is no object in the pool and all N objects are in use, you want
|
||||
to wait until one of the object is returned to the pool. Expects a
|
||||
builder async function to build a new object.
|
||||
builder async function to build a new object and a destruction async
|
||||
function to clean up after a 'dirty' object (see below).
|
||||
|
||||
Usage example:
|
||||
async def start_server():
|
||||
return Server()
|
||||
pool = Pool(4, start_server)
|
||||
async def destroy_server(server):
|
||||
await server.free_resources()
|
||||
pool = Pool(4, start_server, destroy_server)
|
||||
|
||||
server = await pool.get()
|
||||
try:
|
||||
@@ -24,25 +27,33 @@ class Pool(Generic[T]):
|
||||
await pool.put(server)
|
||||
|
||||
Alternatively:
|
||||
async with pool.instance() as server:
|
||||
async with pool.instance(dirty_on_exception=False) as server:
|
||||
await run_test(test, server)
|
||||
|
||||
|
||||
If the object is considered no longer usable by other users of the pool
|
||||
you can 'steal' it, which frees up space in the pool.
|
||||
you can pass `is_dirty=True` flag to `put`, which will cause the object
|
||||
to be 'destroyed' (by calling the provided `destroy` function on it) and
|
||||
will free up space in the pool.
|
||||
server = await.pool.get()
|
||||
dirty = True
|
||||
try:
|
||||
dirty = await run_test(test, server)
|
||||
finally:
|
||||
if dirty:
|
||||
await pool.steal()
|
||||
else:
|
||||
await pool.put(server)
|
||||
await pool.put(server, is_dirty=dirty)
|
||||
|
||||
Alternatively:
|
||||
async with (cm := pool.instance(dirty_on_exception=True)) as server:
|
||||
cm.dirty = await run_test(test, server)
|
||||
# It will also be considered dirty if run_test throws an exception
|
||||
"""
|
||||
def __init__(self, max_size: int, build: Callable[..., Awaitable[T]]):
|
||||
def __init__(self, max_size: int,
|
||||
build: Callable[..., Awaitable[T]],
|
||||
destroy: Callable[[T], Awaitable[None]]):
|
||||
assert(max_size >= 0)
|
||||
self.max_size: Final[int] = max_size
|
||||
self.build: Final[Callable[..., Awaitable[T]]] = build
|
||||
self.destroy: Final[Callable[[T], Awaitable]] = destroy
|
||||
self.cond: Final[asyncio.Condition] = asyncio.Condition()
|
||||
self.pool: list[T] = []
|
||||
self.total: int = 0 # len(self.pool) + leased objects
|
||||
@@ -73,24 +84,27 @@ class Pool(Generic[T]):
|
||||
raise
|
||||
return obj
|
||||
|
||||
async def steal(self) -> None:
|
||||
"""Take ownership of a previously borrowed object.
|
||||
Frees up space in the pool.
|
||||
async def put(self, obj: T, is_dirty: bool):
|
||||
"""Return a previously borrowed object to the pool
|
||||
if it's not dirty, otherwise destroy the object
|
||||
and free up space in the pool.
|
||||
"""
|
||||
if is_dirty:
|
||||
await self.destroy(obj)
|
||||
|
||||
async with self.cond:
|
||||
self.total -= 1
|
||||
if is_dirty:
|
||||
self.total -= 1
|
||||
else:
|
||||
self.pool.append(obj)
|
||||
self.cond.notify()
|
||||
|
||||
async def put(self, obj: T):
|
||||
"""Return a previously borrowed object to the pool."""
|
||||
async with self.cond:
|
||||
self.pool.append(obj)
|
||||
self.cond.notify()
|
||||
|
||||
def instance(self, *args, **kwargs) -> AsyncContextManager[T]:
|
||||
def instance(self, dirty_on_exception: bool, *args, **kwargs) -> AsyncContextManager[T]:
|
||||
class Instance:
|
||||
def __init__(self, pool):
|
||||
def __init__(self, pool: Pool[T], dirty_on_exception: bool):
|
||||
self.pool = pool
|
||||
self.dirty = False
|
||||
self.dirty_on_exception = dirty_on_exception
|
||||
|
||||
async def __aenter__(self):
|
||||
self.obj = await self.pool.get(*args, **kwargs)
|
||||
@@ -98,7 +112,8 @@ class Pool(Generic[T]):
|
||||
|
||||
async def __aexit__(self, exc_type, exc, obj):
|
||||
if self.obj:
|
||||
await self.pool.put(self.obj)
|
||||
self.dirty |= self.dirty_on_exception and exc is not None
|
||||
await self.pool.put(self.obj, is_dirty=self.dirty)
|
||||
self.obj = None
|
||||
|
||||
return Instance(self)
|
||||
return Instance(self, dirty_on_exception)
|
||||
|
||||
@@ -868,9 +868,7 @@ class ScyllaClusterManager:
|
||||
self.logger.info("Setting up %s", self.current_test_case_full_name)
|
||||
if self.cluster.is_dirty:
|
||||
self.logger.info(f"Current cluster %s is dirty after last test, stopping...", self.cluster.name)
|
||||
await self.clusters.steal()
|
||||
await self.cluster.stop()
|
||||
await self.cluster.release_ips()
|
||||
await self.clusters.put(self.cluster, is_dirty=True)
|
||||
self.logger.info(f"Waiting for new cluster for test %s...", self.current_test_case_full_name)
|
||||
await self._get_cluster()
|
||||
self.cluster.setLogger(self.logger)
|
||||
@@ -888,13 +886,11 @@ class ScyllaClusterManager:
|
||||
del self.site
|
||||
if not self.cluster.is_dirty:
|
||||
self.logger.info("Returning Scylla cluster %s for test %s", self.cluster, self.test_uname)
|
||||
await self.clusters.put(self.cluster)
|
||||
await self.clusters.put(self.cluster, is_dirty=False)
|
||||
else:
|
||||
self.logger.info("ScyllaManager: Scylla cluster %s is dirty after %s, stopping it",
|
||||
self.cluster, self.test_uname)
|
||||
await self.cluster.stop()
|
||||
await self.cluster.release_ips()
|
||||
await self.clusters.steal()
|
||||
await self.clusters.put(self.cluster, is_dirty=True)
|
||||
del self.cluster
|
||||
if os.path.exists(self.manager_dir):
|
||||
shutil.rmtree(self.manager_dir)
|
||||
|
||||
Reference in New Issue
Block a user