cql3: statements: introduce parallelized_select_statement

Detect whether a statement is a count(*) query in prepare time. If so,
instantiate a new `select_statement` subclass -
`parallelized_select_statement`. This subclass has a different execution
logic, that enables it to distribute count(*) queries across a cluster.
Also, a new counter was added - `select_parallelized` that counts the
number of parallelized aggregation SELECT query executions.
This commit is contained in:
Michał Sala
2021-11-07 18:31:44 +01:00
parent 66a93d3000
commit f344bd0aaa
3 changed files with 204 additions and 0 deletions

View File

@@ -403,6 +403,11 @@ query_processor::query_processor(service::storage_proxy& proxy, service::forward
_cql_stats.select_partition_range_scan_no_bypass_cache,
sm::description("Counts the number of SELECT query executions requiring partition range scan without BYPASS CACHE option.")),
sm::make_derive(
"select_parallelized",
_cql_stats.select_parallelized,
sm::description("Counts the number of parallelized aggregation SELECT query executions.")),
sm::make_derive(
"authorized_prepared_statements_cache_evictions",
[] { return authorized_prepared_statements_cache::shard_stats().authorized_prepared_statements_cache_evictions; },

View File

@@ -12,6 +12,7 @@
*/
#include "cql3/statements/select_statement.hh"
#include "cql3/expr/expression.hh"
#include "cql3/statements/raw/select_statement.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
@@ -38,6 +39,7 @@
#include "data_dictionary/data_dictionary.hh"
#include "test/lib/select_statement_utils.hh"
#include <boost/algorithm/cxx11/any_of.hpp>
#include "gms/feature_service.hh"
bool is_internal_keyspace(std::string_view name);
@@ -1289,6 +1291,175 @@ indexed_table_select_statement::find_index_clustering_rows(query_processor& qp,
});
}
class parallelized_select_statement : public select_statement {
public:
static ::shared_ptr<cql3::statements::select_statement> prepare(
schema_ptr schema,
uint32_t bound_terms,
lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection,
::shared_ptr<restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
bool is_reversed,
ordering_comparator_type ordering_comparator,
std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit,
cql_stats& stats,
std::unique_ptr<cql3::attributes> attrs
);
parallelized_select_statement(
schema_ptr schema,
uint32_t bound_terms,
lw_shared_ptr<const parameters> parameters,
::shared_ptr<selection::selection> selection,
::shared_ptr<restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
bool is_reversed,
ordering_comparator_type ordering_comparator,
std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit,
cql_stats& stats,
std::unique_ptr<cql3::attributes> attrs
);
private:
virtual future<::shared_ptr<cql_transport::messages::result_message>> do_execute(
query_processor& qp,
service::query_state& state,
const query_options& options
) const override;
};
::shared_ptr<cql3::statements::select_statement> parallelized_select_statement::prepare(
schema_ptr schema,
uint32_t bound_terms,
lw_shared_ptr<const select_statement::parameters> parameters,
::shared_ptr<selection::selection> selection,
::shared_ptr<restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
bool is_reversed,
parallelized_select_statement::ordering_comparator_type ordering_comparator,
std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit,
cql_stats& stats,
std::unique_ptr<cql3::attributes> attrs
) {
return ::make_shared<cql3::statements::parallelized_select_statement>(
schema,
bound_terms,
std::move(parameters),
std::move(selection),
std::move(restrictions),
std::move(group_by_cell_indices),
is_reversed,
std::move(ordering_comparator),
std::move(limit),
std::move(per_partition_limit),
stats,
std::move(attrs)
);
}
parallelized_select_statement::parallelized_select_statement(
schema_ptr schema,
uint32_t bound_terms,
lw_shared_ptr<const parallelized_select_statement::parameters> parameters,
::shared_ptr<selection::selection> selection,
::shared_ptr<restrictions::statement_restrictions> restrictions,
::shared_ptr<std::vector<size_t>> group_by_cell_indices,
bool is_reversed,
parallelized_select_statement::ordering_comparator_type ordering_comparator,
std::optional<expr::expression> limit,
std::optional<expr::expression> per_partition_limit,
cql_stats& stats,
std::unique_ptr<cql3::attributes> attrs
) : select_statement(
schema,
bound_terms,
std::move(parameters),
std::move(selection),
std::move(restrictions),
std::move(group_by_cell_indices),
is_reversed,
std::move(ordering_comparator),
std::move(limit),
std::move(per_partition_limit),
stats,
std::move(attrs)
) {
}
future<::shared_ptr<cql_transport::messages::result_message>>
parallelized_select_statement::do_execute(
query_processor& qp,
service::query_state& state,
const query_options& options
) const {
tracing::add_table_name(state.get_trace_state(), keyspace(), column_family());
auto cl = options.get_consistency();
validate_for_read(cl);
auto now = gc_clock::now();
const source_selector src_sel = state.get_client_state().is_internal()
? source_selector::INTERNAL : source_selector::USER;
++_stats.query_cnt(src_sel, _ks_sel, cond_selector::NO_CONDITIONS, statement_type::SELECT);
_stats.select_bypass_caches += _parameters->bypass_cache();
_stats.select_allow_filtering += _parameters->allow_filtering();
_stats.select_partition_range_scan += _range_scan;
_stats.select_partition_range_scan_no_bypass_cache += _range_scan_no_bypass_cache;
_stats.select_parallelized += 1;
auto slice = make_partition_slice(options);
auto command = ::make_lw_shared<query::read_command>(
_schema->id(),
_schema->version(),
std::move(slice),
qp.proxy().get_max_result_size(slice),
query::row_limit(query::max_rows),
query::partition_limit(query::max_partitions),
now,
tracing::make_trace_info(state.get_trace_state()),
utils::UUID(),
query::is_first_page::no,
options.get_timestamp(state)
);
auto key_ranges = _restrictions->get_partition_key_ranges(options);
if (db::is_serial_consistency(options.get_consistency())) {
throw exceptions::invalid_request_exception(
"SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time"
);
}
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto timeout_duration = get_timeout(state.get_client_state(), options);
auto timeout = db::timeout_clock::now() + timeout_duration;
query::forward_request req = {
.reduction_types = {query::forward_request::reduction_type::count},
.cmd = *command,
.pr = std::move(key_ranges),
.cl = options.get_consistency(),
.timeout = timeout,
};
// dispatch execution of this statement to other nodes
return qp.forwarder().dispatch(req).then([this] (query::forward_result res) {
auto meta = make_shared<metadata>(*_selection->get_result_metadata());
auto rs = std::make_unique<result_set>(std::move(meta));
rs->add_column_value(*res.query_results[0]);
update_stats_rows_read(rs->size());
return shared_ptr<cql_transport::messages::result_message>(
make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)))
);
});
}
namespace raw {
static void validate_attrs(const cql3::attributes::raw& attrs) {
@@ -1396,6 +1567,18 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
::shared_ptr<cql3::statements::select_statement> stmt;
auto prepared_attrs = _attrs->prepare(db, keyspace(), column_family());
prepared_attrs->fill_prepare_context(ctx);
// Used to determine if an execution of this statement can be parallelized
// using `forward_service`.
auto can_be_forwarded = [&] {
return selection->is_aggregate() // Aggregation only
&& selection->is_count() // Only count(*) selection is supported.
&& !restrictions->need_filtering() // No filtering
&& group_by_cell_indices->empty() // No GROUP BY
// All potential intermediate coordinators must support forwarding
&& db.features().cluster_supports_parallelized_aggregation();
};
if (restrictions->uses_secondary_indexing()) {
stmt = indexed_table_select_statement::prepare(
db,
@@ -1411,6 +1594,21 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
prepare_limit(db, ctx, _per_partition_limit),
stats,
std::move(prepared_attrs));
} else if (can_be_forwarded()) {
stmt = parallelized_select_statement::prepare(
schema,
ctx.bound_variables_size(),
_parameters,
std::move(selection),
std::move(restrictions),
std::move(group_by_cell_indices),
is_reversed_,
std::move(ordering_comparator),
prepare_limit(db, ctx, _limit),
prepare_limit(db, ctx, _per_partition_limit),
stats,
std::move(prepared_attrs)
);
} else {
stmt = ::make_shared<cql3::statements::primary_key_select_statement>(
schema,

View File

@@ -76,6 +76,7 @@ struct cql_stats {
int64_t select_allow_filtering = 0;
int64_t select_partition_range_scan = 0;
int64_t select_partition_range_scan_no_bypass_cache = 0;
int64_t select_parallelized = 0;
private:
uint64_t _unpaged_select_queries[(size_t)ks_selector::SIZE] = {0ul};