transport: Untie transport and database
Both controller and server only need database to get config from. Since controller creation only happens in main() code which has the config itself, we may remove database mentioning from transport. Previous attempt was not to carry the config down to the server level, but it stepped on an updateable_value landmine -- the u._v. isn't copyable cross-shard (despite the docs) and to properly initialize server's max_concurrent_requests we need the config's named_value member itself. The db::config that flies through the stack is const reference, but its named_values do not get copied along the way -- the updateable value accepts both references and const references to subscribe on. tests: start-stop in debug mode Signed-off-by: Pavel Emelyanov <xemul@scylladb.com> Message-Id: <20210607135656.18522-1-xemul@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
9bfb2754eb
commit
990db016e9
2
main.cc
2
main.cc
@@ -1327,7 +1327,7 @@ int main(int ac, char** av) {
|
||||
db.revert_initial_system_read_concurrency_boost();
|
||||
}).get();
|
||||
|
||||
cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter, sl_controller);
|
||||
cql_transport::controller cql_server_ctl(auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter, sl_controller, *cfg);
|
||||
|
||||
ss.register_client_shutdown_hook("native transport", [&cql_server_ctl] {
|
||||
cql_server_ctl.stop().get();
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
#include "transport/controller.hh"
|
||||
#include "transport/server.hh"
|
||||
#include "service/memory_limiter.hh"
|
||||
#include "database.hh"
|
||||
#include "db/config.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "log.hh"
|
||||
@@ -33,16 +32,18 @@ namespace cql_transport {
|
||||
|
||||
static logging::logger logger("cql_server_controller");
|
||||
|
||||
controller::controller(distributed<database>& db, sharded<auth::service>& auth, sharded<service::migration_notifier>& mn, gms::gossiper& gossiper, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml,
|
||||
sharded<qos::service_level_controller>& sl_controller)
|
||||
controller::controller(sharded<auth::service>& auth, sharded<service::migration_notifier>& mn,
|
||||
gms::gossiper& gossiper, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml,
|
||||
sharded<qos::service_level_controller>& sl_controller, const db::config& cfg)
|
||||
: _ops_sem(1)
|
||||
, _db(db)
|
||||
, _auth_service(auth)
|
||||
, _mnotifier(mn)
|
||||
, _gossiper(gossiper)
|
||||
, _qp(qp)
|
||||
, _mem_limiter(ml)
|
||||
, _sl_controller(sl_controller) {
|
||||
, _sl_controller(sl_controller)
|
||||
, _config(cfg)
|
||||
{
|
||||
}
|
||||
|
||||
future<> controller::start_server() {
|
||||
@@ -63,7 +64,7 @@ future<> controller::do_start_server() {
|
||||
return seastar::async([this] {
|
||||
auto cserver = std::make_unique<distributed<cql_transport::cql_server>>();
|
||||
|
||||
auto& cfg = _db.local().get_config();
|
||||
auto& cfg = _config;
|
||||
auto addr = cfg.rpc_address();
|
||||
auto preferred = cfg.rpc_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
|
||||
auto family = cfg.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
|
||||
@@ -148,7 +149,7 @@ future<> controller::do_start_server() {
|
||||
}
|
||||
}
|
||||
|
||||
cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config, std::ref(_sl_controller)).get();
|
||||
cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_mem_limiter), cql_server_config, std::ref(cfg), std::ref(_sl_controller)).get();
|
||||
auto on_error = defer([&cserver] { cserver->stop().get(); });
|
||||
|
||||
parallel_for_each(configs, [&cserver, keepalive](const listen_cfg & cfg) {
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
using namespace seastar;
|
||||
|
||||
namespace cql_transport { class cql_server; }
|
||||
class database;
|
||||
namespace auth { class service; }
|
||||
namespace service {
|
||||
class migration_notifier;
|
||||
@@ -37,6 +36,7 @@ namespace service {
|
||||
namespace gms { class gossiper; }
|
||||
namespace cql3 { class query_processor; }
|
||||
namespace qos { class service_level_controller; }
|
||||
namespace db { class config; }
|
||||
|
||||
namespace cql_transport {
|
||||
|
||||
@@ -45,20 +45,22 @@ class controller {
|
||||
semaphore _ops_sem; /* protects start/stop operations on _server */
|
||||
bool _stopped = false;
|
||||
|
||||
distributed<database>& _db;
|
||||
sharded<auth::service>& _auth_service;
|
||||
sharded<service::migration_notifier>& _mnotifier;
|
||||
gms::gossiper& _gossiper;
|
||||
sharded<cql3::query_processor>& _qp;
|
||||
sharded<service::memory_limiter>& _mem_limiter;
|
||||
sharded<qos::service_level_controller>& _sl_controller;
|
||||
const db::config& _config;
|
||||
|
||||
future<> set_cql_ready(bool ready);
|
||||
future<> do_start_server();
|
||||
future<> do_stop_server();
|
||||
|
||||
public:
|
||||
controller(distributed<database>&, sharded<auth::service>&, sharded<service::migration_notifier>&, gms::gossiper&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&, sharded<qos::service_level_controller>&);
|
||||
controller(sharded<auth::service>&, sharded<service::migration_notifier>&, gms::gossiper&,
|
||||
sharded<cql3::query_processor>&, sharded<service::memory_limiter>&,
|
||||
sharded<qos::service_level_controller>&, const db::config& cfg);
|
||||
future<> start_server();
|
||||
future<> stop_server();
|
||||
future<> stop();
|
||||
|
||||
@@ -38,7 +38,6 @@
|
||||
#include "service/memory_limiter.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "database.hh"
|
||||
#include "db/write_type.hh"
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/seastar.hh>
|
||||
@@ -153,12 +152,12 @@ event::event_type parse_event_type(const sstring& value)
|
||||
}
|
||||
|
||||
cql_server::cql_server(distributed<cql3::query_processor>& qp, auth::service& auth_service,
|
||||
service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config, qos::service_level_controller& sl_controller)
|
||||
service::migration_notifier& mn, service::memory_limiter& ml, cql_server_config config, const db::config& db_cfg, qos::service_level_controller& sl_controller)
|
||||
: server("CQLServer", clogger)
|
||||
, _query_processor(qp)
|
||||
, _config(config)
|
||||
, _max_request_size(config.max_request_size)
|
||||
, _max_concurrent_requests(db.get_config().max_concurrent_requests_per_shard)
|
||||
, _max_concurrent_requests(db_cfg.max_concurrent_requests_per_shard)
|
||||
, _memory_available(ml.get_semaphore())
|
||||
, _notifier(std::make_unique<event_notifier>(mn))
|
||||
, _auth_service(auth_service)
|
||||
|
||||
@@ -54,7 +54,6 @@ namespace service {
|
||||
class memory_limiter;
|
||||
}
|
||||
|
||||
class database;
|
||||
enum class client_type;
|
||||
struct client_data;
|
||||
|
||||
@@ -166,8 +165,9 @@ private:
|
||||
qos::service_level_controller& _sl_controller;
|
||||
public:
|
||||
cql_server(distributed<cql3::query_processor>& qp, auth::service&,
|
||||
service::migration_notifier& mn, database& db, service::memory_limiter& ml,
|
||||
service::migration_notifier& mn, service::memory_limiter& ml,
|
||||
cql_server_config config,
|
||||
const db::config& db_cfg,
|
||||
qos::service_level_controller& sl_controller);
|
||||
public:
|
||||
using response = cql_transport::response;
|
||||
|
||||
Reference in New Issue
Block a user