api: implement client_routes endpoints
Ref: scylladb/scylla-enterprise#5699
This commit is contained in:
@@ -69,6 +69,7 @@ target_sources(api
|
||||
PRIVATE
|
||||
api.cc
|
||||
cache_service.cc
|
||||
client_routes.cc
|
||||
collectd.cc
|
||||
column_family.cc
|
||||
commitlog.cc
|
||||
|
||||
13
api/api.cc
13
api/api.cc
@@ -37,6 +37,7 @@
|
||||
#include "raft.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "service_levels.hh"
|
||||
#include "client_routes.hh"
|
||||
|
||||
logging::logger apilog("api");
|
||||
|
||||
@@ -67,9 +68,11 @@ future<> set_server_init(http_context& ctx) {
|
||||
rb02->set_api_doc(r);
|
||||
rb02->register_api_file(r, "swagger20_header");
|
||||
rb02->register_api_file(r, "metrics");
|
||||
rb02->register_api_file(r, "client_routes");
|
||||
rb->register_function(r, "system",
|
||||
"The system related API");
|
||||
rb02->add_definitions_file(r, "metrics");
|
||||
rb02->add_definitions_file(r, "client_routes");
|
||||
set_system(ctx, r);
|
||||
rb->register_function(r, "error_injection",
|
||||
"The error injection API");
|
||||
@@ -129,6 +132,16 @@ future<> unset_server_storage_service(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_storage_service(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr) {
|
||||
return ctx.http_server.set_routes([&ctx, &cr] (routes& r) {
|
||||
set_client_routes(ctx, r, cr);
|
||||
});
|
||||
}
|
||||
|
||||
future<> unset_server_client_routes(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_client_routes(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_load_meter(http_context& ctx, service::load_meter& lm) {
|
||||
return ctx.http_server.set_routes([&ctx, &lm] (routes& r) { set_load_meter(ctx, r, lm); });
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ class storage_proxy;
|
||||
class storage_service;
|
||||
class raft_group0_client;
|
||||
class raft_group_registry;
|
||||
class client_routes_service;
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -99,6 +100,8 @@ future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snit
|
||||
future<> unset_server_snitch(http_context& ctx);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
|
||||
future<> unset_server_storage_service(http_context& ctx);
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
|
||||
future<> unset_server_client_routes(http_context& ctx);
|
||||
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader);
|
||||
future<> unset_server_sstables_loader(http_context& ctx);
|
||||
future<> set_server_view_builder(http_context& ctx, sharded<db::view::view_builder>& vb, sharded<gms::gossiper>& g);
|
||||
|
||||
178
api/client_routes.cc
Normal file
178
api/client_routes.cc
Normal file
@@ -0,0 +1,178 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/http/short_streams.hh>
|
||||
|
||||
#include "client_routes.hh"
|
||||
#include "api/api.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/client_routes.hh"
|
||||
#include "utils/rjson.hh"
|
||||
|
||||
|
||||
#include "api/api-doc/client_routes.json.hh"
|
||||
|
||||
using namespace seastar::httpd;
|
||||
using namespace std::chrono_literals;
|
||||
using namespace json;
|
||||
|
||||
extern logging::logger apilog;
|
||||
|
||||
namespace api {
|
||||
|
||||
static void validate_client_routes_endpoint(sharded<service::client_routes_service>& cr, sstring endpoint_name) {
|
||||
if (!cr.local().get_feature_service().client_routes) {
|
||||
apilog.warn("{}: called before the cluster feature was enabled", endpoint_name);
|
||||
throw std::runtime_error(fmt::format("{} requires all nodes to support the CLIENT_ROUTES cluster feature", endpoint_name));
|
||||
}
|
||||
}
|
||||
|
||||
static sstring parse_string(const char* name, rapidjson::Value const& v) {
|
||||
const auto it = v.FindMember(name);
|
||||
if (it == v.MemberEnd()) {
|
||||
throw bad_param_exception(fmt::format("Missing '{}'", name));
|
||||
}
|
||||
if (!it->value.IsString()) {
|
||||
throw bad_param_exception(fmt::format("'{}' must be a string", name));
|
||||
}
|
||||
return {it->value.GetString(), it->value.GetStringLength()};
|
||||
}
|
||||
|
||||
static std::optional<uint32_t> parse_port(const char* name, rapidjson::Value const& v) {
|
||||
const auto it = v.FindMember(name);
|
||||
if (it == v.MemberEnd()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!it->value.IsInt()) {
|
||||
throw bad_param_exception(fmt::format("'{}' must be an integer", name));
|
||||
}
|
||||
auto port = it->value.GetInt();
|
||||
if (port < 1 || port > 65535) {
|
||||
throw bad_param_exception(fmt::format("'{}' value={} is outside the allowed port range", name, port));
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
static std::vector<service::client_routes_service::client_route_entry> parse_set_client_array(const rapidjson::Document& root) {
|
||||
if (!root.IsArray()) {
|
||||
throw bad_param_exception("Body must be a JSON array");
|
||||
}
|
||||
|
||||
std::vector<service::client_routes_service::client_route_entry> v;
|
||||
v.reserve(root.GetArray().Size());
|
||||
for (const auto& element : root.GetArray()) {
|
||||
if (!element.IsObject()) { throw bad_param_exception("Each element must be object"); }
|
||||
|
||||
const auto port = parse_port("port", element);
|
||||
const auto tls_port = parse_port("tls_port", element);
|
||||
const auto alternator_port = parse_port("alternator_port", element);
|
||||
const auto alternator_https_port = parse_port("alternator_https_port", element);
|
||||
|
||||
if (!port.has_value() && !tls_port.has_value() && !alternator_port.has_value() && !alternator_https_port.has_value()) {
|
||||
throw bad_param_exception("At least one port field ('port', 'tls_port', 'alternator_port', 'alternator_https_port') must be specified");
|
||||
}
|
||||
|
||||
v.emplace_back(
|
||||
parse_string("connection_id", element),
|
||||
utils::UUID{parse_string("host_id", element)},
|
||||
parse_string("address", element),
|
||||
port,
|
||||
tls_port,
|
||||
alternator_port,
|
||||
alternator_https_port
|
||||
);
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "rest_set_client_routes");
|
||||
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
const auto route_entries = parse_set_client_array(root);
|
||||
|
||||
co_await cr.local().set_client_routes(route_entries);
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
static std::vector<service::client_routes_service::client_route_key> parse_delete_client_array(const rapidjson::Document& root) {
|
||||
if (!root.IsArray()) {
|
||||
throw bad_param_exception("Body must be a JSON array");
|
||||
}
|
||||
|
||||
std::vector<service::client_routes_service::client_route_key> v;
|
||||
v.reserve(root.GetArray().Size());
|
||||
for (const auto& element : root.GetArray()) {
|
||||
v.emplace_back(
|
||||
parse_string("connection_id", element),
|
||||
utils::UUID{parse_string("host_id", element)}
|
||||
);
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "delete_client_routes");
|
||||
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
const auto route_keys = parse_delete_client_array(root);
|
||||
co_await cr.local().delete_client_routes(route_keys);
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_get_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "get_client_routes");
|
||||
|
||||
co_return co_await cr.invoke_on(0, [] (service::client_routes_service& cr) -> future<json::json_return_type> {
|
||||
co_return json::json_return_type(stream_range_as_array(co_await cr.get_client_routes(), [](const service::client_routes_service::client_route_entry & entry) {
|
||||
seastar::httpd::client_routes_json::client_routes_entry obj;
|
||||
obj.connection_id = entry.connection_id;
|
||||
obj.host_id = fmt::to_string(entry.host_id);
|
||||
obj.address = entry.address;
|
||||
if (entry.port.has_value()) { obj.port = entry.port.value(); }
|
||||
if (entry.tls_port.has_value()) { obj.tls_port = entry.tls_port.value(); }
|
||||
if (entry.alternator_port.has_value()) { obj.alternator_port = entry.alternator_port.value(); }
|
||||
if (entry.alternator_https_port.has_value()) { obj.alternator_https_port = entry.alternator_https_port.value(); }
|
||||
return obj;
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
void set_client_routes(http_context& ctx, routes& r, sharded<service::client_routes_service>& cr) {
|
||||
seastar::httpd::client_routes_json::set_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_set_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
seastar::httpd::client_routes_json::delete_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_delete_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
seastar::httpd::client_routes_json::get_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_get_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
}
|
||||
|
||||
void unset_client_routes(http_context& ctx, routes& r) {
|
||||
seastar::httpd::client_routes_json::set_client_routes.unset(r);
|
||||
seastar::httpd::client_routes_json::delete_client_routes.unset(r);
|
||||
seastar::httpd::client_routes_json::get_client_routes.unset(r);
|
||||
}
|
||||
|
||||
}
|
||||
20
api/client_routes.hh
Normal file
20
api/client_routes.hh
Normal file
@@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/json/json_elements.hh>
|
||||
#include "api/api_init.hh"
|
||||
|
||||
namespace api {
|
||||
|
||||
void set_client_routes(http_context& ctx, httpd::routes& r, sharded<service::client_routes_service>& cr);
|
||||
void unset_client_routes(http_context& ctx, httpd::routes& r);
|
||||
|
||||
}
|
||||
@@ -1320,6 +1320,7 @@ api = ['api/api.cc',
|
||||
Json2Code('api/api-doc/cache_service.json'),
|
||||
'api/cache_service.cc',
|
||||
Json2Code('api/api-doc/client_routes.json'),
|
||||
'api/client_routes.cc',
|
||||
Json2Code('api/api-doc/collectd.json'),
|
||||
'api/collectd.cc',
|
||||
Json2Code('api/api-doc/endpoint_snitch_info.json'),
|
||||
|
||||
7
main.cc
7
main.cc
@@ -1798,7 +1798,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
checkpoint(stop_signal, "initializing client routes service");
|
||||
static sharded<service::client_routes_service> client_routes;
|
||||
client_routes.start(std::ref(feature_service), std::ref(qp)).get();
|
||||
client_routes.start(std::ref(stop_signal.as_sharded_abort_source()), std::ref(feature_service), std::ref(group0_client), std::ref(qp)).get();
|
||||
auto stop_client_routes = defer_verbose_shutdown("client_routes", [&] {
|
||||
client_routes.stop().get();
|
||||
});
|
||||
@@ -2199,6 +2199,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
});
|
||||
}).get();
|
||||
|
||||
api::set_server_client_routes(ctx, client_routes).get();
|
||||
auto stop_cr_api = defer_verbose_shutdown("client routes API", [&ctx] {
|
||||
api::unset_server_client_routes(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "join cluster");
|
||||
// Allow abort during join_cluster since bootstrap or replace
|
||||
// can take a long time.
|
||||
|
||||
@@ -70,3 +70,61 @@ future<std::vector<service::client_routes_service::client_route_entry>> service:
|
||||
}
|
||||
co_return result;
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::set_client_routes_inner(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
|
||||
auto guard = co_await _group0_client.start_operation(_abort_source, service::raft_timeout{});
|
||||
utils::chunked_vector<canonical_mutation> cmuts;
|
||||
|
||||
for (auto& entry : route_entries) {
|
||||
auto mut = co_await make_update_client_route_mutation(guard.write_timestamp(), entry);
|
||||
cmuts.emplace_back(std::move(mut));
|
||||
}
|
||||
auto cmd = _group0_client.prepare_command(service::write_mutations{std::move(cmuts)}, guard, "insert client routes");
|
||||
co_await _group0_client.add_entry(std::move(cmd), std::move(guard), _abort_source);
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::delete_client_routes_inner(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
|
||||
auto guard = co_await _group0_client.start_operation(_abort_source, service::raft_timeout{});
|
||||
utils::chunked_vector<canonical_mutation> cmuts;
|
||||
|
||||
for (const auto& route_key : route_keys) {
|
||||
auto mut = co_await make_remove_client_route_mutation(guard.write_timestamp(), route_key);
|
||||
cmuts.emplace_back(std::move(mut));
|
||||
}
|
||||
|
||||
auto cmd = _group0_client.prepare_command(service::write_mutations{std::move(cmuts)}, guard, "delete client routes");
|
||||
co_await _group0_client.add_entry(std::move(cmd), std::move(guard), _abort_source);
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
|
||||
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
return cr.set_client_routes_inner(route_entries);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
seastar::future<> service::client_routes_service::delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
|
||||
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
return cr.delete_client_routes_inner(route_keys);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
seastar::future<> service::client_routes_service::with_retry(Func&& func) const {
|
||||
int retries = 10;
|
||||
while (true) {
|
||||
try {
|
||||
co_await func();
|
||||
} catch (const ::service::group0_concurrent_modification&) {
|
||||
crlogger.warn("Failed to set client routes due to guard conflict, retries={}", retries);
|
||||
if (retries--) {
|
||||
continue;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,19 +8,26 @@
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
|
||||
#include "gms/feature_service.hh"
|
||||
#include "mutation/mutation.hh"
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
class client_routes_service : public seastar::peering_sharded_service<client_routes_service> {
|
||||
public:
|
||||
client_routes_service(
|
||||
abort_source& abort_source,
|
||||
gms::feature_service& feature_service,
|
||||
service::raft_group0_client& group0_client,
|
||||
cql3::query_processor& qp
|
||||
)
|
||||
: _feature_service(feature_service)
|
||||
: _abort_source(abort_source)
|
||||
, _feature_service(feature_service)
|
||||
, _group0_client(group0_client)
|
||||
, _qp(qp) { }
|
||||
|
||||
struct client_route_key {
|
||||
@@ -55,9 +62,18 @@ public:
|
||||
future<mutation> make_remove_client_route_mutation(api::timestamp_type ts, const service::client_routes_service::client_route_key& key);
|
||||
future<mutation> make_update_client_route_mutation(api::timestamp_type ts, const client_route_entry& entry);
|
||||
future<std::vector<client_route_entry>> get_client_routes() const;
|
||||
seastar::future<> set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
|
||||
seastar::future<> delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys);
|
||||
|
||||
private:
|
||||
seastar::future<> set_client_routes_inner(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
|
||||
seastar::future<> delete_client_routes_inner(const std::vector<service::client_routes_service::client_route_key>& route_keys);
|
||||
template <typename Func>
|
||||
seastar::future<> with_retry(Func&& func) const;
|
||||
|
||||
abort_source& _abort_source;
|
||||
gms::feature_service& _feature_service;
|
||||
service::raft_group0_client& _group0_client;
|
||||
cql3::query_processor& _qp;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user