Merge: Wrap alternator start-stop into controller

Merged patch series by Pavel Emelyanov:

Alternator start and stop code is sitting inside the main()
and it's a big piece of code out there. Havig it all in main
complicates rework of start-stop sequences, it's much more
handy to have it in alternator/.

This set puts the mentioned code into transport- and thrift-
like controller model. While doing it one more call for global
storage service goes away.

* 'br-alternator-clientize' of https://github.com/xemul/scylla:
  alternator: Move start-stop code into controller
  alternator: Move the whole starting code into a sched group
  alternator: Dont capture db, use cfg
  alternator: Controller skeleton
  alternator: Controller basement
  alternator: Drop storage service from executor
This commit is contained in:
Nadav Har'El
2021-06-14 15:44:10 +03:00
8 changed files with 233 additions and 83 deletions

142
alternator/controller.cc Normal file
View File

@@ -0,0 +1,142 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/net/dns.hh>
#include "controller.hh"
#include "server.hh"
#include "executor.hh"
#include "rmw_operation.hh"
#include "db/config.hh"
#include "cdc/generation_service.hh"
#include "service/memory_limiter.hh"
using namespace seastar;
namespace alternator {
template<typename K, typename V, typename... Args, typename K2, typename V2 = V>
V get_or_default(const std::unordered_map<K, V, Args...>& ss, const K2& key, const V2& def = V()) {
const auto iter = ss.find(key);
if (iter != ss.end()) {
return iter->second;
}
return def;
}
static logging::logger logger("alternator_controller");
controller::controller(sharded<service::storage_proxy>& proxy,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
sharded<cql3::query_processor>& qp,
sharded<service::memory_limiter>& memory_limiter,
const db::config& config)
: _proxy(proxy)
, _mm(mm)
, _sys_dist_ks(sys_dist_ks)
, _cdc_gen_svc(cdc_gen_svc)
, _qp(qp)
, _memory_limiter(memory_limiter)
, _config(config)
{
}
future<> controller::start() {
return seastar::async([this] {
auto preferred = _config.listen_interface_prefer_ipv6() ? std::make_optional(net::inet_address::family::INET6) : std::nullopt;
auto family = _config.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
// Create an smp_service_group to be used for limiting the
// concurrency when forwarding Alternator request between
// shards - if necessary for LWT.
smp_service_group_config c;
c.max_nonlocal_requests = 5000;
_ssg = create_smp_service_group(c).get0();
rmw_operation::set_default_write_isolation(_config.alternator_write_isolation());
executor::set_default_timeout(std::chrono::milliseconds(_config.alternator_timeout_in_ms()));
net::inet_address addr;
try {
addr = net::dns::get_host_by_name(_config.alternator_address(), family).get0().addr_list.front();
} catch (...) {
std::throw_with_nested(std::runtime_error(fmt::format("Unable to resolve alternator_address {}", _config.alternator_address())));
}
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };
_executor.start(std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get();
_server.start(std::ref(_executor), std::ref(_qp)).get();
std::optional<uint16_t> alternator_port;
if (_config.alternator_port()) {
alternator_port = _config.alternator_port();
}
std::optional<uint16_t> alternator_https_port;
std::optional<tls::credentials_builder> creds;
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
creds.emplace();
auto opts = _config.alternator_encryption_options();
if (opts.empty()) {
// Earlier versions mistakenly configured Alternator's
// HTTPS parameters via the "server_encryption_option"
// configuration parameter. We *temporarily* continue
// to allow this, for backward compatibility.
opts = _config.server_encryption_options();
if (!opts.empty()) {
logger.warn("Setting server_encryption_options to configure "
"Alternator's HTTPS encryption is deprecated. Please "
"switch to setting alternator_encryption_options instead.");
}
}
creds->set_dh_level(tls::dh_params::level::MEDIUM);
auto cert = get_or_default(opts, "certificate", db::config::get_conf_sub("scylla.crt").string());
auto key = get_or_default(opts, "keyfile", db::config::get_conf_sub("scylla.key").string());
creds->set_x509_key_file(cert, key, tls::x509_crt_format::PEM).get();
auto prio = get_or_default(opts, "priority_string", sstring());
creds->set_priority_string(db::config::default_tls_priority);
if (!prio.empty()) {
creds->set_priority_string(prio);
}
}
bool alternator_enforce_authorization = _config.alternator_enforce_authorization();
_server.invoke_on_all(
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);
}).then([addr, alternator_port, alternator_https_port] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
}).get();
});
}
future<> controller::stop() {
return seastar::async([this] {
_server.stop().get();
_executor.stop().get();
destroy_smp_service_group(_ssg.value()).get();
});
}
}

79
alternator/controller.hh Normal file
View File

@@ -0,0 +1,79 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>
namespace service {
class storage_proxy;
class migration_manager;
class memory_limiter;
}
namespace db {
class system_distributed_keyspace;
class config;
}
namespace cdc {
class generation_service;
}
namespace cql3 {
class query_processor;
}
namespace alternator {
using namespace seastar;
class executor;
class server;
class controller {
sharded<service::storage_proxy>& _proxy;
sharded<service::migration_manager>& _mm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<cdc::generation_service>& _cdc_gen_svc;
sharded<cql3::query_processor>& _qp;
sharded<service::memory_limiter>& _memory_limiter;
const db::config& _config;
sharded<executor> _executor;
sharded<server> _server;
std::optional<smp_service_group> _ssg;
public:
controller(sharded<service::storage_proxy>& proxy,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
sharded<cql3::query_processor>& qp,
sharded<service::memory_limiter>& memory_limiter,
const db::config& config);
future<> start();
future<> stop();
};
}

View File

@@ -50,7 +50,6 @@ namespace cql3::selection {
}
namespace service {
class storage_service;
class storage_proxy;
}
@@ -144,7 +143,6 @@ class executor : public peering_sharded_service<executor> {
service::storage_proxy& _proxy;
service::migration_manager& _mm;
db::system_distributed_keyspace& _sdks;
service::storage_service& _ss;
cdc::metadata& _cdc_metadata;
// An smp_service_group to be used for limiting the concurrency when
// forwarding Alternator request between shards - if necessary for LWT.
@@ -158,8 +156,8 @@ public:
static constexpr auto KEYSPACE_NAME_PREFIX = "alternator_";
static constexpr std::string_view INTERNAL_TABLE_PREFIX = ".scylla.alternator.";
executor(service::storage_proxy& proxy, service::migration_manager& mm, db::system_distributed_keyspace& sdks, service::storage_service& ss, cdc::metadata& cdc_metadata, smp_service_group ssg)
: _proxy(proxy), _mm(mm), _sdks(sdks), _ss(ss), _cdc_metadata(cdc_metadata), _ssg(ssg) {}
executor(service::storage_proxy& proxy, service::migration_manager& mm, db::system_distributed_keyspace& sdks, cdc::metadata& cdc_metadata, smp_service_group ssg)
: _proxy(proxy), _mm(mm), _sdks(sdks), _cdc_metadata(cdc_metadata), _ssg(ssg) {}
future<request_return_type> create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
future<request_return_type> describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);

View File

@@ -1039,6 +1039,7 @@ api = ['api/api.cc',
]
alternator = [
'alternator/controller.cc',
'alternator/server.cc',
'alternator/executor.cc',
'alternator/stats.cc',

79
main.cc
View File

@@ -80,16 +80,15 @@
#include "thrift/controller.hh"
#include "service/memory_limiter.hh"
#include "alternator/server.hh"
#include "redis/service.hh"
#include "cdc/log.hh"
#include "cdc/cdc_extension.hh"
#include "cdc/generation_service.hh"
#include "alternator/tags_extension.hh"
#include "alternator/rmw_operation.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include "service/storage_proxy.hh"
#include "alternator/controller.hh"
#include "service/raft/raft_services.hh"
@@ -1380,78 +1379,16 @@ int main(int ac, char** av) {
api::unset_rpc_controller(ctx).get();
});
alternator::controller alternator_ctl(proxy, mm, sys_dist_ks, cdc_generation_service, qp, service_memory_limiter, *cfg);
if (cfg->alternator_port() || cfg->alternator_https_port()) {
alternator::rmw_operation::set_default_write_isolation(cfg->alternator_write_isolation());
alternator::executor::set_default_timeout(std::chrono::milliseconds(cfg->alternator_timeout_in_ms()));
static sharded<alternator::executor> alternator_executor;
static sharded<alternator::server> alternator_server;
net::inet_address addr;
try {
addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();
} catch (...) {
std::throw_with_nested(std::runtime_error(fmt::format("Unable to resolve alternator_address {}", cfg->alternator_address())));
}
// Create an smp_service_group to be used for limiting the
// concurrency when forwarding Alternator request between
// shards - if necessary for LWT.
smp_service_group_config c;
c.max_nonlocal_requests = 5000;
smp_service_group ssg = create_smp_service_group(c).get0();
alternator_executor.start(std::ref(proxy), std::ref(mm), std::ref(sys_dist_ks), std::ref(service::get_storage_service()), sharded_parameter(get_cdc_metadata, std::ref(cdc_generation_service)), ssg).get();
alternator_server.start(std::ref(alternator_executor), std::ref(qp)).get();
std::optional<uint16_t> alternator_port;
if (cfg->alternator_port()) {
alternator_port = cfg->alternator_port();
}
std::optional<uint16_t> alternator_https_port;
std::optional<tls::credentials_builder> creds;
if (cfg->alternator_https_port()) {
alternator_https_port = cfg->alternator_https_port();
creds.emplace();
auto opts = cfg->alternator_encryption_options();
if (opts.empty()) {
// Earlier versions mistakenly configured Alternator's
// HTTPS parameters via the "server_encryption_option"
// configuration parameter. We *temporarily* continue
// to allow this, for backward compatibility.
opts = cfg->server_encryption_options();
if (!opts.empty()) {
startlog.warn("Setting server_encryption_options to configure "
"Alternator's HTTPS encryption is deprecated. Please "
"switch to setting alternator_encryption_options instead.");
}
}
creds->set_dh_level(tls::dh_params::level::MEDIUM);
auto cert = get_or_default(opts, "certificate", db::config::get_conf_sub("scylla.crt").string());
auto key = get_or_default(opts, "keyfile", db::config::get_conf_sub("scylla.key").string());
creds->set_x509_key_file(cert, key, tls::x509_crt_format::PEM).get();
auto prio = get_or_default(opts, "priority_string", sstring());
creds->set_priority_string(db::config::default_tls_priority);
if (!prio.empty()) {
creds->set_priority_string(prio);
}
}
bool alternator_enforce_authorization = cfg->alternator_enforce_authorization();
with_scheduling_group(dbcfg.statement_scheduling_group,
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg, &service_memory_limiter, &db] () mutable {
return alternator_server.invoke_on_all(
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg, &service_memory_limiter, &db] (alternator::server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization,
&service_memory_limiter.local().get_semaphore(),
db.local().get_config().max_concurrent_requests_per_shard);
}).then([addr, alternator_port, alternator_https_port] {
startlog.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
});
with_scheduling_group(dbcfg.statement_scheduling_group, [&alternator_ctl] () mutable {
return alternator_ctl.start();
}).get();
auto stop_alternator = [ssg] {
alternator_server.stop().get();
alternator_executor.stop().get();
destroy_smp_service_group(ssg).get();
};
ss.register_client_shutdown_hook("alternator", std::move(stop_alternator));
ss.register_client_shutdown_hook("alternator", [&alternator_ctl] {
alternator_ctl.stop().get();
});
}
static redis_service redis;

View File

@@ -38,7 +38,6 @@ future<> alternator_test_env::start(std::string_view isolation_level) {
// parameters below are only touched by alternator streams;
// not really interesting for this use case
std::ref(_sdks),
std::ref(_storage_service),
std::ref(_cdc_metadata),
// end-of-streams-parameters
ssg);

View File

@@ -32,7 +32,6 @@
namespace service {
class storage_proxy;
class migration_manager;
class storage_service;
}
namespace cql3 {
class query_processor;
@@ -51,7 +50,6 @@ class query_processor;
class alternator_test_env {
sharded<service::storage_proxy>& _proxy;
sharded<service::migration_manager>& _mm;
sharded<service::storage_service>& _storage_service;
sharded<cql3::query_processor>& _qp;
// Dummy service, only needed for alternator streams
@@ -63,11 +61,9 @@ class alternator_test_env {
public:
alternator_test_env(sharded<service::storage_proxy>& proxy,
sharded<service::migration_manager>& mm,
sharded<service::storage_service>& ss,
sharded<cql3::query_processor>& qp)
: _proxy(proxy)
, _mm(mm)
, _storage_service(ss)
, _qp(qp)
{}

View File

@@ -36,7 +36,6 @@
#include "release.hh"
#include <fstream>
#include "service/storage_proxy.hh"
#include "service/storage_service.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
#include "db/extensions.hh"
@@ -366,8 +365,7 @@ static std::vector<perf_result> do_alternator_test(std::string isolation_level,
assert(cfg.frontend == test_config::frontend_type::alternator);
std::cout << "Running test with config: " << cfg << std::endl;
alternator_test_env env(qp.local().proxy().container(),
mm, service::get_storage_service(), qp);
alternator_test_env env(qp.local().proxy().container(), mm, qp);
env.start(isolation_level).get();
auto stop_env = defer([&] {
env.stop().get();