alternator,config: make alternator_timeout_in_ms live-updateable

before this change, alternator_timeout_in_ms is not live-updatable,
as after setting executor's default timeout right before creating
sharded executor instances, they never get updated with this option
anymore.

in this change,

* alternator_timeout_in_ms is marked as live-updateable
* executor::_s_default_timeout is changed to a thread_local variable,
  so it can be updated by a per-shard updateable_value. and
  it is now a updateable_value, so its variable name is updated
  accordingly. this value is set in the ctor of executor, and
  it is disconnected from the corresponding named_value<> option
  in the dtor of executor.
* alternator_timeout_in_ms is passed to the constructor of
  executor via sharded_parameter, so executor::_timeout_in_ms can
  be initialized on per-shard basis
* executor::set_default_timeout() is dropped, as we already pass
  the option to executor in its ctor.

please note, in the ctor of executor, we always update the cached
value of `s_default_timeout` with the value of `_timeout_in_ms`,
and we set the default timeout to 10s in `alternator_test_env`.
this is a design decision to avoid bending the production code for
testing, as in production, we always set the timeout with the value
specified either by the default value of yaml conf file.

Fixes #12232
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
Kefu Chai
2023-03-20 15:50:13 +08:00
parent 2d40952737
commit 69c21f490a
5 changed files with 30 additions and 16 deletions

View File

@@ -76,13 +76,16 @@ future<> controller::start_server() {
_ssg = create_smp_service_group(c).get0();
rmw_operation::set_default_write_isolation(_config.alternator_write_isolation());
executor::set_default_timeout(std::chrono::milliseconds(_config.alternator_timeout_in_ms()));
net::inet_address addr = utils::resolve(_config.alternator_address, family).get0();
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get();
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
return cfg.alternator_timeout_in_ms;
};
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
// Note: from this point on, if start_server() throws for any reason,
// it must first call stop_server() to stop the executor and server

View File

@@ -1365,14 +1365,11 @@ mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) co
// The DynamoDB API doesn't let the client control the server's timeout, so
// we have a global default_timeout() for Alternator requests. The value of
// s_default_timeout is overwritten in alternator::controller::start_server()
// s_default_timeout_ms is overwritten in alternator::controller::start_server()
// based on the "alternator_timeout_in_ms" configuration parameter.
db::timeout_clock::duration executor::s_default_timeout = 10s;
void executor::set_default_timeout(db::timeout_clock::duration timeout) {
s_default_timeout = timeout;
}
thread_local utils::updateable_value<uint32_t> executor::s_default_timeout_in_ms{10'000};
db::timeout_clock::time_point executor::default_timeout() {
return db::timeout_clock::now() + s_default_timeout;
return db::timeout_clock::now() + std::chrono::milliseconds(s_default_timeout_in_ms);
}
static future<std::unique_ptr<rjson::value>> get_previous_item(

View File

@@ -22,6 +22,7 @@
#include "alternator/error.hh"
#include "stats.hh"
#include "utils/rjson.hh"
#include "utils/updateable_value.hh"
namespace db {
class system_distributed_keyspace;
@@ -170,8 +171,10 @@ public:
static constexpr auto KEYSPACE_NAME_PREFIX = "alternator_";
static constexpr std::string_view INTERNAL_TABLE_PREFIX = ".scylla.alternator.";
executor(gms::gossiper& gossiper, service::storage_proxy& proxy, service::migration_manager& mm, db::system_distributed_keyspace& sdks, cdc::metadata& cdc_metadata, smp_service_group ssg)
: _gossiper(gossiper), _proxy(proxy), _mm(mm), _sdks(sdks), _cdc_metadata(cdc_metadata), _ssg(ssg) {}
executor(gms::gossiper& gossiper, service::storage_proxy& proxy, service::migration_manager& mm, db::system_distributed_keyspace& sdks, cdc::metadata& cdc_metadata, smp_service_group ssg, utils::updateable_value<uint32_t> default_timeout_in_ms)
: _gossiper(gossiper), _proxy(proxy), _mm(mm), _sdks(sdks), _cdc_metadata(cdc_metadata), _ssg(ssg) {
s_default_timeout_in_ms = std::move(default_timeout_in_ms);
}
future<request_return_type> create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
future<request_return_type> describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
@@ -199,13 +202,16 @@ public:
future<request_return_type> describe_continuous_backups(client_state& client_state, service_permit permit, rjson::value request);
future<> start();
future<> stop() { return make_ready_future<>(); }
future<> stop() {
// disconnect from the value source, but keep the value unchanged.
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
return make_ready_future<>();
}
static sstring table_name(const schema&);
static db::timeout_clock::time_point default_timeout();
static void set_default_timeout(db::timeout_clock::duration timeout);
private:
static db::timeout_clock::duration s_default_timeout;
static thread_local utils::updateable_value<uint32_t> s_default_timeout_in_ms;
public:
static schema_ptr find_table(service::storage_proxy&, const rjson::value& request);

View File

@@ -880,7 +880,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator")
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator")
, alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams")
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", value_status::Used, 10000,
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000,
"The server-side timeout for completing Alternator API requests.")
, alternator_ttl_period_in_seconds(this, "alternator_ttl_period_in_seconds", value_status::Used,
60*60*24,

View File

@@ -13,13 +13,20 @@
#include "service/storage_proxy.hh"
using namespace std::chrono_literals;
future<> alternator_test_env::start(std::string_view isolation_level) {
smp_service_group_config c;
c.max_nonlocal_requests = 5000;
smp_service_group ssg = co_await create_smp_service_group(c);
utils::updateable_value<uint32_t> timeout_in_ms;
co_await _sdks.start(std::ref(_qp), std::ref(_mm), std::ref(_proxy));
co_await _cdc_metadata.start();
auto get_timeout_in_ms = sharded_parameter([] {
std::chrono::milliseconds ms = 10s;
return utils::updateable_value<uint32_t>(ms.count());
});
co_await _executor.start(
std::ref(_gossiper),
std::ref(_proxy),
@@ -29,7 +36,8 @@ future<> alternator_test_env::start(std::string_view isolation_level) {
std::ref(_sdks),
std::ref(_cdc_metadata),
// end-of-streams-parameters
ssg);
ssg,
get_timeout_in_ms);
try {
alternator::rmw_operation::set_default_write_isolation(isolation_level);
} catch (const std::runtime_error& e) {