diff --git a/atomic_cell.hh b/atomic_cell.hh index 870e9a6e5d..3c19ae8a6e 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -380,6 +380,13 @@ public: return make_live(type, timestamp, value, gc_clock::now() + *ttl, *ttl, cm); } } + static atomic_cell make_live(const abstract_type& type, api::timestamp_type timestamp, const managed_bytes_view& value, ttl_opt ttl, collection_member cm = collection_member::no) { + if (!ttl) { + return make_live(type, timestamp, value, cm); + } else { + return make_live(type, timestamp, value, gc_clock::now() + *ttl, *ttl, cm); + } + } static atomic_cell make_live_uninitialized(const abstract_type& type, api::timestamp_type timestamp, size_t size); friend class atomic_cell_or_collection; friend std::ostream& operator<<(std::ostream& os, const atomic_cell& ac); diff --git a/cdc/log.cc b/cdc/log.cc index 1c9fca9f7f..7b21a57f77 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -601,12 +601,12 @@ public: using pointer = const T*; using reference = const T&; private: - bytes_view _v, _next; + managed_bytes_view _v, _next; size_t _rem = 0; T _current; public: - collection_iterator(bytes_view_opt v = {}) - : _v(v.value_or(bytes_view{})) + collection_iterator(managed_bytes_view_opt v = {}) + : _v(v.value_or(managed_bytes_view{})) , _rem(_v.empty() ? 0 : read_collection_size(_v, cql_serialization_format::internal())) { if (_rem != 0) { @@ -649,7 +649,7 @@ private: }; template<> -void collection_iterator>::parse() { +void collection_iterator>::parse() { assert(_rem > 0); _next = _v; auto k = read_collection_value(_next, cql_serialization_format::internal()); @@ -657,14 +657,6 @@ void collection_iterator>::parse() { _current = std::make_pair(k, v); } -template<> -void collection_iterator::parse() { - assert(_rem > 0); - _next = _v; - auto k = read_collection_value(_next, cql_serialization_format::internal()); - _current = k; -} - template<> void collection_iterator::parse() { assert(_rem > 0); @@ -720,12 +712,13 @@ private: }; template<> -bool maybe_back_insert_iterator>, bytes_view>::compare(const bytes_view& t, const value_type& v) { +bool maybe_back_insert_iterator>, managed_bytes_view>::compare( + const managed_bytes_view& t, const value_type& v) { return _type.compare(t, v.first); } template<> -bool maybe_back_insert_iterator, bytes_view>::compare(const bytes_view& t, const value_type& v) { +bool maybe_back_insert_iterator, managed_bytes_view>::compare(const managed_bytes_view& t, const value_type& v) { return _type.compare(t, v); } @@ -734,45 +727,45 @@ auto make_maybe_back_inserter(Container& c, const abstract_type& type, collectio return maybe_back_insert_iterator(c, type, s); } -static size_t collection_size(const bytes_opt& bo) { +static size_t collection_size(const managed_bytes_opt& bo) { if (bo) { - bytes_view bv(*bo); - return read_collection_size(bv, cql_serialization_format::internal()); + managed_bytes_view mbv(*bo); + return read_collection_size(mbv, cql_serialization_format::internal()); } return 0; } template -static void udt_for_each(const bytes_opt& bo, Func&& f) { +static void udt_for_each(const managed_bytes_opt& bo, Func&& f) { if (bo) { - bytes_view bv(*bo); - std::for_each(tuple_deserializing_iterator::start(bv), tuple_deserializing_iterator::finish(bv), std::forward(f)); + managed_bytes_view mbv(*bo); + std::for_each(tuple_deserializing_iterator::start(mbv), tuple_deserializing_iterator::finish(mbv), std::forward(f)); } } -static bytes merge(const collection_type_impl& ctype, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) { - std::vector> res; +static managed_bytes merge(const collection_type_impl& ctype, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) { + std::vector> res; res.reserve(collection_size(prev) + collection_size(next)); auto type = ctype.name_comparator(); - auto cmp = [&type = *type](const std::pair& p1, const std::pair& p2) { + auto cmp = [&type = *type](const std::pair& p1, const std::pair& p2) { return type.compare(p1.first, p2.first) < 0; }; - collection_iterator> e, i(prev), j(next); + collection_iterator> e, i(prev), j(next); // note order: set_union, when finding doubles, use value from first1 (j here). So // since this is next, it has prio - std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, collection_iterator(deleted)), cmp); - return map_type_impl::serialize_partially_deserialized_form(res, cql_serialization_format::internal()); + std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, collection_iterator(deleted)), cmp); + return map_type_impl::serialize_partially_deserialized_form_fragmented(res, cql_serialization_format::internal()); } -static bytes merge(const set_type_impl& ctype, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) { - std::vector res; +static managed_bytes merge(const set_type_impl& ctype, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) { + std::vector res; res.reserve(collection_size(prev) + collection_size(next)); auto type = ctype.name_comparator(); - auto cmp = [&type = *type](bytes_view k1, bytes_view k2) { + auto cmp = [&type = *type](managed_bytes_view k1, managed_bytes_view k2) { return type.compare(k1, k2) < 0; }; - collection_iterator e, i(prev), j(next), d(deleted); + collection_iterator e, i(prev), j(next), d(deleted); std::set_union(j, e, i, e, make_maybe_back_inserter(res, *type, d), cmp); - return set_type_impl::serialize_partially_deserialized_form(res, cql_serialization_format::internal()); + return set_type_impl::serialize_partially_deserialized_form_fragmented(res, cql_serialization_format::internal()); } -static bytes merge(const user_type_impl& type, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) { +static managed_bytes merge(const user_type_impl& type, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) { std::vector res(type.size()); udt_for_each(prev, [&res, i = res.begin()](managed_bytes_view_opt k) mutable { *i++ = k; @@ -788,16 +781,16 @@ static bytes merge(const user_type_impl& type, const bytes_opt& prev, const byte auto index = deserialize_field_index(k); res[index] = std::nullopt; }); - return type.build_value(res); + return type.build_value_fragmented(res); } -static bytes merge(const abstract_type& type, const bytes_opt& prev, const bytes_opt& next, const bytes_opt& deleted) { +static managed_bytes merge(const abstract_type& type, const managed_bytes_opt& prev, const managed_bytes_opt& next, const managed_bytes_opt& deleted) { throw std::runtime_error(format("cdc merge: unknown type {}", type.name())); } -using cell_map = std::unordered_map; +using cell_map = std::unordered_map; using row_states_map = std::unordered_map; -static bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) { +static managed_bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) { if (state) { if (auto it = state->find(&cdef); it != state->end()) { return it->second; @@ -811,28 +804,28 @@ static cell_map* get_row_state(row_states_map& row_states, const clustering_key& return it == row_states.end() ? nullptr : &it->second; } -static bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) { +static managed_bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) { if (!pirow || !pirow->has(cdef.name_as_text())) { return std::nullopt; } return cdef.is_atomic() - ? pirow->get_blob(cdef.name_as_text()) + ? pirow->get_blob_fragmented(cdef.name_as_text()) : visit(*cdef.type, make_visitor( // flatten set [&] (const set_type_impl& type) { auto v = pirow->get_view(cdef.name_as_text()); auto f = cql_serialization_format::internal(); auto n = read_collection_size(v, f); - std::vector tmp; + std::vector tmp; tmp.reserve(n); while (n--) { - tmp.emplace_back(read_collection_value(v, f).linearize()); // key + tmp.emplace_back(read_collection_value(v, f)); // key read_collection_value(v, f); // value. ignore. } - return set_type_impl::serialize_partially_deserialized_form({tmp.begin(), tmp.end()}, f); + return set_type_impl::serialize_partially_deserialized_form_fragmented({tmp.begin(), tmp.end()}, f); }, - [&] (const abstract_type& o) -> bytes { - return pirow->get_blob(cdef.name_as_text()); + [&] (const abstract_type& o) -> managed_bytes { + return pirow->get_blob_fragmented(cdef.name_as_text()); } )); } @@ -856,7 +849,7 @@ class log_mutation_builder { const column_definition& _ttl_col; // The base mutation's partition key - std::vector _base_pk; + std::vector _base_pk; // The cdc$time value of created rows const bytes _tuuid; @@ -877,7 +870,7 @@ public: : _base_schema(base_schema), _log_schema(*log_mut.schema()), _op_col(*_log_schema.get_column_definition(log_meta_column_name_bytes("operation"))), _ttl_col(*_log_schema.get_column_definition(log_meta_column_name_bytes("ttl"))), - _base_pk(base_pk.explode(_base_schema)), + _base_pk(base_pk.explode_fragmented()), _tuuid(timeuuid_type->decompose(generate_timeuuid(ts))), _ts(ts), _ttl(_base_schema.cdc_options().ttl() @@ -920,7 +913,7 @@ public: // This takes a base schema clustering key prefix and sets these columns // according to the prefix' values for the given log row. void set_clustering_columns(const clustering_key& log_ck, const clustering_key_prefix& base_ckey) { - set_key_columns(log_ck, _base_schema.clustering_key_columns(), base_ckey.explode(_log_schema)); + set_key_columns(log_ck, _base_schema.clustering_key_columns(), base_ckey.explode_fragmented()); } // Sets the `cdc$operation` column for the given row. @@ -942,9 +935,9 @@ public: // Each regular and static column in the base schema has a corresponding column in the log schema with the same name. // Given a reference to such a column from the base schema, this function sets the corresponding column // in the log to the given value for the given row. - void set_value(const clustering_key& log_ck, const column_definition& base_cdef, bytes value) { + void set_value(const clustering_key& log_ck, const column_definition& base_cdef, const managed_bytes_view& value) { auto& log_cdef = *_log_schema.get_column_definition(log_data_column_name_bytes(base_cdef.name())); - _log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*base_cdef.type, _ts, std::move(value), _ttl)); + _log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*base_cdef.type, _ts, value, _ttl)); } // Each regular and static column in the base schema has a corresponding column in the log schema @@ -960,7 +953,7 @@ public: // by prefixing the original name with ``cdc$deleted_elements_''. // Given a reference to such a column from the base schema, this function sets the corresponding column // in the log to the given set of keys for the given row. - void set_deleted_elements(const clustering_key& log_ck, const column_definition& base_cdef, bytes deleted_elements) { + void set_deleted_elements(const clustering_key& log_ck, const column_definition& base_cdef, const managed_bytes& deleted_elements) { auto& log_cdef = *_log_schema.get_column_definition(log_data_column_deleted_elements_name_bytes(base_cdef.name())); _log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl)); } @@ -971,27 +964,21 @@ public: } } private: - void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector& key) { + void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector& key) { size_t pos = 0; for (auto& column : columns) { if (pos >= key.size()) { break; } auto& cdef = *_log_schema.get_column_definition(log_data_column_name_bytes(column.name())); - _log_mut.set_cell(log_ck, cdef, atomic_cell::make_live(*column.type, _ts, bytes_view(key[pos]), _ttl)); + _log_mut.set_cell(log_ck, cdef, atomic_cell::make_live(*column.type, _ts, managed_bytes_view(key[pos]), _ttl)); ++pos; } } }; -static bytes get_bytes(const atomic_cell_view& acv) { - return to_bytes(acv.value()); -} - -static bytes_view get_bytes_view(const atomic_cell_view& acv, std::forward_list& buf) { - return acv.value().is_fragmented() - ? bytes_view{buf.emplace_front(to_bytes(acv.value()))} - : acv.value().current_fragment(); +static managed_bytes get_managed_bytes(const atomic_cell_view& acv) { + return managed_bytes(acv.value()); } static ttl_opt get_ttl(const atomic_cell_view& acv) { @@ -1052,7 +1039,7 @@ struct process_row_visitor { _generate_delta_values(generate_delta_values) {} - void update_row_state(const column_definition& cdef, bytes_opt value) { + void update_row_state(const column_definition& cdef, managed_bytes_opt value) { if (!_row_state) { // static row always has a valid state, so this must be a clustering row missing assert(_base_ck); @@ -1064,7 +1051,7 @@ struct process_row_visitor { void live_atomic_cell(const column_definition& cdef, const atomic_cell_view& cell) { _ttl_column = get_ttl(cell); - bytes value = get_bytes(cell); + managed_bytes value = get_managed_bytes(cell); // delta if (_generate_delta_values) { @@ -1094,7 +1081,7 @@ struct process_row_visitor { // See `set_visitor`, `udt_visitior`, and `map_or_list_visitor` below. struct collection_visitor { bool _is_column_delete = false; - std::vector _deleted_keys; + std::vector _deleted_keys; ttl_opt& _ttl_column; @@ -1113,26 +1100,26 @@ struct process_row_visitor { // cdc$deleted_col, cdc$deleted_elements_col, col - using result_t = std::tuple, bytes_opt>; + using result_t = std::tuple, managed_bytes_opt>; auto result = visit(*cdef.type, make_visitor( [&] (const set_type_impl&) -> result_t { _touched_parts.set(); struct set_visitor : public collection_visitor { - std::vector _added_keys; + std::vector _added_keys; set_visitor(ttl_opt& ttl_column) : collection_visitor(ttl_column) {} void live_collection_cell(bytes_view key, const atomic_cell_view& cell) { this->_ttl_column = get_ttl(cell); - _added_keys.push_back(key); + _added_keys.emplace_back(key); } } v(_ttl_column); visit_collection(v); - bytes_opt added_keys = v._added_keys.empty() ? std::nullopt : - std::optional{set_type_impl::serialize_partially_deserialized_form(v._added_keys, cql_serialization_format::internal())}; + managed_bytes_opt added_keys = v._added_keys.empty() ? std::nullopt : + std::optional{set_type_impl::serialize_partially_deserialized_form_fragmented(v._added_keys, cql_serialization_format::internal())}; return { v._is_column_delete, @@ -1144,25 +1131,23 @@ struct process_row_visitor { _touched_parts.set(); struct udt_visitor : public collection_visitor { - std::vector _added_cells; - std::forward_list& _buf; + std::vector _added_cells; - udt_visitor(ttl_opt& ttl_column, size_t num_keys, std::forward_list& buf) - : collection_visitor(ttl_column), _added_cells(num_keys), _buf(buf) {} + udt_visitor(ttl_opt& ttl_column, size_t num_keys) + : collection_visitor(ttl_column), _added_cells(num_keys) {} void live_collection_cell(bytes_view key, const atomic_cell_view& cell) { this->_ttl_column = get_ttl(cell); - _added_cells[deserialize_field_index(key)].emplace(get_bytes_view(cell, _buf)); + _added_cells[deserialize_field_index(key)].emplace(cell.value()); } }; - std::forward_list buf; - udt_visitor v(_ttl_column, type.size(), buf); + udt_visitor v(_ttl_column, type.size()); visit_collection(v); - bytes_opt added_cells = v._added_cells.empty() ? std::nullopt : - std::optional{type.build_value(v._added_cells)}; + managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt : + std::optional{type.build_value_fragmented(v._added_cells)}; return { v._is_column_delete, @@ -1174,25 +1159,23 @@ struct process_row_visitor { _touched_parts.set(type.is_list() ? stats::part_type::LIST : stats::part_type::MAP); struct map_or_list_visitor : public collection_visitor { - std::vector> _added_cells; - std::forward_list& _buf; + std::vector> _added_cells; - map_or_list_visitor(ttl_opt& ttl_column, std::forward_list& buf) - : collection_visitor(ttl_column), _buf(buf) {} + map_or_list_visitor(ttl_opt& ttl_column) + : collection_visitor(ttl_column) {} void live_collection_cell(bytes_view key, const atomic_cell_view& cell) { this->_ttl_column = get_ttl(cell); - _added_cells.emplace_back(key, get_bytes_view(cell, _buf)); + _added_cells.emplace_back(key, cell.value()); } }; - std::forward_list buf; - map_or_list_visitor v(_ttl_column, buf); + map_or_list_visitor v(_ttl_column); visit_collection(v); - bytes_opt added_cells = v._added_cells.empty() ? std::nullopt : - std::optional{map_type_impl::serialize_partially_deserialized_form(v._added_cells, cql_serialization_format::internal())}; + managed_bytes_opt added_cells = v._added_cells.empty() ? std::nullopt : + std::optional{map_type_impl::serialize_partially_deserialized_form_fragmented(v._added_cells, cql_serialization_format::internal())}; return { v._is_column_delete, @@ -1210,9 +1193,9 @@ struct process_row_visitor { // FIXME: we're doing redundant work: first we serialize the set of deleted keys into a blob, // then we deserialize again when merging images below - bytes_opt deleted_elements = std::nullopt; + managed_bytes_opt deleted_elements = std::nullopt; if (!deleted_keys.empty()) { - deleted_elements = set_type_impl::serialize_partially_deserialized_form(deleted_keys, cql_serialization_format::internal()); + deleted_elements = set_type_impl::serialize_partially_deserialized_form_fragmented(deleted_keys, cql_serialization_format::internal()); } // delta @@ -1233,11 +1216,11 @@ struct process_row_visitor { // images if (_enable_updating_state) { // A column delete overwrites any data we gathered until now. - bytes_opt prev = is_column_delete ? std::nullopt : get_col_from_row_state(_row_state, cdef); + managed_bytes_opt prev = is_column_delete ? std::nullopt : get_col_from_row_state(_row_state, cdef); - bytes_opt next; + managed_bytes_opt next; if (added_cells || (deleted_elements && prev)) { - next = visit(*cdef.type, [&] (const auto& type) -> bytes { + next = visit(*cdef.type, [&] (const auto& type) -> managed_bytes { return merge(type, prev, added_cells, deleted_elements); }); } @@ -1519,7 +1502,7 @@ public: auto process_cell = [&, this] (const column_definition& cdef) { if (auto current = get_col_from_row_state(row_state, cdef)) { - _builder->set_value(image_ck, cdef, std::move(*current)); + _builder->set_value(image_ck, cdef, *current); } }; @@ -1675,7 +1658,7 @@ public: const auto& row = preimage_set->front(); for (auto& c : _schema->static_columns()) { if (auto maybe_cell_view = get_preimage_col_value(c, &row)) { - _static_row_state[&c] = *maybe_cell_view; + _static_row_state[&c] = std::move(*maybe_cell_view); } } } @@ -1687,7 +1670,7 @@ public: // clustering rows for (const auto& row : *preimage_set) { // Construct the clustering key for this row - std::vector ck_parts; + std::vector ck_parts; ck_parts.reserve(_schema->clustering_key_size()); for (auto& c : _schema->clustering_key_columns()) { auto v = row.get_view_opt(c.name_as_text()); @@ -1700,7 +1683,7 @@ public: // as there will be no clustering row data to load into the state. return; } - ck_parts.emplace_back(v->linearize()); + ck_parts.emplace_back(managed_bytes(*v)); } auto ck = clustering_key::from_exploded(std::move(ck_parts)); @@ -1708,7 +1691,7 @@ public: cell_map cells; for (auto& c : _schema->regular_columns()) { if (auto maybe_cell_view = get_preimage_col_value(c, &row)) { - cells[&c] = *maybe_cell_view; + cells[&c] = std::move(*maybe_cell_view); } } diff --git a/cql3/untyped_result_set.hh b/cql3/untyped_result_set.hh index fc647bec16..f3605aad4f 100644 --- a/cql3/untyped_result_set.hh +++ b/cql3/untyped_result_set.hh @@ -96,6 +96,9 @@ public: bytes get_blob(std::string_view name) const { return get_view(name).linearize(); } + managed_bytes get_blob_fragmented(std::string_view name) const { + return managed_bytes(get_view(name)); + } template T get_as(std::string_view name) const { return value_cast(data_type_for()->deserialize(get_view(name))); diff --git a/keys.hh b/keys.hh index e451663073..5b3e03a41f 100644 --- a/keys.hh +++ b/keys.hh @@ -252,6 +252,14 @@ public: return result; } + std::vector explode_fragmented() const { + std::vector result; + for (managed_bytes_view c : components()) { + result.emplace_back(managed_bytes(c)); + } + return result; + } + struct tri_compare { typename TopLevel::compound _t; tri_compare(const schema& s) : _t(get_compound_type(s)) {} diff --git a/types.cc b/types.cc index 31aea34082..85ee94a4f7 100644 --- a/types.cc +++ b/types.cc @@ -1191,8 +1191,24 @@ map_type_impl::serialize_partially_deserialized_form( write_collection_value(out, sf, e.second); } return b; +} +managed_bytes +map_type_impl::serialize_partially_deserialized_form_fragmented( + const std::vector>& v, cql_serialization_format sf) { + size_t len = collection_value_len(sf) * v.size() * 2 + collection_size_len(sf); + for (auto&& e : v) { + len += e.first.size() + e.second.size(); + } + managed_bytes b(managed_bytes::initialized_later(), len); + managed_bytes_mutable_view out = b; + write_collection_size(out, v.size(), sf); + for (auto&& e : v) { + write_collection_value(out, sf, e.first); + write_collection_value(out, sf, e.second); + } + return b; } static std::optional update_user_type_aux( @@ -1322,6 +1338,12 @@ set_type_impl::serialize_partially_deserialized_form( return pack(v.begin(), v.end(), v.size(), sf); } +managed_bytes +set_type_impl::serialize_partially_deserialized_form_fragmented( + const std::vector& v, cql_serialization_format sf) { + return pack_fragmented(v.begin(), v.end(), v.size(), sf); +} + template std::vector partially_deserialize_listlike(View in, cql_serialization_format sf) { auto nr = read_collection_size(in, sf); diff --git a/types/map.hh b/types/map.hh index 63eed6bd07..6e8a0784f6 100644 --- a/types/map.hh +++ b/types/map.hh @@ -59,6 +59,8 @@ public: template data_value deserialize(View v, cql_serialization_format sf) const; static bytes serialize_partially_deserialized_form(const std::vector>& v, cql_serialization_format sf); + static managed_bytes serialize_partially_deserialized_form_fragmented(const std::vector>& v, + cql_serialization_format sf); }; data_value make_map_value(data_type tuple_type, map_type_impl::native_type value); diff --git a/types/set.hh b/types/set.hh index 0cca25357e..a41dbc9853 100644 --- a/types/set.hh +++ b/types/set.hh @@ -51,6 +51,8 @@ public: template data_value deserialize(View v, cql_serialization_format sf) const; static bytes serialize_partially_deserialized_form( const std::vector& v, cql_serialization_format sf); + static managed_bytes serialize_partially_deserialized_form_fragmented( + const std::vector& v, cql_serialization_format sf); }; data_value make_set_value(data_type tuple_type, set_type_impl::native_type value);