From 1c11d8f4c40067c95f8ddea42b0d41e40d40fe97 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Fri, 31 Jul 2020 13:02:13 +0200 Subject: [PATCH] transport: Added listener with port-based load balancing The new port is configurable from scylla.yaml and defaults to 19042 (unencrypted, unless client configures encryption options and omits `native_shard_aware_transport_port_ssl`). Two "SUPPORTED" tags are added: "SCYLLA_SHARD_AWARE_PORT" and "SCYLLA_SHARD_AWARE_PORT_SSL". For compatibility, "SCYLLA_SHARDING_ALGORITHM" is still kept. Fixes #5239 --- conf/scylla.yaml | 8 ++++++++ db/config.cc | 4 ++++ db/config.hh | 2 ++ transport/controller.cc | 31 ++++++++++++++++++++++++------- transport/server.cc | 13 +++++++++++-- transport/server.hh | 4 +++- 6 files changed, 52 insertions(+), 10 deletions(-) diff --git a/conf/scylla.yaml b/conf/scylla.yaml index fc3dc91e26..a3235d9e4c 100644 --- a/conf/scylla.yaml +++ b/conf/scylla.yaml @@ -102,6 +102,10 @@ listen_address: localhost # For security reasons, you should not expose this port to the internet. Firewall it if needed. native_transport_port: 9042 +# Like native_transport_port, but clients are forwarded to specific shards, based on the +# client-side port numbers. +native_shard_aware_transport_port: 19042 + # Enabling native transport encryption in client_encryption_options allows you to either use # encryption for the standard port or to use a dedicated, additional port along with the unencrypted # standard native_transport_port. @@ -111,6 +115,10 @@ native_transport_port: 9042 # keeping native_transport_port unencrypted. #native_transport_port_ssl: 9142 +# Like native_transport_port_ssl, but clients are forwarded to specific shards, based on the +# client-side port numbers. +#native_shard_aware_transport_port_ssl: 19142 + # How long the coordinator should wait for read operations to complete read_request_timeout_in_ms: 5000 diff --git a/db/config.cc b/db/config.cc index b3af507875..37ae62ee0f 100644 --- a/db/config.cc +++ b/db/config.cc @@ -525,6 +525,10 @@ db::config::config(std::shared_ptr exts) "for native_transport_port. Setting native_transport_port_ssl to a different value" "from native_transport_port will use encryption for native_transport_port_ssl while" "keeping native_transport_port unencrypted") + , native_shard_aware_transport_port(this, "native_shard_aware_transport_port", value_status::Used, 19042, + "Like native_transport_port, but clients-side port number (modulo smp) is used to route the connection to the specific shard.") + , native_shard_aware_transport_port_ssl(this, "native_shard_aware_transport_port_ssl", value_status::Used, 19142, + "Like native_transport_port_ssl, but clients-side port number (modulo smp) is used to route the connection to the specific shard.") , native_transport_max_threads(this, "native_transport_max_threads", value_status::Invalid, 128, "The maximum number of thread handling requests. The meaning is the same as rpc_max_threads.\n" "Default is different (128 versus unlimited).\n" diff --git a/db/config.hh b/db/config.hh index d85d61ed68..134519a64d 100644 --- a/db/config.hh +++ b/db/config.hh @@ -218,6 +218,8 @@ public: named_value start_native_transport; named_value native_transport_port; named_value native_transport_port_ssl; + named_value native_shard_aware_transport_port; + named_value native_shard_aware_transport_port_ssl; named_value native_transport_max_threads; named_value native_transport_max_frame_size_in_mb; named_value broadcast_rpc_address; diff --git a/transport/controller.cc b/transport/controller.cc index 5a16a99ccc..dd845e3bdd 100644 --- a/transport/controller.cc +++ b/transport/controller.cc @@ -72,18 +72,30 @@ future<> controller::do_start_server() { cql_server_config.get_service_memory_limiter_semaphore = [ss = std::ref(service::get_storage_service())] () -> semaphore& { return ss.get().local()._service_memory_limiter; }; cql_server_config.allow_shard_aware_drivers = cfg.enable_shard_aware_drivers(); cql_server_config.sharding_ignore_msb = cfg.murmur3_partitioner_ignore_msb_bits(); + if (cfg.native_shard_aware_transport_port.is_set()) { + // Needed for "SUPPORTED" message + cql_server_config.shard_aware_transport_port = cfg.native_shard_aware_transport_port(); + } + if (cfg.native_shard_aware_transport_port_ssl.is_set()) { + // Needed for "SUPPORTED" message + cql_server_config.shard_aware_transport_port_ssl = cfg.native_shard_aware_transport_port_ssl(); + } cql_server_config.partitioner_name = cfg.partitioner(); smp_service_group_config cql_server_smp_service_group_config; cql_server_smp_service_group_config.max_nonlocal_requests = 5000; cql_server_config.bounce_request_smp_service_group = create_smp_service_group(cql_server_smp_service_group_config).get0(); - seastar::net::inet_address ip = gms::inet_address::lookup(addr, family, preferred).get0(); + const seastar::net::inet_address ip = gms::inet_address::lookup(addr, family, preferred).get0(); cserver->start(std::ref(cql3::get_query_processor()), std::ref(_auth_service), std::ref(_mnotifier), cql_server_config).get(); struct listen_cfg { socket_address addr; + bool is_shard_aware; std::shared_ptr cred; }; - std::vector configs({ { socket_address{ip, cfg.native_transport_port()} }}); + std::vector configs({{ socket_address{ip, cfg.native_transport_port()}, false }}); + if (cfg.native_shard_aware_transport_port.is_set()) { + configs.push_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true }); + } // main should have made sure values are clean and neatish if (ceo.at("enabled") == "true") { @@ -108,16 +120,21 @@ future<> controller::do_start_server() { logger.info("Enabling encrypted CQL connections between client and server"); if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) { - configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, std::move(cred)}); + configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, false, cred}); } else { - configs.back().cred = std::move(cred); + configs[0].cred = cred; + } + if (cfg.native_shard_aware_transport_port_ssl.is_set() && cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port()) { + configs.emplace_back(listen_cfg{{ip, cfg.native_shard_aware_transport_port_ssl()}, true, std::move(cred)}); + } else if (cfg.native_shard_aware_transport_port.is_set()) { + configs[1].cred = std::move(cred); } } parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) { - return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] { - logger.info("Starting listening for CQL clients on {} ({})" - , cfg.addr, cfg.cred ? "encrypted" : "unencrypted" + return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, cfg.is_shard_aware, keepalive).then([cfg] { + logger.info("Starting listening for CQL clients on {} ({}, {})" + , cfg.addr, cfg.cred ? "encrypted" : "unencrypted", cfg.is_shard_aware ? "shard-aware" : "non-shard-aware" ); }); }).get(); diff --git a/transport/server.cc b/transport/server.cc index 45531721a6..66ef0e2f6c 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -211,7 +211,7 @@ future<> cql_server::stop() { } future<> -cql_server::listen(socket_address addr, std::shared_ptr creds, bool keepalive) { +cql_server::listen(socket_address addr, std::shared_ptr creds, bool is_shard_aware, bool keepalive) { auto f = make_ready_future>(nullptr); if (creds) { f = creds->build_reloadable_server_credentials([](const std::unordered_set& files, std::exception_ptr ep) { @@ -222,9 +222,12 @@ cql_server::listen(socket_address addr, std::shared_ptr creds) { + return f.then([this, addr, is_shard_aware, keepalive](shared_ptr creds) { listen_options lo; lo.reuse_address = true; + if (is_shard_aware) { + lo.lba = server_socket::load_balancing_algorithm::port; + } server_socket ss; try { ss = creds @@ -1227,6 +1230,12 @@ std::unique_ptr cql_server::connection::make_supported(int opts.insert({"SCYLLA_SHARD", format("{:d}", this_shard_id())}); opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)}); opts.insert({"SCYLLA_SHARDING_ALGORITHM", dht::cpu_sharding_algorithm_name()}); + if (_server._config.shard_aware_transport_port) { + opts.insert({"SCYLLA_SHARD_AWARE_PORT", format("{:d}", *_server._config.shard_aware_transport_port)}); + } + if (_server._config.shard_aware_transport_port_ssl) { + opts.insert({"SCYLLA_SHARD_AWARE_PORT_SSL", format("{:d}", *_server._config.shard_aware_transport_port_ssl)}); + } opts.insert({"SCYLLA_SHARDING_IGNORE_MSB", format("{:d}", _server._config.sharding_ignore_msb)}); opts.insert({"SCYLLA_PARTITIONER", _server._config.partitioner_name}); } diff --git a/transport/server.hh b/transport/server.hh index 66edafe183..9ce9a3dbd0 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -107,6 +107,8 @@ struct cql_server_config { std::function get_service_memory_limiter_semaphore; sstring partitioner_name; unsigned sharding_ignore_msb; + std::optional shard_aware_transport_port; + std::optional shard_aware_transport_port_ssl; bool allow_shard_aware_drivers = true; smp_service_group bounce_request_smp_service_group = default_smp_service_group(); }; @@ -135,7 +137,7 @@ public: cql_server(distributed& qp, auth::service&, service::migration_notifier& mn, cql_server_config config); - future<> listen(socket_address addr, std::shared_ptr = {}, bool keepalive = false); + future<> listen(socket_address addr, std::shared_ptr = {}, bool is_shard_aware = false, bool keepalive = false); future<> do_accepts(int which, bool keepalive, socket_address server_addr); future<> stop(); public: