partition_snapshot_reader: Emit range tombstones on demand
Currently the reader gets all range tombstones from the given range and places them into a stream. When filling the buffer with fragments the range tombstones are extracted from the stream one by one. This is memory consuming, the reader's memory usage shouldn't depend on the number of inhabitants in the partition range. The patch implements the heap-based cursor for range tombstones almost like it's done for rows. The heap contains range_tombstone_list::iterator_ranges, the tombstones are popped from the heap when needed, are applied into the stream and then are emitted from it into the buffer. The refresh_state() is called on each new range to set up the iterators, and when lsa reports references invalidation to refresh the iterators. To let the refresh_state revalidate the iterators, the position at which the last range tombstone was emitted is maintained. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -47,6 +47,9 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
bool operator()(const rows_position& a, const rows_position& b) {
|
||||
return _less(b._position->position(), a._position->position());
|
||||
}
|
||||
bool operator()(const range_tombstone_list::iterator_range& a, const range_tombstone_list::iterator_range& b) {
|
||||
return _less(b.front().position(), a.front().position());
|
||||
}
|
||||
};
|
||||
|
||||
// The part of the reader that accesses LSA memory directly and works
|
||||
@@ -65,6 +68,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
|
||||
partition_snapshot::change_mark _change_mark;
|
||||
std::vector<rows_position> _clustering_rows;
|
||||
std::vector<range_tombstone_list::iterator_range> _range_tombstones;
|
||||
|
||||
range_tombstone_stream _rt_stream;
|
||||
|
||||
@@ -77,24 +81,20 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
});
|
||||
}
|
||||
void maybe_refresh_state(const query::clustering_range& ck_range,
|
||||
const std::optional<position_in_partition>& last_row) {
|
||||
const std::optional<position_in_partition>& last_row,
|
||||
const std::optional<position_in_partition>& last_rts) {
|
||||
auto mark = _snapshot->get_change_mark();
|
||||
if (!last_row || mark != _change_mark) {
|
||||
do_refresh_state(ck_range, last_row);
|
||||
if (mark != _change_mark) {
|
||||
do_refresh_state(ck_range, last_row, last_rts);
|
||||
_change_mark = mark;
|
||||
}
|
||||
}
|
||||
|
||||
void do_refresh_state(const query::clustering_range& ck_range,
|
||||
const std::optional<position_in_partition>& last_row) {
|
||||
const std::optional<position_in_partition>& last_row,
|
||||
const std::optional<position_in_partition>& last_rts) {
|
||||
_clustering_rows.clear();
|
||||
|
||||
if (!last_row) {
|
||||
// New range. Collect all relevant range tombstone.
|
||||
for (auto&& v : _snapshot->versions()) {
|
||||
_rt_stream.apply(v.partition().row_tombstones(), ck_range);
|
||||
}
|
||||
}
|
||||
_range_tombstones.clear();
|
||||
|
||||
rows_entry::tri_compare rows_cmp(_schema);
|
||||
for (auto&& v : _snapshot->versions()) {
|
||||
@@ -110,9 +110,21 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
if (cr != cr_end) {
|
||||
_clustering_rows.emplace_back(rows_position { cr, cr_end });
|
||||
}
|
||||
|
||||
range_tombstone_list::iterator_range rt_slice = [&] () {
|
||||
if (last_rts) {
|
||||
return v.partition().row_tombstones().upper_slice(_schema, *last_rts, bound_view::from_range_end(ck_range));
|
||||
} else {
|
||||
return v.partition().row_tombstones().slice(_schema, ck_range);
|
||||
}
|
||||
}();
|
||||
if (rt_slice.begin() != rt_slice.end()) {
|
||||
_range_tombstones.emplace_back(std::move(rt_slice));
|
||||
}
|
||||
}
|
||||
|
||||
boost::range::make_heap(_clustering_rows, _heap_cmp);
|
||||
boost::range::make_heap(_range_tombstones, _heap_cmp);
|
||||
}
|
||||
// Valid if has_more_rows()
|
||||
const rows_entry& pop_clustering_row() {
|
||||
@@ -127,6 +139,20 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
}
|
||||
return e;
|
||||
}
|
||||
|
||||
const range_tombstone& pop_range_tombstone() {
|
||||
boost::range::pop_heap(_range_tombstones, _heap_cmp);
|
||||
auto& current = _range_tombstones.back();
|
||||
const range_tombstone& rt = *current.begin();
|
||||
current.advance_begin(1);
|
||||
if (current.begin() == current.end()) {
|
||||
_range_tombstones.pop_back();
|
||||
} else {
|
||||
boost::range::push_heap(_range_tombstones, _heap_cmp);
|
||||
}
|
||||
return rt;
|
||||
}
|
||||
|
||||
// Valid if has_more_rows()
|
||||
const rows_entry& peek_row() const {
|
||||
return *_clustering_rows.front()._position;
|
||||
@@ -134,6 +160,13 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
bool has_more_rows() const {
|
||||
return !_clustering_rows.empty();
|
||||
}
|
||||
|
||||
const range_tombstone& peek_range_tombstone() const {
|
||||
return *_range_tombstones.front().begin();
|
||||
}
|
||||
bool has_more_range_tombstones() const {
|
||||
return !_range_tombstones.empty();
|
||||
}
|
||||
public:
|
||||
explicit lsa_partition_reader(const schema& s, reader_permit permit, partition_snapshot_ptr snp,
|
||||
logalloc::region& region, logalloc::allocating_section& read_section,
|
||||
@@ -149,6 +182,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
{ }
|
||||
|
||||
void reset_state(const query::clustering_range& ck_range) {
|
||||
do_refresh_state(ck_range, {}, {});
|
||||
}
|
||||
|
||||
template<typename Function>
|
||||
@@ -175,9 +209,10 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
// new range _rt_stream will be populated with all relevant
|
||||
// tombstones.
|
||||
mutation_fragment_opt next_row(const query::clustering_range& ck_range,
|
||||
const std::optional<position_in_partition>& last_row) {
|
||||
const std::optional<position_in_partition>& last_row,
|
||||
const std::optional<position_in_partition>& last_rts) {
|
||||
return in_alloc_section([&] () -> mutation_fragment_opt {
|
||||
maybe_refresh_state(ck_range, last_row);
|
||||
maybe_refresh_state(ck_range, last_row, last_rts);
|
||||
|
||||
position_in_partition::equal_compare rows_eq(_schema);
|
||||
while (has_more_rows()) {
|
||||
@@ -204,8 +239,22 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
});
|
||||
}
|
||||
|
||||
mutation_fragment_opt next_range_rombstone(position_in_partition_view pos) {
|
||||
return _rt_stream.get_next(std::move(pos));
|
||||
mutation_fragment_opt next_range_rombstone(const query::clustering_range& ck_range,
|
||||
const std::optional<position_in_partition>& last_row,
|
||||
const std::optional<position_in_partition>& last_rts,
|
||||
position_in_partition_view pos) {
|
||||
return in_alloc_section([&] () -> mutation_fragment_opt {
|
||||
maybe_refresh_state(ck_range, last_row, last_rts);
|
||||
|
||||
position_in_partition::less_compare rt_less(_schema);
|
||||
while (has_more_range_tombstones() && !rt_less(pos, peek_range_tombstone().position())) {
|
||||
range_tombstone rt = pop_range_tombstone();
|
||||
rt.trim_front(_schema, position_in_partition_view::for_range_start(ck_range));
|
||||
_rt_stream.apply(std::move(rt));
|
||||
}
|
||||
|
||||
return _rt_stream.get_next(std::move(pos));
|
||||
});
|
||||
}
|
||||
};
|
||||
private:
|
||||
@@ -218,6 +267,7 @@ private:
|
||||
query::clustering_row_ranges::const_iterator _ck_range_end;
|
||||
|
||||
std::optional<position_in_partition> _last_entry;
|
||||
std::optional<position_in_partition> _last_rts;
|
||||
mutation_fragment_opt _next_row;
|
||||
|
||||
lsa_partition_reader _reader;
|
||||
@@ -237,19 +287,25 @@ private:
|
||||
|
||||
mutation_fragment_opt read_next() {
|
||||
if (!_next_row && !_no_more_rows_in_current_range) {
|
||||
_next_row = _reader.next_row(*_current_ck_range, _last_entry);
|
||||
_next_row = _reader.next_row(*_current_ck_range, _last_entry, _last_rts);
|
||||
}
|
||||
if (_next_row) {
|
||||
auto pos_view = _next_row->as_clustering_row().position();
|
||||
auto mf = _reader.next_range_rombstone(pos_view);
|
||||
_last_entry = position_in_partition(pos_view);
|
||||
|
||||
auto mf = _reader.next_range_rombstone(*_current_ck_range, _last_entry, _last_rts, pos_view);
|
||||
if (mf) {
|
||||
_last_rts = mf->as_range_tombstone().position();
|
||||
return mf;
|
||||
}
|
||||
_last_entry = position_in_partition(pos_view);
|
||||
return std::exchange(_next_row, {});
|
||||
} else {
|
||||
_no_more_rows_in_current_range = true;
|
||||
return _reader.next_range_rombstone(position_in_partition_view::for_range_end(*_current_ck_range));
|
||||
auto mf = _reader.next_range_rombstone(*_current_ck_range, _last_entry, _last_rts, position_in_partition_view::for_range_end(*_current_ck_range));
|
||||
if (mf) {
|
||||
_last_rts = mf->as_range_tombstone().position();
|
||||
}
|
||||
return mf;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -275,6 +331,7 @@ private:
|
||||
emplace_mutation_fragment(std::move(*mfopt));
|
||||
} else {
|
||||
_last_entry = std::nullopt;
|
||||
_last_rts = std::nullopt;
|
||||
_current_ck_range = std::next(_current_ck_range);
|
||||
on_new_range();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user