Merge 'Move some compaction manager API handlers from storage_service.cc to tasks.cc' from Pavel Emelyanov
There's a bunch of /storage_service/... endpoints that start compaction manager tasks and wait for it. Most of them have async peer in /tasks/... that start the very same task, but return to the caller with the task ID. This patch moves those handlers' code from storage_service.cc to tasks.cc, next to the corresponding async peers, to keep handlers that need compaction_manager in one place. That's preparation for more future changes. Later all those endpoints will stop using database from http_context and will capture the compaction_manager they need from main, like it was done in #20962 for /compaction_manager/... endpoints. Even "more later", the former and the latter blocks of endpoints will be registered and unregistered together, e.g. like database endpoints were collected in one reg/unreg sequence by #25674. Part of http_context dependencies cleanup effort, no need to backport. Closes scylladb/scylladb#26140 * https://github.com/scylladb/scylladb: api: Move /storage_service/compact to tasks.cc api: Move /storage_service/keyspace_upgrade_sstables to tasks.cc api: Move /storage_service/keyspace_offstrategy_compaction to tasks.cc api: Move /storage_service/keyspace_cleanup to tasks.cc api: Move /storage_service/keyspace_compaction to tasks.cc
This commit is contained in:
@@ -729,68 +729,6 @@ rest_cdc_streams_check_and_repair(sharded<service::storage_service>& ss, std::un
|
||||
});
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_force_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
|
||||
auto& db = ctx.db;
|
||||
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
|
||||
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
|
||||
apilog.info("force_compaction: flush={} consider_only_existing_data={}", flush, consider_only_existing_data);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
std::optional<flush_mode> fmopt;
|
||||
if (!flush && !consider_only_existing_data) {
|
||||
fmopt = flush_mode::skip;
|
||||
}
|
||||
auto task = co_await compaction_module.make_and_start_task<global_major_compaction_task_impl>({}, db, fmopt, consider_only_existing_data);
|
||||
co_await task->done();
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_force_keyspace_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
|
||||
auto& db = ctx.db;
|
||||
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
|
||||
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
|
||||
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
|
||||
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
std::optional<flush_mode> fmopt;
|
||||
if (!flush && !consider_only_existing_data) {
|
||||
fmopt = flush_mode::skip;
|
||||
}
|
||||
auto task = co_await compaction_module.make_and_start_task<major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
|
||||
co_await task->done();
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
auto& db = ctx.db;
|
||||
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
|
||||
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
|
||||
if (rs.is_local() || !rs.is_vnode_based()) {
|
||||
auto reason = rs.is_local() ? "require" : "support";
|
||||
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
|
||||
co_return json::json_return_type(0);
|
||||
}
|
||||
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
|
||||
if (!co_await ss.local().is_cleanup_allowed(keyspace)) {
|
||||
auto msg = "Can not perform cleanup operation when topology changes";
|
||||
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
|
||||
co_await coroutine::return_exception(std::runtime_error(msg));
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<cleanup_keyspace_compaction_task_impl>(
|
||||
{}, std::move(keyspace), db, table_infos, flush_mode::all_tables, tasks::is_user_task::yes);
|
||||
co_await task->done();
|
||||
co_return json::json_return_type(0);
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
@@ -813,33 +751,6 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
|
||||
co_return json::json_return_type(0);
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_perform_keyspace_offstrategy_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
|
||||
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
|
||||
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
|
||||
bool res = false;
|
||||
auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<offstrategy_keyspace_compaction_task_impl>({}, std::move(keyspace), ctx.db, table_infos, &res);
|
||||
co_await task->done();
|
||||
co_return json::json_return_type(res);
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_upgrade_sstables(http_context& ctx, std::unique_ptr<http::request> req) {
|
||||
auto& db = ctx.db;
|
||||
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
|
||||
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
|
||||
|
||||
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
|
||||
co_await task->done();
|
||||
co_return json::json_return_type(0);
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_force_flush(http_context& ctx, std::unique_ptr<http::request> req) {
|
||||
@@ -1821,12 +1732,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
ss::get_current_generation_number.set(r, rest_bind(rest_get_current_generation_number, ss));
|
||||
ss::get_natural_endpoints.set(r, rest_bind(rest_get_natural_endpoints, ctx, ss));
|
||||
ss::cdc_streams_check_and_repair.set(r, rest_bind(rest_cdc_streams_check_and_repair, ss));
|
||||
ss::force_compaction.set(r, rest_bind(rest_force_compaction, ctx));
|
||||
ss::force_keyspace_compaction.set(r, rest_bind(rest_force_keyspace_compaction, ctx));
|
||||
ss::force_keyspace_cleanup.set(r, rest_bind(rest_force_keyspace_cleanup, ctx, ss));
|
||||
ss::cleanup_all.set(r, rest_bind(rest_cleanup_all, ctx, ss));
|
||||
ss::perform_keyspace_offstrategy_compaction.set(r, rest_bind(rest_perform_keyspace_offstrategy_compaction, ctx));
|
||||
ss::upgrade_sstables.set(r, rest_bind(rest_upgrade_sstables, ctx));
|
||||
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
|
||||
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
|
||||
ss::decommission.set(r, rest_bind(rest_decommission, ss));
|
||||
@@ -1903,12 +1809,7 @@ void unset_storage_service(http_context& ctx, routes& r) {
|
||||
ss::get_current_generation_number.unset(r);
|
||||
ss::get_natural_endpoints.unset(r);
|
||||
ss::cdc_streams_check_and_repair.unset(r);
|
||||
ss::force_compaction.unset(r);
|
||||
ss::force_keyspace_compaction.unset(r);
|
||||
ss::force_keyspace_cleanup.unset(r);
|
||||
ss::cleanup_all.unset(r);
|
||||
ss::perform_keyspace_offstrategy_compaction.unset(r);
|
||||
ss::upgrade_sstables.unset(r);
|
||||
ss::force_flush.unset(r);
|
||||
ss::force_keyspace_flush.unset(r);
|
||||
ss::decommission.unset(r);
|
||||
|
||||
84
api/tasks.cc
84
api/tasks.cc
@@ -12,6 +12,7 @@
|
||||
#include "api/api.hh"
|
||||
#include "api/storage_service.hh"
|
||||
#include "api/api-doc/tasks.json.hh"
|
||||
#include "api/api-doc/storage_service.json.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
#include "compaction/task_manager_module.hh"
|
||||
#include "service/storage_service.hh"
|
||||
@@ -25,6 +26,7 @@ extern logging::logger apilog;
|
||||
namespace api {
|
||||
|
||||
namespace t = httpd::tasks_json;
|
||||
namespace ss = httpd::storage_service_json;
|
||||
using namespace json;
|
||||
|
||||
using ks_cf_func = std::function<future<json::json_return_type>(http_context&, std::unique_ptr<http::request>, sstring, std::vector<table_info>)>;
|
||||
@@ -53,6 +55,23 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
co_return json::json_return_type(task->get_status().id.to_sstring());
|
||||
});
|
||||
|
||||
ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
|
||||
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
|
||||
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
|
||||
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
std::optional<flush_mode> fmopt;
|
||||
if (!flush && !consider_only_existing_data) {
|
||||
fmopt = flush_mode::skip;
|
||||
}
|
||||
auto task = co_await compaction_module.make_and_start_task<major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
|
||||
co_await task->done();
|
||||
co_return json_void();
|
||||
});
|
||||
|
||||
t::force_keyspace_cleanup_async.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
|
||||
@@ -69,6 +88,29 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
co_return json::json_return_type(task->get_status().id.to_sstring());
|
||||
});
|
||||
|
||||
ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
|
||||
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
|
||||
if (rs.is_local() || !rs.is_vnode_based()) {
|
||||
auto reason = rs.is_local() ? "require" : "support";
|
||||
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
|
||||
co_return json::json_return_type(0);
|
||||
}
|
||||
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
|
||||
if (!co_await ss.local().is_cleanup_allowed(keyspace)) {
|
||||
auto msg = "Can not perform cleanup operation when topology changes";
|
||||
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
|
||||
co_await coroutine::return_exception(std::runtime_error(msg));
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<cleanup_keyspace_compaction_task_impl>(
|
||||
{}, std::move(keyspace), db, table_infos, flush_mode::all_tables, tasks::is_user_task::yes);
|
||||
co_await task->done();
|
||||
co_return json::json_return_type(0);
|
||||
});
|
||||
|
||||
t::perform_keyspace_offstrategy_compaction_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
|
||||
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
|
||||
auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module();
|
||||
@@ -77,6 +119,15 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
co_return json::json_return_type(task->get_status().id.to_sstring());
|
||||
}));
|
||||
|
||||
ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
|
||||
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
|
||||
bool res = false;
|
||||
auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<offstrategy_keyspace_compaction_task_impl>({}, std::move(keyspace), ctx.db, table_infos, &res);
|
||||
co_await task->done();
|
||||
co_return json::json_return_type(res);
|
||||
}));
|
||||
|
||||
t::upgrade_sstables_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
|
||||
@@ -89,6 +140,18 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
co_return json::json_return_type(task->get_status().id.to_sstring());
|
||||
}));
|
||||
|
||||
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
|
||||
|
||||
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
|
||||
co_await task->done();
|
||||
co_return json::json_return_type(0);
|
||||
}));
|
||||
|
||||
t::scrub_async.set(r, [&ctx, &snap_ctl] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto info = parse_scrub_options(ctx, std::move(req));
|
||||
@@ -102,14 +165,35 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
|
||||
co_return json::json_return_type(task->get_status().id.to_sstring());
|
||||
});
|
||||
|
||||
ss::force_compaction.set(r, [&ctx] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& db = ctx.db;
|
||||
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
|
||||
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
|
||||
apilog.info("force_compaction: flush={} consider_only_existing_data={}", flush, consider_only_existing_data);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
std::optional<flush_mode> fmopt;
|
||||
if (!flush && !consider_only_existing_data) {
|
||||
fmopt = flush_mode::skip;
|
||||
}
|
||||
auto task = co_await compaction_module.make_and_start_task<global_major_compaction_task_impl>({}, db, fmopt, consider_only_existing_data);
|
||||
co_await task->done();
|
||||
co_return json_void();
|
||||
});
|
||||
}
|
||||
|
||||
void unset_tasks_compaction_module(http_context& ctx, httpd::routes& r) {
|
||||
t::force_keyspace_compaction_async.unset(r);
|
||||
ss::force_keyspace_compaction.unset(r);
|
||||
t::force_keyspace_cleanup_async.unset(r);
|
||||
ss::force_keyspace_cleanup.unset(r);
|
||||
t::perform_keyspace_offstrategy_compaction_async.unset(r);
|
||||
ss::perform_keyspace_offstrategy_compaction.unset(r);
|
||||
t::upgrade_sstables_async.unset(r);
|
||||
ss::upgrade_sstables.unset(r);
|
||||
t::scrub_async.unset(r);
|
||||
ss::force_compaction.unset(r);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user