cql3: Move method implementations to .cc
This commit is contained in:
@@ -24,11 +24,25 @@
|
||||
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "cql3/selection/selector_factories.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
namespace selection {
|
||||
|
||||
selection::selection(schema_ptr schema,
|
||||
std::vector<const column_definition*> columns,
|
||||
std::vector<::shared_ptr<column_specification>> metadata_,
|
||||
bool collect_timestamps,
|
||||
bool collect_TTLs)
|
||||
: _schema(std::move(schema))
|
||||
, _columns(std::move(columns))
|
||||
, _metadata(::make_shared<metadata>(std::move(metadata_)))
|
||||
, _collect_timestamps(collect_timestamps)
|
||||
, _collect_TTLs(collect_TTLs)
|
||||
, _contains_static_columns(std::any_of(_columns.begin(), _columns.end(), std::mem_fn(&column_definition::is_static)))
|
||||
{ }
|
||||
|
||||
query::partition_slice::option_set selection::get_query_options() {
|
||||
query::partition_slice::option_set opts;
|
||||
|
||||
@@ -227,6 +241,88 @@ result_set_builder::result_set_builder(selection& s, db_clock::time_point now, s
|
||||
}
|
||||
}
|
||||
|
||||
void result_set_builder::add_empty() {
|
||||
current->emplace_back();
|
||||
if (!_timestamps.empty()) {
|
||||
_timestamps[current->size() - 1] = api::min_timestamp;
|
||||
}
|
||||
if (!_ttls.empty()) {
|
||||
_ttls[current->size() - 1] = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void result_set_builder::add(bytes_opt value) {
|
||||
current->emplace_back(std::move(value));
|
||||
}
|
||||
|
||||
void result_set_builder::add(const column_definition& def, const query::result_atomic_cell_view& c) {
|
||||
current->emplace_back(get_value(def.type, c));
|
||||
if (!_timestamps.empty()) {
|
||||
_timestamps[current->size() - 1] = c.timestamp();
|
||||
}
|
||||
if (!_ttls.empty()) {
|
||||
gc_clock::duration ttl(-1);
|
||||
auto maybe_ttl = c.ttl();
|
||||
if (maybe_ttl) {
|
||||
ttl = *maybe_ttl - to_gc_clock(_now);
|
||||
}
|
||||
_ttls[current->size() - 1] = ttl.count();
|
||||
}
|
||||
}
|
||||
|
||||
void result_set_builder::add(const column_definition& def, collection_mutation::view c) {
|
||||
auto&& ctype = static_cast<collection_type_impl*>(def.type.get());
|
||||
current->emplace_back(ctype->to_value(c, _serialization_format));
|
||||
// timestamps, ttls meaningless for collections
|
||||
}
|
||||
|
||||
void result_set_builder::new_row() {
|
||||
if (current) {
|
||||
_selectors->add_input_row(_serialization_format, *this);
|
||||
if (!_selectors->is_aggregate()) {
|
||||
_result_set->add_row(_selectors->get_output_row(_serialization_format));
|
||||
_selectors->reset();
|
||||
}
|
||||
current->clear();
|
||||
} else {
|
||||
// FIXME: we use optional<> here because we don't have an end_row() signal
|
||||
// instead, !current means that new_row has never been called, so this
|
||||
// call to new_row() does not end a previous row.
|
||||
current.emplace();
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<result_set> result_set_builder::build() {
|
||||
if (current) {
|
||||
_selectors->add_input_row(_serialization_format, *this);
|
||||
_result_set->add_row(_selectors->get_output_row(_serialization_format));
|
||||
_selectors->reset();
|
||||
current = std::experimental::nullopt;
|
||||
}
|
||||
if (_result_set->empty() && _selectors->is_aggregate()) {
|
||||
_result_set->add_row(_selectors->get_output_row(_serialization_format));
|
||||
}
|
||||
return std::move(_result_set);
|
||||
}
|
||||
|
||||
api::timestamp_type result_set_builder::timestamp_of(size_t idx) {
|
||||
return _timestamps[idx];
|
||||
}
|
||||
|
||||
int32_t result_set_builder::ttl_of(size_t idx) {
|
||||
return _ttls[idx];
|
||||
}
|
||||
|
||||
bytes_opt result_set_builder::get_value(data_type t, query::result_atomic_cell_view c) {
|
||||
if (t->is_counter()) {
|
||||
fail(unimplemented::cause::COUNTERS);
|
||||
#if 0
|
||||
ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
|
||||
#endif
|
||||
}
|
||||
return {to_bytes(c.value())};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -29,13 +29,15 @@
|
||||
#include "query-result-reader.hh"
|
||||
#include "cql3/column_specification.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "cql3/selection/raw_selector.hh"
|
||||
#include "cql3/selection/selector_factories.hh"
|
||||
#include "unimplemented.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class result_set;
|
||||
class metadata;
|
||||
|
||||
namespace selection {
|
||||
|
||||
class selectors {
|
||||
@@ -66,15 +68,12 @@ private:
|
||||
const bool _collect_TTLs;
|
||||
const bool _contains_static_columns;
|
||||
protected:
|
||||
selection(schema_ptr schema, std::vector<const column_definition*> columns, std::vector<::shared_ptr<column_specification>> metadata_,
|
||||
bool collect_timestamps, bool collect_TTLs)
|
||||
: _schema(std::move(schema))
|
||||
, _columns(std::move(columns))
|
||||
, _metadata(::make_shared<metadata>(std::move(metadata_)))
|
||||
, _collect_timestamps(collect_timestamps)
|
||||
, _collect_TTLs(collect_TTLs)
|
||||
, _contains_static_columns(std::any_of(_columns.begin(), _columns.end(), std::mem_fn(&column_definition::is_static)))
|
||||
{ }
|
||||
selection(schema_ptr schema,
|
||||
std::vector<const column_definition*> columns,
|
||||
std::vector<::shared_ptr<column_specification>> metadata_,
|
||||
bool collect_timestamps,
|
||||
bool collect_TTLs);
|
||||
|
||||
virtual ~selection() {}
|
||||
public:
|
||||
// Overriden by SimpleSelection when appropriate.
|
||||
@@ -223,88 +222,16 @@ private:
|
||||
serialization_format _serialization_format;
|
||||
public:
|
||||
result_set_builder(selection& s, db_clock::time_point now, serialization_format sf);
|
||||
|
||||
void add_empty() {
|
||||
current->emplace_back();
|
||||
if (!_timestamps.empty()) {
|
||||
_timestamps[current->size() - 1] = api::min_timestamp;
|
||||
}
|
||||
if (!_ttls.empty()) {
|
||||
_ttls[current->size() - 1] = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void add(bytes_opt value) {
|
||||
current->emplace_back(std::move(value));
|
||||
}
|
||||
|
||||
void add(const column_definition& def, const query::result_atomic_cell_view& c) {
|
||||
current->emplace_back(get_value(def.type, c));
|
||||
if (!_timestamps.empty()) {
|
||||
_timestamps[current->size() - 1] = c.timestamp();
|
||||
}
|
||||
if (!_ttls.empty()) {
|
||||
gc_clock::duration ttl(-1);
|
||||
auto maybe_ttl = c.ttl();
|
||||
if (maybe_ttl) {
|
||||
ttl = *maybe_ttl - to_gc_clock(_now);
|
||||
}
|
||||
_ttls[current->size() - 1] = ttl.count();
|
||||
}
|
||||
}
|
||||
|
||||
void add(const column_definition& def, collection_mutation::view c) {
|
||||
auto&& ctype = static_cast<collection_type_impl*>(def.type.get());
|
||||
current->emplace_back(ctype->to_value(c, _serialization_format));
|
||||
// timestamps, ttls meaningless for collections
|
||||
}
|
||||
|
||||
void new_row() {
|
||||
if (current) {
|
||||
_selectors->add_input_row(_serialization_format, *this);
|
||||
if (!_selectors->is_aggregate()) {
|
||||
_result_set->add_row(_selectors->get_output_row(_serialization_format));
|
||||
_selectors->reset();
|
||||
}
|
||||
current->clear();
|
||||
} else {
|
||||
// FIXME: we use optional<> here because we don't have an end_row() signal
|
||||
// instead, !current means that new_row has never been called, so this
|
||||
// call to new_row() does not end a previous row.
|
||||
current.emplace();
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<result_set> build() {
|
||||
if (current) {
|
||||
_selectors->add_input_row(_serialization_format, *this);
|
||||
_result_set->add_row(_selectors->get_output_row(_serialization_format));
|
||||
_selectors->reset();
|
||||
current = std::experimental::nullopt;
|
||||
}
|
||||
if (_result_set->empty() && _selectors->is_aggregate()) {
|
||||
_result_set->add_row(_selectors->get_output_row(_serialization_format));
|
||||
}
|
||||
return std::move(_result_set);
|
||||
}
|
||||
|
||||
api::timestamp_type timestamp_of(size_t idx) {
|
||||
return _timestamps[idx];
|
||||
}
|
||||
|
||||
int32_t ttl_of(size_t idx) {
|
||||
return _ttls[idx];
|
||||
}
|
||||
void add_empty();
|
||||
void add(bytes_opt value);
|
||||
void add(const column_definition& def, const query::result_atomic_cell_view& c);
|
||||
void add(const column_definition& def, collection_mutation::view c);
|
||||
void new_row();
|
||||
std::unique_ptr<result_set> build();
|
||||
api::timestamp_type timestamp_of(size_t idx);
|
||||
int32_t ttl_of(size_t idx);
|
||||
private:
|
||||
bytes_opt get_value(data_type t, query::result_atomic_cell_view c) {
|
||||
if (t->is_counter()) {
|
||||
fail(unimplemented::cause::COUNTERS);
|
||||
#if 0
|
||||
ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
|
||||
#endif
|
||||
}
|
||||
return {to_bytes(c.value())};
|
||||
}
|
||||
bytes_opt get_value(data_type t, query::result_atomic_cell_view c);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -35,6 +35,113 @@ namespace statements {
|
||||
|
||||
const shared_ptr<select_statement::parameters> select_statement::_default_parameters = ::make_shared<select_statement::parameters>();
|
||||
|
||||
select_statement::select_statement(schema_ptr schema,
|
||||
uint32_t bound_terms,
|
||||
::shared_ptr<parameters> parameters,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions,
|
||||
bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator,
|
||||
::shared_ptr<term> limit)
|
||||
: _schema(schema)
|
||||
, _bound_terms(bound_terms)
|
||||
, _parameters(std::move(parameters))
|
||||
, _selection(std::move(selection))
|
||||
, _restrictions(std::move(restrictions))
|
||||
, _is_reversed(is_reversed)
|
||||
, _limit(std::move(limit))
|
||||
, _ordering_comparator(std::move(ordering_comparator))
|
||||
{
|
||||
_opts = _selection->get_query_options();
|
||||
}
|
||||
|
||||
bool select_statement::uses_function(const sstring& ks_name, const sstring& function_name) const {
|
||||
return _selection->uses_function(ks_name, function_name)
|
||||
|| _restrictions->uses_function(ks_name, function_name)
|
||||
|| (_limit && _limit->uses_function(ks_name, function_name));
|
||||
}
|
||||
|
||||
::shared_ptr<select_statement>
|
||||
select_statement::for_selection(schema_ptr schema, ::shared_ptr<selection::selection> selection) {
|
||||
return ::make_shared<select_statement>(schema,
|
||||
0,
|
||||
_default_parameters,
|
||||
selection,
|
||||
::make_shared<restrictions::statement_restrictions>(schema),
|
||||
false,
|
||||
ordering_comparator_type{},
|
||||
::shared_ptr<term>{});
|
||||
}
|
||||
|
||||
uint32_t select_statement::get_bound_terms() {
|
||||
return _bound_terms;
|
||||
}
|
||||
|
||||
void select_statement::check_access(const service::client_state& state) {
|
||||
warn(unimplemented::cause::PERMISSIONS);
|
||||
#if 0
|
||||
state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT);
|
||||
#endif
|
||||
}
|
||||
|
||||
void select_statement::validate(const service::client_state& state) {
|
||||
// Nothing to do, all validation has been done by raw_statemet::prepare()
|
||||
}
|
||||
|
||||
query::partition_slice
|
||||
select_statement::make_partition_slice(const query_options& options) {
|
||||
std::vector<column_id> static_columns;
|
||||
std::vector<column_id> regular_columns;
|
||||
|
||||
if (_selection->contains_static_columns()) {
|
||||
static_columns.reserve(_selection->get_column_count());
|
||||
}
|
||||
|
||||
regular_columns.reserve(_selection->get_column_count());
|
||||
|
||||
for (auto&& col : _selection->get_columns()) {
|
||||
if (col->is_static()) {
|
||||
static_columns.push_back(col->id);
|
||||
} else if (col->is_regular()) {
|
||||
regular_columns.push_back(col->id);
|
||||
}
|
||||
}
|
||||
|
||||
if (_parameters->is_distinct()) {
|
||||
return query::partition_slice({}, std::move(static_columns), {}, _opts);
|
||||
}
|
||||
|
||||
return query::partition_slice(_restrictions->get_clustering_bounds(options),
|
||||
std::move(static_columns), std::move(regular_columns), _opts);
|
||||
}
|
||||
|
||||
int32_t select_statement::get_limit(const query_options& options) const {
|
||||
if (!_limit) {
|
||||
return std::numeric_limits<int32_t>::max();
|
||||
}
|
||||
|
||||
auto val = _limit->bind_and_get(options);
|
||||
if (!val) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value of limit");
|
||||
}
|
||||
|
||||
try {
|
||||
int32_type->validate(*val);
|
||||
auto l = boost::any_cast<int32_t>(int32_type->deserialize(*val));
|
||||
if (l <= 0) {
|
||||
throw exceptions::invalid_request_exception("LIMIT must be strictly positive");
|
||||
}
|
||||
return l;
|
||||
} catch (const marshal_exception& e) {
|
||||
throw exceptions::invalid_request_exception("Invalid limit value");
|
||||
}
|
||||
}
|
||||
|
||||
bool select_statement::needs_post_query_ordering() const {
|
||||
// We need post-query ordering only for queries with IN on the partition key and an ORDER BY.
|
||||
return _restrictions->key_is_in_relation() && !_parameters->orderings().empty();
|
||||
}
|
||||
|
||||
future<shared_ptr<transport::messages::result_message>>
|
||||
select_statement::execute(service::storage_proxy& proxy, service::query_state& state, const query_options& options) {
|
||||
auto cl = options.get_consistency();
|
||||
@@ -217,5 +324,251 @@ select_statement::process_results(foreign_ptr<lw_shared_ptr<query::result>> resu
|
||||
return ::make_shared<transport::messages::result_message::rows>(std::move(rs));
|
||||
}
|
||||
|
||||
::shared_ptr<parsed_statement::prepared>
|
||||
select_statement::raw_statement::prepare(database& db) {
|
||||
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
|
||||
auto bound_names = get_bound_variables();
|
||||
|
||||
auto selection = _select_clause.empty()
|
||||
? selection::selection::wildcard(schema)
|
||||
: selection::selection::from_selectors(schema, _select_clause);
|
||||
|
||||
auto restrictions = prepare_restrictions(schema, bound_names, selection);
|
||||
|
||||
if (_parameters->is_distinct()) {
|
||||
validate_distinct_selection(schema, selection, restrictions);
|
||||
}
|
||||
|
||||
select_statement::ordering_comparator_type ordering_comparator;
|
||||
bool is_reversed_ = false;
|
||||
|
||||
if (!_parameters->orderings().empty()) {
|
||||
verify_ordering_is_allowed(restrictions);
|
||||
ordering_comparator = get_ordering_comparator(schema, selection, restrictions);
|
||||
is_reversed_ = is_reversed(schema);
|
||||
}
|
||||
|
||||
if (is_reversed_) {
|
||||
restrictions->reverse();
|
||||
}
|
||||
|
||||
check_needs_filtering(restrictions);
|
||||
|
||||
auto stmt = ::make_shared<select_statement>(schema,
|
||||
bound_names->size(),
|
||||
_parameters,
|
||||
std::move(selection),
|
||||
std::move(restrictions),
|
||||
is_reversed_,
|
||||
std::move(ordering_comparator),
|
||||
prepare_limit(bound_names));
|
||||
|
||||
return ::make_shared<parsed_statement::prepared>(std::move(stmt), std::move(*bound_names));
|
||||
}
|
||||
|
||||
::shared_ptr<restrictions::statement_restrictions>
|
||||
select_statement::raw_statement::prepare_restrictions(schema_ptr schema,
|
||||
::shared_ptr<variable_specifications> bound_names,
|
||||
::shared_ptr<selection::selection> selection)
|
||||
{
|
||||
try {
|
||||
return ::make_shared<restrictions::statement_restrictions>(schema, std::move(_where_clause), bound_names,
|
||||
selection->contains_only_static_columns(), selection->contains_a_collection());
|
||||
} catch (const exceptions::unrecognized_entity_exception& e) {
|
||||
if (contains_alias(e.entity)) {
|
||||
throw exceptions::invalid_request_exception(sprint("Aliases aren't allowed in the where clause ('%s')", e.relation->to_string()));
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns a ::shared_ptr<term> for the limit or null if no limit is set */
|
||||
::shared_ptr<term>
|
||||
select_statement::raw_statement::prepare_limit(::shared_ptr<variable_specifications> bound_names) {
|
||||
if (!_limit) {
|
||||
return {};
|
||||
}
|
||||
|
||||
auto prep_limit = _limit->prepare(keyspace(), limit_receiver());
|
||||
prep_limit->collect_marker_specification(bound_names);
|
||||
return prep_limit;
|
||||
}
|
||||
|
||||
void select_statement::raw_statement::verify_ordering_is_allowed(
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions)
|
||||
{
|
||||
if (restrictions->uses_secondary_indexing()) {
|
||||
throw exceptions::invalid_request_exception("ORDER BY with 2ndary indexes is not supported.");
|
||||
}
|
||||
if (restrictions->is_key_range()) {
|
||||
throw exceptions::invalid_request_exception("ORDER BY is only supported when the partition key is restricted by an EQ or an IN.");
|
||||
}
|
||||
}
|
||||
|
||||
void select_statement::raw_statement::validate_distinct_selection(schema_ptr schema,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions)
|
||||
{
|
||||
for (auto&& def : selection->get_columns()) {
|
||||
if (!def->is_partition_key() && !def->is_static()) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"SELECT DISTINCT queries must only request partition key columns and/or static columns (not %s)",
|
||||
def->name_as_text()));
|
||||
}
|
||||
}
|
||||
|
||||
// If it's a key range, we require that all partition key columns are selected so we don't have to bother
|
||||
// with post-query grouping.
|
||||
if (!restrictions->is_key_range()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto&& def : schema->partition_key_columns()) {
|
||||
if (!selection->has_column(def)) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name_as_text()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void select_statement::raw_statement::handle_unrecognized_ordering_column(
|
||||
::shared_ptr<column_identifier> column)
|
||||
{
|
||||
if (contains_alias(column)) {
|
||||
throw exceptions::invalid_request_exception(sprint("Aliases are not allowed in order by clause ('%s')", *column));
|
||||
}
|
||||
throw exceptions::invalid_request_exception(sprint("Order by on unknown column %s", *column));
|
||||
}
|
||||
|
||||
select_statement::ordering_comparator_type
|
||||
select_statement::raw_statement::get_ordering_comparator(schema_ptr schema,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions)
|
||||
{
|
||||
if (!restrictions->key_is_in_relation()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<std::pair<uint32_t, data_type>> sorters;
|
||||
sorters.reserve(_parameters->orderings().size());
|
||||
|
||||
// If we order post-query (see orderResults), the sorted column needs to be in the ResultSet for sorting,
|
||||
// even if we don't
|
||||
// ultimately ship them to the client (CASSANDRA-4911).
|
||||
for (auto&& e : _parameters->orderings()) {
|
||||
auto&& raw = e.first;
|
||||
::shared_ptr<column_identifier> column = raw->prepare_column_identifier(schema);
|
||||
const column_definition* def = schema->get_column_definition(column->name());
|
||||
if (!def) {
|
||||
handle_unrecognized_ordering_column(column);
|
||||
}
|
||||
auto index = selection->index_of(*def);
|
||||
if (index < 0) {
|
||||
index = selection->add_column_for_ordering(*def);
|
||||
}
|
||||
|
||||
sorters.emplace_back(index, def->type);
|
||||
}
|
||||
|
||||
return [sorters = std::move(sorters)] (const result_row_type& r1, const result_row_type& r2) mutable {
|
||||
for (auto&& e : sorters) {
|
||||
auto& c1 = r1[e.first];
|
||||
auto& c2 = r2[e.first];
|
||||
auto type = e.second;
|
||||
|
||||
if (bool(c1) != bool(c2)) {
|
||||
return bool(c2);
|
||||
}
|
||||
if (c1) {
|
||||
int result = type->compare(*c1, *c2);
|
||||
if (result != 0) {
|
||||
return result < 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
}
|
||||
|
||||
bool select_statement::raw_statement::is_reversed(schema_ptr schema) {
|
||||
std::experimental::optional<bool> reversed_map[schema->clustering_key_size()];
|
||||
|
||||
uint32_t i = 0;
|
||||
for (auto&& e : _parameters->orderings()) {
|
||||
::shared_ptr<column_identifier> column = e.first->prepare_column_identifier(schema);
|
||||
bool reversed = e.second;
|
||||
|
||||
auto def = schema->get_column_definition(column->name());
|
||||
if (!def) {
|
||||
handle_unrecognized_ordering_column(column);
|
||||
}
|
||||
|
||||
if (!def->is_clustering_key()) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", *column));
|
||||
}
|
||||
|
||||
if (i != def->component_index()) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY");
|
||||
}
|
||||
|
||||
reversed_map[i] = std::experimental::make_optional(reversed != def->type->is_reversed());
|
||||
++i;
|
||||
}
|
||||
|
||||
// GCC incorrenctly complains about "*is_reversed_" below
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
|
||||
|
||||
// Check that all bool in reversedMap, if set, agrees
|
||||
std::experimental::optional<bool> is_reversed_{};
|
||||
for (auto&& b : reversed_map) {
|
||||
if (b) {
|
||||
if (!is_reversed_) {
|
||||
is_reversed_ = b;
|
||||
} else {
|
||||
if ((*is_reversed_) != *b) {
|
||||
throw exceptions::invalid_request_exception("Unsupported order by relation");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert(is_reversed_);
|
||||
return *is_reversed_;
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
}
|
||||
|
||||
/** If ALLOW FILTERING was not specified, this verifies that it is not needed */
|
||||
void select_statement::raw_statement::check_needs_filtering(
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions)
|
||||
{
|
||||
// non-key-range non-indexed queries cannot involve filtering underneath
|
||||
if (!_parameters->allow_filtering() && (restrictions->is_key_range() || restrictions->uses_secondary_indexing())) {
|
||||
// We will potentially filter data if either:
|
||||
// - Have more than one IndexExpression
|
||||
// - Have no index expression and the column filter is not the identity
|
||||
if (restrictions->need_filtering()) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Cannot execute this query as it might involve data filtering and "
|
||||
"thus may have unpredictable performance. If you want to execute "
|
||||
"this query despite the performance unpredictability, use ALLOW FILTERING");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool select_statement::raw_statement::contains_alias(::shared_ptr<column_identifier> name) {
|
||||
return std::any_of(_select_clause.begin(), _select_clause.end(), [name] (auto raw) {
|
||||
return *name == *raw->alias;
|
||||
});
|
||||
}
|
||||
|
||||
::shared_ptr<column_specification> select_statement::raw_statement::limit_receiver() {
|
||||
return ::make_shared<column_specification>(keyspace(), column_family(), ::make_shared<column_identifier>("[limit]", true),
|
||||
int32_type);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,58 +104,19 @@ public:
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions,
|
||||
bool is_reversed,
|
||||
ordering_comparator_type ordering_comparator,
|
||||
::shared_ptr<term> limit)
|
||||
: _schema(schema)
|
||||
, _bound_terms(bound_terms)
|
||||
, _parameters(std::move(parameters))
|
||||
, _selection(std::move(selection))
|
||||
, _restrictions(std::move(restrictions))
|
||||
, _is_reversed(is_reversed)
|
||||
, _limit(std::move(limit))
|
||||
, _ordering_comparator(std::move(ordering_comparator))
|
||||
{
|
||||
_opts = _selection->get_query_options();
|
||||
}
|
||||
::shared_ptr<term> limit);
|
||||
|
||||
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const override {
|
||||
return _selection->uses_function(ks_name, function_name)
|
||||
|| _restrictions->uses_function(ks_name, function_name)
|
||||
|| (_limit && _limit->uses_function(ks_name, function_name));
|
||||
}
|
||||
virtual bool uses_function(const sstring& ks_name, const sstring& function_name) const override;
|
||||
|
||||
// Creates a simple select based on the given selection
|
||||
// Note that the results select statement should not be used for actual queries, but only for processing already
|
||||
// queried data through processColumnFamily.
|
||||
static ::shared_ptr<select_statement> for_selection(schema_ptr schema,
|
||||
::shared_ptr<selection::selection> selection) {
|
||||
return ::make_shared<select_statement>(schema,
|
||||
0,
|
||||
_default_parameters,
|
||||
selection,
|
||||
::make_shared<restrictions::statement_restrictions>(schema),
|
||||
false,
|
||||
ordering_comparator_type{},
|
||||
::shared_ptr<term>{});
|
||||
}
|
||||
static ::shared_ptr<select_statement> for_selection(
|
||||
schema_ptr schema, ::shared_ptr<selection::selection> selection);
|
||||
|
||||
::shared_ptr<metadata> get_result_metadata() {
|
||||
return _selection->get_result_metadata();
|
||||
}
|
||||
|
||||
virtual uint32_t get_bound_terms() override {
|
||||
return _bound_terms;
|
||||
}
|
||||
|
||||
virtual void check_access(const service::client_state& state) override {
|
||||
warn(unimplemented::cause::PERMISSIONS);
|
||||
#if 0
|
||||
state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT);
|
||||
#endif
|
||||
}
|
||||
|
||||
virtual void validate(const service::client_state& state) override {
|
||||
// Nothing to do, all validation has been done by raw_statemet::prepare()
|
||||
}
|
||||
virtual uint32_t get_bound_terms() override;
|
||||
virtual void check_access(const service::client_state& state) override;
|
||||
virtual void validate(const service::client_state& state) override;
|
||||
|
||||
virtual future<::shared_ptr<transport::messages::result_message>> execute(service::storage_proxy& proxy,
|
||||
service::query_state& state, const query_options& options) override;
|
||||
@@ -230,31 +191,7 @@ public:
|
||||
}
|
||||
#endif
|
||||
|
||||
query::partition_slice make_partition_slice(const query_options& options) {
|
||||
std::vector<column_id> static_columns;
|
||||
std::vector<column_id> regular_columns;
|
||||
|
||||
if (_selection->contains_static_columns()) {
|
||||
static_columns.reserve(_selection->get_column_count());
|
||||
}
|
||||
|
||||
regular_columns.reserve(_selection->get_column_count());
|
||||
|
||||
for (auto&& col : _selection->get_columns()) {
|
||||
if (col->is_static()) {
|
||||
static_columns.push_back(col->id);
|
||||
} else if (col->is_regular()) {
|
||||
regular_columns.push_back(col->id);
|
||||
}
|
||||
}
|
||||
|
||||
if (_parameters->is_distinct()) {
|
||||
return query::partition_slice({}, std::move(static_columns), {}, _opts);
|
||||
}
|
||||
|
||||
return query::partition_slice(_restrictions->get_clustering_bounds(options),
|
||||
std::move(static_columns), std::move(regular_columns), _opts);
|
||||
}
|
||||
query::partition_slice make_partition_slice(const query_options& options);
|
||||
|
||||
#if 0
|
||||
private SliceQueryFilter sliceFilter(ColumnSlice slice, int limit, int toGroup)
|
||||
@@ -270,27 +207,8 @@ public:
|
||||
#endif
|
||||
|
||||
private:
|
||||
int32_t get_limit(const query_options& options) const {
|
||||
if (!_limit) {
|
||||
return std::numeric_limits<int32_t>::max();
|
||||
}
|
||||
|
||||
auto val = _limit->bind_and_get(options);
|
||||
if (!val) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value of limit");
|
||||
}
|
||||
|
||||
try {
|
||||
int32_type->validate(*val);
|
||||
auto l = boost::any_cast<int32_t>(int32_type->deserialize(*val));
|
||||
if (l <= 0) {
|
||||
throw exceptions::invalid_request_exception("LIMIT must be strictly positive");
|
||||
}
|
||||
return l;
|
||||
} catch (const marshal_exception& e) {
|
||||
throw exceptions::invalid_request_exception("Invalid limit value");
|
||||
}
|
||||
}
|
||||
int32_t get_limit(const query_options& options) const;
|
||||
bool needs_post_query_ordering() const;
|
||||
|
||||
#if 0
|
||||
private int updateLimitForQuery(int limit)
|
||||
@@ -511,12 +429,6 @@ private:
|
||||
}
|
||||
#endif
|
||||
|
||||
private:
|
||||
bool needs_post_query_ordering() const {
|
||||
// We need post-query ordering only for queries with IN on the partition key and an ORDER BY.
|
||||
return _restrictions->key_is_in_relation() && !_parameters->orderings().empty();
|
||||
}
|
||||
|
||||
public:
|
||||
class raw_statement;
|
||||
};
|
||||
@@ -541,236 +453,36 @@ public:
|
||||
, _limit(std::move(limit))
|
||||
{ }
|
||||
|
||||
virtual ::shared_ptr<prepared> prepare(database& db) override {
|
||||
schema_ptr schema = validation::validate_column_family(db, keyspace(), column_family());
|
||||
auto bound_names = get_bound_variables();
|
||||
|
||||
auto selection = _select_clause.empty()
|
||||
? selection::selection::wildcard(schema)
|
||||
: selection::selection::from_selectors(schema, _select_clause);
|
||||
|
||||
auto restrictions = prepare_restrictions(schema, bound_names, selection);
|
||||
|
||||
if (_parameters->is_distinct()) {
|
||||
validate_distinct_selection(schema, selection, restrictions);
|
||||
}
|
||||
|
||||
select_statement::ordering_comparator_type ordering_comparator;
|
||||
bool is_reversed_ = false;
|
||||
|
||||
if (!_parameters->orderings().empty()) {
|
||||
verify_ordering_is_allowed(restrictions);
|
||||
ordering_comparator = get_ordering_comparator(schema, selection, restrictions);
|
||||
is_reversed_ = is_reversed(schema);
|
||||
}
|
||||
|
||||
if (is_reversed_) {
|
||||
restrictions->reverse();
|
||||
}
|
||||
|
||||
check_needs_filtering(restrictions);
|
||||
|
||||
auto stmt = ::make_shared<select_statement>(schema,
|
||||
bound_names->size(),
|
||||
_parameters,
|
||||
std::move(selection),
|
||||
std::move(restrictions),
|
||||
is_reversed_,
|
||||
std::move(ordering_comparator),
|
||||
prepare_limit(bound_names));
|
||||
|
||||
return ::make_shared<parsed_statement::prepared>(std::move(stmt), std::move(*bound_names));
|
||||
}
|
||||
|
||||
virtual ::shared_ptr<prepared> prepare(database& db) override;
|
||||
private:
|
||||
::shared_ptr<restrictions::statement_restrictions> prepare_restrictions(schema_ptr schema,
|
||||
::shared_ptr<variable_specifications> bound_names, ::shared_ptr<selection::selection> selection) {
|
||||
try {
|
||||
return ::make_shared<restrictions::statement_restrictions>(schema, std::move(_where_clause), bound_names,
|
||||
selection->contains_only_static_columns(), selection->contains_a_collection());
|
||||
} catch (const exceptions::unrecognized_entity_exception& e) {
|
||||
if (contains_alias(e.entity)) {
|
||||
throw exceptions::invalid_request_exception(sprint("Aliases aren't allowed in the where clause ('%s')", e.relation->to_string()));
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
::shared_ptr<restrictions::statement_restrictions> prepare_restrictions(
|
||||
schema_ptr schema,
|
||||
::shared_ptr<variable_specifications> bound_names,
|
||||
::shared_ptr<selection::selection> selection);
|
||||
|
||||
/** Returns a ::shared_ptr<term> for the limit or null if no limit is set */
|
||||
::shared_ptr<term> prepare_limit(::shared_ptr<variable_specifications> bound_names) {
|
||||
if (!_limit) {
|
||||
return {};
|
||||
}
|
||||
::shared_ptr<term> prepare_limit(::shared_ptr<variable_specifications> bound_names);
|
||||
|
||||
auto prep_limit = _limit->prepare(keyspace(), limit_receiver());
|
||||
prep_limit->collect_marker_specification(bound_names);
|
||||
return prep_limit;
|
||||
}
|
||||
static void verify_ordering_is_allowed(::shared_ptr<restrictions::statement_restrictions> restrictions);
|
||||
|
||||
static void verify_ordering_is_allowed(::shared_ptr<restrictions::statement_restrictions> restrictions) {
|
||||
if (restrictions->uses_secondary_indexing()) {
|
||||
throw exceptions::invalid_request_exception("ORDER BY with 2ndary indexes is not supported.");
|
||||
}
|
||||
if (restrictions->is_key_range()) {
|
||||
throw exceptions::invalid_request_exception("ORDER BY is only supported when the partition key is restricted by an EQ or an IN.");
|
||||
}
|
||||
}
|
||||
static void validate_distinct_selection(schema_ptr schema,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions);
|
||||
|
||||
static void validate_distinct_selection(schema_ptr schema, ::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions) {
|
||||
for (auto&& def : selection->get_columns()) {
|
||||
if (!def->is_partition_key() && !def->is_static()) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"SELECT DISTINCT queries must only request partition key columns and/or static columns (not %s)",
|
||||
def->name_as_text()));
|
||||
}
|
||||
}
|
||||
void handle_unrecognized_ordering_column(::shared_ptr<column_identifier> column);
|
||||
|
||||
// If it's a key range, we require that all partition key columns are selected so we don't have to bother
|
||||
// with post-query grouping.
|
||||
if (!restrictions->is_key_range()) {
|
||||
return;
|
||||
}
|
||||
select_statement::ordering_comparator_type get_ordering_comparator(schema_ptr schema,
|
||||
::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions);
|
||||
|
||||
for (auto&& def : schema->partition_key_columns()) {
|
||||
if (!selection->has_column(def)) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name_as_text()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void handle_unrecognized_ordering_column(::shared_ptr<column_identifier> column) {
|
||||
if (contains_alias(column)) {
|
||||
throw exceptions::invalid_request_exception(sprint("Aliases are not allowed in order by clause ('%s')", *column));
|
||||
}
|
||||
throw exceptions::invalid_request_exception(sprint("Order by on unknown column %s", *column));
|
||||
}
|
||||
|
||||
select_statement::ordering_comparator_type get_ordering_comparator(schema_ptr schema, ::shared_ptr<selection::selection> selection,
|
||||
::shared_ptr<restrictions::statement_restrictions> restrictions) {
|
||||
if (!restrictions->key_is_in_relation()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<std::pair<uint32_t, data_type>> sorters;
|
||||
sorters.reserve(_parameters->orderings().size());
|
||||
|
||||
// If we order post-query (see orderResults), the sorted column needs to be in the ResultSet for sorting,
|
||||
// even if we don't
|
||||
// ultimately ship them to the client (CASSANDRA-4911).
|
||||
for (auto&& e : _parameters->orderings()) {
|
||||
auto&& raw = e.first;
|
||||
::shared_ptr<column_identifier> column = raw->prepare_column_identifier(schema);
|
||||
const column_definition* def = schema->get_column_definition(column->name());
|
||||
if (!def) {
|
||||
handle_unrecognized_ordering_column(column);
|
||||
}
|
||||
auto index = selection->index_of(*def);
|
||||
if (index < 0) {
|
||||
index = selection->add_column_for_ordering(*def);
|
||||
}
|
||||
|
||||
sorters.emplace_back(index, def->type);
|
||||
}
|
||||
|
||||
return [sorters = std::move(sorters)] (const result_row_type& r1, const result_row_type& r2) mutable {
|
||||
for (auto&& e : sorters) {
|
||||
auto& c1 = r1[e.first];
|
||||
auto& c2 = r2[e.first];
|
||||
auto type = e.second;
|
||||
|
||||
if (bool(c1) != bool(c2)) {
|
||||
return bool(c2);
|
||||
}
|
||||
if (c1) {
|
||||
int result = type->compare(*c1, *c2);
|
||||
if (result != 0) {
|
||||
return result < 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
}
|
||||
|
||||
bool is_reversed(schema_ptr schema) {
|
||||
std::experimental::optional<bool> reversed_map[schema->clustering_key_size()];
|
||||
|
||||
uint32_t i = 0;
|
||||
for (auto&& e : _parameters->orderings()) {
|
||||
::shared_ptr<column_identifier> column = e.first->prepare_column_identifier(schema);
|
||||
bool reversed = e.second;
|
||||
|
||||
auto def = schema->get_column_definition(column->name());
|
||||
if (!def) {
|
||||
handle_unrecognized_ordering_column(column);
|
||||
}
|
||||
|
||||
if (!def->is_clustering_key()) {
|
||||
throw exceptions::invalid_request_exception(sprint(
|
||||
"Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", *column));
|
||||
}
|
||||
|
||||
if (i != def->component_index()) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY");
|
||||
}
|
||||
|
||||
reversed_map[i] = std::experimental::make_optional(reversed != def->type->is_reversed());
|
||||
++i;
|
||||
}
|
||||
|
||||
// GCC incorrenctly complains about "*is_reversed_" below
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
|
||||
|
||||
// Check that all bool in reversedMap, if set, agrees
|
||||
std::experimental::optional<bool> is_reversed_{};
|
||||
for (auto&& b : reversed_map) {
|
||||
if (b) {
|
||||
if (!is_reversed_) {
|
||||
is_reversed_ = b;
|
||||
} else {
|
||||
if ((*is_reversed_) != *b) {
|
||||
throw exceptions::invalid_request_exception("Unsupported order by relation");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert(is_reversed_);
|
||||
return *is_reversed_;
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
}
|
||||
bool is_reversed(schema_ptr schema);
|
||||
|
||||
/** If ALLOW FILTERING was not specified, this verifies that it is not needed */
|
||||
void check_needs_filtering(::shared_ptr<restrictions::statement_restrictions> restrictions) {
|
||||
// non-key-range non-indexed queries cannot involve filtering underneath
|
||||
if (!_parameters->allow_filtering() && (restrictions->is_key_range() || restrictions->uses_secondary_indexing())) {
|
||||
// We will potentially filter data if either:
|
||||
// - Have more than one IndexExpression
|
||||
// - Have no index expression and the column filter is not the identity
|
||||
if (restrictions->need_filtering()) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Cannot execute this query as it might involve data filtering and "
|
||||
"thus may have unpredictable performance. If you want to execute "
|
||||
"this query despite the performance unpredictability, use ALLOW FILTERING");
|
||||
}
|
||||
}
|
||||
}
|
||||
void check_needs_filtering(::shared_ptr<restrictions::statement_restrictions> restrictions);
|
||||
|
||||
bool contains_alias(::shared_ptr<column_identifier> name) {
|
||||
return std::any_of(_select_clause.begin(), _select_clause.end(), [name] (auto raw) {
|
||||
return *name == *raw->alias;
|
||||
});
|
||||
}
|
||||
bool contains_alias(::shared_ptr<column_identifier> name);
|
||||
|
||||
::shared_ptr<column_specification> limit_receiver() {
|
||||
return ::make_shared<column_specification>(keyspace(), column_family(), ::make_shared<column_identifier>("[limit]", true),
|
||||
int32_type);
|
||||
}
|
||||
::shared_ptr<column_specification> limit_receiver();
|
||||
|
||||
#if 0
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user