Merge 'Move timeouts to client state' from Piotr Sarna
This series is extracted from #7913 as it may prove useful to other series as well, and #7913 might take a while until its merged, given that it also depends on other unmerged pull requests. The idea of this series is to move timeouts to the client state, which will allow changing them independently for each session - e.g. by setting per-service-level timeouts and initializing the values from attached service levels (see #7867). Closes #8140 * github.com:scylladb/scylla: treewide: remove timeout config from query options cql3: use timeout config from client state instead of query options cql3: use timeout config from client state instead of query options cql3: use timeout config from client state instead of query options service: add timeout config to client state
This commit is contained in:
@@ -129,8 +129,7 @@ future<std::string> get_key_from_roles(cql3::query_processor& qp, std::string us
|
||||
auth::meta::roles_table::qualified_name, auth::meta::roles_table::role_col_name);
|
||||
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
auto& timeout = auth::internal_distributed_timeout_config();
|
||||
return qp.execute_internal(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
return qp.execute_internal(query, cl, auth::internal_distributed_query_state(), {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
auto res = f.get0();
|
||||
auto salted_hash = std::optional<sstring>();
|
||||
if (res->empty()) {
|
||||
|
||||
@@ -3214,7 +3214,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit));
|
||||
|
||||
command->slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
auto query_options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config, std::vector<cql3::raw_value>{});
|
||||
auto query_options = std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>{});
|
||||
query_options = std::make_unique<cql3::query_options>(std::move(query_options), std::move(paging_state));
|
||||
auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr);
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ future<> wait_for_schema_agreement(::service::migration_manager& mm, const datab
|
||||
});
|
||||
}
|
||||
|
||||
const timeout_config& internal_distributed_timeout_config() noexcept {
|
||||
::service::query_state& internal_distributed_query_state() noexcept {
|
||||
#ifdef DEBUG
|
||||
// Give the much slower debug tests more headroom for completing auth queries.
|
||||
static const auto t = 30s;
|
||||
@@ -116,7 +116,9 @@ const timeout_config& internal_distributed_timeout_config() noexcept {
|
||||
static const auto t = 5s;
|
||||
#endif
|
||||
static const timeout_config tc{t, t, t, t, t, t, t};
|
||||
return tc;
|
||||
static thread_local ::service::client_state cs(::service::client_state::internal_tag{}, tc);
|
||||
static thread_local ::service::query_state qs(cs, empty_service_permit());
|
||||
return qs;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include "log.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "service/query_state.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -87,6 +88,6 @@ future<> wait_for_schema_agreement(::service::migration_manager&, const database
|
||||
///
|
||||
/// Time-outs for internal, non-local CQL queries.
|
||||
///
|
||||
const timeout_config& internal_distributed_timeout_config() noexcept;
|
||||
::service::query_state& internal_distributed_query_state() noexcept;
|
||||
|
||||
}
|
||||
|
||||
@@ -103,7 +103,6 @@ future<bool> default_authorizer::any_granted() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{},
|
||||
true).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return !results->empty();
|
||||
@@ -116,8 +115,7 @@ future<> default_authorizer::migrate_legacy_metadata() const {
|
||||
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
db::consistency_level::LOCAL_ONE).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
return do_with(
|
||||
row.get_as<sstring>("username"),
|
||||
@@ -197,7 +195,6 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{*maybe_role.name, r.name()}).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return permissions::NONE;
|
||||
@@ -226,7 +223,7 @@ default_authorizer::modify(
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{permissions::to_strings(set), sstring(role_name), resource.name()}).discard_result();
|
||||
});
|
||||
}
|
||||
@@ -251,7 +248,7 @@ future<std::vector<permission_details>> default_authorizer::list_all() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{},
|
||||
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
std::vector<permission_details> all_details;
|
||||
@@ -278,7 +275,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).discard_result().handle_exception([role_name](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
@@ -298,7 +295,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{resource.name()}).then_wrapped([this, resource](future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
try {
|
||||
auto res = f.get0();
|
||||
@@ -315,7 +311,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{r.get_as<sstring>(ROLE_NAME), resource.name()}).discard_result().handle_exception(
|
||||
[resource](auto ep) {
|
||||
try {
|
||||
|
||||
@@ -114,7 +114,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
internal_distributed_query_state()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
auto username = row.get_as<sstring>("username");
|
||||
auto salted_hash = row.get_as<sstring>(SALTED_HASH);
|
||||
@@ -122,7 +122,7 @@ future<> password_authenticator::migrate_legacy_metadata() const {
|
||||
return _qp.execute_internal(
|
||||
update_row_query(),
|
||||
consistency_for_user(username),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{std::move(salted_hash), username}).discard_result();
|
||||
}).finally([results] {});
|
||||
}).then([] {
|
||||
@@ -139,7 +139,7 @@ future<> password_authenticator::create_default_if_missing() const {
|
||||
return _qp.execute_internal(
|
||||
update_row_query(),
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt), DEFAULT_USER_NAME}).then([](auto&&) {
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
});
|
||||
@@ -236,7 +236,7 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_user(username),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{username},
|
||||
true);
|
||||
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
|
||||
@@ -270,7 +270,7 @@ future<> password_authenticator::create(std::string_view role_name, const authen
|
||||
return _qp.execute_internal(
|
||||
update_row_query(),
|
||||
consistency_for_user(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result();
|
||||
}
|
||||
|
||||
@@ -287,7 +287,7 @@ future<> password_authenticator::alter(std::string_view role_name, const authent
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_user(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result();
|
||||
}
|
||||
|
||||
@@ -299,7 +299,7 @@ future<> password_authenticator::drop(std::string_view name) const {
|
||||
|
||||
return _qp.execute_internal(
|
||||
query, consistency_for_user(name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(name)}).discard_result();
|
||||
}
|
||||
|
||||
|
||||
@@ -68,14 +68,13 @@ future<bool> default_role_row_satisfies(
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([&qp, &p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
@@ -100,7 +99,7 @@ future<bool> any_nondefault_role_row_satisfies(
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config()).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
internal_distributed_query_state()).then([&p](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -210,7 +210,6 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
return _qp.execute_internal(
|
||||
default_user_query,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
@@ -220,7 +219,6 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
return _qp.execute_internal(
|
||||
default_user_query,
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config,
|
||||
{meta::DEFAULT_SUPERUSER_NAME},
|
||||
true).then([this](auto results) {
|
||||
if (!results->empty()) {
|
||||
@@ -229,8 +227,7 @@ future<bool> service::has_existing_legacy_users() const {
|
||||
|
||||
return _qp.execute_internal(
|
||||
all_users_query,
|
||||
db::consistency_level::QUORUM,
|
||||
infinite_timeout_config).then([](auto results) {
|
||||
db::consistency_level::QUORUM).then([](auto results) {
|
||||
return make_ready_future<bool>(!results->empty());
|
||||
});
|
||||
});
|
||||
|
||||
@@ -86,7 +86,7 @@ static future<std::optional<record>> find_record(cql3::query_processor& qp, std:
|
||||
return qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)},
|
||||
true).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
if (results->empty()) {
|
||||
@@ -165,7 +165,7 @@ future<> standard_role_manager::create_default_role_if_missing() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{meta::DEFAULT_SUPERUSER_NAME}).then([](auto&&) {
|
||||
log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME);
|
||||
return make_ready_future<>();
|
||||
@@ -192,7 +192,7 @@ future<> standard_role_manager::migrate_legacy_metadata() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
internal_distributed_query_state()).then([this](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
|
||||
role_config config;
|
||||
config.is_superuser = row.get_or<bool>("super", false);
|
||||
@@ -253,7 +253,7 @@ future<> standard_role_manager::create_or_replace(std::string_view role_name, co
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name), c.is_superuser, c.can_login},
|
||||
true).discard_result();
|
||||
}
|
||||
@@ -296,7 +296,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
|
||||
build_column_assignments(u),
|
||||
meta::roles_table::role_col_name),
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).discard_result();
|
||||
});
|
||||
}
|
||||
@@ -315,7 +315,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).then([this, role_name](::shared_ptr<cql3::untyped_result_set> members) {
|
||||
return parallel_for_each(
|
||||
members->begin(),
|
||||
@@ -354,7 +354,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name)}).discard_result();
|
||||
};
|
||||
|
||||
@@ -381,7 +381,7 @@ standard_role_manager::modify_membership(
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(grantee_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{role_set{sstring(role_name)}, sstring(grantee_name)}).discard_result();
|
||||
};
|
||||
|
||||
@@ -392,7 +392,7 @@ standard_role_manager::modify_membership(
|
||||
format("INSERT INTO {} (role, member) VALUES (?, ?)",
|
||||
meta::role_members_table::qualified_name),
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name), sstring(grantee_name)}).discard_result();
|
||||
|
||||
case membership_change::remove:
|
||||
@@ -400,7 +400,7 @@ standard_role_manager::modify_membership(
|
||||
format("DELETE FROM {} WHERE role = ? AND member = ?",
|
||||
meta::role_members_table::qualified_name),
|
||||
consistency_for_role(role_name),
|
||||
internal_distributed_timeout_config(),
|
||||
internal_distributed_query_state(),
|
||||
{sstring(role_name), sstring(grantee_name)}).discard_result();
|
||||
}
|
||||
|
||||
@@ -503,7 +503,7 @@ future<role_set> standard_role_manager::query_all() const {
|
||||
return _qp.execute_internal(
|
||||
query,
|
||||
db::consistency_level::QUORUM,
|
||||
internal_distributed_timeout_config()).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
internal_distributed_query_state()).then([](::shared_ptr<cql3::untyped_result_set> results) {
|
||||
role_set roles;
|
||||
|
||||
std::transform(
|
||||
|
||||
@@ -52,12 +52,11 @@ thread_local const query_options::specific_options query_options::specific_optio
|
||||
-1, {}, db::consistency_level::SERIAL, api::missing_timestamp};
|
||||
|
||||
thread_local query_options query_options::DEFAULT{default_cql_config,
|
||||
db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
|
||||
db::consistency_level::ONE, std::nullopt,
|
||||
std::vector<cql3::raw_value_view>(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()};
|
||||
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
@@ -66,7 +65,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
cql_serialization_format sf)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values(std::move(values))
|
||||
, _value_views(value_views)
|
||||
@@ -78,7 +76,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
bool skip_metadata,
|
||||
@@ -86,7 +83,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
cql_serialization_format sf)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values(std::move(values))
|
||||
, _value_views()
|
||||
@@ -99,7 +95,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
bool skip_metadata,
|
||||
@@ -107,7 +102,6 @@ query_options::query_options(const cql_config& cfg,
|
||||
cql_serialization_format sf)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values()
|
||||
, _value_views(std::move(value_views))
|
||||
@@ -117,12 +111,11 @@ query_options::query_options(const cql_config& cfg,
|
||||
{
|
||||
}
|
||||
|
||||
query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector<cql3::raw_value> values,
|
||||
query_options::query_options(db::consistency_level cl, std::vector<cql3::raw_value> values,
|
||||
specific_options options)
|
||||
: query_options(
|
||||
default_cql_config,
|
||||
cl,
|
||||
timeout_config,
|
||||
{},
|
||||
std::move(values),
|
||||
false,
|
||||
@@ -135,7 +128,6 @@ query_options::query_options(db::consistency_level cl, const ::timeout_config& t
|
||||
query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state)
|
||||
: query_options(qo->_cql_config,
|
||||
qo->_consistency,
|
||||
qo->get_timeout_config(),
|
||||
std::move(qo->_names),
|
||||
std::move(qo->_values),
|
||||
std::move(qo->_value_views),
|
||||
@@ -148,7 +140,6 @@ query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<se
|
||||
query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
|
||||
: query_options(qo->_cql_config,
|
||||
qo->_consistency,
|
||||
qo->get_timeout_config(),
|
||||
std::move(qo->_names),
|
||||
std::move(qo->_values),
|
||||
std::move(qo->_value_views),
|
||||
@@ -160,7 +151,7 @@ query_options::query_options(std::unique_ptr<query_options> qo, lw_shared_ptr<se
|
||||
|
||||
query_options::query_options(std::vector<cql3::raw_value> values)
|
||||
: query_options(
|
||||
db::consistency_level::ONE, infinite_timeout_config, std::move(values))
|
||||
db::consistency_level::ONE, std::move(values))
|
||||
{}
|
||||
|
||||
void query_options::prepare(const std::vector<lw_shared_ptr<column_specification>>& specs)
|
||||
|
||||
@@ -51,7 +51,6 @@
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "cql3/values.hh"
|
||||
#include "cql_serialization_format.hh"
|
||||
#include "timeout_config.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -75,7 +74,6 @@ public:
|
||||
private:
|
||||
const cql_config& _cql_config;
|
||||
const db::consistency_level _consistency;
|
||||
const timeout_config& _timeout_config;
|
||||
const std::optional<std::vector<sstring_view>> _names;
|
||||
std::vector<cql3::raw_value> _values;
|
||||
std::vector<cql3::raw_value_view> _value_views;
|
||||
@@ -118,7 +116,6 @@ public:
|
||||
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
bool skip_metadata,
|
||||
@@ -126,7 +123,6 @@ public:
|
||||
cql_serialization_format sf);
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
@@ -135,7 +131,6 @@ public:
|
||||
cql_serialization_format sf);
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
bool skip_metadata,
|
||||
@@ -167,13 +162,10 @@ public:
|
||||
|
||||
// forInternalUse
|
||||
explicit query_options(std::vector<cql3::raw_value> values);
|
||||
explicit query_options(db::consistency_level, const timeout_config& timeouts,
|
||||
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
|
||||
explicit query_options(db::consistency_level, std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
|
||||
explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state);
|
||||
explicit query_options(std::unique_ptr<query_options>, lw_shared_ptr<service::pager::paging_state> paging_state, int32_t page_size);
|
||||
|
||||
const timeout_config& get_timeout_config() const { return _timeout_config; }
|
||||
|
||||
db::consistency_level get_consistency() const {
|
||||
return _consistency;
|
||||
}
|
||||
@@ -300,7 +292,7 @@ query_options::query_options(query_options&& o, std::vector<OneMutationDataRange
|
||||
std::vector<query_options> tmp;
|
||||
tmp.reserve(values_ranges.size());
|
||||
std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) {
|
||||
return query_options(_cql_config, _consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
|
||||
return query_options(_cql_config, _consistency, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
|
||||
});
|
||||
_batch_options = std::move(tmp);
|
||||
}
|
||||
|
||||
@@ -619,7 +619,6 @@ query_options query_processor::make_internal_options(
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
const std::initializer_list<data_value>& values,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
int32_t page_size) const {
|
||||
if (p->bound_names.size() != values.size()) {
|
||||
throw std::invalid_argument(
|
||||
@@ -643,11 +642,10 @@ query_options query_processor::make_internal_options(
|
||||
api::timestamp_type ts = api::missing_timestamp;
|
||||
return query_options(
|
||||
cl,
|
||||
timeout_config,
|
||||
bound_values,
|
||||
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts});
|
||||
}
|
||||
return query_options(cl, timeout_config, bound_values);
|
||||
return query_options(cl, bound_values);
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr query_processor::prepare_internal(const sstring& query_string) {
|
||||
@@ -671,11 +669,10 @@ struct internal_query_state {
|
||||
::shared_ptr<internal_query_state> query_processor::create_paged_state(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& values,
|
||||
int32_t page_size) {
|
||||
auto p = prepare_internal(query_string);
|
||||
auto opts = make_internal_options(p, values, cl, timeout_config, page_size);
|
||||
auto opts = make_internal_options(p, values, cl, page_size);
|
||||
::shared_ptr<internal_query_state> res = ::make_shared<internal_query_state>(
|
||||
internal_query_state{
|
||||
query_string,
|
||||
@@ -793,7 +790,16 @@ future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& values,
|
||||
bool cache) {
|
||||
return execute_internal(query_string, cl, *_internal_state, values, cache);
|
||||
}
|
||||
|
||||
future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& values,
|
||||
bool cache) {
|
||||
|
||||
@@ -801,13 +807,13 @@ query_processor::execute_internal(
|
||||
log.trace("execute_internal: {}\"{}\" ({})", cache ? "(cached) " : "", query_string, ::join(", ", values));
|
||||
}
|
||||
if (cache) {
|
||||
return execute_with_params(prepare_internal(query_string), cl, timeout_config, values);
|
||||
return execute_with_params(prepare_internal(query_string), cl, query_state, values);
|
||||
} else {
|
||||
auto p = parse_statement(query_string)->prepare(_db, _cql_stats);
|
||||
p->statement->raw_cql_statement = query_string;
|
||||
p->statement->validate(_proxy, *_internal_state);
|
||||
auto checked_weak_ptr = p->checked_weak_from_this();
|
||||
return execute_with_params(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {});
|
||||
return execute_with_params(std::move(checked_weak_ptr), cl, query_state, values).finally([p = std::move(p)] {});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -815,11 +821,11 @@ future<::shared_ptr<untyped_result_set>>
|
||||
query_processor::execute_with_params(
|
||||
statements::prepared_statement::checked_weak_ptr p,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& values) {
|
||||
auto opts = make_internal_options(p, values, cl, timeout_config);
|
||||
return do_with(std::move(opts), [this, p = std::move(p)](auto & opts) {
|
||||
return p->statement->execute(_proxy, *_internal_state, opts).then([](auto msg) {
|
||||
auto opts = make_internal_options(p, values, cl);
|
||||
return do_with(std::move(opts), [this, &query_state, p = std::move(p)](auto & opts) {
|
||||
return p->statement->execute(_proxy, query_state, opts).then([](auto msg) {
|
||||
return make_ready_future<::shared_ptr<untyped_result_set>>(::make_shared<untyped_result_set>(msg));
|
||||
});
|
||||
});
|
||||
@@ -942,17 +948,16 @@ bool query_processor::migration_subscriber::should_invalidate(
|
||||
future<> query_processor::query_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& values,
|
||||
int32_t page_size,
|
||||
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)>&& f) {
|
||||
return for_each_cql_result(create_paged_state(query_string, cl, timeout_config, values, page_size), std::move(f));
|
||||
return for_each_cql_result(create_paged_state(query_string, cl, values, page_size), std::move(f));
|
||||
}
|
||||
|
||||
future<> query_processor::query_internal(
|
||||
const sstring& query_string,
|
||||
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)>&& f) {
|
||||
return query_internal(query_string, db::consistency_level::ONE, infinite_timeout_config, {}, 1000, std::move(f));
|
||||
return query_internal(query_string, db::consistency_level::ONE, {}, 1000, std::move(f));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -215,8 +215,7 @@ public:
|
||||
// creating namespaces, etc) is explicitly forbidden via this interface.
|
||||
future<::shared_ptr<untyped_result_set>>
|
||||
execute_internal(const sstring& query_string, const std::initializer_list<data_value>& values = { }) {
|
||||
return execute_internal(query_string, db::consistency_level::ONE,
|
||||
infinite_timeout_config, values, true);
|
||||
return execute_internal(query_string, db::consistency_level::ONE, values, true);
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);
|
||||
@@ -234,7 +233,6 @@ public:
|
||||
return query_internal(
|
||||
"SELECT * from system.compaction_history",
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
{},
|
||||
[&history] (const cql3::untyped_result_set::row& row) mutable {
|
||||
....
|
||||
@@ -246,7 +244,6 @@ public:
|
||||
*
|
||||
* query_string - the cql string, can contain placeholders
|
||||
* cl - consistency level of the query
|
||||
* timeout_config - timeout configuration
|
||||
* values - values to be substituted for the placeholders in the query
|
||||
* page_size - maximum page size
|
||||
* f - a function to be run on each row of the query result,
|
||||
@@ -255,7 +252,6 @@ public:
|
||||
future<> query_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level cl,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& values,
|
||||
int32_t page_size,
|
||||
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)>&& f);
|
||||
@@ -282,14 +278,19 @@ public:
|
||||
future<::shared_ptr<untyped_result_set>> execute_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
const std::initializer_list<data_value>& = { },
|
||||
bool cache = false);
|
||||
future<::shared_ptr<untyped_result_set>> execute_internal(
|
||||
const sstring& query_string,
|
||||
db::consistency_level,
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& = { },
|
||||
bool cache = false);
|
||||
|
||||
future<::shared_ptr<untyped_result_set>> execute_with_params(
|
||||
statements::prepared_statement::checked_weak_ptr p,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
service::query_state& query_state,
|
||||
const std::initializer_list<data_value>& = { });
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
||||
@@ -318,7 +319,6 @@ private:
|
||||
const statements::prepared_statement::checked_weak_ptr& p,
|
||||
const std::initializer_list<data_value>&,
|
||||
db::consistency_level,
|
||||
const timeout_config& timeout_config,
|
||||
int32_t page_size = -1) const;
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
@@ -332,7 +332,6 @@ private:
|
||||
::shared_ptr<internal_query_state> create_paged_state(
|
||||
const sstring& query_string,
|
||||
db::consistency_level,
|
||||
const timeout_config&,
|
||||
const std::initializer_list<data_value>&,
|
||||
int32_t page_size);
|
||||
|
||||
|
||||
@@ -59,8 +59,8 @@ timeout_for_type(batch_statement::type t) {
|
||||
: &timeout_config::write_timeout;
|
||||
}
|
||||
|
||||
db::timeout_clock::duration batch_statement::get_timeout(const query_options& options) const {
|
||||
return _attrs->is_timeout_set() ? _attrs->get_timeout(options) : options.get_timeout_config().*get_timeout_config_selector();
|
||||
db::timeout_clock::duration batch_statement::get_timeout(const service::client_state& state, const query_options& options) const {
|
||||
return _attrs->is_timeout_set() ? _attrs->get_timeout(options) : state.get_timeout_config().*get_timeout_config_selector();
|
||||
}
|
||||
|
||||
batch_statement::batch_statement(int bound_terms, type type_,
|
||||
@@ -290,7 +290,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
|
||||
++_stats.batches;
|
||||
_stats.statements_in_batches += _statements.size();
|
||||
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(options);
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
|
||||
return get_mutations(storage, options, timeout, local, now, query_state).then([this, &storage, &options, timeout, tr_state = query_state.get_trace_state(),
|
||||
permit = query_state.get_permit()] (std::vector<mutation> ms) mutable {
|
||||
return execute_without_conditions(storage, std::move(ms), options.get_consistency(), timeout, std::move(tr_state), std::move(permit));
|
||||
@@ -347,7 +347,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
schema_ptr schema;
|
||||
|
||||
db::timeout_clock::time_point now = db::timeout_clock::now();
|
||||
const timeout_config& cfg = options.get_timeout_config();
|
||||
const timeout_config& cfg = qs.get_client_state().get_timeout_config();
|
||||
auto batch_timeout = now + cfg.write_timeout; // Statement timeout.
|
||||
auto cas_timeout = now + cfg.cas_timeout; // Ballot contention timeout.
|
||||
auto read_timeout = now + cfg.read_timeout; // Query timeout.
|
||||
|
||||
@@ -171,7 +171,7 @@ private:
|
||||
const query_options& options,
|
||||
service::query_state& state) const;
|
||||
|
||||
db::timeout_clock::duration get_timeout(const query_options& options) const;
|
||||
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
|
||||
public:
|
||||
// FIXME: no cql_statement::to_string() yet
|
||||
#if 0
|
||||
|
||||
@@ -74,8 +74,8 @@ modification_statement_timeout(const schema& s) {
|
||||
}
|
||||
}
|
||||
|
||||
db::timeout_clock::duration modification_statement::get_timeout(const query_options& options) const {
|
||||
return attrs->is_timeout_set() ? attrs->get_timeout(options) : options.get_timeout_config().*get_timeout_config_selector();
|
||||
db::timeout_clock::duration modification_statement::get_timeout(const service::client_state& state, const query_options& options) const {
|
||||
return attrs->is_timeout_set() ? attrs->get_timeout(options) : state.get_timeout_config().*get_timeout_config_selector();
|
||||
}
|
||||
|
||||
modification_statement::modification_statement(statement_type type_, uint32_t bound_terms, schema_ptr schema_, std::unique_ptr<attributes> attrs_, cql_stats& stats_)
|
||||
@@ -291,7 +291,7 @@ modification_statement::do_execute(service::storage_proxy& proxy, service::query
|
||||
future<>
|
||||
modification_statement::execute_without_condition(service::storage_proxy& proxy, service::query_state& qs, const query_options& options) const {
|
||||
auto cl = options.get_consistency();
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(options);
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(qs.get_client_state(), options);
|
||||
return get_mutations(proxy, options, timeout, false, options.get_timestamp(qs), qs).then([this, cl, timeout, &proxy, &qs] (auto mutations) {
|
||||
if (mutations.empty()) {
|
||||
return now();
|
||||
@@ -307,7 +307,7 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
|
||||
auto cl_for_learn = options.get_consistency();
|
||||
auto cl_for_paxos = options.check_serial_consistency();
|
||||
db::timeout_clock::time_point now = db::timeout_clock::now();
|
||||
const timeout_config& cfg = options.get_timeout_config();
|
||||
const timeout_config& cfg = qs.get_client_state().get_timeout_config();
|
||||
|
||||
auto statement_timeout = now + cfg.write_timeout; // All CAS networking operations run with write timeout.
|
||||
auto cas_timeout = now + cfg.cas_timeout; // When to give up due to contention.
|
||||
|
||||
@@ -299,7 +299,7 @@ protected:
|
||||
*/
|
||||
virtual void validate_where_clause_for_conditions() const;
|
||||
|
||||
db::timeout_clock::duration get_timeout(const query_options& options) const;
|
||||
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
|
||||
|
||||
friend class raw::modification_statement;
|
||||
};
|
||||
|
||||
@@ -161,8 +161,8 @@ select_statement::select_statement(schema_ptr schema,
|
||||
_opts.set_if<query::partition_slice::option::reversed>(_is_reversed);
|
||||
}
|
||||
|
||||
db::timeout_clock::duration select_statement::get_timeout(const query_options& options) const {
|
||||
return _attrs->is_timeout_set() ? _attrs->get_timeout(options) : options.get_timeout_config().*get_timeout_config_selector();
|
||||
db::timeout_clock::duration select_statement::get_timeout(const service::client_state& state, const query_options& options) const {
|
||||
return _attrs->is_timeout_set() ? _attrs->get_timeout(options) : state.get_timeout_config().*get_timeout_config_selector();
|
||||
}
|
||||
|
||||
::shared_ptr<const cql3::metadata> select_statement::get_result_metadata() const {
|
||||
@@ -371,7 +371,7 @@ select_statement::do_execute(service::storage_proxy& proxy,
|
||||
}
|
||||
|
||||
command->slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
auto timeout_duration = get_timeout(options);
|
||||
auto timeout_duration = get_timeout(state.get_client_state(), options);
|
||||
auto timeout = db::timeout_clock::now() + timeout_duration;
|
||||
auto p = service::pager::query_pagers::pager(_schema, _selection,
|
||||
state, options, command, std::move(key_ranges), restrictions_need_filtering ? _restrictions : nullptr);
|
||||
@@ -518,7 +518,7 @@ indexed_table_select_statement::do_execute_base_query(
|
||||
lw_shared_ptr<const service::pager::paging_state> paging_state) const {
|
||||
using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>;
|
||||
auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state));
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(options);
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
uint32_t queried_ranges_count = partition_ranges.size();
|
||||
service::query_ranges_to_vnodes_generator ranges_to_vnodes(proxy.get_token_metadata_ptr(), _schema, std::move(partition_ranges));
|
||||
|
||||
@@ -612,7 +612,7 @@ indexed_table_select_statement::do_execute_base_query(
|
||||
lw_shared_ptr<const service::pager::paging_state> paging_state) const {
|
||||
using value_type = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>;
|
||||
auto cmd = prepare_command_for_base_query(proxy, options, state, now, bool(paging_state));
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(options);
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
|
||||
struct base_query_state {
|
||||
query::result_merger merger;
|
||||
@@ -694,7 +694,7 @@ select_statement::execute(service::storage_proxy& proxy,
|
||||
// is specified we need to get "limit" rows from each partition since there
|
||||
// is no way to tell which of these rows belong to the query result before
|
||||
// doing post-query ordering.
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(options);
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
if (needs_post_query_ordering() && _limit) {
|
||||
return do_with(std::forward<dht::partition_range_vector>(partition_ranges), [this, &proxy, &state, &options, cmd, timeout](auto& prs) {
|
||||
assert(cmd->partition_limit == query::max_partitions);
|
||||
@@ -1259,7 +1259,7 @@ indexed_table_select_statement::find_index_partition_ranges(service::storage_pro
|
||||
{
|
||||
using value_type = std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>;
|
||||
auto now = gc_clock::now();
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(options);
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
return read_posting_list(proxy, options, get_limit(options), state, now, timeout, false).then(
|
||||
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
|
||||
auto rs = cql3::untyped_result_set(rows);
|
||||
@@ -1300,7 +1300,7 @@ indexed_table_select_statement::find_index_clustering_rows(service::storage_prox
|
||||
{
|
||||
using value_type = std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>;
|
||||
auto now = gc_clock::now();
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(options);
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
return read_posting_list(proxy, options, get_limit(options), state, now, timeout, true).then(
|
||||
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
|
||||
|
||||
|
||||
@@ -147,7 +147,7 @@ public:
|
||||
|
||||
bool has_group_by() const { return _group_by_cell_indices && !_group_by_cell_indices->empty(); }
|
||||
|
||||
db::timeout_clock::duration get_timeout(const query_options& options) const;
|
||||
db::timeout_clock::duration get_timeout(const service::client_state& state, const query_options& options) const;
|
||||
|
||||
protected:
|
||||
uint64_t do_get_limit(const query_options& options, ::shared_ptr<term> limit, uint64_t default_limit) const;
|
||||
|
||||
@@ -55,10 +55,18 @@ struct query_context {
|
||||
// let the `storage_proxy` time out the query down the call chain
|
||||
db::timeout_clock::duration::zero();
|
||||
|
||||
return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) {
|
||||
struct timeout_context {
|
||||
std::unique_ptr<service::client_state> client_state;
|
||||
service::query_state query_state;
|
||||
timeout_context(db::timeout_clock::duration d)
|
||||
: client_state(std::make_unique<service::client_state>(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d}))
|
||||
, query_state(*client_state, empty_service_permit())
|
||||
{}
|
||||
};
|
||||
return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) {
|
||||
return _qp.local().execute_internal(req,
|
||||
cql3::query_options::DEFAULT.get_consistency(),
|
||||
tcfg,
|
||||
tctx.query_state,
|
||||
{ data_value(std::forward<Args>(args))... },
|
||||
true);
|
||||
});
|
||||
|
||||
@@ -3131,7 +3131,6 @@ future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_ver
|
||||
auto cm_fut = qctx->qp().execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{table_id, version}
|
||||
);
|
||||
return cm_fut.then([version] (shared_ptr<cql3::untyped_result_set> results) {
|
||||
@@ -3174,7 +3173,6 @@ future<bool> column_mapping_exists(utils::UUID table_id, table_schema_version ve
|
||||
return qctx->qp().execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{table_id, version}
|
||||
).then([] (shared_ptr<cql3::untyped_result_set> results) {
|
||||
return !results->empty();
|
||||
@@ -3188,7 +3186,6 @@ future<> drop_column_mapping(utils::UUID table_id, table_schema_version version)
|
||||
return qctx->qp().execute_internal(
|
||||
DEL_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
{table_id, version}).discard_result();
|
||||
}
|
||||
|
||||
|
||||
@@ -181,17 +181,20 @@ future<> system_distributed_keyspace::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
static timeout_config get_timeout_config(db::timeout_clock::duration t) {
|
||||
return timeout_config{ t, t, t, t, t, t, t };
|
||||
}
|
||||
|
||||
static const timeout_config internal_distributed_timeout_config = get_timeout_config(std::chrono::seconds(10));
|
||||
static service::query_state& internal_distributed_query_state() {
|
||||
using namespace std::chrono_literals;
|
||||
const auto t = 10s;
|
||||
static timeout_config tc{ t, t, t, t, t, t, t };
|
||||
static thread_local service::client_state cs(service::client_state::internal_tag{}, tc);
|
||||
static thread_local service::query_state qs(cs, empty_service_permit());
|
||||
return qs;
|
||||
};
|
||||
|
||||
future<std::unordered_map<utils::UUID, sstring>> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const {
|
||||
return _qp.execute_internal(
|
||||
format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).then([this] (::shared_ptr<cql3::untyped_result_set> cql_result) {
|
||||
return boost::copy_range<std::unordered_map<utils::UUID, sstring>>(*cql_result
|
||||
@@ -208,7 +211,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring
|
||||
return _qp.execute_internal(
|
||||
format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" },
|
||||
false).discard_result();
|
||||
});
|
||||
@@ -219,7 +222,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring
|
||||
return _qp.execute_internal(
|
||||
format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) },
|
||||
false).discard_result();
|
||||
});
|
||||
@@ -229,7 +232,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_
|
||||
return _qp.execute_internal(
|
||||
format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ std::move(ks_name), std::move(view_name) },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -307,7 +310,7 @@ system_distributed_keyspace::insert_cdc_topology_description(
|
||||
return _qp.execute_internal(
|
||||
format("INSERT INTO {}.{} (time, description) VALUES (?,?)", NAME, CDC_TOPOLOGY_DESCRIPTION),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ time, make_list_value(cdc_generation_description_type, prepare_cdc_generation_description(description)) },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -319,7 +322,7 @@ system_distributed_keyspace::read_cdc_topology_description(
|
||||
return _qp.execute_internal(
|
||||
format("SELECT description FROM {}.{} WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ time },
|
||||
false
|
||||
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> std::optional<cdc::topology_description> {
|
||||
@@ -347,7 +350,7 @@ system_distributed_keyspace::expire_cdc_topology_description(
|
||||
return _qp.execute_internal(
|
||||
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ expiration_time, streams_ts },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -413,7 +416,7 @@ system_distributed_keyspace::create_cdc_desc(
|
||||
co_await _qp.execute_internal(
|
||||
format("INSERT INTO {}.{} (key, time) VALUES (?, ?)", NAME, CDC_TIMESTAMPS),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ CDC_TIMESTAMPS_KEY, time },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -426,7 +429,7 @@ system_distributed_keyspace::expire_cdc_desc(
|
||||
return _qp.execute_internal(
|
||||
format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TIMESTAMPS),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ expiration_time, streams_ts },
|
||||
false).discard_result();
|
||||
}
|
||||
@@ -471,7 +474,7 @@ system_distributed_keyspace::cdc_desc_exists(
|
||||
co_return co_await _qp.execute_internal(
|
||||
format("SELECT time FROM {}.{} WHERE key = ? AND time = ?", NAME, CDC_TIMESTAMPS),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ CDC_TIMESTAMPS_KEY, streams_ts },
|
||||
false
|
||||
).then([] (::shared_ptr<cql3::untyped_result_set> cql_result) -> bool {
|
||||
@@ -484,7 +487,7 @@ system_distributed_keyspace::cdc_get_versioned_streams(db_clock::time_point not_
|
||||
auto timestamps_cql = co_await _qp.execute_internal(
|
||||
format("SELECT time FROM {}.{} WHERE key = ?", NAME, CDC_TIMESTAMPS),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ CDC_TIMESTAMPS_KEY },
|
||||
false);
|
||||
|
||||
@@ -506,7 +509,7 @@ system_distributed_keyspace::cdc_get_versioned_streams(db_clock::time_point not_
|
||||
auto streams_cql = co_await _qp.execute_internal(
|
||||
format("SELECT streams FROM {}.{} WHERE time = ?", NAME, CDC_DESC_V2),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
internal_distributed_timeout_config,
|
||||
internal_distributed_query_state(),
|
||||
{ ts },
|
||||
false);
|
||||
|
||||
@@ -526,11 +529,10 @@ future<std::vector<db_clock::time_point>>
|
||||
system_distributed_keyspace::get_cdc_desc_v1_timestamps(context ctx) {
|
||||
std::vector<db_clock::time_point> res;
|
||||
co_await _qp.query_internal(
|
||||
format("SELECT time FROM {}.{}", NAME, CDC_DESC_V1),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
// This is a long and expensive scan (mostly due to #8061).
|
||||
// Give it a bit more time than usual.
|
||||
get_timeout_config(std::chrono::seconds(60)),
|
||||
format("SELECT time FROM {}.{} USING TIMEOUT 60s", NAME, CDC_DESC_V1),
|
||||
quorum_if_many(ctx.num_token_owners),
|
||||
{},
|
||||
1000,
|
||||
[&] (const cql3::untyped_result_set_row& r) {
|
||||
|
||||
@@ -60,7 +60,7 @@ public:
|
||||
,_read_consistency(rcl)
|
||||
,_write_consistency(wcl)
|
||||
,_timeout_config(tc)
|
||||
,_client_state(service::client_state::external_tag{}, auth, addr)
|
||||
,_client_state(service::client_state::external_tag{}, auth, tc, addr)
|
||||
,_total_redis_db_count(total_redis_db_count)
|
||||
{
|
||||
}
|
||||
@@ -75,7 +75,7 @@ public:
|
||||
,_read_consistency(rcl)
|
||||
,_write_consistency(wcl)
|
||||
,_timeout_config(tc)
|
||||
,_client_state(service::client_state::external_tag{}, auth, addr)
|
||||
,_client_state(service::client_state::external_tag{}, auth, tc, addr)
|
||||
,_total_redis_db_count(total_redis_db_count)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include "auth/service.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "timeout_config.hh"
|
||||
#include "timestamp.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "database_fwd.hh"
|
||||
@@ -88,10 +89,17 @@ public:
|
||||
};
|
||||
private:
|
||||
client_state(const client_state* cs, seastar::sharded<auth::service>* auth_service)
|
||||
: _keyspace(cs->_keyspace), _user(cs->_user), _auth_state(cs->_auth_state),
|
||||
_is_internal(cs->_is_internal), _is_thrift(cs->_is_thrift), _remote_address(cs->_remote_address),
|
||||
_auth_service(auth_service ? &auth_service->local() : nullptr),
|
||||
_enabled_protocol_extensions(cs->_enabled_protocol_extensions) {}
|
||||
: _keyspace(cs->_keyspace)
|
||||
, _user(cs->_user)
|
||||
, _auth_state(cs->_auth_state)
|
||||
, _is_internal(cs->_is_internal)
|
||||
, _is_thrift(cs->_is_thrift)
|
||||
, _remote_address(cs->_remote_address)
|
||||
, _auth_service(auth_service ? &auth_service->local() : nullptr)
|
||||
, _default_timeout_config(cs->_default_timeout_config)
|
||||
, _timeout_config(cs->_timeout_config)
|
||||
, _enabled_protocol_extensions(cs->_enabled_protocol_extensions)
|
||||
{}
|
||||
friend client_state_for_another_shard;
|
||||
private:
|
||||
sstring _keyspace;
|
||||
@@ -136,6 +144,10 @@ private:
|
||||
// Only populated for external client state.
|
||||
auth::service* _auth_service{nullptr};
|
||||
|
||||
// For restoring default values in the timeout config
|
||||
timeout_config _default_timeout_config;
|
||||
timeout_config _timeout_config;
|
||||
|
||||
public:
|
||||
struct internal_tag {};
|
||||
struct external_tag {};
|
||||
@@ -162,11 +174,13 @@ public:
|
||||
_driver_version = std::move(driver_version);
|
||||
}
|
||||
|
||||
client_state(external_tag, auth::service& auth_service, const socket_address& remote_address = socket_address(), bool thrift = false)
|
||||
client_state(external_tag, auth::service& auth_service, timeout_config timeout_config, const socket_address& remote_address = socket_address(), bool thrift = false)
|
||||
: _is_internal(false)
|
||||
, _is_thrift(thrift)
|
||||
, _remote_address(remote_address)
|
||||
, _auth_service(&auth_service) {
|
||||
, _auth_service(&auth_service)
|
||||
, _default_timeout_config(timeout_config)
|
||||
, _timeout_config(timeout_config) {
|
||||
if (!auth_service.underlying_authenticator().require_authentication()) {
|
||||
_user = auth::authenticated_user();
|
||||
}
|
||||
@@ -180,10 +194,19 @@ public:
|
||||
return _remote_address.port();
|
||||
}
|
||||
|
||||
client_state(internal_tag)
|
||||
const timeout_config& get_timeout_config() const {
|
||||
return _timeout_config;
|
||||
}
|
||||
|
||||
client_state(internal_tag) : client_state(internal_tag{}, infinite_timeout_config)
|
||||
{}
|
||||
|
||||
client_state(internal_tag, const timeout_config& config)
|
||||
: _keyspace("system")
|
||||
, _is_internal(true)
|
||||
, _is_thrift(false)
|
||||
, _default_timeout_config(config)
|
||||
, _timeout_config(config)
|
||||
{}
|
||||
|
||||
client_state(const client_state&) = delete;
|
||||
|
||||
@@ -184,7 +184,6 @@ future<> raft_sys_table_storage::do_store_log_entries(const std::vector<raft::lo
|
||||
cql3::query_options(
|
||||
cql3::default_cql_config,
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
std::nullopt,
|
||||
std::vector<cql3::raw_value>{},
|
||||
false,
|
||||
|
||||
@@ -173,7 +173,7 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) {
|
||||
BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception);
|
||||
|
||||
auto make_query_options = [] (cql_protocol_version_type version) {
|
||||
return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
|
||||
return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt,
|
||||
std::vector<cql3::raw_value_view>(), false,
|
||||
cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version});
|
||||
};
|
||||
|
||||
@@ -385,7 +385,6 @@ SEASTAR_TEST_CASE(test_list_append_limit) {
|
||||
// append sequence, which will be exceeded it in this
|
||||
// test.
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE,
|
||||
infinite_timeout_config,
|
||||
std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto cql = fmt::format("UPDATE t SET l = l + [{}] WHERE pk = 0;", value_list);
|
||||
@@ -3037,7 +3036,7 @@ SEASTAR_TEST_CASE(test_empty_partition_range_scan) {
|
||||
e.execute_cql("create table empty_partition_range_scan.tb (a int, b int, c int, val int, PRIMARY KEY ((a,b),c) );").get();
|
||||
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("select * from empty_partition_range_scan.tb where token (a,b) > 1 and token(a,b) <= 1;", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().is_empty();
|
||||
@@ -4432,7 +4431,6 @@ static std::unique_ptr<cql3::query_options> q_serial_opts(
|
||||
const auto& so = cql3::query_options::specific_options::DEFAULT;
|
||||
auto qo = std::make_unique<cql3::query_options>(
|
||||
cl,
|
||||
infinite_timeout_config,
|
||||
values,
|
||||
// Ensure (optional) serial consistency is always specified.
|
||||
cql3::query_options::specific_options{
|
||||
@@ -4647,7 +4645,7 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) {
|
||||
const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC");
|
||||
|
||||
int32_t page_size = is_paged ? 10000 : -1;
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()});
|
||||
|
||||
const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows;
|
||||
|
||||
@@ -831,14 +831,14 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
{ int32_type->decompose(6), boolean_type->decompose(false)},
|
||||
});
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{ int32_type->decompose(3), boolean_type->decompose(true)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 5 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -849,7 +849,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
{ int32_type->decompose(6), boolean_type->decompose(false)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 2 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -857,7 +857,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
{ int32_type->decompose(2), boolean_type->decompose(false)}
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -866,7 +866,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
{ int32_type->decompose(4), boolean_type->decompose(false)}
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(msg);
|
||||
@@ -877,7 +877,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
// Some pages might be empty and in such case we should continue querying
|
||||
size_t rows_fetched = 0;
|
||||
while (rows_fetched == 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
@@ -889,7 +889,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
|
||||
rows_fetched = 0;
|
||||
while (rows_fetched == 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
@@ -905,7 +905,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) {
|
||||
rows_fetched = 0;
|
||||
uint64_t remaining = 1;
|
||||
while (remaining > 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
rows_fetched += count_rows_fetched(msg);
|
||||
@@ -964,7 +964,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
|
||||
{ int32_type->decompose(1), boolean_type->decompose(false)},
|
||||
});
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -972,7 +972,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
|
||||
{ int32_type->decompose(3), boolean_type->decompose(true)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline PER PARTITION LIMIT 1;", std::move(qo)).get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
|
||||
// Some pages might be empty and in such case we should continue querying
|
||||
size_t rows_fetched = 0;
|
||||
while (rows_fetched == 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
@@ -1001,7 +1001,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) {
|
||||
rows_fetched = 0;
|
||||
uint64_t remaining = 1;
|
||||
while (remaining > 0) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{pg, paging_state, {}, api::new_timestamp()});
|
||||
sstring query = allow_filtering ?
|
||||
fmt::format("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT {} ALLOW FILTERING;", ppl) :
|
||||
|
||||
@@ -36,7 +36,7 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_size(4321);
|
||||
|
||||
@@ -63,7 +63,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) {
|
||||
e.execute_prepared(id, {cql3_pk, cql3_ck}).get();
|
||||
}
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = e.execute_cql("select * from test;", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(msg);
|
||||
@@ -75,7 +75,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) {
|
||||
paging_state->set_remaining(test_remaining);
|
||||
|
||||
while (has_more_pages(msg)) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT * FROM test;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
@@ -101,7 +101,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering
|
||||
e.execute_prepared(id, {cql3_pk, cql3_ck}).get();
|
||||
}
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = e.execute_cql("select * from test where ck > 10;", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(msg);
|
||||
@@ -113,7 +113,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering
|
||||
paging_state->set_remaining(test_remaining);
|
||||
|
||||
while (has_more_pages(msg)) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("SELECT * FROM test where ck > 10;", std::move(qo)).get0();
|
||||
rows_fetched = count_rows_fetched(msg);
|
||||
|
||||
@@ -209,8 +209,7 @@ std::unordered_map<sstring, uint64_t> get_query_metrics() {
|
||||
|
||||
/// Creates query_options with cl, infinite timeout, and no named values.
|
||||
auto make_options(clevel cl) {
|
||||
return std::make_unique<cql3::query_options>(
|
||||
cl, infinite_timeout_config, std::vector<cql3::raw_value>());
|
||||
return std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>());
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
@@ -45,7 +45,7 @@ std::unique_ptr<cql3::query_options> to_options(
|
||||
static auto& d = cql3::query_options::DEFAULT;
|
||||
return std::make_unique<cql3::query_options>(
|
||||
cfg,
|
||||
d.get_consistency(), d.get_timeout_config(), std::move(names), std::move(values), d.skip_metadata(),
|
||||
d.get_consistency(), std::move(names), std::move(values), d.skip_metadata(),
|
||||
d.get_specific_options(), d.get_cql_serialization_format());
|
||||
}
|
||||
|
||||
|
||||
@@ -427,7 +427,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{101, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_size(101);
|
||||
@@ -439,7 +439,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
|
||||
});
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE pk2 = 1", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_rows({{
|
||||
@@ -449,7 +449,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) {
|
||||
});
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE ck2 = 'world8'", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_rows({{
|
||||
@@ -485,7 +485,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
};
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -495,7 +495,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
expect_more_pages(res, true);
|
||||
@@ -505,7 +505,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
paging_state = extract_paging_state(res);
|
||||
@@ -520,7 +520,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
try {
|
||||
expect_more_pages(res, false);
|
||||
} catch (...) {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
assert_that(res).is_rows().with_size(0);
|
||||
@@ -530,7 +530,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
});
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -539,7 +539,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
|
||||
@@ -549,7 +549,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
});
|
||||
|
||||
{
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -566,7 +566,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
|
||||
paging_state->get_rows_fetched_for_last_partition());
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
|
||||
|
||||
@@ -578,7 +578,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
|
||||
// not to return rows (since no row matches an empty partition key)
|
||||
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
|
||||
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
|
||||
|
||||
@@ -817,7 +817,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
|
||||
};
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -826,7 +826,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0();
|
||||
|
||||
@@ -836,7 +836,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
|
||||
});
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
|
||||
auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0();
|
||||
auto paging_state = extract_paging_state(res);
|
||||
@@ -845,7 +845,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) {
|
||||
{int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
|
||||
}});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
|
||||
res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0();
|
||||
|
||||
@@ -1173,7 +1173,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo));
|
||||
// Even though we set up paging, we still expect a single result from an aggregation function.
|
||||
@@ -1188,7 +1188,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
{ int32_type->decompose(row_count * row_count / 4 + row_count / 2)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
|
||||
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo));
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
@@ -1206,7 +1206,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str());
|
||||
}
|
||||
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
|
||||
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo));
|
||||
// Even though we set up paging, we still expect a single result from an aggregation function
|
||||
@@ -1214,7 +1214,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
|
||||
{ int32_type->decompose(row_count * row_count / 4)},
|
||||
});
|
||||
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
|
||||
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo));
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
|
||||
@@ -128,7 +128,7 @@ private:
|
||||
service::client_state client_state;
|
||||
|
||||
core_local_state(auth::service& auth_service)
|
||||
: client_state(service::client_state::external_tag{}, auth_service)
|
||||
: client_state(service::client_state::external_tag{}, auth_service, infinite_timeout_config)
|
||||
{
|
||||
client_state.set_login(auth::authenticated_user(testing_superuser));
|
||||
}
|
||||
@@ -194,7 +194,7 @@ public:
|
||||
db::consistency_level cl = db::consistency_level::ONE) override {
|
||||
|
||||
const auto& so = cql3::query_options::specific_options::DEFAULT;
|
||||
auto options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config,
|
||||
auto options = std::make_unique<cql3::query_options>(cl,
|
||||
std::move(values), cql3::query_options::specific_options{
|
||||
so.page_size,
|
||||
so.state,
|
||||
@@ -231,7 +231,7 @@ public:
|
||||
throw std::runtime_error(format("get_stmt_mutations: not a modification statement: {}", text));
|
||||
}
|
||||
auto& qo = cql3::query_options::DEFAULT;
|
||||
auto timeout = db::timeout_clock::now() + qo.get_timeout_config().write_timeout;
|
||||
auto timeout = db::timeout_clock::now() + qs->get_client_state().get_timeout_config().write_timeout;
|
||||
|
||||
return modif_stmt->get_mutations(local_qp().proxy(), qo, timeout, false, qo.get_timestamp(*qs), *qs)
|
||||
.finally([qs, modif_stmt = std::move(modif_stmt)] {});
|
||||
|
||||
@@ -229,7 +229,7 @@ SEASTAR_TEST_CASE(scan_enormous_table_test) {
|
||||
std::unique_ptr<cql3::query_options> qo;
|
||||
uint64_t fetched_rows_log_counter = 1e7;
|
||||
do {
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
|
||||
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
|
||||
cql3::query_options::specific_options{10000, paging_state, {}, api::new_timestamp()});
|
||||
msg = e.execute_cql("select * from enormous_table;", std::move(qo)).get0();
|
||||
rows_fetched += count_rows_fetched(msg);
|
||||
|
||||
@@ -105,7 +105,6 @@ std::unique_ptr<cql3::query_options> repl_options() {
|
||||
const auto& so = cql3::query_options::specific_options::DEFAULT;
|
||||
auto qo = std::make_unique<cql3::query_options>(
|
||||
db::consistency_level::ONE,
|
||||
infinite_timeout_config,
|
||||
std::vector<cql3::raw_value>{},
|
||||
// Ensure (optional) serial consistency is always specified.
|
||||
cql3::query_options::specific_options{
|
||||
|
||||
@@ -201,9 +201,9 @@ enum class query_order { no, yes };
|
||||
class thrift_handler : public CassandraCobSvIf {
|
||||
distributed<database>& _db;
|
||||
distributed<cql3::query_processor>& _query_processor;
|
||||
::timeout_config _timeout_config;
|
||||
service::client_state _client_state;
|
||||
service::query_state _query_state;
|
||||
::timeout_config _timeout_config;
|
||||
private:
|
||||
template <typename Cob, typename Func>
|
||||
void
|
||||
@@ -220,9 +220,9 @@ public:
|
||||
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config)
|
||||
: _db(db)
|
||||
, _query_processor(qp)
|
||||
, _client_state(service::client_state::external_tag{}, auth_service, socket_address(), true)
|
||||
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
|
||||
, _timeout_config(timeout_config)
|
||||
, _client_state(service::client_state::external_tag{}, auth_service, _timeout_config, socket_address(), true)
|
||||
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
|
||||
{ }
|
||||
|
||||
const sstring& current_keyspace() const {
|
||||
@@ -976,7 +976,7 @@ public:
|
||||
throw make_exception<InvalidRequestException>("Compressed query strings are not supported");
|
||||
}
|
||||
auto& qp = _query_processor.local();
|
||||
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector<cql3::raw_value_view>(),
|
||||
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::vector<cql3::raw_value_view>(),
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
auto f = qp.execute_direct(query, _query_state, *opts);
|
||||
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {
|
||||
@@ -1056,7 +1056,7 @@ public:
|
||||
return cql3::raw_value::make_value(to_bytes(s));
|
||||
});
|
||||
auto& qp = _query_processor.local();
|
||||
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values),
|
||||
auto opts = std::make_unique<cql3::query_options>(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::move(bytes_values),
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
auto f = qp.execute_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization);
|
||||
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {
|
||||
|
||||
@@ -64,9 +64,13 @@ const sstring trace_keyspace_helper::EVENTS("events");
|
||||
const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG("node_slow_log");
|
||||
const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG_TIME_IDX("node_slow_log_time_idx");
|
||||
|
||||
timeout_config tracing_db_timeout_config {
|
||||
5s, 5s, 5s, 5s, 5s, 5s, 5s,
|
||||
};
|
||||
static service::client_state& tracing_client_state() {
|
||||
static timeout_config tracing_db_timeout_config {
|
||||
5s, 5s, 5s, 5s, 5s, 5s, 5s,
|
||||
};
|
||||
static thread_local service::client_state s(service::client_state::internal_tag{}, tracing_db_timeout_config);
|
||||
return s;
|
||||
}
|
||||
|
||||
struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base {
|
||||
int64_t last_nanos = 0;
|
||||
@@ -76,7 +80,7 @@ struct trace_keyspace_backend_sesssion_state final : public backend_session_stat
|
||||
|
||||
trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
|
||||
: i_tracing_backend_helper(tr)
|
||||
, _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit())
|
||||
, _dummy_query_state(tracing_client_state(), empty_service_permit())
|
||||
, _sessions(KEYSPACE_NAME, SESSIONS,
|
||||
sprint("CREATE TABLE IF NOT EXISTS %s.%s ("
|
||||
"session_id uuid,"
|
||||
@@ -314,7 +318,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
|
||||
};
|
||||
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) {
|
||||
@@ -332,7 +336,7 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c
|
||||
};
|
||||
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
@@ -375,7 +379,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
|
||||
});
|
||||
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
@@ -396,7 +400,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
|
||||
});
|
||||
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) {
|
||||
@@ -432,7 +436,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp,
|
||||
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
|
||||
|
||||
return do_with(
|
||||
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
|
||||
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
|
||||
cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()),
|
||||
[this] (auto& batch_options, auto& batch) {
|
||||
return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); });
|
||||
|
||||
@@ -219,10 +219,10 @@ private:
|
||||
options_flag::NAMES_FOR_VALUES
|
||||
>;
|
||||
public:
|
||||
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts, const cql3::cql_config& cql_config) {
|
||||
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const cql3::cql_config& cql_config) {
|
||||
auto consistency = read_consistency();
|
||||
if (version == 1) {
|
||||
return std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::vector<cql3::raw_value_view>{},
|
||||
return std::make_unique<cql3::query_options>(cql_config, consistency, std::nullopt, std::vector<cql3::raw_value_view>{},
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_ser_format);
|
||||
}
|
||||
|
||||
@@ -270,11 +270,11 @@ public:
|
||||
if (!names.empty()) {
|
||||
onames = std::move(names);
|
||||
}
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::move(onames), std::move(values), skip_metadata,
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, std::move(onames), std::move(values), skip_metadata,
|
||||
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts},
|
||||
cql_ser_format);
|
||||
} else {
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::move(values), skip_metadata,
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, std::nullopt, std::move(values), skip_metadata,
|
||||
cql3::query_options::specific_options::DEFAULT, cql_ser_format);
|
||||
}
|
||||
|
||||
|
||||
@@ -608,7 +608,7 @@ cql_server::connection::connection(cql_server& server, socket_address server_add
|
||||
, _fd(std::move(fd))
|
||||
, _read_buf(_fd.input())
|
||||
, _write_buf(_fd.output())
|
||||
, _client_state(service::client_state::external_tag{}, server._auth_service, addr)
|
||||
, _client_state(service::client_state::external_tag{}, server._auth_service, server.timeout_config(), addr)
|
||||
{
|
||||
++_server._total_connections;
|
||||
++_server._current_connections;
|
||||
@@ -943,7 +943,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme
|
||||
(bytes_ostream& linearization_buffer, service::client_state& client_state) mutable {
|
||||
request_reader in(is, linearization_buffer);
|
||||
return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
|
||||
server.timeout_config(), /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
|
||||
/* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
|
||||
// result here has to be foreign ptr
|
||||
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
|
||||
});
|
||||
@@ -958,7 +958,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
|
||||
fragmented_temporary_buffer::istream is = in.get_stream();
|
||||
|
||||
return process_fn(client_state, _server._query_processor, in, stream,
|
||||
_version, _cql_serialization_format, _server.timeout_config(), permit, trace_state, true)
|
||||
_version, _cql_serialization_format, permit, trace_state, true)
|
||||
.then([stream, &client_state, this, is, permit, process_fn, trace_state]
|
||||
(std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
|
||||
unsigned* shard = std::get_if<unsigned>(&msg);
|
||||
@@ -972,12 +972,11 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
const ::timeout_config& timeout_config, service_permit permit, tracing::trace_state_ptr trace_state,
|
||||
bool init_trace) {
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
auto query = in.read_long_string_view();
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config());
|
||||
q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
|
||||
auto& options = *q_state->options;
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
|
||||
@@ -1040,8 +1039,7 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
const ::timeout_config& timeout_config, service_permit permit,
|
||||
tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
|
||||
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
|
||||
bool needs_authorization = false;
|
||||
@@ -1064,10 +1062,10 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
|
||||
std::vector<cql3::raw_value_view> values;
|
||||
in.read_value_view_list(version, values);
|
||||
auto consistency = in.read_consistency();
|
||||
q_state->options = std::make_unique<cql3::query_options>(qp.local().get_cql_config(), consistency, timeout_config, std::nullopt, values, false,
|
||||
q_state->options = std::make_unique<cql3::query_options>(qp.local().get_cql_config(), consistency, std::nullopt, values, false,
|
||||
cql3::query_options::specific_options::DEFAULT, serialization_format);
|
||||
} else {
|
||||
q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config());
|
||||
q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
|
||||
}
|
||||
auto& options = *q_state->options;
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
@@ -1120,8 +1118,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connectio
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_batch_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
const ::timeout_config& timeout_config, service_permit permit,
|
||||
tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
if (version == 1) {
|
||||
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
|
||||
}
|
||||
@@ -1210,7 +1207,7 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
|
||||
auto& query_state = q_state->query_state;
|
||||
// #563. CQL v2 encodes query_options in v1 format for batch requests.
|
||||
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format,
|
||||
timeout_config, qp.local().get_cql_config())), std::move(values)));
|
||||
qp.local().get_cql_config())), std::move(values)));
|
||||
auto& options = *q_state->options;
|
||||
|
||||
if (init_trace) {
|
||||
|
||||
Reference in New Issue
Block a user