Revert "Merge 'compaction: abort compaction tasks' from Aleksandra Martyniuk"
This reverts commit2860d43309, reversing changes made toa3621dbd3e. Reverting because rest_api.test_compaction_task started failing after this was merged. Fixes: #16005
This commit is contained in:
committed by
Kamil Braun
parent
c5956957f3
commit
1cccc86813
@@ -22,7 +22,6 @@
|
||||
#include "sstables/exceptions.hh"
|
||||
#include "sstables/sstable_directory.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
@@ -489,10 +488,6 @@ public:
|
||||
virtual future<tasks::task_manager::task::progress> get_progress() const override {
|
||||
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
|
||||
}
|
||||
|
||||
virtual future<> abort() noexcept override {
|
||||
return compaction_task_executor::abort(_as);
|
||||
}
|
||||
protected:
|
||||
virtual future<> run() override {
|
||||
return perform();
|
||||
@@ -514,10 +509,6 @@ public:
|
||||
virtual future<tasks::task_manager::task::progress> get_progress() const override {
|
||||
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
|
||||
}
|
||||
|
||||
virtual future<> abort() noexcept override {
|
||||
return compaction_task_executor::abort(_as);
|
||||
}
|
||||
protected:
|
||||
virtual future<> run() override {
|
||||
return perform();
|
||||
@@ -630,10 +621,6 @@ public:
|
||||
virtual future<tasks::task_manager::task::progress> get_progress() const override {
|
||||
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
|
||||
}
|
||||
|
||||
virtual future<> abort() noexcept override {
|
||||
return compaction_task_executor::abort(_as);
|
||||
}
|
||||
protected:
|
||||
virtual future<> run() override {
|
||||
return perform();
|
||||
@@ -857,14 +844,6 @@ void compaction_task_executor::finish_compaction(state finish_state) noexcept {
|
||||
_compaction_state.compaction_done.signal();
|
||||
}
|
||||
|
||||
future<> compaction_task_executor::abort(abort_source& as) noexcept {
|
||||
if (!as.abort_requested()) {
|
||||
as.request_abort();
|
||||
stop_compaction("user requested abort");
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
void compaction_task_executor::stop_compaction(sstring reason) noexcept {
|
||||
_compaction_data.stop(std::move(reason));
|
||||
}
|
||||
@@ -1179,19 +1158,12 @@ public:
|
||||
: compaction_task_executor(mgr, do_throw_if_stopping, &t, sstables::compaction_type::Compaction, "Compaction")
|
||||
, regular_compaction_task_impl(mgr._task_manager_module, tasks::task_id::create_random_id(), mgr._task_manager_module->new_sequence_number(), t.schema()->ks_name(), t.schema()->cf_name(), "", tasks::task_id::create_null_id())
|
||||
{}
|
||||
|
||||
virtual future<> abort() noexcept override {
|
||||
return compaction_task_executor::abort(_as);
|
||||
}
|
||||
protected:
|
||||
virtual future<> run() override {
|
||||
return perform();
|
||||
}
|
||||
|
||||
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
|
||||
co_await utils::get_local_injector().inject_with_handler("compaction_regular_compaction_task_executor_do_run",
|
||||
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); });
|
||||
|
||||
co_await coroutine::switch_to(_cm.compaction_sg());
|
||||
|
||||
for (;;) {
|
||||
@@ -1347,10 +1319,6 @@ public:
|
||||
virtual future<tasks::task_manager::task::progress> get_progress() const override {
|
||||
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
|
||||
}
|
||||
|
||||
virtual future<> abort() noexcept override {
|
||||
return compaction_task_executor::abort(_as);
|
||||
}
|
||||
protected:
|
||||
virtual future<> run() override {
|
||||
return perform();
|
||||
@@ -1699,10 +1667,6 @@ public:
|
||||
virtual future<tasks::task_manager::task::progress> get_progress() const override {
|
||||
return compaction_task_impl::get_progress(_compaction_data, _progress_monitor);
|
||||
}
|
||||
|
||||
virtual future<> abort() noexcept override {
|
||||
return compaction_task_executor::abort(_as);
|
||||
}
|
||||
protected:
|
||||
virtual future<> run() override {
|
||||
return perform();
|
||||
|
||||
@@ -586,8 +586,6 @@ public:
|
||||
return _compaction_data.abort.abort_requested();
|
||||
}
|
||||
|
||||
future<> abort(abort_source& as) noexcept;
|
||||
|
||||
void stop_compaction(sstring reason) noexcept;
|
||||
|
||||
sstables::compaction_stopped_exception make_compaction_stopped_exception() const;
|
||||
|
||||
@@ -13,11 +13,8 @@
|
||||
#include "replica/database.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstable_directory.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/pretty_printers.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace replica {
|
||||
|
||||
// Helper structure for resharding.
|
||||
@@ -268,14 +265,7 @@ future<tasks::task_manager::task::progress> compaction_task_impl::get_progress(c
|
||||
};
|
||||
}
|
||||
|
||||
tasks::is_abortable compaction_task_impl::is_abortable() const noexcept {
|
||||
return tasks::is_abortable{!_parent_id};
|
||||
}
|
||||
|
||||
future<> major_keyspace_compaction_task_impl::run() {
|
||||
co_await utils::get_local_injector().inject_with_handler("compaction_major_keyspace_compaction_task_impl_run",
|
||||
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); });
|
||||
|
||||
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
tasks::task_info parent_info{_status.id, _status.shard};
|
||||
auto& module = db.get_compaction_manager().get_task_manager_module();
|
||||
|
||||
@@ -39,7 +39,6 @@ public:
|
||||
}
|
||||
|
||||
virtual std::string type() const override = 0;
|
||||
virtual tasks::is_abortable is_abortable() const noexcept override;
|
||||
protected:
|
||||
virtual future<> run() override = 0;
|
||||
|
||||
|
||||
@@ -124,12 +124,7 @@ void task_manager::task::impl::run_to_completion() {
|
||||
if (f.failed()) {
|
||||
finish_failed(f.get_exception());
|
||||
} else {
|
||||
try {
|
||||
_as.check();
|
||||
finish();
|
||||
} catch (...) {
|
||||
finish_failed(std::current_exception());
|
||||
}
|
||||
finish();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3,4 +3,3 @@ pool_size: 1
|
||||
|
||||
skip_in_release:
|
||||
- test_task_manager
|
||||
- test_compaction_task
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import sys
|
||||
import threading
|
||||
|
||||
# Use the util.py library from ../cql-pytest:
|
||||
sys.path.insert(1, sys.path[0] + '/../cql-pytest')
|
||||
from util import new_test_table, new_test_keyspace
|
||||
from rest_util import set_tmp_task_ttl, scylla_inject_error
|
||||
from task_manager_utils import wait_for_task, list_tasks, check_child_parent_relationship, drain_module_tasks, abort_task
|
||||
from rest_util import set_tmp_task_ttl
|
||||
from task_manager_utils import wait_for_task, list_tasks, check_child_parent_relationship, drain_module_tasks
|
||||
|
||||
module_name = "compaction"
|
||||
long_time = 1000000000
|
||||
@@ -89,78 +88,3 @@ def test_regular_compaction_task(cql, this_dc, rest_api):
|
||||
failed = [status["task_id"] for status in statuses if status["state"] != "done"]
|
||||
assert not failed, f"Regular compaction tasks with ids = {failed} failed"
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
|
||||
def test_running_compaction_task_abort(cql, this_dc, rest_api):
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
with set_tmp_task_ttl(rest_api, long_time):
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as t0:
|
||||
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [0, 'hello'])
|
||||
cql.execute(stmt, [1, 'world'])
|
||||
|
||||
injection = "compaction_regular_compaction_task_executor_do_run"
|
||||
with scylla_inject_error(rest_api, injection, True):
|
||||
[_, table] = t0.split(".")
|
||||
resp = rest_api.send("POST", f"column_family/autocompaction/{keyspace}:{table}")
|
||||
resp.raise_for_status()
|
||||
|
||||
tasks = list_tasks(rest_api, module_name, False, keyspace, table)
|
||||
assert tasks, "Compaction task was not created"
|
||||
|
||||
for task in tasks:
|
||||
abort_task(rest_api, task["task_id"])
|
||||
|
||||
resp = rest_api.send("POST", f"v2/error_injection/injection/{injection}/message")
|
||||
resp.raise_for_status()
|
||||
|
||||
for task in tasks:
|
||||
status = wait_for_task(rest_api, task["task_id"])
|
||||
assert status["state"] == "failed", "Task finished successfully"
|
||||
assert "abort requested" in status["error"], "Task wasn't aborted by user"
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
|
||||
def run_major_compaction(rest_api, keyspace):
|
||||
resp = rest_api.send("POST", f"storage_service/keyspace_compaction/{keyspace}")
|
||||
resp.raise_for_status()
|
||||
|
||||
def test_not_created_compaction_task_abort(cql, this_dc, rest_api):
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
with set_tmp_task_ttl(rest_api, long_time):
|
||||
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
|
||||
schema = 'p int, v text, primary key (p)'
|
||||
with new_test_table(cql, keyspace, schema) as t0:
|
||||
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
|
||||
cql.execute(stmt, [0, 'hello'])
|
||||
cql.execute(stmt, [1, 'world'])
|
||||
|
||||
injection = "compaction_major_keyspace_compaction_task_impl_run"
|
||||
with scylla_inject_error(rest_api, injection, True):
|
||||
[_, table] = t0.split(".")
|
||||
# FIXME: Replace with asynchronous compaction api call as soon as it is available.
|
||||
x = threading.Thread(target=run_major_compaction, args=(rest_api, keyspace,))
|
||||
x.start()
|
||||
|
||||
tasks = []
|
||||
while not tasks:
|
||||
tasks = list_tasks(rest_api, module_name)
|
||||
assert len(tasks) == 1, "More than one top level task was created"
|
||||
task = tasks[0]
|
||||
|
||||
abort_task(rest_api, task["task_id"])
|
||||
|
||||
resp = rest_api.send("POST", f"v2/error_injection/injection/{injection}/message")
|
||||
resp.raise_for_status()
|
||||
|
||||
status = wait_for_task(rest_api, task["task_id"])
|
||||
assert status["state"] == "failed", "Task finished successfully despite abort"
|
||||
assert "abort" in status["error"], "Task wasn't aborted by user"
|
||||
|
||||
if "children_ids" in status:
|
||||
children = [wait_for_task(rest_api, child_id) for child_id in status["children_ids"]]
|
||||
assert all(child["state"] == "failed" for child in children), "Some child tasks finished successfully despite abort"
|
||||
assert all("abort requested" in child["error"] for child in children), "Some child tasks weren't aborted by user"
|
||||
assert all("children" not in child for child in children), "Some child tasks spawned new tasks even though they were aborted"
|
||||
x.join()
|
||||
drain_module_tasks(rest_api, module_name)
|
||||
|
||||
Reference in New Issue
Block a user