Merge "Fix quadratic behavior in memtable/row_cache with lots of range tombstones" from Tomasz
" This series fixes two issues which cause very poor efficiency of reads when there is a lot of range tombstones per live row in a partition. The first issue is in the row_cache reader. Before the patch, all range tombstones up to the next row were copied into a vector, and then put into the buffer until it's full. This would get quadratic if there is much more range tombstones than fit in a buffer. The fix is to avoid the accumulation of all tombstones in the vector and invoke the callback instead, which stops the iteration as soon as the buffer is full. Fixes #2581. The second, similar issue was in the memtable reader. Tests: - unit (dev) - perf_row_cache_update (release) " * tag 'no-quadratic-rt-in-reads-v1' of github.com:tgrabiec/scylla: test: perf_row_cache_update: Uncomment test case for lots of range tombstones row_cache: Consume range tombstones incrementally partition_snapshot_reader: Avoid quadratic behavior with lots of range tombstones tests: mvcc: Relax monotonicity check range_tombstone_stream: Introduce peek_next()
This commit is contained in:
@@ -531,16 +531,16 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
|
||||
_next_row.touch();
|
||||
position_in_partition_view next_lower_bound = _next_row.dummy() ? _next_row.position() : position_in_partition_view::after_key(_next_row.key());
|
||||
auto upper_bound = _next_row_in_range ? next_lower_bound : _upper_bound;
|
||||
for (auto &&rts : _snp->range_tombstones(_lower_bound, upper_bound)) {
|
||||
if (_snp->range_tombstones(_lower_bound, upper_bound, [&] (range_tombstone rts) {
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
// Avoid emitting overlapping range tombstones for performance reasons.
|
||||
if (less(upper_bound, rts.end_position())) {
|
||||
rts.set_end(*_schema, upper_bound);
|
||||
}
|
||||
add_range_tombstone_to_buffer(std::move(rts));
|
||||
if (_lower_bound_changed && is_buffer_full()) {
|
||||
return;
|
||||
}
|
||||
return stop_iteration(_lower_bound_changed && is_buffer_full());
|
||||
}) == stop_iteration::no) {
|
||||
return;
|
||||
}
|
||||
// We add the row to the buffer even when it's full.
|
||||
// This simplifies the code. For more info see #3139.
|
||||
|
||||
@@ -392,6 +392,11 @@ mutation_fragment_opt range_tombstone_stream::get_next()
|
||||
return { };
|
||||
}
|
||||
|
||||
const range_tombstone& range_tombstone_stream::peek_next() const
|
||||
{
|
||||
return *_list.begin();
|
||||
}
|
||||
|
||||
void range_tombstone_stream::forward_to(position_in_partition_view pos) {
|
||||
_list.erase_where([this, &pos] (const range_tombstone& rt) {
|
||||
return !_cmp(pos, rt.end_position());
|
||||
|
||||
@@ -595,6 +595,8 @@ public:
|
||||
// Returns next fragment with position before upper_bound or disengaged optional if no such fragments are left.
|
||||
mutation_fragment_opt get_next(position_in_partition_view upper_bound);
|
||||
mutation_fragment_opt get_next();
|
||||
// Precondition: !empty()
|
||||
const range_tombstone& peek_next() const;
|
||||
// Forgets all tombstones which are not relevant for any range starting at given position.
|
||||
void forward_to(position_in_partition_view);
|
||||
|
||||
|
||||
@@ -245,11 +245,16 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
const std::optional<position_in_partition>& last_row,
|
||||
const std::optional<position_in_partition>& last_rts,
|
||||
position_in_partition_view pos) {
|
||||
if (!_rt_stream.empty()) {
|
||||
return _rt_stream.get_next(std::move(pos));
|
||||
}
|
||||
return in_alloc_section([&] () -> mutation_fragment_opt {
|
||||
maybe_refresh_state(ck_range, last_row, last_rts);
|
||||
|
||||
position_in_partition::less_compare rt_less(_schema);
|
||||
while (has_more_range_tombstones() && !rt_less(pos, peek_range_tombstone().position())) {
|
||||
while (has_more_range_tombstones()
|
||||
&& !rt_less(pos, peek_range_tombstone().position())
|
||||
&& (_rt_stream.empty() || !rt_less(_rt_stream.peek_next().position(), peek_range_tombstone().position()))) {
|
||||
range_tombstone rt = pop_range_tombstone();
|
||||
if (rt.trim(_schema,
|
||||
position_in_partition_view::for_range_start(ck_range),
|
||||
|
||||
@@ -541,21 +541,64 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r,
|
||||
}
|
||||
|
||||
partition_snapshot::range_tombstone_result
|
||||
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end)
|
||||
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end) {
|
||||
range_tombstone_result rts;
|
||||
range_tombstones(start, end, [&] (range_tombstone rt) {
|
||||
rts.emplace_back(std::move(rt));
|
||||
return stop_iteration::no;
|
||||
});
|
||||
return rts;
|
||||
}
|
||||
|
||||
stop_iteration
|
||||
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end,
|
||||
std::function<stop_iteration(range_tombstone)> callback)
|
||||
{
|
||||
partition_version* v = &*version();
|
||||
if (!v->next()) {
|
||||
return boost::copy_range<range_tombstone_result>(
|
||||
v->partition().row_tombstones().slice(*_schema, start, end));
|
||||
}
|
||||
range_tombstone_list list(*_schema);
|
||||
while (v) {
|
||||
|
||||
if (!v->next()) { // Optimization for single-version snapshots
|
||||
for (auto&& rt : v->partition().row_tombstones().slice(*_schema, start, end)) {
|
||||
list.apply(*_schema, rt);
|
||||
if (callback(rt) == stop_iteration::yes) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
}
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
std::vector<range_tombstone_list::iterator_range> streams; // contains only non-empty ranges
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
|
||||
// Sorts ranges by first range_tombstone's starting position
|
||||
// in descending order.
|
||||
auto stream_less = [&] (range_tombstone_list::iterator_range left, range_tombstone_list::iterator_range right) {
|
||||
return less(right.begin()->position(), left.begin()->position());
|
||||
};
|
||||
|
||||
while (v) {
|
||||
auto&& range = v->partition().row_tombstones().slice(*_schema, start, end);
|
||||
if (!range.empty()) {
|
||||
streams.emplace_back(std::move(range));
|
||||
}
|
||||
v = v->next();
|
||||
}
|
||||
return boost::copy_range<range_tombstone_result>(list.slice(*_schema, start, end));
|
||||
|
||||
std::make_heap(streams.begin(), streams.end(), stream_less);
|
||||
|
||||
while (!streams.empty()) {
|
||||
std::pop_heap(streams.begin(), streams.end(), stream_less);
|
||||
range_tombstone_list::iterator_range& stream = streams.back();
|
||||
if (callback(*stream.begin()) == stop_iteration::yes) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
stream.advance_begin(1);
|
||||
if (!stream.empty()) {
|
||||
std::push_heap(streams.begin(), streams.end(), stream_less);
|
||||
} else {
|
||||
streams.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
partition_snapshot::range_tombstone_result
|
||||
|
||||
@@ -411,6 +411,11 @@ public:
|
||||
|
||||
// Returns range tombstones overlapping with [start, end)
|
||||
range_tombstone_result range_tombstones(position_in_partition_view start, position_in_partition_view end);
|
||||
// Invokes the callback for every range tombstones overlapping with [start, end) until
|
||||
// the callback returns stop_iteration::yes or all tombstones are exhausted.
|
||||
// Returns stop_iteration::yes if all range tombstones in the range were consumed.
|
||||
stop_iteration range_tombstones(position_in_partition_view start, position_in_partition_view end,
|
||||
std::function<stop_iteration(range_tombstone)> callback);
|
||||
// Returns all range tombstones
|
||||
range_tombstone_result range_tombstones();
|
||||
};
|
||||
|
||||
@@ -64,7 +64,7 @@ void check_tombstone_slice(const schema& s, const utils::chunked_vector<range_to
|
||||
if (!less(position_in_partition::for_range_start(range), rt.end_position())) {
|
||||
BOOST_FAIL(format("Range tombstone out of range: {}, range: {}", rt, range));
|
||||
}
|
||||
if (!less(prev_pos, rt.position())) {
|
||||
if (less(rt.position(), prev_pos)) {
|
||||
BOOST_FAIL(format("Range tombstone breaks position monotonicity: {}, list: {}", rt, list));
|
||||
}
|
||||
prev_pos = position_in_partition(rt.position());
|
||||
|
||||
@@ -239,9 +239,7 @@ int main(int argc, char** argv) {
|
||||
test_small_partitions();
|
||||
test_partition_with_few_small_rows();
|
||||
test_partition_with_lots_of_small_rows();
|
||||
// Takes a huge amount of time due to https://github.com/scylladb/scylla/issues/2581#issuecomment-398030186,
|
||||
// disable until fixed.
|
||||
// test_partition_with_lots_of_range_tombstones();
|
||||
test_partition_with_lots_of_range_tombstones();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user