api: storage_service: break out set_storage_service lambdas into free functions

This was originally an attempt to reduce the compile time of this
translation unit, but apparently it doesn't work. Still, it has
the effect of converting stack traces that say "set_storage_service"
and refer to some lambda to stack traces that refer to the operation
being performed, so it's a net positive.

To faciliate the change, we introduce new functions rest_bind(),
similar to (and in fact wrapping) std::bind_front(), that capture
references like the lambdas did originally. We can't use
std::bind_front directly since the call to
seastar::httpd::path_description::set() cannot be disambiguated
after the function is obscured by the template returned by
std::bind_front. The new function rest_bind() has constraints
to understand which overload is in use.

Closes scylladb/scylladb#22526
This commit is contained in:
Avi Kivity
2025-01-28 00:31:21 +02:00
committed by Botond Dénes
parent edd56a2c1c
commit f25636884a

View File

@@ -557,8 +557,9 @@ static future<json::json_return_type> describe_ring_as_json_for_table(const shar
co_return json::json_return_type(stream_range_as_array(co_await ss.local().describe_ring_for_table(keyspace, table), token_range_endpoints_to_json));
}
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
ss::get_token_endpoint.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_get_token_endpoint(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
const auto keyspace_name = req->get_query_param("keyspace");
const auto table_name = req->get_query_param("cf");
@@ -581,9 +582,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
val.value = fmt::to_string(i.second);
return val;
}));
});
}
ss::toppartitions_generic.set(r, [&ctx] (std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_toppartitions_generic(http_context& ctx, std::unique_ptr<http::request> req) {
bool filters_provided = false;
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters {};
@@ -631,20 +634,29 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return seastar::do_with(db::toppartitions_query(ctx.db, std::move(table_filters), std::move(keyspace_filters), duration.value, list_size, capacity), [&ctx] (db::toppartitions_query& q) {
return run_toppartitions_query(q, ctx);
});
});
}
ss::get_release_version.set(r, [&ss](const_req req) {
static
json::json_return_type
rest_get_release_version(sharded<service::storage_service>& ss, const_req& req) {
return ss.local().get_release_version();
});
}
ss::get_scylla_release_version.set(r, [](const_req req) {
static
json::json_return_type
rest_get_scylla_release_version(sharded<service::storage_service>& ss, const_req& req) {
return scylla_version();
});
ss::get_schema_version.set(r, [&ss](const_req req) {
return ss.local().get_schema_version();
});
}
ss::get_range_to_endpoint_map.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
json::json_return_type
rest_get_schema_version(sharded<service::storage_service>& ss, const_req& req) {
return ss.local().get_schema_version();
}
static
future<json::json_return_type>
rest_get_range_to_endpoint_map(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto keyspace = validate_keyspace(ctx, req);
auto table = req->get_query_param("cf");
@@ -680,17 +692,21 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
return m;
});
});
}
ss::get_pending_range_to_endpoint_map.set(r, [&ctx](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_pending_range_to_endpoint_map(http_context& ctx, std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto keyspace = validate_keyspace(ctx, req);
std::vector<ss::maplist_mapper> res;
return make_ready_future<json::json_return_type>(res);
});
}
ss::describe_ring.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_describe_ring(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
if (!req->param.exists("keyspace")) {
throw bad_param_exception("The keyspace param is not provided");
}
@@ -701,34 +717,44 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return describe_ring_as_json_for_table(ss, keyspace, table);
}
return describe_ring_as_json(ss, validate_keyspace(ctx, req));
});
}
ss::get_load.set(r, [&ctx](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_load(http_context& ctx, std::unique_ptr<http::request> req) {
return get_cf_stats(ctx, &replica::column_family_stats::live_disk_space_used);
});
}
ss::get_current_generation_number.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_current_generation_number(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto ep = ss.local().get_token_metadata().get_topology().my_address();
return ss.local().gossiper().get_current_generation_number(ep).then([](gms::generation_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});
}
ss::get_natural_endpoints.set(r, [&ctx, &ss](const_req req) {
static
json::json_return_type
rest_get_natural_endpoints(http_context& ctx, sharded<service::storage_service>& ss, const_req req) {
auto keyspace = validate_keyspace(ctx, req);
return container_to_vec(ss.local().get_natural_endpoints(keyspace, req.get_query_param("cf"),
req.get_query_param("key")));
});
}
ss::cdc_streams_check_and_repair.set(r, [&ss] (std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_cdc_streams_check_and_repair(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.invoke_on(0, [] (service::storage_service& ss) {
return ss.check_and_repair_cdc_streams();
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::force_compaction.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_force_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto params = req_params({
std::pair("flush_memtables", mandatory::no),
@@ -753,9 +779,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
co_return json_void();
});
}
ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_force_keyspace_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto params = req_params({
std::pair("keyspace", mandatory::yes),
@@ -784,9 +812,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
co_return json_void();
});
}
ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto keyspace = validate_keyspace(ctx, req);
auto table_infos = parse_table_infos(keyspace, ctx, req->query_parameters, "cf");
@@ -814,9 +844,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
co_return json::json_return_type(0);
});
}
ss::cleanup_all.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("cleanup_all");
auto done = co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
if (!ss.is_topology_coordinator_enabled()) {
@@ -839,9 +871,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
throw;
}
co_return json::json_return_type(0);
});
}
ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_perform_keyspace_offstrategy_compaction(http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) {
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
bool res = false;
auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module();
@@ -854,9 +888,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
co_return json::json_return_type(res);
}));
}
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_upgrade_sstables(http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) {
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
@@ -872,17 +908,21 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
co_return json::json_return_type(0);
}));
}
ss::force_flush.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_force_flush(http_context& ctx, std::unique_ptr<http::request> req) {
apilog.info("flush all tables");
co_await ctx.db.invoke_on_all([] (replica::database& db) {
return db.flush_all_tables();
});
co_return json_void();
});
}
ss::force_keyspace_flush.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_force_keyspace_flush(http_context& ctx, std::unique_ptr<http::request> req) {
auto keyspace = validate_keyspace(ctx, req);
auto column_families = parse_tables(keyspace, ctx, req->query_parameters, "cf");
apilog.info("perform_keyspace_flush: keyspace={} tables={}", keyspace, column_families);
@@ -893,24 +933,29 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
co_await replica::database::flush_tables_on_all_shards(db, keyspace, std::move(column_families));
}
co_return json_void();
});
}
ss::decommission.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_decommission(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("decommission");
return ss.local().decommission().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::move.set(r, [&ss] (std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_move(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto new_token = req->get_query_param("new_token");
return ss.local().move(new_token).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::remove_node.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_remove_node(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto host_id = validate_host_id(req->get_query_param("host_id"));
std::vector<sstring> ignore_nodes_strs = utils::split_comma_separated_list(req->get_query_param("ignore_nodes"));
apilog.info("remove_node: host_id={} ignore_nodes={}", host_id, ignore_nodes_strs);
@@ -930,29 +975,37 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return ss.local().removenode(host_id, std::move(ignore_nodes)).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::get_removal_status.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_removal_status(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.local().get_removal_status().then([] (auto status) {
return make_ready_future<json::json_return_type>(status);
});
});
}
ss::force_remove_completion.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_force_remove_completion(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.local().force_remove_completion().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::set_logging_level.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_set_logging_level(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto class_qualifier = req->get_query_param("class_qualifier");
auto level = req->get_query_param("level");
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::get_logging_levels.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_logging_levels(std::unique_ptr<http::request> req) {
std::vector<ss::mapper> res;
for (auto i : logging::logger_registry().get_all_logger_names()) {
ss::mapper log;
@@ -961,37 +1014,47 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
res.push_back(log);
}
return make_ready_future<json::json_return_type>(res);
});
}
ss::get_operation_mode.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_operation_mode(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(format("{}", mode));
});
});
}
ss::is_starting.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_is_starting(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode <= service::storage_service::mode::STARTING);
});
});
}
ss::get_drain_progress.set(r, [&ctx](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_drain_progress(http_context& ctx, std::unique_ptr<http::request> req) {
return ctx.db.map_reduce(adder<replica::database::drain_progress>(), [] (auto& db) {
return db.get_drain_progress();
}).then([] (auto&& progress) {
auto progress_str = format("Drained {}/{} ColumnFamilies", progress.remaining_cfs, progress.total_cfs);
return make_ready_future<json::json_return_type>(std::move(progress_str));
});
});
}
ss::drain.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_drain(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("drain");
return ss.local().drain().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::get_keyspaces.set(r, [&ctx](const_req req) {
static
json::json_return_type
rest_get_keyspaces(http_context& ctx, const_req req) {
auto type = req.get_query_param("type");
auto replication = req.get_query_param("replication");
std::vector<sstring> keyspaces;
@@ -1009,36 +1072,46 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return keyspaces | std::views::filter([&ctx, want_tablets] (const sstring& ks) {
return ctx.db.local().find_keyspace(ks).get_replication_strategy().uses_tablets() == want_tablets;
}) | std::ranges::to<std::vector>();
});
}
ss::stop_gossiping.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_stop_gossiping(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("stop_gossiping");
return ss.local().stop_gossiping().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::start_gossiping.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_start_gossiping(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("start_gossiping");
return ss.local().start_gossiping().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::is_gossip_running.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_is_gossip_running(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.local().is_gossip_running().then([] (bool running){
return make_ready_future<json::json_return_type>(running);
});
});
}
ss::stop_daemon.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_stop_daemon(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::is_initialized.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_is_initialized(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.local().get_operation_mode().then([&ss] (auto mode) {
bool is_initialized = mode >= service::storage_service::mode::STARTING && mode != service::storage_service::mode::MAINTENANCE;
if (mode == service::storage_service::mode::NORMAL) {
@@ -1046,19 +1119,25 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
return make_ready_future<json::json_return_type>(is_initialized);
});
});
}
ss::join_ring.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_join_ring(std::unique_ptr<http::request> req) {
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::is_joined.set(r, [&ss] (std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_is_joined(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode >= service::storage_service::mode::JOINING && mode != service::storage_service::mode::MAINTENANCE);
});
});
}
ss::is_incremental_backups_enabled.set(r, [&ctx](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_is_incremental_backups_enabled(http_context& ctx, std::unique_ptr<http::request> req) {
// If this is issued in parallel with an ongoing change, we may see values not agreeing.
// Reissuing is asking for trouble, so we will just return true upon seeing any true value.
return ctx.db.map_reduce(adder<bool>(), [] (replica::database& db) {
@@ -1072,9 +1151,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}).then([] (bool val) {
return make_ready_future<json::json_return_type>(val);
});
});
}
ss::set_incremental_backups_enabled.set(r, [&ctx](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_set_incremental_backups_enabled(http_context& ctx, std::unique_ptr<http::request> req) {
auto val_str = req->get_query_param("value");
bool value = (val_str == "True") || (val_str == "true") || (val_str == "1");
return ctx.db.invoke_on_all([value] (replica::database& db) {
@@ -1092,9 +1173,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::rebuild.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_rebuild(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
utils::optional_param source_dc;
if (auto source_dc_str = req->get_query_param("source_dc"); !source_dc_str.empty()) {
source_dc.emplace(std::move(source_dc_str)).set_user_provided();
@@ -1109,43 +1192,55 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return ss.local().rebuild(std::move(source_dc)).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
}
ss::bulk_load.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_bulk_load(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto path = req->get_path_param("path");
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::bulk_load_async.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_bulk_load_async(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto path = req->get_path_param("path");
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::reschedule_failed_deletions.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_reschedule_failed_deletions(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::sample_key_range.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_sample_key_range(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
std::vector<sstring> res;
return make_ready_future<json::json_return_type>(res);
});
}
ss::reset_local_schema.set(r, [&ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_reset_local_schema(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
// FIXME: We should truncate schema tables if more than one node in the cluster.
apilog.info("reset_local_schema");
co_await ss.local().reload_schema();
co_return json_void();
});
}
ss::set_trace_probability.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_set_trace_probability(std::unique_ptr<http::request> req) {
auto probability = req->get_query_param("probability");
apilog.info("set_trace_probability: probability={}", probability);
return futurize_invoke([probability] {
@@ -1165,22 +1260,28 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
throw httpd::bad_param_exception(format("Bad format in a probability value: \"{}\"", probability.c_str()));
}
});
});
}
ss::get_trace_probability.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_trace_probability(std::unique_ptr<http::request> req) {
return make_ready_future<json::json_return_type>(tracing::tracing::get_local_tracing_instance().get_trace_probability());
});
}
ss::get_slow_query_info.set(r, [](const_req req) {
static
json::json_return_type
rest_get_slow_query_info(const_req req) {
ss::slow_query_info res;
res.enable = tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled();
res.ttl = tracing::tracing::get_local_tracing_instance().slow_query_record_ttl().count() ;
res.threshold = tracing::tracing::get_local_tracing_instance().slow_query_threshold().count();
res.fast = tracing::tracing::get_local_tracing_instance().ignore_trace_events_enabled();
return res;
});
}
ss::set_slow_query.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_set_slow_query(std::unique_ptr<http::request> req) {
auto enable = req->get_query_param("enable");
auto ttl = req->get_query_param("ttl");
auto threshold = req->get_query_param("threshold");
@@ -1206,90 +1307,120 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
} catch (...) {
throw httpd::bad_param_exception(format("Bad format value: "));
}
});
}
ss::deliver_hints.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_deliver_hints(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto host = req->get_query_param("host");
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::get_cluster_name.set(r, [&ss](const_req req) {
static
json::json_return_type
rest_get_cluster_name(sharded<service::storage_service>& ss, const_req req) {
return ss.local().gossiper().get_cluster_name();
});
}
ss::get_partitioner_name.set(r, [&ss](const_req req) {
static
json::json_return_type
rest_get_partitioner_name(sharded<service::storage_service>& ss, const_req req) {
return ss.local().gossiper().get_partitioner_name();
});
}
ss::get_tombstone_warn_threshold.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_tombstone_warn_threshold(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
return make_ready_future<json::json_return_type>(0);
});
}
ss::set_tombstone_warn_threshold.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_set_tombstone_warn_threshold(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto debug_threshold = req->get_query_param("debug_threshold");
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::get_tombstone_failure_threshold.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_tombstone_failure_threshold(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
return make_ready_future<json::json_return_type>(0);
});
}
ss::set_tombstone_failure_threshold.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_set_tombstone_failure_threshold(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto debug_threshold = req->get_query_param("debug_threshold");
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::get_batch_size_failure_threshold.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_batch_size_failure_threshold(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
return make_ready_future<json::json_return_type>(0);
});
}
ss::set_batch_size_failure_threshold.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_set_batch_size_failure_threshold(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto threshold = req->get_query_param("threshold");
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::set_hinted_handoff_throttle_in_kb.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_set_hinted_handoff_throttle_in_kb(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
auto debug_threshold = req->get_query_param("throttle");
return make_ready_future<json::json_return_type>(json_void());
});
}
ss::get_metrics_load.set(r, [&ctx](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_metrics_load(http_context& ctx, std::unique_ptr<http::request> req) {
return get_cf_stats(ctx, &replica::column_family_stats::live_disk_space_used);
});
}
ss::get_exceptions.set(r, [&ss](const_req req) {
static
json::json_return_type
rest_get_exceptions(sharded<service::storage_service>& ss, const_req req) {
return ss.local().get_exception_count();
});
}
ss::get_total_hints_in_progress.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_total_hints_in_progress(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
return make_ready_future<json::json_return_type>(0);
});
}
ss::get_total_hints.set(r, [](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_total_hints(std::unique_ptr<http::request> req) {
//TBD
unimplemented();
return make_ready_future<json::json_return_type>(0);
});
}
ss::get_ownership.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_ownership(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
if (any_of_keyspaces_use_tablets(ctx)) {
throw httpd::bad_param_exception("storage_service/ownership cannot be used when a keyspace uses tablets");
}
@@ -1298,9 +1429,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
});
}
ss::get_effective_ownership.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_effective_ownership(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto keyspace_name = req->get_path_param("keyspace") == "null" ? "" : validate_keyspace(ctx, req);
auto table_name = req->get_query_param("cf");
@@ -1316,9 +1449,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
});
}
ss::sstable_info.set(r, [&ctx] (std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_sstable_info(http_context& ctx, std::unique_ptr<http::request> req) {
auto ks = api::req_param<sstring>(*req, "keyspace", {}).value;
auto cf = api::req_param<sstring>(*req, "cf", {}).value;
@@ -1442,18 +1577,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(stream_object(dst));
});
});
});
}
ss::reload_raft_topology_state.set(r,
[&ss, &group0_client] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_reload_raft_topology_state(sharded<service::storage_service>& ss, service::raft_group0_client& group0_client, std::unique_ptr<http::request> req) {
co_await ss.invoke_on(0, [&group0_client] (service::storage_service& ss) -> future<> {
return ss.reload_raft_topology_state(group0_client);
});
co_return json_void();
});
}
ss::upgrade_to_raft_topology.set(r,
[&ss] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_upgrade_to_raft_topology(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("Requested to schedule upgrade to raft topology");
try {
co_await ss.invoke_on(0, [] (auto& ss) {
@@ -1465,17 +1602,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
std::rethrow_exception(std::move(ex));
}
co_return json_void();
});
}
ss::raft_topology_upgrade_status.set(r,
[&ss] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
static
future<json::json_return_type>
rest_raft_topology_upgrade_status(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
const auto ustate = co_await ss.invoke_on(0, [] (auto& ss) {
return ss.get_topology_upgrade_state();
});
co_return sstring(format("{}", ustate));
});
}
ss::move_tablet.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
static
future<json::json_return_type>
rest_move_tablet(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto src_host_id = validate_host_id(req->get_query_param("src_host"));
shard_id src_shard_id = validate_int(req->get_query_param("src_shard"));
auto dst_host_id = validate_host_id(req->get_query_param("dst_host"));
@@ -1494,9 +1634,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
force);
co_return json_void();
});
}
ss::add_tablet_replica.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
static
future<json::json_return_type>
rest_add_tablet_replica(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto dst_host_id = validate_host_id(req->get_query_param("dst_host"));
shard_id dst_shard_id = validate_int(req->get_query_param("dst_shard"));
auto token = dht::token::from_int64(validate_int(req->get_query_param("token")));
@@ -1511,9 +1653,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
force);
co_return json_void();
});
}
ss::del_tablet_replica.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
static
future<json::json_return_type>
rest_del_tablet_replica(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto dst_host_id = validate_host_id(req->get_query_param("host"));
shard_id dst_shard_id = validate_int(req->get_query_param("shard"));
auto token = dht::token::from_int64(validate_int(req->get_query_param("token")));
@@ -1528,9 +1672,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
force);
co_return json_void();
});
}
ss::repair_tablet.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
static
future<json::json_return_type>
rest_repair_tablet(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto tokens_param = split(req->get_query_param("tokens"), ",");
utils::chunked_vector<dht::token> tokens;
bool all_tokens = tokens_param.size() == 1 && tokens_param.front() == "all";
@@ -1554,20 +1700,26 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
auto res = co_await ss.local().add_repair_tablet_request(table_id, tokens_variant);
co_return json::json_return_type(res);
});
}
ss::tablet_balancing_enable.set(r, [&ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
static
future<json::json_return_type>
rest_tablet_balancing_enable(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto enabled = validate_bool(req->get_query_param("enabled"));
co_await ss.local().set_tablet_balancing_enabled(enabled);
co_return json_void();
});
}
ss::quiesce_topology.set(r, [&ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
static
future<json::json_return_type>
rest_quiesce_topology(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
co_await ss.local().await_topology_quiesced();
co_return json_void();
});
}
sp::get_schema_versions.set(r, [&ss](std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_schema_versions(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
for (auto e : result) {
@@ -1578,7 +1730,114 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
return make_ready_future<json::json_return_type>(std::move(res));
});
});
}
// Disambiguate between a function that returns a future and a function that returns a plain value, also
// add std::ref() as a courtesy. Also handles ks_cf_func signatures.
template <typename FuncType, typename... BindArgs>
requires std::invocable<FuncType, BindArgs&..., const_req>
&& std::same_as<seastar::json::json_return_type, std::invoke_result_t<FuncType, BindArgs&..., const_req&>>
static
seastar::httpd::json_request_function
rest_bind(FuncType func, BindArgs&... args) {
return std::bind_front(func, std::ref(args)...);
}
template <typename FuncType, typename... BindArgs>
requires std::invocable<FuncType, BindArgs&..., std::unique_ptr<seastar::http::request>>
&& std::same_as<future<seastar::json::json_return_type>, std::invoke_result_t<FuncType, BindArgs&..., std::unique_ptr<seastar::http::request>>>
static
seastar::httpd::future_json_function
rest_bind(FuncType func, BindArgs&... args) {
return std::bind_front(func, std::ref(args)...);
}
static
seastar::httpd::future_json_function
rest_bind(ks_cf_func func, http_context& ctx) {
return wrap_ks_cf(ctx, func);
}
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
ss::get_token_endpoint.set(r, rest_bind(rest_get_token_endpoint, ctx, ss));
ss::toppartitions_generic.set(r, rest_bind(rest_toppartitions_generic, ctx));
ss::get_release_version.set(r, rest_bind(rest_get_release_version, ss));
ss::get_scylla_release_version.set(r, rest_bind(rest_get_scylla_release_version, ss));
ss::get_schema_version.set(r, rest_bind(rest_get_schema_version, ss));
ss::get_range_to_endpoint_map.set(r, rest_bind(rest_get_range_to_endpoint_map, ctx, ss));
ss::get_pending_range_to_endpoint_map.set(r, rest_bind(rest_get_pending_range_to_endpoint_map, ctx));
ss::describe_ring.set(r, rest_bind(rest_describe_ring, ctx, ss));
ss::get_load.set(r, rest_bind(rest_get_load, ctx));
ss::get_current_generation_number.set(r, rest_bind(rest_get_current_generation_number, ss));
ss::get_natural_endpoints.set(r, rest_bind(rest_get_natural_endpoints, ctx, ss));
ss::cdc_streams_check_and_repair.set(r, rest_bind(rest_cdc_streams_check_and_repair, ss));
ss::force_compaction.set(r, rest_bind(rest_force_compaction, ctx));
ss::force_keyspace_compaction.set(r, rest_bind(rest_force_keyspace_compaction, ctx));
ss::force_keyspace_cleanup.set(r, rest_bind(rest_force_keyspace_cleanup, ctx, ss));
ss::cleanup_all.set(r, rest_bind(rest_cleanup_all, ctx, ss));
ss::perform_keyspace_offstrategy_compaction.set(r, rest_bind(rest_perform_keyspace_offstrategy_compaction, ctx));
ss::upgrade_sstables.set(r, rest_bind(rest_upgrade_sstables, ctx));
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
ss::decommission.set(r, rest_bind(rest_decommission, ss));
ss::move.set(r, rest_bind(rest_move, ss));
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
ss::get_removal_status.set(r, rest_bind(rest_get_removal_status, ss));
ss::force_remove_completion.set(r, rest_bind(rest_force_remove_completion, ss));
ss::set_logging_level.set(r, rest_bind(rest_set_logging_level));
ss::get_logging_levels.set(r, rest_bind(rest_get_logging_levels));
ss::get_operation_mode.set(r, rest_bind(rest_get_operation_mode, ss));
ss::is_starting.set(r, rest_bind(rest_is_starting, ss));
ss::get_drain_progress.set(r, rest_bind(rest_get_drain_progress, ctx));
ss::drain.set(r, rest_bind(rest_drain, ss));
ss::get_keyspaces.set(r, rest_bind(rest_get_keyspaces, ctx));
ss::stop_gossiping.set(r, rest_bind(rest_stop_gossiping, ss));
ss::start_gossiping.set(r, rest_bind(rest_start_gossiping, ss));
ss::is_gossip_running.set(r, rest_bind(rest_is_gossip_running, ss));
ss::stop_daemon.set(r, rest_bind(rest_stop_daemon));
ss::is_initialized.set(r, rest_bind(rest_is_initialized, ss));
ss::join_ring.set(r, rest_bind(rest_join_ring));
ss::is_joined.set(r, rest_bind(rest_is_joined, ss));
ss::is_incremental_backups_enabled.set(r, rest_bind(rest_is_incremental_backups_enabled, ctx));
ss::set_incremental_backups_enabled.set(r, rest_bind(rest_set_incremental_backups_enabled, ctx));
ss::rebuild.set(r, rest_bind(rest_rebuild, ss));
ss::bulk_load.set(r, rest_bind(rest_bulk_load));
ss::bulk_load_async.set(r, rest_bind(rest_bulk_load_async));
ss::reschedule_failed_deletions.set(r, rest_bind(rest_reschedule_failed_deletions));
ss::sample_key_range.set(r, rest_bind(rest_sample_key_range));
ss::reset_local_schema.set(r, rest_bind(rest_reset_local_schema, ss));
ss::set_trace_probability.set(r, rest_bind(rest_set_trace_probability));
ss::get_trace_probability.set(r, rest_bind(rest_get_trace_probability));
ss::get_slow_query_info.set(r, rest_bind(rest_get_slow_query_info));
ss::set_slow_query.set(r, rest_bind(rest_set_slow_query));
ss::deliver_hints.set(r, rest_bind(rest_deliver_hints));
ss::get_cluster_name.set(r, rest_bind(rest_get_cluster_name, ss));
ss::get_partitioner_name.set(r, rest_bind(rest_get_partitioner_name, ss));
ss::get_tombstone_warn_threshold.set(r, rest_bind(rest_get_tombstone_warn_threshold));
ss::set_tombstone_warn_threshold.set(r, rest_bind(rest_set_tombstone_warn_threshold));
ss::get_tombstone_failure_threshold.set(r, rest_bind(rest_get_tombstone_failure_threshold));
ss::set_tombstone_failure_threshold.set(r, rest_bind(rest_set_tombstone_failure_threshold));
ss::get_batch_size_failure_threshold.set(r, rest_bind(rest_get_batch_size_failure_threshold));
ss::set_batch_size_failure_threshold.set(r, rest_bind(rest_set_batch_size_failure_threshold));
ss::set_hinted_handoff_throttle_in_kb.set(r, rest_bind(rest_set_hinted_handoff_throttle_in_kb));
ss::get_metrics_load.set(r, rest_bind(rest_get_metrics_load, ctx));
ss::get_exceptions.set(r, rest_bind(rest_get_exceptions, ss));
ss::get_total_hints_in_progress.set(r, rest_bind(rest_get_total_hints_in_progress));
ss::get_total_hints.set(r, rest_bind(rest_get_total_hints));
ss::get_ownership.set(r, rest_bind(rest_get_ownership, ctx, ss));
ss::get_effective_ownership.set(r, rest_bind(rest_get_effective_ownership, ctx, ss));
ss::sstable_info.set(r, rest_bind(rest_sstable_info, ctx));
ss::reload_raft_topology_state.set(r, rest_bind(rest_reload_raft_topology_state, ss, group0_client));
ss::upgrade_to_raft_topology.set(r, rest_bind(rest_upgrade_to_raft_topology, ss));
ss::raft_topology_upgrade_status.set(r, rest_bind(rest_raft_topology_upgrade_status, ss));
ss::move_tablet.set(r, rest_bind(rest_move_tablet, ctx, ss));
ss::add_tablet_replica.set(r, rest_bind(rest_add_tablet_replica, ctx, ss));
ss::del_tablet_replica.set(r, rest_bind(rest_del_tablet_replica, ctx, ss));
ss::repair_tablet.set(r, rest_bind(rest_repair_tablet, ctx, ss));
ss::tablet_balancing_enable.set(r, rest_bind(rest_tablet_balancing_enable, ss));
ss::quiesce_topology.set(r, rest_bind(rest_quiesce_topology, ss));
sp::get_schema_versions.set(r, rest_bind(rest_get_schema_versions, ss));
}
void unset_storage_service(http_context& ctx, routes& r) {