thrift: Update CQL mapping of static CFs
This patch updates the mapping of static CFs so that their CQL representation is a non-compound, non-dense schema with static columns, instead of regular ones. This matches the representation os static CFs in Cassandra 3.x. Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
committed by
Calle Wilund
parent
6c8b5fc09d
commit
6260f31e08
@@ -139,7 +139,7 @@ void schema::rebuild() {
|
||||
}
|
||||
|
||||
thrift()._compound = is_compound();
|
||||
thrift()._is_dynamic = clustering_key_size() > 0;
|
||||
thrift()._is_dynamic = static_columns_count() == 0;
|
||||
|
||||
if (is_counter()) {
|
||||
for (auto&& cdef : boost::range::join(static_columns(), regular_columns())) {
|
||||
|
||||
@@ -324,13 +324,7 @@ public:
|
||||
auto cmd = slice_pred_to_read_cmd(*schema, predicate);
|
||||
// KeyRange::count is the number of thrift rows to return, while
|
||||
// SlicePredicte::slice_range::count limits the number of thrift colums.
|
||||
if (schema->thrift().is_dynamic()) {
|
||||
// For dynamic CFs we must limit the number of partitions returned.
|
||||
cmd->partition_limit = range.count;
|
||||
} else {
|
||||
// For static CFs each thrift row maps to a CQL row.
|
||||
cmd->row_limit = range.count;
|
||||
}
|
||||
cmd->partition_limit = range.count;
|
||||
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
|
||||
return f.then([schema, cmd, prange = std::move(prange), consistency_level] () mutable {
|
||||
return service::get_local_storage_proxy().query(
|
||||
@@ -351,6 +345,7 @@ public:
|
||||
auto opts = query_opts(s);
|
||||
std::vector<query::clustering_range> clustering_ranges;
|
||||
std::vector<column_id> regular_columns;
|
||||
std::vector<column_id> static_columns;
|
||||
uint32_t row_limit;
|
||||
uint32_t partition_limit;
|
||||
std::unique_ptr<query::specific_ranges> specific_ranges = nullptr;
|
||||
@@ -370,14 +365,14 @@ public:
|
||||
// we ask for as many partitions as those that are capable of exhausting the limit and later filter out
|
||||
// any excess cells.
|
||||
row_limit = query::max_rows;
|
||||
partition_limit = (column_limit + s.regular_columns_count() - 1) / s.regular_columns_count();
|
||||
partition_limit = (column_limit + s.static_columns_count() - 1) / s.static_columns_count();
|
||||
schema::const_iterator start_col = start_column
|
||||
? s.regular_lower_bound(to_bytes(*start_column))
|
||||
: s.regular_begin();
|
||||
regular_columns = add_columns(start_col, s.regular_end(), false);
|
||||
? s.static_lower_bound(to_bytes(*start_column))
|
||||
: s.static_begin();
|
||||
static_columns = add_columns(start_col, s.static_end(), false);
|
||||
}
|
||||
clustering_ranges.emplace_back(query::clustering_range::make_open_ended_both_sides());
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), { }, std::move(regular_columns), opts,
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), std::move(static_columns), std::move(regular_columns), opts,
|
||||
std::move(specific_ranges), cql_serialization_format::internal());
|
||||
return make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice), row_limit, gc_clock::now(), stdx::nullopt, partition_limit);
|
||||
}
|
||||
@@ -604,6 +599,7 @@ public:
|
||||
auto pk = key_from_thrift(s, to_bytes(request.key));
|
||||
auto dk = dht::global_partitioner().decorate_key(s, pk);
|
||||
std::vector<column_id> regular_columns;
|
||||
std::vector<column_id> static_columns;
|
||||
std::vector<query::clustering_range> clustering_ranges;
|
||||
auto opts = query_opts(s);
|
||||
uint32_t row_limit;
|
||||
@@ -628,9 +624,10 @@ public:
|
||||
return make_range(cslice.start, cslice.finish);
|
||||
}, cmp, [&](auto& range) { return range.is_wrap_around(cmp); }, request.reversed);
|
||||
auto on_range = [&](auto&& range) {
|
||||
auto start = range.start() ? s.regular_lower_bound(range.start()->value()) : s.regular_begin();
|
||||
auto end = range.end() ? s.regular_upper_bound(range.end()->value()) : s.regular_end();
|
||||
regular_columns = add_columns(start, end, request.reversed);
|
||||
auto start = range.start() ? s.static_lower_bound(range.start()->value()) : s.static_begin();
|
||||
auto end = range.end() ? s.static_upper_bound(range.end()->value()) : s.static_end();
|
||||
auto cols = add_columns(start, end, request.reversed);
|
||||
std::move(cols.begin(), cols.end(), std::back_inserter(static_columns));
|
||||
};
|
||||
if (request.reversed) {
|
||||
std::for_each(ranges.rbegin(), ranges.rend(), on_range);
|
||||
@@ -638,7 +635,7 @@ public:
|
||||
std::for_each(ranges.begin(), ranges.end(), on_range);
|
||||
}
|
||||
}
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts, nullptr);
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), std::move(static_columns), std::move(regular_columns), opts, nullptr);
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), row_limit);
|
||||
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
|
||||
return f.then([dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level] {
|
||||
@@ -897,7 +894,7 @@ public:
|
||||
}
|
||||
|
||||
auto s = schema_from_thrift(cf_def, cf_def.keyspace, schema->id());
|
||||
if (schema->thrift().is_dynamic() && s->regular_columns_count() > 1) {
|
||||
if (schema->thrift().is_dynamic() && !s->thrift().is_dynamic()) {
|
||||
fail(unimplemented::cause::MIXED_CF);
|
||||
}
|
||||
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::ALTER).then([this, s = std::move(s)] {
|
||||
@@ -1132,7 +1129,7 @@ private:
|
||||
cf_def.__set_read_repair_chance(s->read_repair_chance());
|
||||
std::vector<ColumnDef> columns;
|
||||
if (!s->thrift().is_dynamic()) {
|
||||
for (auto&& c : s->regular_columns()) {
|
||||
for (auto&& c : s->static_columns()) {
|
||||
ColumnDef c_def;
|
||||
c_def.__set_name(c.name_as_text());
|
||||
c_def.__set_validation_class(c.type->name());
|
||||
@@ -1227,7 +1224,7 @@ private:
|
||||
auto col_name = to_bytes(col_def.name);
|
||||
regular_column_name_type->validate(col_name);
|
||||
builder.with_column(std::move(col_name), db::marshal::type_parser::parse(to_sstring(col_def.validation_class)),
|
||||
column_kind::regular_column);
|
||||
column_kind::static_column);
|
||||
auto index = index_metadata_from_thrift(col_def);
|
||||
if (index) {
|
||||
builder.with_index(index.value());
|
||||
@@ -1398,8 +1395,8 @@ private:
|
||||
return { std::move(start_bound), std::move(end_bound) };
|
||||
}
|
||||
static std::pair<schema::const_iterator, schema::const_iterator> make_column_range(const schema& s, const std::string& start, const std::string& end) {
|
||||
auto start_it = start.empty() ? s.regular_begin() : s.regular_lower_bound(to_bytes(start));
|
||||
auto end_it = end.empty() ? s.regular_end() : s.regular_upper_bound(to_bytes(end));
|
||||
auto start_it = start.empty() ? s.static_begin() : s.static_lower_bound(to_bytes(start));
|
||||
auto end_it = end.empty() ? s.static_end() : s.static_upper_bound(to_bytes(end));
|
||||
if (start_it > end_it) {
|
||||
throw make_exception<InvalidRequestException>("Range finish must come after start in the order of traversal");
|
||||
}
|
||||
@@ -1431,6 +1428,7 @@ private:
|
||||
auto opts = query_opts(s);
|
||||
std::vector<query::clustering_range> clustering_ranges;
|
||||
std::vector<column_id> regular_columns;
|
||||
std::vector<column_id> static_columns;
|
||||
uint32_t per_partition_row_limit = query::max_rows;
|
||||
if (predicate.__isset.column_names) {
|
||||
thrift_validation::validate_column_names(predicate.column_names);
|
||||
@@ -1445,9 +1443,9 @@ private:
|
||||
clustering_ranges.emplace_back(query::clustering_range::make_open_ended_both_sides());
|
||||
auto&& defs = unique_column_names
|
||||
| boost::adaptors::transformed([&s](auto&& name) { return s.get_column_definition(to_bytes(name)); })
|
||||
| boost::adaptors::filtered([](auto* def) { return def; })
|
||||
| boost::adaptors::filtered([](auto* def) { return def && def->is_static(); })
|
||||
| boost::adaptors::indirected;
|
||||
regular_columns = add_columns(defs.begin(), defs.end(), false);
|
||||
static_columns = add_columns(defs.begin(), defs.end(), false);
|
||||
}
|
||||
} else if (predicate.__isset.slice_range) {
|
||||
auto range = predicate.slice_range;
|
||||
@@ -1467,12 +1465,12 @@ private:
|
||||
auto r = make_column_range(s, range.start, range.finish);
|
||||
// For static CFs, the limit is enforced on the result as we do not implement
|
||||
// a cell limit in the database engine.
|
||||
regular_columns = add_columns(r.first, r.second, range.reversed);
|
||||
static_columns = add_columns(r.first, r.second, range.reversed);
|
||||
}
|
||||
} else {
|
||||
throw make_exception<InvalidRequestException>("SlicePredicate column_names and slice_range may not both be null");
|
||||
}
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts,
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), std::move(static_columns), std::move(regular_columns), opts,
|
||||
nullptr, cql_serialization_format::internal(), per_partition_row_limit);
|
||||
return make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice));
|
||||
}
|
||||
@@ -1567,26 +1565,33 @@ private:
|
||||
abort();
|
||||
}
|
||||
void accept_new_row(const clustering_key_prefix& key, const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
std::cout << "accept new row\n";
|
||||
auto it = row.iterator();
|
||||
auto cell = it.next_atomic_cell();
|
||||
if (cell && _current_cell_limit > 0) {
|
||||
std::cout << "has normal cell?!\n";
|
||||
bytes column_name = composite::serialize_value(key.components(), _s.thrift().has_compound_comparator()).release_bytes();
|
||||
Aggregator::on_column(_current_aggregation, column_name, *cell);
|
||||
_current_cell_limit -= 1;
|
||||
}
|
||||
accept_partition_end(static_row);
|
||||
}
|
||||
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
auto it = row.iterator();
|
||||
for (auto&& id : _slice.regular_columns) {
|
||||
std::cout << "accept new row static\n";
|
||||
accept_partition_end(static_row);
|
||||
}
|
||||
void accept_partition_end(const query::result_row_view& static_row) {
|
||||
std::cout << "accept row partition end\n";
|
||||
auto it = static_row.iterator();
|
||||
for (auto&& id : _slice.static_columns) {
|
||||
std::cout << "going over the static cols\n";
|
||||
auto cell = it.next_atomic_cell();
|
||||
if (cell && _current_cell_limit > 0) {
|
||||
Aggregator::on_column(_current_aggregation, _s.regular_column_at(id).name(), *cell);
|
||||
Aggregator::on_column(_current_aggregation, _s.static_column_at(id).name(), *cell);
|
||||
_current_cell_limit -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
void accept_partition_end(const query::result_row_view& static_row) {
|
||||
}
|
||||
};
|
||||
struct column_or_supercolumn_builder {
|
||||
using type = std::vector<ColumnOrSuperColumn>;
|
||||
@@ -1735,7 +1740,7 @@ private:
|
||||
static void delete_cell(const column_definition& def, api::timestamp_type timestamp, gc_clock::time_point deletion_time, mutation& m_to_apply) {
|
||||
if (def.is_atomic()) {
|
||||
auto dead_cell = atomic_cell::make_dead(timestamp, deletion_time);
|
||||
m_to_apply.set_clustered_cell(clustering_key_prefix::make_empty(), def, std::move(dead_cell));
|
||||
m_to_apply.set_static_cell(def, std::move(dead_cell));
|
||||
}
|
||||
}
|
||||
static void delete_column(const schema& s, const sstring& column_name, api::timestamp_type timestamp, gc_clock::time_point deletion_time, mutation& m_to_apply) {
|
||||
@@ -1772,11 +1777,21 @@ private:
|
||||
throw make_exception<InvalidRequestException>("SlicePredicate column_names and slice_range may not both be null");
|
||||
}
|
||||
}
|
||||
static void add_live_cell(const schema& s, const Column& col, const column_definition& def, mutation& m_to_apply) {
|
||||
thrift_validation::validate_column(col, def);
|
||||
auto cell = atomic_cell::make_live(col.timestamp, to_bytes_view(col.value), maybe_ttl(s, col));
|
||||
m_to_apply.set_static_cell(def, std::move(cell));
|
||||
}
|
||||
static void add_live_cell(const schema& s, const Column& col, const column_definition& def, clustering_key_prefix ckey, mutation& m_to_apply) {
|
||||
thrift_validation::validate_column(col, def);
|
||||
auto cell = atomic_cell::make_live(col.timestamp, to_bytes_view(col.value), maybe_ttl(s, col));
|
||||
m_to_apply.set_clustered_cell(std::move(ckey), def, std::move(cell));
|
||||
}
|
||||
static void add_live_cell(const schema& s, const CounterColumn& col, const column_definition& def, mutation& m_to_apply) {
|
||||
//thrift_validation::validate_column(col, def);
|
||||
auto cell = atomic_cell::make_live_counter_update(api::new_timestamp(), col.value);
|
||||
m_to_apply.set_static_cell(def, std::move(cell));
|
||||
}
|
||||
static void add_live_cell(const schema& s, const CounterColumn& col, const column_definition& def, clustering_key_prefix ckey, mutation& m_to_apply) {
|
||||
//thrift_validation::validate_column(col, def);
|
||||
auto cell = atomic_cell::make_live_counter_update(api::new_timestamp(), col.value);
|
||||
@@ -1790,10 +1805,10 @@ private:
|
||||
} else {
|
||||
auto def = s.get_column_definition(to_bytes(col.name));
|
||||
if (def) {
|
||||
if (def->kind != column_kind::regular_column) {
|
||||
if (def->kind != column_kind::static_column) {
|
||||
throw make_exception<InvalidRequestException>("Column %s is not settable", col.name);
|
||||
}
|
||||
add_live_cell(s, col, *def, clustering_key_prefix::make_empty(s), m_to_apply);
|
||||
add_live_cell(s, col, *def, m_to_apply);
|
||||
} else {
|
||||
fail(unimplemented::cause::MIXED_CF);
|
||||
}
|
||||
@@ -1807,10 +1822,10 @@ private:
|
||||
} else {
|
||||
auto def = s.get_column_definition(to_bytes(col.name));
|
||||
if (def) {
|
||||
if (def->kind != column_kind::regular_column) {
|
||||
if (def->kind != column_kind::static_column) {
|
||||
throw make_exception<InvalidRequestException>("Column %s is not settable", col.name);
|
||||
}
|
||||
add_live_cell(s, col, *def, clustering_key_prefix::make_empty(s), m_to_apply);
|
||||
add_live_cell(s, col, *def, m_to_apply);
|
||||
} else {
|
||||
fail(unimplemented::cause::MIXED_CF);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user