|
|
|
|
@@ -601,12 +601,12 @@ public:
|
|
|
|
|
using pointer = const T*;
|
|
|
|
|
using reference = const T&;
|
|
|
|
|
private:
|
|
|
|
|
bytes_view _v, _next;
|
|
|
|
|
managed_bytes_view _v, _next;
|
|
|
|
|
size_t _rem = 0;
|
|
|
|
|
T _current;
|
|
|
|
|
public:
|
|
|
|
|
collection_iterator(bytes_view_opt v = {})
|
|
|
|
|
: _v(v.value_or(bytes_view{}))
|
|
|
|
|
collection_iterator(managed_bytes_view_opt v = {})
|
|
|
|
|
: _v(v.value_or(managed_bytes_view{}))
|
|
|
|
|
, _rem(_v.empty() ? 0 : read_collection_size(_v, cql_serialization_format::internal()))
|
|
|
|
|
{
|
|
|
|
|
if (_rem != 0) {
|
|
|
|
|
@@ -649,7 +649,7 @@ private:
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
template<>
|
|
|
|
|
void collection_iterator<std::pair<bytes_view, bytes_view>>::parse() {
|
|
|
|
|
void collection_iterator<std::pair<managed_bytes_view, managed_bytes_view>>::parse() {
|
|
|
|
|
assert(_rem > 0);
|
|
|
|
|
_next = _v;
|
|
|
|
|
auto k = read_collection_value(_next, cql_serialization_format::internal());
|
|
|
|
|
@@ -657,14 +657,6 @@ void collection_iterator<std::pair<bytes_view, bytes_view>>::parse() {
|
|
|
|
|
_current = std::make_pair(k, v);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
template<>
|
|
|
|
|
void collection_iterator<bytes_view>::parse() {
|
|
|
|
|
assert(_rem > 0);
|
|
|
|
|
_next = _v;
|
|
|
|
|
auto k = read_collection_value(_next, cql_serialization_format::internal());
|
|
|
|
|
_current = k;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
template<>
|
|
|
|
|
void collection_iterator<managed_bytes_view>::parse() {
|
|
|
|
|
assert(_rem > 0);
|
|
|
|
|
@@ -720,12 +712,13 @@ private:
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
template<>
|
|
|
|
|
bool maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
|
|
|
|
bool maybe_back_insert_iterator<std::vector<std::pair<managed_bytes_view, managed_bytes_view>>, managed_bytes_view>::compare(
|
|
|
|
|
const managed_bytes_view& t, const value_type& v) {
|
|
|
|
|
return _type.compare(t, v.first);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
template<>
|
|
|
|
|
bool maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
|
|
|
|
bool maybe_back_insert_iterator<std::vector<managed_bytes_view>, managed_bytes_view>::compare(const managed_bytes_view& t, const value_type& v) {
|
|
|
|
|
return _type.compare(t, v);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -734,45 +727,45 @@ auto make_maybe_back_inserter(Container& c, const abstract_type& type, collectio
|
|
|
|
|
return maybe_back_insert_iterator<Container, T>(c, type, s);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static size_t collection_size(const bytes_opt& bo) {
|
|
|
|
|
static size_t collection_size(const managed_bytes_opt& bo) {
|
|
|
|
|
if (bo) {
|
|
|
|
|
bytes_view bv(*bo);
|
|
|
|
|
return read_collection_size(bv, cql_serialization_format::internal());
|
|
|
|
|
managed_bytes_view mbv(*bo);
|
|
|
|
|
return read_collection_size(mbv, cql_serialization_format::internal());
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
template<typename Func>
|
|
|
|
|
static void udt_for_each(const bytes_opt& bo, Func&& f) {
|
|
|
|
|
static void udt_for_each(const managed_bytes_opt& bo, Func&& f) {
|
|
|
|
|
if (bo) {
|
|
|
|
|
bytes_view bv(*bo);
|
|
|
|
|
std::for_each(tuple_deserializing_iterator::start(bv), tuple_deserializing_iterator::finish(bv), std::forward<Func>(f));
|
|
|
|
|
managed_bytes_view mbv(*bo);
|
|
|
|
|
std::for_each(tuple_deserializing_iterator::start(mbv), tuple_deserializing_iterator::finish(mbv), std::forward<Func>(f));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
static bytes merge(const collection_type_impl& ctype, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) {
|
|
|
|
|
std::vector<std::pair<bytes_view, bytes_view>> res;
|
|
|
|
|
static managed_bytes merge(const collection_type_impl& ctype, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
|
|
|
|
|
std::vector<std::pair<managed_bytes_view, managed_bytes_view>> res;
|
|
|
|
|
res.reserve(collection_size(prev) + collection_size(next));
|
|
|
|
|
auto type = ctype.name_comparator();
|
|
|
|
|
auto cmp = [&type = *type](const std::pair<bytes_view, bytes_view>& p1, const std::pair<bytes_view, bytes_view>& p2) {
|
|
|
|
|
auto cmp = [&type = *type](const std::pair<managed_bytes_view, managed_bytes_view>& p1, const std::pair<managed_bytes_view, managed_bytes_view>& p2) {
|
|
|
|
|
return type.compare(p1.first, p2.first) < 0;
|
|
|
|
|
};
|
|
|
|
|
collection_iterator<std::pair<bytes_view, bytes_view>> e, i(prev), j(next);
|
|
|
|
|
collection_iterator<std::pair<managed_bytes_view, managed_bytes_view>> e, i(prev), j(next);
|
|
|
|
|
// note order: set_union, when finding doubles, use value from first1 (j here). So
|
|
|
|
|
// since this is next, it has prio
|
|
|
|
|
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, collection_iterator<bytes_view>(deleted)), cmp);
|
|
|
|
|
return map_type_impl::serialize_partially_deserialized_form(res, cql_serialization_format::internal());
|
|
|
|
|
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, collection_iterator<managed_bytes_view>(deleted)), cmp);
|
|
|
|
|
return map_type_impl::serialize_partially_deserialized_form_fragmented(res, cql_serialization_format::internal());
|
|
|
|
|
}
|
|
|
|
|
static bytes merge(const set_type_impl& ctype, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) {
|
|
|
|
|
std::vector<bytes_view> res;
|
|
|
|
|
static managed_bytes merge(const set_type_impl& ctype, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
|
|
|
|
|
std::vector<managed_bytes_view> res;
|
|
|
|
|
res.reserve(collection_size(prev) + collection_size(next));
|
|
|
|
|
auto type = ctype.name_comparator();
|
|
|
|
|
auto cmp = [&type = *type](bytes_view k1, bytes_view k2) {
|
|
|
|
|
auto cmp = [&type = *type](managed_bytes_view k1, managed_bytes_view k2) {
|
|
|
|
|
return type.compare(k1, k2) < 0;
|
|
|
|
|
};
|
|
|
|
|
collection_iterator<bytes_view> e, i(prev), j(next), d(deleted);
|
|
|
|
|
collection_iterator<managed_bytes_view> e, i(prev), j(next), d(deleted);
|
|
|
|
|
std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, d), cmp);
|
|
|
|
|
return set_type_impl::serialize_partially_deserialized_form(res, cql_serialization_format::internal());
|
|
|
|
|
return set_type_impl::serialize_partially_deserialized_form_fragmented(res, cql_serialization_format::internal());
|
|
|
|
|
}
|
|
|
|
|
static bytes merge(const user_type_impl& type, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) {
|
|
|
|
|
static managed_bytes merge(const user_type_impl& type, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
|
|
|
|
|
std::vector<managed_bytes_view_opt> res(type.size());
|
|
|
|
|
udt_for_each(prev, [&res, i = res.begin()](managed_bytes_view_opt k) mutable {
|
|
|
|
|
*i++ = k;
|
|
|
|
|
@@ -788,16 +781,16 @@ static bytes merge(const user_type_impl& type, const bytes_opt& prev, const byte
|
|
|
|
|
auto index = deserialize_field_index(k);
|
|
|
|
|
res[index] = std::nullopt;
|
|
|
|
|
});
|
|
|
|
|
return type.build_value(res);
|
|
|
|
|
return type.build_value_fragmented(res);
|
|
|
|
|
}
|
|
|
|
|
static bytes merge(const abstract_type& type, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) {
|
|
|
|
|
static managed_bytes merge(const abstract_type& type, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) {
|
|
|
|
|
throw std::runtime_error(format("cdc merge: unknown type {}", type.name()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
using cell_map = std::unordered_map<const column_definition*, bytes_opt>;
|
|
|
|
|
using cell_map = std::unordered_map<const column_definition*, managed_bytes_opt>;
|
|
|
|
|
using row_states_map = std::unordered_map<clustering_key, cell_map, clustering_key::hashing, clustering_key::equality>;
|
|
|
|
|
|
|
|
|
|
static bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) {
|
|
|
|
|
static managed_bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) {
|
|
|
|
|
if (state) {
|
|
|
|
|
if (auto it = state->find(&cdef); it != state->end()) {
|
|
|
|
|
return it->second;
|
|
|
|
|
@@ -811,28 +804,28 @@ static cell_map* get_row_state(row_states_map& row_states, const clustering_key&
|
|
|
|
|
return it == row_states.end() ? nullptr : &it->second;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
|
|
|
|
|
static managed_bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
|
|
|
|
|
if (!pirow || !pirow->has(cdef.name_as_text())) {
|
|
|
|
|
return std::nullopt;
|
|
|
|
|
}
|
|
|
|
|
return cdef.is_atomic()
|
|
|
|
|
? pirow->get_blob(cdef.name_as_text())
|
|
|
|
|
? pirow->get_blob_fragmented(cdef.name_as_text())
|
|
|
|
|
: visit(*cdef.type, make_visitor(
|
|
|
|
|
// flatten set
|
|
|
|
|
[&] (const set_type_impl& type) {
|
|
|
|
|
auto v = pirow->get_view(cdef.name_as_text());
|
|
|
|
|
auto f = cql_serialization_format::internal();
|
|
|
|
|
auto n = read_collection_size(v, f);
|
|
|
|
|
std::vector<bytes> tmp;
|
|
|
|
|
std::vector<managed_bytes> tmp;
|
|
|
|
|
tmp.reserve(n);
|
|
|
|
|
while (n--) {
|
|
|
|
|
tmp.emplace_back(read_collection_value(v, f).linearize()); // key
|
|
|
|
|
tmp.emplace_back(read_collection_value(v, f)); // key
|
|
|
|
|
read_collection_value(v, f); // value. ignore.
|
|
|
|
|
}
|
|
|
|
|
return set_type_impl::serialize_partially_deserialized_form({tmp.begin(), tmp.end()}, f);
|
|
|
|
|
return set_type_impl::serialize_partially_deserialized_form_fragmented({tmp.begin(), tmp.end()}, f);
|
|
|
|
|
},
|
|
|
|
|
[&] (const abstract_type& o) -> bytes {
|
|
|
|
|
return pirow->get_blob(cdef.name_as_text());
|
|
|
|
|
[&] (const abstract_type& o) -> managed_bytes {
|
|
|
|
|
return pirow->get_blob_fragmented(cdef.name_as_text());
|
|
|
|
|
}
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
@@ -856,7 +849,7 @@ class log_mutation_builder {
|
|
|
|
|
const column_definition& _ttl_col;
|
|
|
|
|
|
|
|
|
|
// The base mutation's partition key
|
|
|
|
|
std::vector<bytes> _base_pk;
|
|
|
|
|
std::vector<managed_bytes> _base_pk;
|
|
|
|
|
|
|
|
|
|
// The cdc$time value of created rows
|
|
|
|
|
const bytes _tuuid;
|
|
|
|
|
@@ -877,7 +870,7 @@ public:
|
|
|
|
|
: _base_schema(base_schema), _log_schema(*log_mut.schema()),
|
|
|
|
|
_op_col(*_log_schema.get_column_definition(log_meta_column_name_bytes("operation"))),
|
|
|
|
|
_ttl_col(*_log_schema.get_column_definition(log_meta_column_name_bytes("ttl"))),
|
|
|
|
|
_base_pk(base_pk.explode(_base_schema)),
|
|
|
|
|
_base_pk(base_pk.explode_fragmented()),
|
|
|
|
|
_tuuid(timeuuid_type->decompose(generate_timeuuid(ts))),
|
|
|
|
|
_ts(ts),
|
|
|
|
|
_ttl(_base_schema.cdc_options().ttl()
|
|
|
|
|
@@ -920,7 +913,7 @@ public:
|
|
|
|
|
// This takes a base schema clustering key prefix and sets these columns
|
|
|
|
|
// according to the prefix' values for the given log row.
|
|
|
|
|
void set_clustering_columns(const clustering_key& log_ck, const clustering_key_prefix& base_ckey) {
|
|
|
|
|
set_key_columns(log_ck, _base_schema.clustering_key_columns(), base_ckey.explode(_log_schema));
|
|
|
|
|
set_key_columns(log_ck, _base_schema.clustering_key_columns(), base_ckey.explode_fragmented());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Sets the `cdc$operation` column for the given row.
|
|
|
|
|
@@ -942,9 +935,9 @@ public:
|
|
|
|
|
// Each regular and static column in the base schema has a corresponding column in the log schema with the same name.
|
|
|
|
|
// Given a reference to such a column from the base schema, this function sets the corresponding column
|
|
|
|
|
// in the log to the given value for the given row.
|
|
|
|
|
void set_value(const clustering_key& log_ck, const column_definition& base_cdef, bytes value) {
|
|
|
|
|
void set_value(const clustering_key& log_ck, const column_definition& base_cdef, const managed_bytes_view& value) {
|
|
|
|
|
auto& log_cdef = *_log_schema.get_column_definition(log_data_column_name_bytes(base_cdef.name()));
|
|
|
|
|
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*base_cdef.type, _ts, std::move(value), _ttl));
|
|
|
|
|
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*base_cdef.type, _ts, value, _ttl));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Each regular and static column in the base schema has a corresponding column in the log schema
|
|
|
|
|
@@ -960,7 +953,7 @@ public:
|
|
|
|
|
// by prefixing the original name with ``cdc$deleted_elements_''.
|
|
|
|
|
// Given a reference to such a column from the base schema, this function sets the corresponding column
|
|
|
|
|
// in the log to the given set of keys for the given row.
|
|
|
|
|
void set_deleted_elements(const clustering_key& log_ck, const column_definition& base_cdef, bytes deleted_elements) {
|
|
|
|
|
void set_deleted_elements(const clustering_key& log_ck, const column_definition& base_cdef, const managed_bytes& deleted_elements) {
|
|
|
|
|
auto& log_cdef = *_log_schema.get_column_definition(log_data_column_deleted_elements_name_bytes(base_cdef.name()));
|
|
|
|
|
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
|
|
|
|
|
}
|
|
|
|
|
@@ -971,27 +964,21 @@ public:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
private:
|
|
|
|
|
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<bytes>& key) {
|
|
|
|
|
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<managed_bytes>& key) {
|
|
|
|
|
size_t pos = 0;
|
|
|
|
|
for (auto& column : columns) {
|
|
|
|
|
if (pos >= key.size()) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
auto& cdef = *_log_schema.get_column_definition(log_data_column_name_bytes(column.name()));
|
|
|
|
|
_log_mut.set_cell(log_ck, cdef, atomic_cell::make_live(*column.type, _ts, bytes_view(key[pos]), _ttl));
|
|
|
|
|
_log_mut.set_cell(log_ck, cdef, atomic_cell::make_live(*column.type, _ts, managed_bytes_view(key[pos]), _ttl));
|
|
|
|
|
++pos;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static bytes get_bytes(const atomic_cell_view& acv) {
|
|
|
|
|
return to_bytes(acv.value());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bytes_view get_bytes_view(const atomic_cell_view& acv, std::forward_list<bytes>& buf) {
|
|
|
|
|
return acv.value().is_fragmented()
|
|
|
|
|
? bytes_view{buf.emplace_front(to_bytes(acv.value()))}
|
|
|
|
|
: acv.value().current_fragment();
|
|
|
|
|
static managed_bytes get_managed_bytes(const atomic_cell_view& acv) {
|
|
|
|
|
return managed_bytes(acv.value());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static ttl_opt get_ttl(const atomic_cell_view& acv) {
|
|
|
|
|
@@ -1052,7 +1039,7 @@ struct process_row_visitor {
|
|
|
|
|
_generate_delta_values(generate_delta_values)
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
void update_row_state(const column_definition& cdef, bytes_opt value) {
|
|
|
|
|
void update_row_state(const column_definition& cdef, managed_bytes_opt value) {
|
|
|
|
|
if (!_row_state) {
|
|
|
|
|
// static row always has a valid state, so this must be a clustering row missing
|
|
|
|
|
assert(_base_ck);
|
|
|
|
|
@@ -1064,7 +1051,7 @@ struct process_row_visitor {
|
|
|
|
|
|
|
|
|
|
void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) {
|
|
|
|
|
_ttl_column = get_ttl(cell);
|
|
|
|
|
bytes value = get_bytes(cell);
|
|
|
|
|
managed_bytes value = get_managed_bytes(cell);
|
|
|
|
|
|
|
|
|
|
// delta
|
|
|
|
|
if (_generate_delta_values) {
|
|
|
|
|
@@ -1094,7 +1081,7 @@ struct process_row_visitor {
|
|
|
|
|
// See `set_visitor`, `udt_visitior`, and `map_or_list_visitor` below.
|
|
|
|
|
struct collection_visitor {
|
|
|
|
|
bool _is_column_delete = false;
|
|
|
|
|
std::vector<bytes_view> _deleted_keys;
|
|
|
|
|
std::vector<managed_bytes_view> _deleted_keys;
|
|
|
|
|
|
|
|
|
|
ttl_opt& _ttl_column;
|
|
|
|
|
|
|
|
|
|
@@ -1113,26 +1100,26 @@ struct process_row_visitor {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// cdc$deleted_col, cdc$deleted_elements_col, col
|
|
|
|
|
using result_t = std::tuple<bool, std::vector<bytes_view>, bytes_opt>;
|
|
|
|
|
using result_t = std::tuple<bool, std::vector<managed_bytes_view>, managed_bytes_opt>;
|
|
|
|
|
auto result = visit(*cdef.type, make_visitor(
|
|
|
|
|
[&] (const set_type_impl&) -> result_t {
|
|
|
|
|
_touched_parts.set<stats::part_type::SET>();
|
|
|
|
|
|
|
|
|
|
struct set_visitor : public collection_visitor {
|
|
|
|
|
std::vector<bytes_view> _added_keys;
|
|
|
|
|
std::vector<managed_bytes_view> _added_keys;
|
|
|
|
|
|
|
|
|
|
set_visitor(ttl_opt& ttl_column) : collection_visitor(ttl_column) {}
|
|
|
|
|
|
|
|
|
|
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
|
|
|
|
|
this->_ttl_column = get_ttl(cell);
|
|
|
|
|
_added_keys.push_back(key);
|
|
|
|
|
_added_keys.emplace_back(key);
|
|
|
|
|
}
|
|
|
|
|
} v(_ttl_column);
|
|
|
|
|
|
|
|
|
|
visit_collection(v);
|
|
|
|
|
|
|
|
|
|
bytes_opt added_keys = v._added_keys.empty() ? std::nullopt :
|
|
|
|
|
std::optional{set_type_impl::serialize_partially_deserialized_form(v._added_keys, cql_serialization_format::internal())};
|
|
|
|
|
managed_bytes_opt added_keys = v._added_keys.empty() ? std::nullopt :
|
|
|
|
|
std::optional{set_type_impl::serialize_partially_deserialized_form_fragmented(v._added_keys, cql_serialization_format::internal())};
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
v._is_column_delete,
|
|
|
|
|
@@ -1144,25 +1131,23 @@ struct process_row_visitor {
|
|
|
|
|
_touched_parts.set<stats::part_type::UDT>();
|
|
|
|
|
|
|
|
|
|
struct udt_visitor : public collection_visitor {
|
|
|
|
|
std::vector<bytes_view_opt> _added_cells;
|
|
|
|
|
std::forward_list<bytes>& _buf;
|
|
|
|
|
std::vector<managed_bytes_view_opt> _added_cells;
|
|
|
|
|
|
|
|
|
|
udt_visitor(ttl_opt& ttl_column, size_t num_keys, std::forward_list<bytes>& buf)
|
|
|
|
|
: collection_visitor(ttl_column), _added_cells(num_keys), _buf(buf) {}
|
|
|
|
|
udt_visitor(ttl_opt& ttl_column, size_t num_keys)
|
|
|
|
|
: collection_visitor(ttl_column), _added_cells(num_keys) {}
|
|
|
|
|
|
|
|
|
|
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
|
|
|
|
|
this->_ttl_column = get_ttl(cell);
|
|
|
|
|
_added_cells[deserialize_field_index(key)].emplace(get_bytes_view(cell, _buf));
|
|
|
|
|
_added_cells[deserialize_field_index(key)].emplace(cell.value());
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::forward_list<bytes> buf;
|
|
|
|
|
udt_visitor v(_ttl_column, type.size(), buf);
|
|
|
|
|
udt_visitor v(_ttl_column, type.size());
|
|
|
|
|
|
|
|
|
|
visit_collection(v);
|
|
|
|
|
|
|
|
|
|
bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
|
|
|
|
|
std::optional{type.build_value(v._added_cells)};
|
|
|
|
|
managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
|
|
|
|
|
std::optional{type.build_value_fragmented(v._added_cells)};
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
v._is_column_delete,
|
|
|
|
|
@@ -1174,25 +1159,23 @@ struct process_row_visitor {
|
|
|
|
|
_touched_parts.set(type.is_list() ? stats::part_type::LIST : stats::part_type::MAP);
|
|
|
|
|
|
|
|
|
|
struct map_or_list_visitor : public collection_visitor {
|
|
|
|
|
std::vector<std::pair<bytes_view, bytes_view>> _added_cells;
|
|
|
|
|
std::forward_list<bytes>& _buf;
|
|
|
|
|
std::vector<std::pair<managed_bytes_view, managed_bytes_view>> _added_cells;
|
|
|
|
|
|
|
|
|
|
map_or_list_visitor(ttl_opt& ttl_column, std::forward_list<bytes>& buf)
|
|
|
|
|
: collection_visitor(ttl_column), _buf(buf) {}
|
|
|
|
|
map_or_list_visitor(ttl_opt& ttl_column)
|
|
|
|
|
: collection_visitor(ttl_column) {}
|
|
|
|
|
|
|
|
|
|
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
|
|
|
|
|
this->_ttl_column = get_ttl(cell);
|
|
|
|
|
_added_cells.emplace_back(key, get_bytes_view(cell, _buf));
|
|
|
|
|
_added_cells.emplace_back(key, cell.value());
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::forward_list<bytes> buf;
|
|
|
|
|
map_or_list_visitor v(_ttl_column, buf);
|
|
|
|
|
map_or_list_visitor v(_ttl_column);
|
|
|
|
|
|
|
|
|
|
visit_collection(v);
|
|
|
|
|
|
|
|
|
|
bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
|
|
|
|
|
std::optional{map_type_impl::serialize_partially_deserialized_form(v._added_cells, cql_serialization_format::internal())};
|
|
|
|
|
managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt :
|
|
|
|
|
std::optional{map_type_impl::serialize_partially_deserialized_form_fragmented(v._added_cells, cql_serialization_format::internal())};
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
v._is_column_delete,
|
|
|
|
|
@@ -1210,9 +1193,9 @@ struct process_row_visitor {
|
|
|
|
|
|
|
|
|
|
// FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob,
|
|
|
|
|
// then we deserialize again when merging images below
|
|
|
|
|
bytes_opt deleted_elements = std::nullopt;
|
|
|
|
|
managed_bytes_opt deleted_elements = std::nullopt;
|
|
|
|
|
if (!deleted_keys.empty()) {
|
|
|
|
|
deleted_elements = set_type_impl::serialize_partially_deserialized_form(deleted_keys, cql_serialization_format::internal());
|
|
|
|
|
deleted_elements = set_type_impl::serialize_partially_deserialized_form_fragmented(deleted_keys, cql_serialization_format::internal());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// delta
|
|
|
|
|
@@ -1233,11 +1216,11 @@ struct process_row_visitor {
|
|
|
|
|
// images
|
|
|
|
|
if (_enable_updating_state) {
|
|
|
|
|
// A column delete overwrites any data we gathered until now.
|
|
|
|
|
bytes_opt prev = is_column_delete ? std::nullopt : get_col_from_row_state(_row_state, cdef);
|
|
|
|
|
managed_bytes_opt prev = is_column_delete ? std::nullopt : get_col_from_row_state(_row_state, cdef);
|
|
|
|
|
|
|
|
|
|
bytes_opt next;
|
|
|
|
|
managed_bytes_opt next;
|
|
|
|
|
if (added_cells || (deleted_elements && prev)) {
|
|
|
|
|
next = visit(*cdef.type, [&] (const auto& type) -> bytes {
|
|
|
|
|
next = visit(*cdef.type, [&] (const auto& type) -> managed_bytes {
|
|
|
|
|
return merge(type, prev, added_cells, deleted_elements);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
@@ -1519,7 +1502,7 @@ public:
|
|
|
|
|
|
|
|
|
|
auto process_cell = [&, this] (const column_definition& cdef) {
|
|
|
|
|
if (auto current = get_col_from_row_state(row_state, cdef)) {
|
|
|
|
|
_builder->set_value(image_ck, cdef, std::move(*current));
|
|
|
|
|
_builder->set_value(image_ck, cdef, *current);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@@ -1675,7 +1658,7 @@ public:
|
|
|
|
|
const auto& row = preimage_set->front();
|
|
|
|
|
for (auto& c : _schema->static_columns()) {
|
|
|
|
|
if (auto maybe_cell_view = get_preimage_col_value(c, &row)) {
|
|
|
|
|
_static_row_state[&c] = *maybe_cell_view;
|
|
|
|
|
_static_row_state[&c] = std::move(*maybe_cell_view);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1687,7 +1670,7 @@ public:
|
|
|
|
|
// clustering rows
|
|
|
|
|
for (const auto& row : *preimage_set) {
|
|
|
|
|
// Construct the clustering key for this row
|
|
|
|
|
std::vector<bytes> ck_parts;
|
|
|
|
|
std::vector<managed_bytes> ck_parts;
|
|
|
|
|
ck_parts.reserve(_schema->clustering_key_size());
|
|
|
|
|
for (auto& c : _schema->clustering_key_columns()) {
|
|
|
|
|
auto v = row.get_view_opt(c.name_as_text());
|
|
|
|
|
@@ -1700,7 +1683,7 @@ public:
|
|
|
|
|
// as there will be no clustering row data to load into the state.
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
ck_parts.emplace_back(v->linearize());
|
|
|
|
|
ck_parts.emplace_back(managed_bytes(*v));
|
|
|
|
|
}
|
|
|
|
|
auto ck = clustering_key::from_exploded(std::move(ck_parts));
|
|
|
|
|
|
|
|
|
|
@@ -1708,7 +1691,7 @@ public:
|
|
|
|
|
cell_map cells;
|
|
|
|
|
for (auto& c : _schema->regular_columns()) {
|
|
|
|
|
if (auto maybe_cell_view = get_preimage_col_value(c, &row)) {
|
|
|
|
|
cells[&c] = *maybe_cell_view;
|
|
|
|
|
cells[&c] = std::move(*maybe_cell_view);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|