From 69c21f490aa4c6b58fa7f174b70158cd782ab30b Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 20 Mar 2023 15:50:13 +0800 Subject: [PATCH] alternator,config: make alternator_timeout_in_ms live-updateable before this change, alternator_timeout_in_ms is not live-updatable, as after setting executor's default timeout right before creating sharded executor instances, they never get updated with this option anymore. in this change, * alternator_timeout_in_ms is marked as live-updateable * executor::_s_default_timeout is changed to a thread_local variable, so it can be updated by a per-shard updateable_value. and it is now a updateable_value, so its variable name is updated accordingly. this value is set in the ctor of executor, and it is disconnected from the corresponding named_value<> option in the dtor of executor. * alternator_timeout_in_ms is passed to the constructor of executor via sharded_parameter, so executor::_timeout_in_ms can be initialized on per-shard basis * executor::set_default_timeout() is dropped, as we already pass the option to executor in its ctor. please note, in the ctor of executor, we always update the cached value of `s_default_timeout` with the value of `_timeout_in_ms`, and we set the default timeout to 10s in `alternator_test_env`. this is a design decision to avoid bending the production code for testing, as in production, we always set the timeout with the value specified either by the default value of yaml conf file. Fixes #12232 Signed-off-by: Kefu Chai --- alternator/controller.cc | 9 ++++++--- alternator/executor.cc | 9 +++------ alternator/executor.hh | 16 +++++++++++----- db/config.cc | 2 +- test/lib/alternator_test_env.cc | 10 +++++++++- 5 files changed, 30 insertions(+), 16 deletions(-) diff --git a/alternator/controller.cc b/alternator/controller.cc index 97f4ffed5d..f3280df7e7 100644 --- a/alternator/controller.cc +++ b/alternator/controller.cc @@ -76,13 +76,16 @@ future<> controller::start_server() { _ssg = create_smp_service_group(c).get0(); rmw_operation::set_default_write_isolation(_config.alternator_write_isolation()); - executor::set_default_timeout(std::chrono::milliseconds(_config.alternator_timeout_in_ms())); net::inet_address addr = utils::resolve(_config.alternator_address, family).get0(); auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); }; - - _executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get(); + auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value { + return cfg.alternator_timeout_in_ms; + }; + _executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), + sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(), + sharded_parameter(get_timeout_in_ms, std::ref(_config))).get(); _server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get(); // Note: from this point on, if start_server() throws for any reason, // it must first call stop_server() to stop the executor and server diff --git a/alternator/executor.cc b/alternator/executor.cc index 842925b28a..03d46e6f94 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -1365,14 +1365,11 @@ mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) co // The DynamoDB API doesn't let the client control the server's timeout, so // we have a global default_timeout() for Alternator requests. The value of -// s_default_timeout is overwritten in alternator::controller::start_server() +// s_default_timeout_ms is overwritten in alternator::controller::start_server() // based on the "alternator_timeout_in_ms" configuration parameter. -db::timeout_clock::duration executor::s_default_timeout = 10s; -void executor::set_default_timeout(db::timeout_clock::duration timeout) { - s_default_timeout = timeout; -} +thread_local utils::updateable_value executor::s_default_timeout_in_ms{10'000}; db::timeout_clock::time_point executor::default_timeout() { - return db::timeout_clock::now() + s_default_timeout; + return db::timeout_clock::now() + std::chrono::milliseconds(s_default_timeout_in_ms); } static future> get_previous_item( diff --git a/alternator/executor.hh b/alternator/executor.hh index b4c588ba0d..f53a4b43c8 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -22,6 +22,7 @@ #include "alternator/error.hh" #include "stats.hh" #include "utils/rjson.hh" +#include "utils/updateable_value.hh" namespace db { class system_distributed_keyspace; @@ -170,8 +171,10 @@ public: static constexpr auto KEYSPACE_NAME_PREFIX = "alternator_"; static constexpr std::string_view INTERNAL_TABLE_PREFIX = ".scylla.alternator."; - executor(gms::gossiper& gossiper, service::storage_proxy& proxy, service::migration_manager& mm, db::system_distributed_keyspace& sdks, cdc::metadata& cdc_metadata, smp_service_group ssg) - : _gossiper(gossiper), _proxy(proxy), _mm(mm), _sdks(sdks), _cdc_metadata(cdc_metadata), _ssg(ssg) {} + executor(gms::gossiper& gossiper, service::storage_proxy& proxy, service::migration_manager& mm, db::system_distributed_keyspace& sdks, cdc::metadata& cdc_metadata, smp_service_group ssg, utils::updateable_value default_timeout_in_ms) + : _gossiper(gossiper), _proxy(proxy), _mm(mm), _sdks(sdks), _cdc_metadata(cdc_metadata), _ssg(ssg) { + s_default_timeout_in_ms = std::move(default_timeout_in_ms); + } future create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); @@ -199,13 +202,16 @@ public: future describe_continuous_backups(client_state& client_state, service_permit permit, rjson::value request); future<> start(); - future<> stop() { return make_ready_future<>(); } + future<> stop() { + // disconnect from the value source, but keep the value unchanged. + s_default_timeout_in_ms = utils::updateable_value{s_default_timeout_in_ms()}; + return make_ready_future<>(); + } static sstring table_name(const schema&); static db::timeout_clock::time_point default_timeout(); - static void set_default_timeout(db::timeout_clock::duration timeout); private: - static db::timeout_clock::duration s_default_timeout; + static thread_local utils::updateable_value s_default_timeout_in_ms; public: static schema_ptr find_table(service::storage_proxy&, const rjson::value& request); diff --git a/db/config.cc b/db/config.cc index 3b87b5f8f6..f0f4a325b7 100644 --- a/db/config.cc +++ b/db/config.cc @@ -880,7 +880,7 @@ db::config::config(std::shared_ptr exts) , alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator") , alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator") , alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams") - , alternator_timeout_in_ms(this, "alternator_timeout_in_ms", value_status::Used, 10000, + , alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000, "The server-side timeout for completing Alternator API requests.") , alternator_ttl_period_in_seconds(this, "alternator_ttl_period_in_seconds", value_status::Used, 60*60*24, diff --git a/test/lib/alternator_test_env.cc b/test/lib/alternator_test_env.cc index 3938300c7b..9a3a64f741 100644 --- a/test/lib/alternator_test_env.cc +++ b/test/lib/alternator_test_env.cc @@ -13,13 +13,20 @@ #include "service/storage_proxy.hh" +using namespace std::chrono_literals; + future<> alternator_test_env::start(std::string_view isolation_level) { smp_service_group_config c; c.max_nonlocal_requests = 5000; smp_service_group ssg = co_await create_smp_service_group(c); + utils::updateable_value timeout_in_ms; co_await _sdks.start(std::ref(_qp), std::ref(_mm), std::ref(_proxy)); co_await _cdc_metadata.start(); + auto get_timeout_in_ms = sharded_parameter([] { + std::chrono::milliseconds ms = 10s; + return utils::updateable_value(ms.count()); + }); co_await _executor.start( std::ref(_gossiper), std::ref(_proxy), @@ -29,7 +36,8 @@ future<> alternator_test_env::start(std::string_view isolation_level) { std::ref(_sdks), std::ref(_cdc_metadata), // end-of-streams-parameters - ssg); + ssg, + get_timeout_in_ms); try { alternator::rmw_operation::set_default_write_isolation(isolation_level); } catch (const std::runtime_error& e) {