diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 6d8e32ba84..d6ec3cb44c 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -47,6 +47,9 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public bool operator()(const rows_position& a, const rows_position& b) { return _less(b._position->position(), a._position->position()); } + bool operator()(const range_tombstone_list::iterator_range& a, const range_tombstone_list::iterator_range& b) { + return _less(b.front().position(), a.front().position()); + } }; // The part of the reader that accesses LSA memory directly and works @@ -65,6 +68,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public partition_snapshot::change_mark _change_mark; std::vector _clustering_rows; + std::vector _range_tombstones; range_tombstone_stream _rt_stream; @@ -77,24 +81,20 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public }); } void maybe_refresh_state(const query::clustering_range& ck_range, - const std::optional& last_row) { + const std::optional& last_row, + const std::optional& last_rts) { auto mark = _snapshot->get_change_mark(); - if (!last_row || mark != _change_mark) { - do_refresh_state(ck_range, last_row); + if (mark != _change_mark) { + do_refresh_state(ck_range, last_row, last_rts); _change_mark = mark; } } void do_refresh_state(const query::clustering_range& ck_range, - const std::optional& last_row) { + const std::optional& last_row, + const std::optional& last_rts) { _clustering_rows.clear(); - - if (!last_row) { - // New range. Collect all relevant range tombstone. - for (auto&& v : _snapshot->versions()) { - _rt_stream.apply(v.partition().row_tombstones(), ck_range); - } - } + _range_tombstones.clear(); rows_entry::tri_compare rows_cmp(_schema); for (auto&& v : _snapshot->versions()) { @@ -110,9 +110,21 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public if (cr != cr_end) { _clustering_rows.emplace_back(rows_position { cr, cr_end }); } + + range_tombstone_list::iterator_range rt_slice = [&] () { + if (last_rts) { + return v.partition().row_tombstones().upper_slice(_schema, *last_rts, bound_view::from_range_end(ck_range)); + } else { + return v.partition().row_tombstones().slice(_schema, ck_range); + } + }(); + if (rt_slice.begin() != rt_slice.end()) { + _range_tombstones.emplace_back(std::move(rt_slice)); + } } boost::range::make_heap(_clustering_rows, _heap_cmp); + boost::range::make_heap(_range_tombstones, _heap_cmp); } // Valid if has_more_rows() const rows_entry& pop_clustering_row() { @@ -127,6 +139,20 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public } return e; } + + const range_tombstone& pop_range_tombstone() { + boost::range::pop_heap(_range_tombstones, _heap_cmp); + auto& current = _range_tombstones.back(); + const range_tombstone& rt = *current.begin(); + current.advance_begin(1); + if (current.begin() == current.end()) { + _range_tombstones.pop_back(); + } else { + boost::range::push_heap(_range_tombstones, _heap_cmp); + } + return rt; + } + // Valid if has_more_rows() const rows_entry& peek_row() const { return *_clustering_rows.front()._position; @@ -134,6 +160,13 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public bool has_more_rows() const { return !_clustering_rows.empty(); } + + const range_tombstone& peek_range_tombstone() const { + return *_range_tombstones.front().begin(); + } + bool has_more_range_tombstones() const { + return !_range_tombstones.empty(); + } public: explicit lsa_partition_reader(const schema& s, reader_permit permit, partition_snapshot_ptr snp, logalloc::region& region, logalloc::allocating_section& read_section, @@ -149,6 +182,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public { } void reset_state(const query::clustering_range& ck_range) { + do_refresh_state(ck_range, {}, {}); } template @@ -175,9 +209,10 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public // new range _rt_stream will be populated with all relevant // tombstones. mutation_fragment_opt next_row(const query::clustering_range& ck_range, - const std::optional& last_row) { + const std::optional& last_row, + const std::optional& last_rts) { return in_alloc_section([&] () -> mutation_fragment_opt { - maybe_refresh_state(ck_range, last_row); + maybe_refresh_state(ck_range, last_row, last_rts); position_in_partition::equal_compare rows_eq(_schema); while (has_more_rows()) { @@ -204,8 +239,22 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public }); } - mutation_fragment_opt next_range_rombstone(position_in_partition_view pos) { - return _rt_stream.get_next(std::move(pos)); + mutation_fragment_opt next_range_rombstone(const query::clustering_range& ck_range, + const std::optional& last_row, + const std::optional& last_rts, + position_in_partition_view pos) { + return in_alloc_section([&] () -> mutation_fragment_opt { + maybe_refresh_state(ck_range, last_row, last_rts); + + position_in_partition::less_compare rt_less(_schema); + while (has_more_range_tombstones() && !rt_less(pos, peek_range_tombstone().position())) { + range_tombstone rt = pop_range_tombstone(); + rt.trim_front(_schema, position_in_partition_view::for_range_start(ck_range)); + _rt_stream.apply(std::move(rt)); + } + + return _rt_stream.get_next(std::move(pos)); + }); } }; private: @@ -218,6 +267,7 @@ private: query::clustering_row_ranges::const_iterator _ck_range_end; std::optional _last_entry; + std::optional _last_rts; mutation_fragment_opt _next_row; lsa_partition_reader _reader; @@ -237,19 +287,25 @@ private: mutation_fragment_opt read_next() { if (!_next_row && !_no_more_rows_in_current_range) { - _next_row = _reader.next_row(*_current_ck_range, _last_entry); + _next_row = _reader.next_row(*_current_ck_range, _last_entry, _last_rts); } if (_next_row) { auto pos_view = _next_row->as_clustering_row().position(); - auto mf = _reader.next_range_rombstone(pos_view); + _last_entry = position_in_partition(pos_view); + + auto mf = _reader.next_range_rombstone(*_current_ck_range, _last_entry, _last_rts, pos_view); if (mf) { + _last_rts = mf->as_range_tombstone().position(); return mf; } - _last_entry = position_in_partition(pos_view); return std::exchange(_next_row, {}); } else { _no_more_rows_in_current_range = true; - return _reader.next_range_rombstone(position_in_partition_view::for_range_end(*_current_ck_range)); + auto mf = _reader.next_range_rombstone(*_current_ck_range, _last_entry, _last_rts, position_in_partition_view::for_range_end(*_current_ck_range)); + if (mf) { + _last_rts = mf->as_range_tombstone().position(); + } + return mf; } } @@ -275,6 +331,7 @@ private: emplace_mutation_fragment(std::move(*mfopt)); } else { _last_entry = std::nullopt; + _last_rts = std::nullopt; _current_ck_range = std::next(_current_ck_range); on_new_range(); }