Merge 'sstables: remove large allocations when parsing cells' from Wojciech Mitros

sstable cells are parsed into temporary_buffers, which causes large contiguous allocations for some cells.
This is fixed by storing fragments of the cell value in a fragmented_temporary_buffer instead.
To achieve this, this patch also adds new methods to the fragmented_temporary_buffer(size(), ostream& operator<<()) and adds methods to the underlying parser(primitive_consumer) for parsing byte strings into fragmented buffers.

Fixes #7457
Fixes #6376

Closes #8182

* github.com:scylladb/scylla:
  primitive_consumer: keep fragments of parsed buffer in a small_vector
  sstables: add parsing of cell values into fragmented buffers
  sstables: add non-contiguous parsing of byte strings to the primitive_consumer
  utils: add ostream operator<<() for fragmented_temporary_buffer::view
  compound_type: extend serialize_value for all FragmentedView types
This commit is contained in:
Avi Kivity
2021-04-22 12:55:56 +03:00
committed by Piotr Sarna
12 changed files with 206 additions and 94 deletions

View File

@@ -76,14 +76,18 @@ private:
template<typename RangeOfSerializedComponents, FragmentedMutableView Out>
static void serialize_value(RangeOfSerializedComponents&& values, Out out) {
for (auto&& val : values) {
assert(val.size() <= std::numeric_limits<size_type>::max());
write<size_type>(out, size_type(val.size()));
using val_type = std::remove_cvref_t<decltype(val)>;
if constexpr (FragmentedView<val_type>) {
assert(val.size_bytes() <= std::numeric_limits<size_type>::max());
write<size_type>(out, size_type(val.size_bytes()));
write_fragmented(out, val);
} else if constexpr (std::same_as<val_type, managed_bytes>) {
assert(val.size() <= std::numeric_limits<size_type>::max());
write<size_type>(out, size_type(val.size()));
write_fragmented(out, managed_bytes_view(val));
} else {
assert(val.size() <= std::numeric_limits<size_type>::max());
write<size_type>(out, size_type(val.size()));
write_fragmented(out, single_fragmented_view(val));
}
}
@@ -92,7 +96,12 @@ private:
static size_t serialized_size(RangeOfSerializedComponents&& values) {
size_t len = 0;
for (auto&& val : values) {
len += sizeof(size_type) + val.size();
using val_type = std::remove_cvref_t<decltype(val)>;
if constexpr (FragmentedView<val_type>) {
len += sizeof(size_type) + val.size_bytes();
} else {
len += sizeof(size_type) + val.size();
}
}
return len;
}

View File

@@ -31,6 +31,9 @@
#include <seastar/net/byteorder.hh>
#include "bytes.hh"
#include "reader_permit.hh"
#include "utils/fragmented_temporary_buffer.hh"
#include "utils/overloaded_functor.hh"
#include "utils/small_vector.hh"
#include <variant>
@@ -84,11 +87,14 @@ private:
READING_U16,
READING_U32,
READING_U64,
READING_BYTES_CONTIGUOUS,
READING_BYTES,
READING_U16_BYTES,
READING_UNSIGNED_VINT,
READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS,
READING_UNSIGNED_VINT_LENGTH_BYTES,
READING_UNSIGNED_VINT_WITH_LEN,
READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS,
READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN,
READING_SIGNED_VINT,
READING_SIGNED_VINT_WITH_LEN,
@@ -114,9 +120,10 @@ private:
} _read_int;
// state for READING_BYTES prestate
temporary_buffer<char> _read_bytes;
temporary_buffer<char>* _read_bytes_where; // which temporary_buffer to set, _key or _val?
size_t _read_bytes_len = 0;
utils::small_vector<temporary_buffer<char>, 1> _read_bytes;
temporary_buffer<char>* _read_bytes_where_contiguous; // which buffer to set, _key, _val, _cell_path or _pk?
fragmented_temporary_buffer* _read_bytes_where;
inline read_status read_partial_int(temporary_buffer<char>& data, prestate next_state) {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
@@ -138,8 +145,10 @@ private:
data.trim_front(len);
return read_status::ready;
} else {
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
std::copy(data.begin(), data.end(), _read_bytes.get_write());
_read_bytes.clear();
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
std::copy(data.begin(), data.end(), _read_bytes.front().get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
_prestate = ReadingVintWithLen;
@@ -150,13 +159,13 @@ private:
template <typename VintType, typename T>
inline read_status read_vint_with_len(temporary_buffer<char>& data, T& dest) {
static_assert(std::is_same_v<T, typename VintType::value_type>, "Destination type mismatch");
const auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos);
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
if (_pos == _read_bytes_len) {
dest = VintType::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.get_write()), _read_bytes.size()));
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
_prestate = prestate::NONE;
return read_status::ready;
}
@@ -203,15 +212,36 @@ public:
return read_partial_int(data, prestate::READING_U64);
}
}
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
if (data.size() >= len) {
inline read_status read_bytes_contiguous(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
if (data.size() >= len) {
where = data.share(0, len);
data.trim_front(len);
return read_status::ready;
} else {
// copy what we have so far, read the rest later
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
std::copy(data.begin(), data.end(),_read_bytes.get_write());
_read_bytes.clear();
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_len = len;
_read_bytes_where_contiguous = &where;
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_BYTES_CONTIGUOUS;
return read_status::waiting;
}
}
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, fragmented_temporary_buffer& where) {
if (data.size() >= len) {
std::vector<temporary_buffer<char>> fragments;
fragments.push_back(data.share(0,len));
where = fragmented_temporary_buffer(std::move(fragments), len);
data.trim_front(len);
return read_status::ready;
} else {
// copy what we have so far, read the rest later
_read_bytes.clear();
_read_bytes.push_back(data.share());
_read_bytes_len = len;
_read_bytes_where = &where;
_pos = data.size();
data.trim(0);
@@ -223,10 +253,10 @@ public:
if (data.size() >= sizeof(uint16_t)) {
_u16 = consume_be<uint16_t>(data);
} else {
_read_bytes_where = &where;
_read_bytes_where_contiguous = &where;
return read_partial_int(data, prestate::READING_U16_BYTES);
}
return read_bytes(data, uint32_t{_u16}, where);
return read_bytes_contiguous(data, uint32_t{_u16}, where);
}
inline read_status read_unsigned_vint(temporary_buffer<char>& data) {
return read_vint<
@@ -240,7 +270,32 @@ public:
prestate::READING_SIGNED_VINT,
prestate::READING_SIGNED_VINT_WITH_LEN>(data, _i64);
}
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, temporary_buffer<char>& where) {
inline read_status read_unsigned_vint_length_bytes_contiguous(temporary_buffer<char>& data, temporary_buffer<char>& where) {
if (data.empty()) {
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS;
_read_bytes_where_contiguous = &where;
return read_status::waiting;
} else {
const vint_size_type len = unsigned_vint::serialized_size_from_first_byte(*data.begin());
if (data.size() >= len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(data.get_write()), data.size()));
data.trim_front(len);
return read_bytes_contiguous(data, static_cast<uint32_t>(_u64), where);
} else {
_read_bytes.clear();
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
_read_bytes_where_contiguous = &where;
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS;
return read_status::waiting;
}
}
}
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, fragmented_temporary_buffer& where) {
if (data.empty()) {
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES;
_read_bytes_where = &where;
@@ -253,8 +308,10 @@ public:
data.trim_front(len);
return read_bytes(data, static_cast<uint32_t>(_u64), where);
} else {
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
std::copy(data.begin(), data.end(), _read_bytes.get_write());
_read_bytes.clear();
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
_read_bytes_where = &where;
@@ -301,6 +358,12 @@ public:
return read_status::ready;
}
break;
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS:
if (read_unsigned_vint_length_bytes_contiguous(data, *_read_bytes_where_contiguous) == read_status::ready) {
_prestate = prestate::NONE;
return read_status::ready;
}
break;
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES:
if (read_unsigned_vint_length_bytes(data, *_read_bytes_where) == read_status::ready) {
_prestate = prestate::NONE;
@@ -311,14 +374,29 @@ public:
return read_vint_with_len<unsigned_vint>(data, _u64);
case prestate::READING_SIGNED_VINT_WITH_LEN:
return read_vint_with_len<signed_vint>(data, _i64);
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
const auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos);
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS: {
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
if (_pos == _read_bytes_len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.get_write()), _read_bytes.size()));
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
if (read_bytes_contiguous(data, _u64, *_read_bytes_where_contiguous) == read_status::ready) {
_prestate = prestate::NONE;
return read_status::ready;
}
}
break;
}
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) {
_prestate = prestate::NONE;
return read_status::ready;
@@ -326,14 +404,26 @@ public:
}
break;
}
case prestate::READING_BYTES: {
auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy(data.begin(), data.begin() + n,
_read_bytes.get_write() + _pos);
case prestate::READING_BYTES_CONTIGUOUS: {
auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy(data.begin(), data.begin() + n, _read_bytes.front().get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
*_read_bytes_where = std::move(_read_bytes);
if (_pos == _read_bytes_len) {
*_read_bytes_where_contiguous = std::move(_read_bytes.front());
_prestate = prestate::NONE;
return read_status::ready;
}
break;
}
case prestate::READING_BYTES: {
auto n = std::min(_read_bytes_len - _pos, data.size());
_read_bytes.push_back(data.share(0, n));
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
std::vector<temporary_buffer<char>> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
*_read_bytes_where = fragmented_temporary_buffer(std::move(fragments), _read_bytes_len);
_prestate = prestate::NONE;
return read_status::ready;
}
@@ -357,7 +447,7 @@ public:
if (process_int(data, sizeof(uint16_t))) {
_u16 = net::ntoh(_read_int.uint16);
_prestate = prestate::NONE;
return read_bytes(data, _u16, *_read_bytes_where);
return read_bytes_contiguous(data, _u16, *_read_bytes_where_contiguous);
}
break;
case prestate::READING_U32:

View File

@@ -160,7 +160,7 @@ public:
}
case state::KEY_BYTES:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::KEY_BYTES);
if (this->read_bytes(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
if (this->read_bytes_contiguous(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
_state = state::POSITION;
break;
}

View File

@@ -467,7 +467,7 @@ public:
return ret;
}
virtual proceed consume_counter_cell(bytes_view col_name, bytes_view value, int64_t timestamp) override {
virtual proceed consume_counter_cell(bytes_view col_name, fragmented_temporary_buffer::view value, int64_t timestamp) override {
return do_consume_cell(col_name, timestamp, 0, 0, [&] (auto&& col) {
auto ac = make_counter_cell(timestamp, value);
@@ -483,7 +483,7 @@ public:
});
}
virtual proceed consume_cell(bytes_view col_name, bytes_view value, int64_t timestamp, int64_t ttl, int64_t expiration) override {
virtual proceed consume_cell(bytes_view col_name, fragmented_temporary_buffer::view value, int64_t timestamp, int64_t ttl, int64_t expiration) override {
return do_consume_cell(col_name, timestamp, ttl, expiration, [&] (auto&& col) {
bool is_multi_cell = col.collection_extra_data.size();
if (is_multi_cell != col.cdef->is_multi_cell()) {

View File

@@ -83,13 +83,13 @@ public:
// (in seconds) originally set for this cell, and "expiration" is the
// absolute time (in seconds since the UNIX epoch) when this cell will
// expire. Typical cells, not set to expire, will get expiration = 0.
virtual proceed consume_cell(bytes_view col_name, bytes_view value,
virtual proceed consume_cell(bytes_view col_name, fragmented_temporary_buffer::view value,
int64_t timestamp,
int64_t ttl, int64_t expiration) = 0;
// Consume one counter cell. Column name and value are serialized, and need
// to be deserialized according to the schema.
virtual proceed consume_counter_cell(bytes_view col_name, bytes_view value,
virtual proceed consume_counter_cell(bytes_view col_name, fragmented_temporary_buffer::view value,
int64_t timestamp) = 0;
// Consume a deleted cell (i.e., a cell tombstone).
@@ -162,6 +162,7 @@ private:
temporary_buffer<char> _key;
temporary_buffer<char> _val;
fragmented_temporary_buffer _val_fragmented;
// state for reading a cell
bool _deleted;
@@ -334,7 +335,7 @@ private:
break;
}
case state::CELL_VALUE_BYTES:
if (read_bytes(data, _u32, _val) != read_status::ready) {
if (read_bytes(data, _u32, _val_fragmented) != read_status::ready) {
_state = state::CELL_VALUE_BYTES_2;
break;
}
@@ -342,24 +343,28 @@ private:
{
row_consumer::proceed ret;
if (_deleted) {
if (_val.size() != 4) {
if (_val_fragmented.size_bytes() != 4) {
throw malformed_sstable_exception("deleted cell expects local_deletion_time value");
}
_val = temporary_buffer<char>(4);
auto v = fragmented_temporary_buffer::view(_val_fragmented);
read_fragmented(v, 4, reinterpret_cast<bytes::value_type*>(_val.get_write()));
deletion_time del;
del.local_deletion_time = consume_be<uint32_t>(_val);
del.marked_for_delete_at = _u64;
ret = _consumer.consume_deleted_cell(to_bytes_view(_key), del);
_val.release();
} else if (_counter) {
ret = _consumer.consume_counter_cell(to_bytes_view(_key),
to_bytes_view(_val), _u64);
fragmented_temporary_buffer::view(_val_fragmented), _u64);
} else {
ret = _consumer.consume_cell(to_bytes_view(_key),
to_bytes_view(_val), _u64, _ttl, _expiration);
fragmented_temporary_buffer::view(_val_fragmented), _u64, _ttl, _expiration);
}
// after calling the consume function, we can release the
// buffers we held for it.
_key.release();
_val.release();
_val_fragmented.remove_prefix(_val_fragmented.size_bytes());
_state = state::ATOM_START;
if (ret == row_consumer::proceed::no) {
return row_consumer::proceed::no;

View File

@@ -44,10 +44,10 @@ class clustering_parser {
bool _parsing_start_key;
boost::iterator_range<column_values_fixed_lengths::const_iterator> ck_range;
std::vector<temporary_buffer<char>> clustering_key_values;
std::vector<fragmented_temporary_buffer> clustering_key_values;
bound_kind_m kind{};
temporary_buffer<char> column_value;
fragmented_temporary_buffer column_value;
uint64_t ck_blocks_header = 0;
uint32_t ck_blocks_header_offset = 0;
std::optional<position_in_partition> _pos;
@@ -91,7 +91,7 @@ class clustering_parser {
position_in_partition make_position() {
auto key = clustering_key_prefix::from_range(clustering_key_values | boost::adaptors::transformed(
[] (const temporary_buffer<char>& b) { return to_bytes_view(b); }));
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
if (kind == bound_kind_m::clustering) {
return position_in_partition::for_key(std::move(key));

View File

@@ -68,7 +68,7 @@ public:
// proceed consuming more data.
virtual proceed consume_partition_end() = 0;
virtual row_processing_result consume_row_start(const std::vector<temporary_buffer<char>>& ecp) = 0;
virtual row_processing_result consume_row_start(const std::vector<fragmented_temporary_buffer>& ecp) = 0;
virtual proceed consume_row_marker_and_tombstone(
const sstables::liveness_info& info, tombstone tomb, tombstone shadowable_tomb) = 0;
@@ -77,7 +77,7 @@ public:
virtual proceed consume_column(const sstables::column_translation::column_info& column_info,
bytes_view cell_path,
bytes_view value,
fragmented_temporary_buffer::view value,
api::timestamp_type timestamp,
gc_clock::duration ttl,
gc_clock::time_point local_deletion_time,
@@ -89,13 +89,13 @@ public:
virtual proceed consume_complex_column_end(const sstables::column_translation::column_info& column_info) = 0;
virtual proceed consume_counter_column(const sstables::column_translation::column_info& column_info,
bytes_view value, api::timestamp_type timestamp) = 0;
fragmented_temporary_buffer::view value, api::timestamp_type timestamp) = 0;
virtual proceed consume_range_tombstone(const std::vector<temporary_buffer<char>>& ecp,
virtual proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp,
bound_kind kind,
tombstone tomb) = 0;
virtual proceed consume_range_tombstone(const std::vector<temporary_buffer<char>>& ecp,
virtual proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp,
sstables::bound_kind_m,
tombstone end_tombstone,
tombstone start_tombstone) = 0;
@@ -203,7 +203,7 @@ private:
liveness_info _liveness;
bool _is_first_unfiltered = true;
std::vector<temporary_buffer<char>> _row_key;
std::vector<fragmented_temporary_buffer> _row_key;
struct row_schema {
using column_range = boost::iterator_range<std::vector<column_translation::column_info>::const_iterator>;
@@ -234,7 +234,7 @@ private:
gc_clock::time_point _column_local_deletion_time;
gc_clock::duration _column_ttl;
uint32_t _column_value_length;
temporary_buffer<char> _column_value;
fragmented_temporary_buffer _column_value;
temporary_buffer<char> _cell_path;
uint64_t _ck_blocks_header;
uint32_t _ck_blocks_header_offset;
@@ -671,7 +671,7 @@ private:
case state::COLUMN_CELL_PATH:
column_cell_path_label:
if (!is_column_simple()) {
if (read_unsigned_vint_length_bytes(data, _cell_path) != read_status::ready) {
if (read_unsigned_vint_length_bytes_contiguous(data, _cell_path) != read_status::ready) {
_state = state::COLUMN_VALUE;
break;
}
@@ -681,7 +681,7 @@ private:
case state::COLUMN_VALUE:
{
if (!_column_flags.has_value()) {
_column_value = temporary_buffer<char>(0);
_column_value = fragmented_temporary_buffer();
_state = state::COLUMN_END;
goto column_end_label;
}
@@ -701,14 +701,14 @@ private:
_state = state::NEXT_COLUMN;
if (is_column_counter() && !_column_flags.is_deleted()) {
if (_consumer.consume_counter_column(get_column_info(),
to_bytes_view(_column_value),
fragmented_temporary_buffer::view(_column_value),
_column_timestamp) == consumer_m::proceed::no) {
return consumer_m::proceed::no;
}
} else {
if (_consumer.consume_column(get_column_info(),
to_bytes_view(_cell_path),
to_bytes_view(_column_value),
fragmented_temporary_buffer::view(_column_value),
_column_timestamp,
_column_ttl,
_column_local_deletion_time,
@@ -991,7 +991,7 @@ class mp_row_consumer_m : public consumer_m {
}
};
inline friend std::ostream& operator<<(std::ostream& o, const sstables::mx::mp_row_consumer_m::range_tombstone_start& rt_start) {
inline friend std::ostream& operator<<(std::ostream& o, const mp_row_consumer_m::range_tombstone_start& rt_start) {
o << "{ clustering: " << rt_start.ck
<< ", kind: " << rt_start.kind
<< ", tombstone: " << rt_start.tomb << " }";
@@ -1224,9 +1224,9 @@ public:
return proceed(!_reader->is_buffer_full() && !need_preempt());
}
virtual consumer_m::row_processing_result consume_row_start(const std::vector<temporary_buffer<char>>& ecp) override {
virtual consumer_m::row_processing_result consume_row_start(const std::vector<fragmented_temporary_buffer>& ecp) override {
auto key = clustering_key_prefix::from_range(ecp | boost::adaptors::transformed(
[] (const temporary_buffer<char>& b) { return to_bytes_view(b); }));
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
sstlog.trace("mp_row_consumer_m {}: consume_row_start({})", fmt::ptr(this), key);
@@ -1306,14 +1306,14 @@ public:
virtual proceed consume_column(const column_translation::column_info& column_info,
bytes_view cell_path,
bytes_view value,
fragmented_temporary_buffer::view value,
api::timestamp_type timestamp,
gc_clock::duration ttl,
gc_clock::time_point local_deletion_time,
bool is_deleted) override {
const std::optional<column_id>& column_id = column_info.id;
sstlog.trace("mp_row_consumer_m {}: consume_column(id={}, path={}, value={}, ts={}, ttl={}, del_time={}, deleted={})", fmt::ptr(this),
column_id, fmt_hex(cell_path), fmt_hex(value), timestamp, ttl.count(), local_deletion_time.time_since_epoch().count(), is_deleted);
column_id, fmt_hex(cell_path), value, timestamp, ttl.count(), local_deletion_time.time_since_epoch().count(), is_deleted);
check_column_missing_in_current_schema(column_info, timestamp);
if (!column_id) {
return proceed::yes;
@@ -1388,10 +1388,10 @@ public:
}
virtual proceed consume_counter_column(const column_translation::column_info& column_info,
bytes_view value,
fragmented_temporary_buffer::view value,
api::timestamp_type timestamp) override {
const std::optional<column_id>& column_id = column_info.id;
sstlog.trace("mp_row_consumer_m {}: consume_counter_column({}, {}, {})", fmt::ptr(this), column_id, fmt_hex(value), timestamp);
sstlog.trace("mp_row_consumer_m {}: consume_counter_column({}, {}, {})", fmt::ptr(this), column_id, value, timestamp);
check_column_missing_in_current_schema(column_info, timestamp);
if (!column_id) {
return proceed::yes;
@@ -1406,11 +1406,11 @@ public:
return proceed::yes;
}
virtual proceed consume_range_tombstone(const std::vector<temporary_buffer<char>>& ecp,
virtual proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp,
bound_kind kind,
tombstone tomb) override {
auto ck = clustering_key_prefix::from_range(ecp | boost::adaptors::transformed(
[] (const temporary_buffer<char>& b) { return to_bytes_view(b); }));
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
if (kind == bound_kind::incl_start || kind == bound_kind::excl_start) {
consume_range_tombstone_start(std::move(ck), kind, std::move(tomb));
return proceed(!_reader->is_buffer_full() && !need_preempt());
@@ -1419,13 +1419,13 @@ public:
}
}
virtual proceed consume_range_tombstone(const std::vector<temporary_buffer<char>>& ecp,
virtual proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp,
sstables::bound_kind_m kind,
tombstone end_tombstone,
tombstone start_tombstone) override {
auto result = proceed::yes;
auto ck = clustering_key_prefix::from_range(ecp | boost::adaptors::transformed(
[] (const temporary_buffer<char>& b) { return to_bytes_view(b); }));
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
switch (kind) {
case bound_kind_m::incl_end_excl_start:
result = consume_range_tombstone_end(ck, bound_kind::incl_end, std::move(end_tombstone));

View File

@@ -104,7 +104,7 @@ private:
return;
}
case state_k_l::START_NAME_BYTES:
if (this->read_bytes(data, this->_u16, ctx.start) != continuous_data_consumer::read_status::ready) {
if (this->read_bytes_contiguous(data, this->_u16, ctx.start) != continuous_data_consumer::read_status::ready) {
ctx.state = state_k_l::END_NAME_LENGTH;
return;
}
@@ -114,7 +114,7 @@ private:
return;
}
case state_k_l::END_NAME_BYTES:
if (this->read_bytes(data, this->_u16, ctx.end) != continuous_data_consumer::read_status::ready) {
if (this->read_bytes_contiguous(data, this->_u16, ctx.end) != continuous_data_consumer::read_status::ready) {
ctx.state = state_k_l::OFFSET;
return;
}

View File

@@ -23,13 +23,14 @@
#include "column_translation.hh"
#include "concrete_types.hh"
#include "utils/fragment_range.hh"
namespace sstables {
atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
atomic_cell make_counter_cell(api::timestamp_type timestamp, fragmented_temporary_buffer::view cell_value) {
static constexpr size_t shard_size = 32;
if (value.empty()) {
if (cell_value.empty()) {
// This will never happen in a correct MC sstable but
// we had a bug #4363 that caused empty counters
// to be incorrectly stored inside sstables.
@@ -37,27 +38,27 @@ atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
return ccb.build(timestamp);
}
data_input in(value);
auto cell_value_size = cell_value.size_bytes();
auto header_size = in.read<int16_t>();
auto header_size = read_simple<int16_t>(cell_value);
for (auto i = 0; i < header_size; i++) {
auto idx = in.read<int16_t>();
auto idx = read_simple<int16_t>(cell_value);
if (idx >= 0) {
throw marshal_exception("encountered a local shard in a counter cell");
}
}
auto header_length = (size_t(header_size) + 1) * sizeof(int16_t);
auto shard_count = (value.size() - header_length) / shard_size;
auto shard_count = (cell_value_size - header_length) / shard_size;
if (shard_count != size_t(header_size)) {
throw marshal_exception("encountered remote shards in a counter cell");
}
counter_cell_builder ccb(shard_count);
for (auto i = 0u; i < shard_count; i++) {
auto id_hi = in.read<int64_t>();
auto id_lo = in.read<int64_t>();
auto clock = in.read<int64_t>();
auto value = in.read<int64_t>();
auto id_hi = read_simple<int64_t>(cell_value);
auto id_lo = read_simple<int64_t>(cell_value);
auto clock = read_simple<int64_t>(cell_value);
auto value = read_simple<int64_t>(cell_value);
ccb.add_maybe_unsorted_shard(counter_shard(counter_id(utils::UUID(id_hi, id_lo)), value, clock));
}
ccb.sort_and_remove_duplicates();

View File

@@ -83,7 +83,7 @@ public:
inline atomic_cell make_atomic_cell(const abstract_type& type,
api::timestamp_type timestamp,
bytes_view value,
fragmented_temporary_buffer::view value,
gc_clock::duration ttl,
gc_clock::time_point expiration,
atomic_cell::collection_member cm) {
@@ -94,7 +94,7 @@ inline atomic_cell make_atomic_cell(const abstract_type& type,
}
}
atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value);
atomic_cell make_counter_cell(api::timestamp_type timestamp, fragmented_temporary_buffer::view value);
position_in_partition_view get_slice_upper_bound(const schema& s, const query::partition_slice& slice, dht::ring_position_view key);

View File

@@ -350,7 +350,7 @@ public:
return proceed::yes;
}
virtual proceed consume_cell(bytes_view col_name, bytes_view value,
virtual proceed consume_cell(bytes_view col_name, fragmented_temporary_buffer::view value,
int64_t timestamp, int64_t ttl, int64_t expiration) override {
BOOST_REQUIRE(ttl == 0);
BOOST_REQUIRE(expiration == 0);
@@ -358,7 +358,7 @@ public:
case 0:
// The silly "cql marker" column
BOOST_REQUIRE(col_name.size() == 3 && col_name[0] == 0 && col_name[1] == 0 && col_name[2] == 0);
BOOST_REQUIRE(value.size() == 0);
BOOST_REQUIRE(value.size_bytes() == 0);
BOOST_REQUIRE(timestamp == desired_timestamp);
break;
case 1:
@@ -366,7 +366,7 @@ public:
col_name[1] == 4 && col_name[2] == 'c' &&
col_name[3] == 'o' && col_name[4] == 'l' &&
col_name[5] == '1' && col_name[6] == '\0');
BOOST_REQUIRE(value == as_bytes("daughter"));
BOOST_REQUIRE(linearized(value) == as_bytes("daughter"));
BOOST_REQUIRE(timestamp == desired_timestamp);
break;
case 2:
@@ -374,8 +374,7 @@ public:
col_name[1] == 4 && col_name[2] == 'c' &&
col_name[3] == 'o' && col_name[4] == 'l' &&
col_name[5] == '2' && col_name[6] == '\0');
BOOST_REQUIRE(value.size() == 4 && value[0] == 0 &&
value[1] == 0 && value[2] == 0 && value[3] == 3);
BOOST_REQUIRE(linearized(value) == bytes({0, 0, 0, 3}));
BOOST_REQUIRE(timestamp == desired_timestamp);
break;
}
@@ -383,7 +382,7 @@ public:
return proceed::yes;
}
virtual proceed consume_counter_cell(bytes_view col_name, bytes_view value, int64_t timestamp) override {
virtual proceed consume_counter_cell(bytes_view col_name, fragmented_temporary_buffer::view value, int64_t timestamp) override {
BOOST_FAIL("counter cell wasn't expected");
abort(); // BOOST_FAIL is not marked as [[noreturn]].
}
@@ -470,12 +469,12 @@ public:
count_row_start++;
return proceed::yes;
}
virtual proceed consume_cell(bytes_view col_name, bytes_view value,
virtual proceed consume_cell(bytes_view col_name, fragmented_temporary_buffer::view value,
int64_t timestamp, int64_t ttl, int64_t expiration) override {
count_cell++;
return proceed::yes;
}
virtual proceed consume_counter_cell(bytes_view col_name, bytes_view value, int64_t timestamp) override {
virtual proceed consume_counter_cell(bytes_view col_name, fragmented_temporary_buffer::view value, int64_t timestamp) override {
count_cell++;
return proceed::yes;
}
@@ -625,13 +624,13 @@ public:
return proceed::yes;
}
virtual proceed consume_cell(bytes_view col_name, bytes_view value,
virtual proceed consume_cell(bytes_view col_name, fragmented_temporary_buffer::view value,
int64_t timestamp, int64_t ttl, int64_t expiration) override {
switch (count_cell) {
case 0:
// The silly "cql row marker" cell
BOOST_REQUIRE(col_name.size() == 3 && col_name[0] == 0 && col_name[1] == 0 && col_name[2] == 0);
BOOST_REQUIRE(value.size() == 0);
BOOST_REQUIRE(value.size_bytes() == 0);
BOOST_REQUIRE(timestamp == desired_timestamp);
BOOST_REQUIRE(ttl == 3600);
BOOST_REQUIRE(expiration == 1430154618);
@@ -641,8 +640,7 @@ public:
col_name[1] == 3 && col_name[2] == 'a' &&
col_name[3] == 'g' && col_name[4] == 'e' &&
col_name[5] == '\0');
BOOST_REQUIRE(value.size() == 4 && value[0] == 0 && value[1] == 0
&& value[2] == 0 && value[3] == 40);
BOOST_REQUIRE(linearized(value) == bytes({0, 0, 0, 40}));
BOOST_REQUIRE(timestamp == desired_timestamp);
BOOST_REQUIRE(ttl == 3600);
BOOST_REQUIRE(expiration == 1430154618);

View File

@@ -489,3 +489,12 @@ public:
});
}
};
// The operator below is used only for logging
inline std::ostream& operator<<(std::ostream& out, const fragmented_temporary_buffer::view& v) {
for (bytes_view frag : fragment_range(v)) {
out << to_hex(frag);
}
return out;
}