redis,thrift,transport: make timeout_config live-updateable
* timeout_config
- add `updated_timeout_config` which represents an always-updated
options backed by `utils::updateable_value<>`. this class is
used by servers which need to access the latest timeout related
options. the existing `timeout_config` is more like a snapshot
of the `updated_timeout_config`. it is used in the use case where
we don't need to most updated options or we update the options
manually on demand.
* redis, thrift, transport: s/timeout_config/updated_timeout_config/
when appropriate. use the improved version of timeout_config where
we need to have the access to the most-updated version of the timeout
options.
Fixes #10172
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
@@ -49,7 +49,7 @@ future<> controller::listen(seastar::sharded<auth::service>& auth_service, db::c
|
||||
return utils::resolve(cfg.rpc_address, family, preferred).then([this, server, &cfg, keepalive, ceo = std::move(ceo), &auth_service] (seastar::net::inet_address ip) {
|
||||
auto get_config = sharded_parameter([&] {
|
||||
return redis_transport::redis_server_config {
|
||||
._timeout_config = make_timeout_config(cfg),
|
||||
._timeout_config = updateable_timeout_config(cfg),
|
||||
._max_request_size = memory::stats().total_memory() / 10,
|
||||
._read_consistency_level = make_consistency_level(cfg.redis_read_consistency_level()),
|
||||
._write_consistency_level = make_consistency_level(cfg.redis_write_consistency_level()),
|
||||
|
||||
@@ -27,13 +27,13 @@ class redis_options {
|
||||
sstring _ks_name;
|
||||
const db::consistency_level _read_consistency;
|
||||
const db::consistency_level _write_consistency;
|
||||
const timeout_config& _timeout_config;
|
||||
const updateable_timeout_config& _timeout_config;
|
||||
service::client_state _client_state;
|
||||
size_t _total_redis_db_count;
|
||||
public:
|
||||
explicit redis_options(const db::consistency_level rcl,
|
||||
const db::consistency_level wcl,
|
||||
const timeout_config& tc,
|
||||
const updateable_timeout_config& tc,
|
||||
auth::service& auth,
|
||||
const socket_address addr,
|
||||
size_t total_redis_db_count)
|
||||
@@ -41,14 +41,14 @@ public:
|
||||
,_read_consistency(rcl)
|
||||
,_write_consistency(wcl)
|
||||
,_timeout_config(tc)
|
||||
,_client_state(service::client_state::external_tag{}, auth, nullptr, tc, addr)
|
||||
,_client_state(service::client_state::external_tag{}, auth, nullptr, tc.current_values(), addr)
|
||||
,_total_redis_db_count(total_redis_db_count)
|
||||
{
|
||||
}
|
||||
explicit redis_options(const sstring& ks_name,
|
||||
const db::consistency_level rcl,
|
||||
const db::consistency_level wcl,
|
||||
const timeout_config& tc,
|
||||
const updateable_timeout_config& tc,
|
||||
auth::service& auth,
|
||||
const socket_address addr,
|
||||
size_t total_redis_db_count)
|
||||
@@ -56,7 +56,7 @@ public:
|
||||
,_read_consistency(rcl)
|
||||
,_write_consistency(wcl)
|
||||
,_timeout_config(tc)
|
||||
,_client_state(service::client_state::external_tag{}, auth, nullptr, tc, addr)
|
||||
,_client_state(service::client_state::external_tag{}, auth, nullptr, tc.current_values(), addr)
|
||||
,_total_redis_db_count(total_redis_db_count)
|
||||
{
|
||||
}
|
||||
@@ -64,9 +64,8 @@ public:
|
||||
const db::consistency_level get_read_consistency_level() const { return _read_consistency; }
|
||||
const db::consistency_level get_write_consistency_level() const { return _write_consistency; }
|
||||
|
||||
const timeout_config& get_timeout_config() const { return _timeout_config; }
|
||||
const db::timeout_clock::duration get_read_timeout() const { return _timeout_config.read_timeout; }
|
||||
const db::timeout_clock::duration get_write_timeout() const { return _timeout_config.write_timeout; }
|
||||
const db::timeout_clock::duration get_read_timeout() const { return std::chrono::milliseconds(_timeout_config.read_timeout_in_ms); }
|
||||
const db::timeout_clock::duration get_write_timeout() const { return std::chrono::milliseconds(_timeout_config.write_timeout_in_ms); }
|
||||
const sstring& get_keyspace_name() const { return _ks_name; }
|
||||
service::client_state& get_client_state() { return _client_state; }
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ class storage_proxy;
|
||||
namespace redis_transport {
|
||||
|
||||
struct redis_server_config {
|
||||
::timeout_config _timeout_config;
|
||||
::updateable_timeout_config _timeout_config;
|
||||
size_t _max_request_size;
|
||||
db::consistency_level _read_consistency_level;
|
||||
db::consistency_level _write_consistency_level;
|
||||
|
||||
@@ -67,14 +67,13 @@ future<> thrift_controller::do_start_server() {
|
||||
auto family = cfg.enable_ipv6_dns_lookup() || preferred ? std::nullopt : std::make_optional(net::inet_address::family::INET);
|
||||
auto keepalive = cfg.rpc_keepalive();
|
||||
|
||||
<<<<<<< HEAD
|
||||
auto ip = utils::resolve(cfg.rpc_address, family, preferred).get();
|
||||
auto port = cfg.rpc_port();
|
||||
_addr.emplace(ip, port);
|
||||
|
||||
auto tsc = sharded_parameter([&cfg] {
|
||||
return thrift_server_config {
|
||||
.timeout_config = make_timeout_config(cfg),
|
||||
.timeout_config = updateable_timeout_config(cfg),
|
||||
.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20),
|
||||
};
|
||||
});
|
||||
|
||||
@@ -2055,7 +2055,7 @@ class handler_factory : public CassandraCobSvIfFactory {
|
||||
sharded<service::storage_service>& _ss;
|
||||
sharded<service::storage_proxy>& _proxy;
|
||||
auth::service& _auth_service;
|
||||
const timeout_config& _timeout_config;
|
||||
const updateable_timeout_config& _timeout_config;
|
||||
service_permit& _current_permit;
|
||||
public:
|
||||
explicit handler_factory(data_dictionary::database db,
|
||||
@@ -2063,12 +2063,12 @@ public:
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
auth::service& auth_service,
|
||||
const ::timeout_config& timeout_config,
|
||||
const ::updateable_timeout_config& timeout_config,
|
||||
service_permit& current_permit)
|
||||
: _db(db), _query_processor(qp), _ss(ss), _proxy(proxy), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {}
|
||||
typedef CassandraCobSvIf Handler;
|
||||
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
|
||||
return new thrift_handler(_db, _query_processor, _ss, _proxy, _auth_service, _timeout_config, _current_permit);
|
||||
return new thrift_handler(_db, _query_processor, _ss, _proxy, _auth_service, _timeout_config.current_values(), _current_permit);
|
||||
}
|
||||
virtual void releaseHandler(CassandraCobSvIf* handler) {
|
||||
delete handler;
|
||||
@@ -2078,6 +2078,6 @@ public:
|
||||
std::unique_ptr<CassandraCobSvIfFactory>
|
||||
create_handler_factory(data_dictionary::database db, distributed<cql3::query_processor>& qp,
|
||||
sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy,
|
||||
auth::service& auth_service, const ::timeout_config& timeout_config, service_permit& current_permit) {
|
||||
auth::service& auth_service, const ::updateable_timeout_config& timeout_config, service_permit& current_permit) {
|
||||
return std::make_unique<handler_factory>(db, qp, ss, proxy, auth_service, timeout_config, current_permit);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,6 @@ namespace data_dictionary {
|
||||
class database;
|
||||
}
|
||||
|
||||
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(data_dictionary::database db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy, auth::service&, const timeout_config&, service_permit& current_permit);
|
||||
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(data_dictionary::database db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, sharded<service::storage_proxy>& proxy, auth::service&, const updateable_timeout_config&, service_permit& current_permit);
|
||||
|
||||
#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */
|
||||
|
||||
@@ -69,7 +69,7 @@ class database;
|
||||
}
|
||||
|
||||
struct thrift_server_config {
|
||||
::timeout_config timeout_config;
|
||||
::updateable_timeout_config timeout_config;
|
||||
uint64_t max_request_size;
|
||||
std::function<semaphore& ()> get_service_memory_limiter_semaphore;
|
||||
};
|
||||
|
||||
@@ -9,17 +9,34 @@
|
||||
|
||||
#include "timeout_config.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include <chrono>
|
||||
#include <seastar/core/future.hh>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
timeout_config make_timeout_config(const db::config& cfg) {
|
||||
timeout_config tc;
|
||||
tc.read_timeout = cfg.read_request_timeout_in_ms() * 1ms;
|
||||
tc.write_timeout = cfg.write_request_timeout_in_ms() * 1ms;
|
||||
tc.range_read_timeout = cfg.range_request_timeout_in_ms() * 1ms;
|
||||
tc.counter_write_timeout = cfg.counter_write_request_timeout_in_ms() * 1ms;
|
||||
tc.truncate_timeout = cfg.truncate_request_timeout_in_ms() * 1ms;
|
||||
tc.cas_timeout = cfg.cas_contention_timeout_in_ms() * 1ms;
|
||||
tc.other_timeout = cfg.request_timeout_in_ms() * 1ms;
|
||||
return tc;
|
||||
updateable_timeout_config::updateable_timeout_config(const db::config& cfg)
|
||||
: read_timeout_in_ms(cfg.read_request_timeout_in_ms)
|
||||
, write_timeout_in_ms(cfg.write_request_timeout_in_ms)
|
||||
, range_read_timeout_in_ms(cfg.range_request_timeout_in_ms)
|
||||
, counter_write_timeout_in_ms(cfg.counter_write_request_timeout_in_ms)
|
||||
, truncate_timeout_in_ms(cfg.counter_write_request_timeout_in_ms)
|
||||
, cas_timeout_in_ms(cfg.cas_contention_timeout_in_ms)
|
||||
, other_timeout_in_ms(cfg.request_timeout_in_ms)
|
||||
{}
|
||||
|
||||
timeout_config updateable_timeout_config::current_values() const {
|
||||
return {
|
||||
std::chrono::milliseconds(read_timeout_in_ms),
|
||||
std::chrono::milliseconds(write_timeout_in_ms),
|
||||
std::chrono::milliseconds(range_read_timeout_in_ms),
|
||||
std::chrono::milliseconds(counter_write_timeout_in_ms),
|
||||
std::chrono::milliseconds(truncate_timeout_in_ms),
|
||||
std::chrono::milliseconds(cas_timeout_in_ms),
|
||||
std::chrono::milliseconds(cas_timeout_in_ms),
|
||||
};
|
||||
}
|
||||
|
||||
timeout_config make_timeout_config(const db::config& cfg) {
|
||||
return updateable_timeout_config(cfg).current_values();
|
||||
}
|
||||
|
||||
@@ -10,20 +10,50 @@
|
||||
#pragma once
|
||||
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <chrono>
|
||||
|
||||
namespace db { class config; }
|
||||
|
||||
class updateable_timeout_config;
|
||||
|
||||
/// timeout_config represents a snapshot of the options stored in it when
|
||||
/// an instance of this class is created. so far this class is only used by
|
||||
/// client_state and thrift_handler. so either these classes are obliged to
|
||||
/// update it by themselves, or they are fine with using the maybe-updated
|
||||
/// options in the lifecycle of a client / connection even if some of these
|
||||
/// options are changed whtn the client / connection is still alive.
|
||||
struct timeout_config {
|
||||
db::timeout_clock::duration read_timeout;
|
||||
db::timeout_clock::duration write_timeout;
|
||||
db::timeout_clock::duration range_read_timeout;
|
||||
db::timeout_clock::duration counter_write_timeout;
|
||||
db::timeout_clock::duration truncate_timeout;
|
||||
db::timeout_clock::duration cas_timeout;
|
||||
db::timeout_clock::duration other_timeout;
|
||||
using duration_t = db::timeout_clock::duration;
|
||||
|
||||
duration_t read_timeout;
|
||||
duration_t write_timeout;
|
||||
duration_t range_read_timeout;
|
||||
duration_t counter_write_timeout;
|
||||
duration_t truncate_timeout;
|
||||
duration_t cas_timeout;
|
||||
duration_t other_timeout;
|
||||
};
|
||||
|
||||
struct updateable_timeout_config {
|
||||
using timeout_option_t = utils::updateable_value<uint32_t>;
|
||||
|
||||
timeout_option_t read_timeout_in_ms;
|
||||
timeout_option_t write_timeout_in_ms;
|
||||
timeout_option_t range_read_timeout_in_ms;
|
||||
timeout_option_t counter_write_timeout_in_ms;
|
||||
timeout_option_t truncate_timeout_in_ms;
|
||||
timeout_option_t cas_timeout_in_ms;
|
||||
timeout_option_t other_timeout_in_ms;
|
||||
|
||||
explicit updateable_timeout_config(const db::config& cfg);
|
||||
|
||||
timeout_config current_values() const;
|
||||
};
|
||||
|
||||
|
||||
using timeout_config_selector = db::timeout_clock::duration (timeout_config::*);
|
||||
|
||||
extern const timeout_config infinite_timeout_config;
|
||||
|
||||
namespace db { class config; }
|
||||
timeout_config make_timeout_config(const db::config& cfg);
|
||||
|
||||
@@ -90,7 +90,7 @@ future<> controller::do_start_server() {
|
||||
shard_aware_transport_port_ssl = cfg.native_shard_aware_transport_port_ssl();
|
||||
}
|
||||
return cql_server_config {
|
||||
.timeout_config = make_timeout_config(cfg),
|
||||
.timeout_config = updateable_timeout_config(cfg),
|
||||
.max_request_size = _mem_limiter.local().total_memory(),
|
||||
.partitioner_name = cfg.partitioner(),
|
||||
.sharding_ignore_msb = cfg.murmur3_partitioner_ignore_msb_bits(),
|
||||
|
||||
@@ -101,7 +101,7 @@ struct cql_query_state {
|
||||
};
|
||||
|
||||
struct cql_server_config {
|
||||
::timeout_config timeout_config;
|
||||
updateable_timeout_config timeout_config;
|
||||
size_t max_request_size;
|
||||
sstring partitioner_name;
|
||||
unsigned sharding_ignore_msb;
|
||||
@@ -280,7 +280,7 @@ private:
|
||||
future<> advertise_new_connection(shared_ptr<generic_server::connection> conn) override;
|
||||
future<> unadvertise_connection(shared_ptr<generic_server::connection> conn) override;
|
||||
|
||||
const ::timeout_config& timeout_config() const { return _config.timeout_config; }
|
||||
::timeout_config timeout_config() const { return _config.timeout_config.current_values(); }
|
||||
};
|
||||
|
||||
class cql_server::event_notifier : public service::migration_listener,
|
||||
|
||||
Reference in New Issue
Block a user