view: check for computed columns in view

Currently, having a 'computed' column in view update generation
indicates that token value needs to be generated and assigned to it.
This commit is contained in:
Piotr Sarna
2019-03-13 13:06:17 +01:00
parent a0e02df36a
commit 6a6871aa0e
2 changed files with 26 additions and 17 deletions

View File

@@ -83,11 +83,18 @@ view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
cql3::statements::select_statement& view_info::select_statement() const {
if (!_select_statement) {
shared_ptr<cql3::statements::raw::select_statement> raw;
if (is_index()) {
// Token column is the first clustering column
auto token_column_it = boost::range::find_if(_schema.all_columns(), std::mem_fn(&column_definition::is_clustering_key));
auto real_columns = _schema.all_columns() | boost::adaptors::filtered([this, token_column_it](const column_definition& cdef) {
return std::addressof(cdef) != std::addressof(*token_column_it);
// FIXME(sarna): legacy code, should be removed after "computed_columns" feature is guaranteed
// to be available on every node. Then, we won't need to check if this view is backing a secondary index.
const column_definition* legacy_token_column = nullptr;
if (service::get_local_storage_service().db().local().find_column_family(base_id()).get_index_manager().is_index(_schema)) {
if (!_schema.clustering_key_columns().empty()) {
legacy_token_column = &_schema.clustering_key_columns().front();
}
}
if (legacy_token_column || boost::algorithm::any_of(_schema.all_columns(), std::mem_fn(&column_definition::is_computed))) {
auto real_columns = _schema.all_columns() | boost::adaptors::filtered([this, legacy_token_column] (const column_definition& cdef) {
return &cdef != legacy_token_column && !cdef.is_computed();
});
schema::columns_type columns = boost::copy_range<schema::columns_type>(std::move(real_columns));
raw = cql3::util::build_select_statement(base_name(), where_clause(), include_all_columns(), columns);
@@ -142,12 +149,6 @@ void view_info::initialize_base_dependent_fields(const schema& base) {
}
}
bool view_info::is_index() const {
//TODO(sarna): result of this call can be cached instead of calling index_manager::is_index every time
column_family& base_cf = service::get_local_storage_service().db().local().find_column_family(base_id());
return base_cf.get_index_manager().is_index(view_ptr(_schema.shared_from_this()));
}
namespace db {
namespace view {
@@ -306,11 +307,21 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) -> bytes_view {
auto* base_col = _base->get_column_definition(cdef.name());
if (!base_col) {
if (!_view_info.is_index()) {
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
bytes_opt computed_value;
if (!cdef.is_computed()) {
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
// once "computed_columns feature" is supported by every node
if (!service::get_local_storage_service().db().local().find_column_family(_base->id()).get_index_manager().is_index(*_base)) {
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
}
computed_value = token_column_computation().compute_value(*_base, base_key, update);
} else {
computed_value = cdef.get_computation().compute_value(*_base, base_key, update);
}
auto& partitioner = dht::global_partitioner();
return linearized_values.emplace_back(partitioner.token_to_bytes(token_for(base_key)));
if (!computed_value) {
throw std::logic_error(format("No value computed for primary key column {}", cdef.name()));
}
return linearized_values.emplace_back(*computed_value);
}
switch (base_col->kind) {
case column_kind::partition_key:

View File

@@ -58,8 +58,6 @@ public:
return _raw.where_clause();
}
bool is_index() const;
cql3::statements::select_statement& select_statement() const;
const query::partition_slice& partition_slice() const;
const dht::partition_range_vector& partition_ranges() const;