Revert "Merge 'compaction: abort compaction tasks' from Aleksandra Martyniuk"

This reverts commit 11cafd2fc8, reversing
changes made to 2bae14f743.

Reverting because this series causes frequent CI failures, and the
proposed quickfix causes other failures of its own.

Fixes: #16113
This commit is contained in:
Botond Dénes
2023-11-22 12:51:04 +02:00
parent 48340380dd
commit 0ae1335daa
7 changed files with 3 additions and 134 deletions

View File

@@ -22,7 +22,6 @@
#include "sstables/exceptions.hh"
#include "sstables/sstable_directory.hh"
#include "locator/abstract_replication_strategy.hh"
#include "utils/error_injection.hh"
#include "utils/fb_utilities.hh"
#include "utils/UUID_gen.hh"
#include "db/system_keyspace.hh"
@@ -489,10 +488,6 @@ public:
virtual future<tasks::task_manager::task::progress> get_progress() const override {
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
}
virtual future<> abort() noexcept override {
return compaction_task_executor::abort(_as);
}
protected:
virtual future<> run() override {
return perform();
@@ -514,10 +509,6 @@ public:
virtual future<tasks::task_manager::task::progress> get_progress() const override {
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
}
virtual future<> abort() noexcept override {
return compaction_task_executor::abort(_as);
}
protected:
virtual future<> run() override {
return perform();
@@ -630,10 +621,6 @@ public:
virtual future<tasks::task_manager::task::progress> get_progress() const override {
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
}
virtual future<> abort() noexcept override {
return compaction_task_executor::abort(_as);
}
protected:
virtual future<> run() override {
return perform();
@@ -857,14 +844,6 @@ void compaction_task_executor::finish_compaction(state finish_state) noexcept {
_compaction_state.compaction_done.signal();
}
future<> compaction_task_executor::abort(abort_source& as) noexcept {
if (!as.abort_requested()) {
as.request_abort();
stop_compaction("user requested abort");
}
return make_ready_future();
}
void compaction_task_executor::stop_compaction(sstring reason) noexcept {
_compaction_data.stop(std::move(reason));
}
@@ -1179,19 +1158,12 @@ public:
: compaction_task_executor(mgr, do_throw_if_stopping, &t, sstables::compaction_type::Compaction, "Compaction")
, regular_compaction_task_impl(mgr._task_manager_module, tasks::task_id::create_random_id(), mgr._task_manager_module->new_sequence_number(), t.schema()->ks_name(), t.schema()->cf_name(), "", tasks::task_id::create_null_id())
{}
virtual future<> abort() noexcept override {
return compaction_task_executor::abort(_as);
}
protected:
virtual future<> run() override {
return perform();
}
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
co_await utils::get_local_injector().inject_with_handler("compaction_regular_compaction_task_executor_do_run",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); });
co_await coroutine::switch_to(_cm.compaction_sg());
for (;;) {
@@ -1347,10 +1319,6 @@ public:
virtual future<tasks::task_manager::task::progress> get_progress() const override {
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
}
virtual future<> abort() noexcept override {
return compaction_task_executor::abort(_as);
}
protected:
virtual future<> run() override {
return perform();
@@ -1699,10 +1667,6 @@ public:
virtual future<tasks::task_manager::task::progress> get_progress() const override {
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
}
virtual future<> abort() noexcept override {
return compaction_task_executor::abort(_as);
}
protected:
virtual future<> run() override {
return perform();

View File

@@ -588,8 +588,6 @@ public:
return _compaction_data.abort.abort_requested();
}
future<> abort(abort_source& as) noexcept;
void stop_compaction(sstring reason) noexcept;
sstables::compaction_stopped_exception make_compaction_stopped_exception() const;

View File

@@ -13,11 +13,8 @@
#include "replica/database.hh"
#include "sstables/sstables.hh"
#include "sstables/sstable_directory.hh"
#include "utils/error_injection.hh"
#include "utils/pretty_printers.hh"
using namespace std::chrono_literals;
namespace replica {
// Helper structure for resharding.
@@ -268,14 +265,7 @@ future<tasks::task_manager::task::progress> compaction_task_impl::get_progress(c
};
}
tasks::is_abortable compaction_task_impl::is_abortable() const noexcept {
return tasks::is_abortable{!_parent_id};
}
future<> major_keyspace_compaction_task_impl::run() {
co_await utils::get_local_injector().inject_with_handler("compaction_major_keyspace_compaction_task_impl_run",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); });
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
tasks::task_info parent_info{_status.id, _status.shard};
auto& module = db.get_compaction_manager().get_task_manager_module();

View File

@@ -39,7 +39,6 @@ public:
}
virtual std::string type() const override = 0;
virtual tasks::is_abortable is_abortable() const noexcept override;
protected:
virtual future<> run() override = 0;

View File

@@ -124,12 +124,7 @@ void task_manager::task::impl::run_to_completion() {
if (f.failed()) {
finish_failed(f.get_exception());
} else {
try {
_as.check();
finish();
} catch (...) {
finish_failed(std::current_exception());
}
finish();
}
});
}

View File

@@ -3,4 +3,3 @@ pool_size: 1
skip_in_release:
- test_task_manager
- test_compaction_task

View File

@@ -1,11 +1,10 @@
import sys
import threading
# Use the util.py library from ../cql-pytest:
sys.path.insert(1, sys.path[0] + '/../cql-pytest')
from util import new_test_table, new_test_keyspace
from rest_util import set_tmp_task_ttl, scylla_inject_error
from task_manager_utils import wait_for_task, list_tasks, check_child_parent_relationship, drain_module_tasks, abort_task
from rest_util import set_tmp_task_ttl
from task_manager_utils import wait_for_task, list_tasks, check_child_parent_relationship, drain_module_tasks
module_name = "compaction"
long_time = 1000000000
@@ -89,78 +88,3 @@ def test_regular_compaction_task(cql, this_dc, rest_api):
failed = [status["task_id"] for status in statuses if status["state"] != "done"]
assert not failed, f"Regular compaction tasks with ids = {failed} failed"
drain_module_tasks(rest_api, module_name)
def test_running_compaction_task_abort(cql, this_dc, rest_api):
drain_module_tasks(rest_api, module_name)
with set_tmp_task_ttl(rest_api, long_time):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
schema = 'p int, v text, primary key (p)'
with new_test_table(cql, keyspace, schema) as t0:
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
cql.execute(stmt, [0, 'hello'])
cql.execute(stmt, [1, 'world'])
injection = "compaction_regular_compaction_task_executor_do_run"
with scylla_inject_error(rest_api, injection, True):
[_, table] = t0.split(".")
resp = rest_api.send("POST", f"column_family/autocompaction/{keyspace}:{table}")
resp.raise_for_status()
tasks = list_tasks(rest_api, module_name, False, keyspace, table)
assert tasks, "Compaction task was not created"
for task in tasks:
abort_task(rest_api, task["task_id"])
resp = rest_api.send("POST", f"v2/error_injection/injection/{injection}/message")
resp.raise_for_status()
for task in tasks:
status = wait_for_task(rest_api, task["task_id"])
assert status["state"] == "failed", "Task finished successfully"
assert "abort requested" in status["error"], "Task wasn't aborted by user"
drain_module_tasks(rest_api, module_name)
def run_major_compaction(rest_api, keyspace):
resp = rest_api.send("POST", f"storage_service/keyspace_compaction/{keyspace}")
resp.raise_for_status()
def test_not_created_compaction_task_abort(cql, this_dc, rest_api):
drain_module_tasks(rest_api, module_name)
with set_tmp_task_ttl(rest_api, long_time):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
schema = 'p int, v text, primary key (p)'
with new_test_table(cql, keyspace, schema) as t0:
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
cql.execute(stmt, [0, 'hello'])
cql.execute(stmt, [1, 'world'])
injection = "compaction_major_keyspace_compaction_task_impl_run"
with scylla_inject_error(rest_api, injection, True):
[_, table] = t0.split(".")
# FIXME: Replace with asynchronous compaction api call as soon as it is available.
x = threading.Thread(target=run_major_compaction, args=(rest_api, keyspace,))
x.start()
tasks = []
while not tasks:
tasks = [task for task in list_tasks(rest_api, module_name) if task["type"] == "major compaction"]
assert len(tasks) == 1, "More than one top level task was created"
task = tasks[0]
abort_task(rest_api, task["task_id"])
resp = rest_api.send("POST", f"v2/error_injection/injection/{injection}/message")
resp.raise_for_status()
status = wait_for_task(rest_api, task["task_id"])
assert status["state"] == "failed", "Task finished successfully despite abort"
assert "abort" in status["error"], "Task wasn't aborted by user"
if "children_ids" in status:
children = [wait_for_task(rest_api, child_id) for child_id in status["children_ids"]]
assert all(child["state"] == "failed" for child in children), "Some child tasks finished successfully despite abort"
assert all("abort requested" in child["error"] for child in children), "Some child tasks weren't aborted by user"
assert all("children" not in child for child in children), "Some child tasks spawned new tasks even though they were aborted"
x.join()
drain_module_tasks(rest_api, module_name)