Merge 'service: tasks: return successful status if a table was dropped' from Aleksandra Martyniuk
tablet_virtual_task::wait throws if a table on which a tablet operation
was working is dropped.
Treat the tablet operation as successful if a table is dropped.
Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-494
Needs backport to all live releases
Closes scylladb/scylladb#28933
* github.com:scylladb/scylladb:
test: add test_tablet_repair_wait_with_table_drop
service: tasks: return successful status if a table was dropped
(cherry picked from commit 1e41db5948)
Closes scylladb/scylladb#29361
This commit is contained in:
@@ -140,14 +140,19 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
|
||||
auto task_type = hint.get_task_type();
|
||||
auto tablet_id_opt = tablet_id_provided(task_type) ? std::make_optional(hint.get_tablet_id()) : std::nullopt;
|
||||
|
||||
size_t tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
|
||||
const auto& tablets = _ss.get_token_metadata().tablets();
|
||||
size_t tablet_count = tablets.has_tablet_map(table) ? tablets.get_tablet_map(table).tablet_count() : 0;
|
||||
auto res = co_await get_status_helper(id, std::move(hint));
|
||||
if (!res) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
tasks::tmlogger.info("tablet_virtual_task: wait until tablet operation is finished");
|
||||
co_await utils::get_local_injector().inject("tablet_virtual_task_wait", utils::wait_for_message(60s));
|
||||
co_await _ss._topology_state_machine.event.wait([&] {
|
||||
if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) {
|
||||
return true;
|
||||
}
|
||||
auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table);
|
||||
if (is_resize_task(task_type)) { // Resize task.
|
||||
return tmap.resize_task_info().tablet_task_id.uuid() != id.uuid();
|
||||
@@ -161,6 +166,10 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
|
||||
});
|
||||
|
||||
res->status.state = tasks::task_manager::task_state::done; // Failed repair task is retried.
|
||||
if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) {
|
||||
res->status.end_time = db_clock::now();
|
||||
co_return res->status;
|
||||
}
|
||||
if (is_migration_task(task_type)) {
|
||||
auto& replicas = _ss.get_token_metadata().tablets().get_tablet_map(table).get_tablet_info(tablet_id_opt.value()).replicas;
|
||||
auto migration_failed = std::all_of(replicas.begin(), replicas.end(), [&] (const auto& replica) { return res->pending_replica.has_value() && replica != res->pending_replica.value(); });
|
||||
@@ -243,7 +252,15 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
|
||||
status_helper res;
|
||||
auto table = hint.get_table_id();
|
||||
auto task_type = hint.get_task_type();
|
||||
auto schema = _ss._db.local().get_tables_metadata().get_table(table).schema();
|
||||
auto table_ptr = _ss._db.local().get_tables_metadata().get_table_if_exists(table);
|
||||
if (!table_ptr) {
|
||||
co_return tasks::task_status {
|
||||
.task_id = id,
|
||||
.kind = tasks::task_kind::cluster,
|
||||
.is_abortable = co_await is_abortable(std::move(hint)),
|
||||
};
|
||||
}
|
||||
auto schema = table_ptr->schema();
|
||||
res.status = {
|
||||
.task_id = id,
|
||||
.kind = tasks::task_kind::cluster,
|
||||
|
||||
@@ -96,6 +96,50 @@ async def test_tablet_repair_task(manager: ManagerClient):
|
||||
|
||||
await asyncio.gather(repair_task(), check_and_abort_repair_task(manager, tm, servers, module_name, ks))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_repair_wait_with_table_drop(manager: ManagerClient):
|
||||
module_name = "tablets"
|
||||
tm = TaskManagerClient(manager.api)
|
||||
injection = "tablet_virtual_task_wait"
|
||||
|
||||
cmdline = [
|
||||
'--logger-log-level', 'debug_error_injection=debug',
|
||||
]
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, cmdline=cmdline)
|
||||
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
|
||||
|
||||
token = -1
|
||||
await enable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
|
||||
|
||||
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=ks)
|
||||
|
||||
task = repair_tasks[0]
|
||||
assert task.scope == "table"
|
||||
assert task.keyspace == ks
|
||||
assert task.table == "test"
|
||||
assert task.state in ["created", "running"]
|
||||
|
||||
log = await manager.server_open_log(servers[0].server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
await enable_injection(manager, [servers[0]], injection)
|
||||
|
||||
async def wait_for_task():
|
||||
status_wait = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
|
||||
assert status_wait.state == "done"
|
||||
|
||||
async def drop_table():
|
||||
await log.wait_for(f'"{injection}"', from_mark=mark)
|
||||
await disable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
|
||||
await manager.get_cql().run_async(f"DROP TABLE {ks}.test")
|
||||
await manager.api.message_injection(servers[0].ip_addr, injection)
|
||||
|
||||
await asyncio.gather(wait_for_task(), drop_table())
|
||||
|
||||
await disable_injection(manager, servers, injection)
|
||||
|
||||
async def check_repair_task_list(tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str):
|
||||
def get_task_with_id(repair_tasks, task_id):
|
||||
tasks_with_id1 = [task for task in repair_tasks if task.task_id == task_id]
|
||||
|
||||
Reference in New Issue
Block a user