transport: Added listener with port-based load balancing
The new port is configurable from scylla.yaml and defaults to 19042 (unencrypted, unless client configures encryption options and omits `native_shard_aware_transport_port_ssl`). Two "SUPPORTED" tags are added: "SCYLLA_SHARD_AWARE_PORT" and "SCYLLA_SHARD_AWARE_PORT_SSL". For compatibility, "SCYLLA_SHARDING_ALGORITHM" is still kept. Fixes #5239
This commit is contained in:
@@ -102,6 +102,10 @@ listen_address: localhost
|
||||
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
|
||||
native_transport_port: 9042
|
||||
|
||||
# Like native_transport_port, but clients are forwarded to specific shards, based on the
|
||||
# client-side port numbers.
|
||||
native_shard_aware_transport_port: 19042
|
||||
|
||||
# Enabling native transport encryption in client_encryption_options allows you to either use
|
||||
# encryption for the standard port or to use a dedicated, additional port along with the unencrypted
|
||||
# standard native_transport_port.
|
||||
@@ -111,6 +115,10 @@ native_transport_port: 9042
|
||||
# keeping native_transport_port unencrypted.
|
||||
#native_transport_port_ssl: 9142
|
||||
|
||||
# Like native_transport_port_ssl, but clients are forwarded to specific shards, based on the
|
||||
# client-side port numbers.
|
||||
#native_shard_aware_transport_port_ssl: 19142
|
||||
|
||||
# How long the coordinator should wait for read operations to complete
|
||||
read_request_timeout_in_ms: 5000
|
||||
|
||||
|
||||
@@ -525,6 +525,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"for native_transport_port. Setting native_transport_port_ssl to a different value"
|
||||
"from native_transport_port will use encryption for native_transport_port_ssl while"
|
||||
"keeping native_transport_port unencrypted")
|
||||
, native_shard_aware_transport_port(this, "native_shard_aware_transport_port", value_status::Used, 19042,
|
||||
"Like native_transport_port, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_shard_aware_transport_port_ssl(this, "native_shard_aware_transport_port_ssl", value_status::Used, 19142,
|
||||
"Like native_transport_port_ssl, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
|
||||
, native_transport_max_threads(this, "native_transport_max_threads", value_status::Invalid, 128,
|
||||
"The maximum number of thread handling requests. The meaning is the same as rpc_max_threads.\n"
|
||||
"Default is different (128 versus unlimited).\n"
|
||||
|
||||
@@ -218,6 +218,8 @@ public:
|
||||
named_value<bool> start_native_transport;
|
||||
named_value<uint16_t> native_transport_port;
|
||||
named_value<uint16_t> native_transport_port_ssl;
|
||||
named_value<uint16_t> native_shard_aware_transport_port;
|
||||
named_value<uint16_t> native_shard_aware_transport_port_ssl;
|
||||
named_value<uint32_t> native_transport_max_threads;
|
||||
named_value<uint32_t> native_transport_max_frame_size_in_mb;
|
||||
named_value<sstring> broadcast_rpc_address;
|
||||
|
||||
@@ -72,18 +72,30 @@ future<> controller::do_start_server() {
|
||||
cql_server_config.get_service_memory_limiter_semaphore = [ss = std::ref(service::get_storage_service())] () -> semaphore& { return ss.get().local()._service_memory_limiter; };
|
||||
cql_server_config.allow_shard_aware_drivers = cfg.enable_shard_aware_drivers();
|
||||
cql_server_config.sharding_ignore_msb = cfg.murmur3_partitioner_ignore_msb_bits();
|
||||
if (cfg.native_shard_aware_transport_port.is_set()) {
|
||||
// Needed for "SUPPORTED" message
|
||||
cql_server_config.shard_aware_transport_port = cfg.native_shard_aware_transport_port();
|
||||
}
|
||||
if (cfg.native_shard_aware_transport_port_ssl.is_set()) {
|
||||
// Needed for "SUPPORTED" message
|
||||
cql_server_config.shard_aware_transport_port_ssl = cfg.native_shard_aware_transport_port_ssl();
|
||||
}
|
||||
cql_server_config.partitioner_name = cfg.partitioner();
|
||||
smp_service_group_config cql_server_smp_service_group_config;
|
||||
cql_server_smp_service_group_config.max_nonlocal_requests = 5000;
|
||||
cql_server_config.bounce_request_smp_service_group = create_smp_service_group(cql_server_smp_service_group_config).get0();
|
||||
seastar::net::inet_address ip = gms::inet_address::lookup(addr, family, preferred).get0();
|
||||
const seastar::net::inet_address ip = gms::inet_address::lookup(addr, family, preferred).get0();
|
||||
cserver->start(std::ref(cql3::get_query_processor()), std::ref(_auth_service), std::ref(_mnotifier), cql_server_config).get();
|
||||
struct listen_cfg {
|
||||
socket_address addr;
|
||||
bool is_shard_aware;
|
||||
std::shared_ptr<seastar::tls::credentials_builder> cred;
|
||||
};
|
||||
|
||||
std::vector<listen_cfg> configs({ { socket_address{ip, cfg.native_transport_port()} }});
|
||||
std::vector<listen_cfg> configs({{ socket_address{ip, cfg.native_transport_port()}, false }});
|
||||
if (cfg.native_shard_aware_transport_port.is_set()) {
|
||||
configs.push_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
|
||||
}
|
||||
|
||||
// main should have made sure values are clean and neatish
|
||||
if (ceo.at("enabled") == "true") {
|
||||
@@ -108,16 +120,21 @@ future<> controller::do_start_server() {
|
||||
logger.info("Enabling encrypted CQL connections between client and server");
|
||||
|
||||
if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) {
|
||||
configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, std::move(cred)});
|
||||
configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, false, cred});
|
||||
} else {
|
||||
configs.back().cred = std::move(cred);
|
||||
configs[0].cred = cred;
|
||||
}
|
||||
if (cfg.native_shard_aware_transport_port_ssl.is_set() && cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port()) {
|
||||
configs.emplace_back(listen_cfg{{ip, cfg.native_shard_aware_transport_port_ssl()}, true, std::move(cred)});
|
||||
} else if (cfg.native_shard_aware_transport_port.is_set()) {
|
||||
configs[1].cred = std::move(cred);
|
||||
}
|
||||
}
|
||||
|
||||
parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
|
||||
return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
|
||||
logger.info("Starting listening for CQL clients on {} ({})"
|
||||
, cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
|
||||
return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, cfg.is_shard_aware, keepalive).then([cfg] {
|
||||
logger.info("Starting listening for CQL clients on {} ({}, {})"
|
||||
, cfg.addr, cfg.cred ? "encrypted" : "unencrypted", cfg.is_shard_aware ? "shard-aware" : "non-shard-aware"
|
||||
);
|
||||
});
|
||||
}).get();
|
||||
|
||||
@@ -211,7 +211,7 @@ future<> cql_server::stop() {
|
||||
}
|
||||
|
||||
future<>
|
||||
cql_server::listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
|
||||
cql_server::listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool is_shard_aware, bool keepalive) {
|
||||
auto f = make_ready_future<shared_ptr<seastar::tls::server_credentials>>(nullptr);
|
||||
if (creds) {
|
||||
f = creds->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
|
||||
@@ -222,9 +222,12 @@ cql_server::listen(socket_address addr, std::shared_ptr<seastar::tls::credential
|
||||
}
|
||||
});
|
||||
}
|
||||
return f.then([this, addr, keepalive](shared_ptr<seastar::tls::server_credentials> creds) {
|
||||
return f.then([this, addr, is_shard_aware, keepalive](shared_ptr<seastar::tls::server_credentials> creds) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
if (is_shard_aware) {
|
||||
lo.lba = server_socket::load_balancing_algorithm::port;
|
||||
}
|
||||
server_socket ss;
|
||||
try {
|
||||
ss = creds
|
||||
@@ -1227,6 +1230,12 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int
|
||||
opts.insert({"SCYLLA_SHARD", format("{:d}", this_shard_id())});
|
||||
opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)});
|
||||
opts.insert({"SCYLLA_SHARDING_ALGORITHM", dht::cpu_sharding_algorithm_name()});
|
||||
if (_server._config.shard_aware_transport_port) {
|
||||
opts.insert({"SCYLLA_SHARD_AWARE_PORT", format("{:d}", *_server._config.shard_aware_transport_port)});
|
||||
}
|
||||
if (_server._config.shard_aware_transport_port_ssl) {
|
||||
opts.insert({"SCYLLA_SHARD_AWARE_PORT_SSL", format("{:d}", *_server._config.shard_aware_transport_port_ssl)});
|
||||
}
|
||||
opts.insert({"SCYLLA_SHARDING_IGNORE_MSB", format("{:d}", _server._config.sharding_ignore_msb)});
|
||||
opts.insert({"SCYLLA_PARTITIONER", _server._config.partitioner_name});
|
||||
}
|
||||
|
||||
@@ -107,6 +107,8 @@ struct cql_server_config {
|
||||
std::function<semaphore& ()> get_service_memory_limiter_semaphore;
|
||||
sstring partitioner_name;
|
||||
unsigned sharding_ignore_msb;
|
||||
std::optional<uint16_t> shard_aware_transport_port;
|
||||
std::optional<uint16_t> shard_aware_transport_port_ssl;
|
||||
bool allow_shard_aware_drivers = true;
|
||||
smp_service_group bounce_request_smp_service_group = default_smp_service_group();
|
||||
};
|
||||
@@ -135,7 +137,7 @@ public:
|
||||
cql_server(distributed<cql3::query_processor>& qp, auth::service&,
|
||||
service::migration_notifier& mn,
|
||||
cql_server_config config);
|
||||
future<> listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
|
||||
future<> listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool is_shard_aware = false, bool keepalive = false);
|
||||
future<> do_accepts(int which, bool keepalive, socket_address server_addr);
|
||||
future<> stop();
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user