column_computation.hh, schema.cc: collection_column_computation

This type of column computation will be used for creating updates to
materialized views that are indexes over collections.

This type features additional function, compute_values_with_action,
which depending on an (optional) old row and new row (the update to the
base table) returns multiple bytes_with_action, a vector of pairs
(computed value, some action), where the action signifies whether a
deletion of row with a specific key is needed, or creation thereby.
This commit is contained in:
Michał Radwański
2022-05-17 15:55:35 +02:00
committed by Nadav Har'El
parent 2babee2cdc
commit ebc4ad4713
3 changed files with 234 additions and 2 deletions

View File

@@ -13,6 +13,12 @@
class schema;
class partition_key;
class clustering_row;
struct atomic_cell_view;
struct tombstone;
namespace db::view {
struct bytes_with_action;
}
class column_computation;
using column_computation_ptr = std::unique_ptr<column_computation>;
@@ -77,3 +83,39 @@ public:
virtual bytes serialize() const override;
virtual bytes compute_value(const schema& schema, const partition_key& key) const override;
};
class collection_column_computation final : public column_computation {
enum class kind {
keys,
values,
entries,
};
const bytes _collection_name;
const kind _kind;
collection_column_computation(const bytes& collection_name, kind kind) : _collection_name(collection_name), _kind(kind) {}
using collection_kv = std::pair<bytes_view, atomic_cell_view>;
void operate_on_collection_entries(
std::invocable<collection_kv*, collection_kv*, tombstone> auto&& old_and_new_row_func, const schema& schema,
const partition_key& key, const clustering_row& update, const std::optional<clustering_row>& existing) const;
public:
static collection_column_computation for_keys(const bytes& collection_name) {
return {collection_name, kind::keys};
}
static collection_column_computation for_values(const bytes& collection_name) {
return {collection_name, kind::values};
}
static collection_column_computation for_entries(const bytes& collection_name) {
return {collection_name, kind::entries};
}
static column_computation_ptr for_target_type(std::string_view type, const bytes& collection_name);
virtual bytes serialize() const override;
virtual bytes compute_value(const schema& schema, const partition_key& key) const override;
virtual column_computation_ptr clone() const override {
return std::make_unique<collection_column_computation>(*this);
}
std::vector<db::view::bytes_with_action> compute_values_with_action(const schema& schema, const partition_key& key, const clustering_row& row, const std::optional<clustering_row>& existing) const;
};

View File

@@ -123,6 +123,33 @@ bool matches_view_filter(const schema& base, const view_info& view, const partit
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck);
/**
* The liveness of rows resulting from an update to a table with an index over collection column
* depends on the liveness of the collection cells. bytes_with_action is used to return this information
* from the function computing the result for a particular computed column.
*/
struct bytes_with_action {
struct no_action {};
struct shadowable_tombstone_tag {
api::timestamp_type ts;
shadowable_tombstone into_shadowable_tombstone(gc_clock::time_point now) const {
return shadowable_tombstone{ts, now};
}
};
using action = std::variant<no_action, row_marker, shadowable_tombstone_tag>;
bytes _view;
action _action = no_action{};
bytes_with_action(bytes view)
: _view(std::move(view))
{}
bytes_with_action(bytes view, action action)
: _view(std::move(view))
, _action(action)
{}
};
class view_updates final {
view_ptr _view;
const view_info& _view_info;

167
schema.cc
View File

@@ -8,6 +8,7 @@
#include <seastar/core/on_internal_error.hh>
#include <map>
#include "timestamp.hh"
#include "utils/UUID_gen.hh"
#include "cql3/column_identifier.hh"
#include "cql3/util.hh"
@@ -1691,12 +1692,24 @@ column_computation_ptr column_computation::deserialize(bytes_view raw) {
if (!type_json || !type_json->IsString()) {
throw std::runtime_error(format("Type {} is not convertible to string", *type_json));
}
if (rjson::to_string_view(*type_json) == "token") {
const std::string_view type = rjson::to_string_view(*type_json);
if (type == "token") {
return std::make_unique<legacy_token_column_computation>();
}
if (rjson::to_string_view(*type_json) == "token_v2") {
if (type == "token_v2") {
return std::make_unique<token_column_computation>();
}
if (type.starts_with("collection_")) {
const rjson::value* collection_name = rjson::find(parsed, "collection_name");
if (collection_name && collection_name->IsString()) {
auto collection = rjson::to_string_view(*collection_name);
auto collection_as_bytes = bytes(collection.begin(), collection.end());
if (auto collection = collection_column_computation::for_target_type(type, collection_as_bytes)) {
return collection->clone();
}
}
}
throw std::runtime_error(format("Incorrect column computation type {} found when parsing {}", *type_json, parsed));
}
@@ -1721,6 +1734,156 @@ bytes token_column_computation::compute_value(const schema& schema, const partit
return long_type->decompose(long_value);
}
bytes collection_column_computation::serialize() const {
rjson::value serialized = rjson::empty_object();
const char* type = nullptr;
switch (_kind) {
case kind::keys:
type = "collection_keys";
break;
case kind::values:
type = "collection_values";
break;
case kind::entries:
type = "collection_entries";
break;
}
rjson::add(serialized, "type", rjson::from_string(type));
rjson::add(serialized, "collection_name", rjson::from_string(sstring(_collection_name.begin(), _collection_name.end())));
return to_bytes(rjson::print(serialized));
}
column_computation_ptr collection_column_computation::for_target_type(std::string_view type, const bytes& collection_name) {
if (type == "collection_keys") {
return collection_column_computation::for_keys(collection_name).clone();
}
if (type == "collection_values") {
return collection_column_computation::for_values(collection_name).clone();
}
if (type == "collection_entries") {
return collection_column_computation::for_entries(collection_name).clone();
}
return {};
}
void collection_column_computation::operate_on_collection_entries(
std::invocable<collection_kv*, collection_kv*, tombstone> auto&& old_and_new_row_func, const schema& schema,
const partition_key& key, const clustering_row& update, const std::optional<clustering_row>& existing) const {
const column_definition* cdef = schema.get_column_definition(_collection_name);
auto get_cell = [](auto& kv) -> collection_kv::second_type& { return kv->second; };
decltype(collection_mutation_view_description::cells) update_cells, existing_cells;
const auto* update_cell = update.cells().find_cell(cdef->id);
tombstone update_tombstone = update.tomb().tomb();
if (update_cell) {
collection_mutation_view update_col_view = update_cell->as_collection_mutation();
update_col_view.with_deserialized(*(cdef->type), [&update_cells, &update_tombstone] (collection_mutation_view_description descr) {
update_tombstone.apply(descr.tomb);
update_cells = descr.cells;
});
}
if (existing) {
const auto* existing_cell = existing->cells().find_cell(cdef->id);
if (existing_cell) {
collection_mutation_view existing_col_view = existing_cell->as_collection_mutation();
existing_col_view.with_deserialized(*(cdef->type), [&existing_cells] (collection_mutation_view_description descr) {
existing_cells = descr.cells;
});
}
}
auto compare = [](const collection_kv& p1, const collection_kv& p2) {
return p1.first <=> p2.first;
};
// Both collections are assumed to be sorted by the keys.
auto existing_it = existing_cells.begin();
auto update_it = update_cells.begin();
auto is_existing_end = [&] {
return existing_it == existing_cells.end();
};
auto is_update_end = [&] {
return update_it == update_cells.end();
};
while (!(is_existing_end() && is_update_end())) {
std::strong_ordering cmp = [&] {
if (is_existing_end()) {
return std::strong_ordering::greater;
} else if (is_update_end()) {
return std::strong_ordering::less;
}
return compare(*existing_it, *update_it);
}();
auto existing_ptr = [&] () -> collection_kv* {
return (!is_existing_end() && cmp <= 0) ? &*existing_it : nullptr;
};
auto update_ptr = [&] () -> collection_kv* {
return (!is_update_end() && cmp >= 0) ? &*update_it : nullptr;
};
old_and_new_row_func(existing_ptr(), update_ptr(), update_tombstone);
if (cmp <= 0) {
++existing_it;
}
if (cmp >= 0) {
++update_it;
}
}
}
bytes collection_column_computation::compute_value(const schema&, const partition_key&) const {
throw std::runtime_error(fmt::format("{}: not supported", __PRETTY_FUNCTION__));
}
std::vector<db::view::bytes_with_action> collection_column_computation::compute_values_with_action(const schema& schema, const partition_key& key, const clustering_row& update, const std::optional<clustering_row>& existing) const {
using collection_kv = std::pair<bytes_view, atomic_cell_view>;
auto serialize_cell = [_kind = _kind](const collection_kv& kv) -> bytes {
using kind = collection_column_computation::kind;
auto& [key, value] = kv;
switch (_kind) {
case kind::keys:
return bytes(key);
case kind::values:
return value.value().linearize();
case kind::entries:
bytes_opt elements[] = {bytes(key), value.value().linearize()};
return tuple_type_impl::build_value(elements);
}
};
std::vector<db::view::bytes_with_action> ret;
auto compute_row_marker = [] (auto&& cell) -> row_marker {
return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp());
};
auto fn = [&ret, &compute_row_marker, &serialize_cell] (collection_kv* existing, collection_kv* update, tombstone tomb) {
api::timestamp_type operation_ts = tomb.timestamp;
if (existing && update && compare_atomic_cell_for_merge(existing->second, update->second) == 0) {
return;
}
if (update) {
operation_ts = update->second.timestamp();
if (update->second.is_live()) {
row_marker rm = compute_row_marker(update->second);
ret.push_back({serialize_cell(*update), {rm}});
}
}
operation_ts -= 1;
if (existing && existing->second.is_live()) {
db::view::bytes_with_action::shadowable_tombstone_tag tag{operation_ts};
ret.push_back({serialize_cell(*existing), {tag}});
}
};
operate_on_collection_entries(fn, schema, key, update, existing);
return ret;
}
bool operator==(const raw_view_info& x, const raw_view_info& y) {
return x._base_id == y._base_id
&& x._base_name == y._base_name