From 9996b7ca1867437a61c87f4f634daec1bbab5b9c Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 21 May 2021 00:49:19 +0200 Subject: [PATCH] flat_mutation_reader: Introduce adaptors between v1 and v2 of mutation fragment stream The transition to v2 will be incremental. To support that, we need adaptors between v1 and v2 which will be inserted at places which are boundaries of conversion. The v1 -> v2 converter needs to accumulate range tombstones, so has unbounded worst case memory footprint. The v2 -> v1 converter trims range tombstones around clustering rows, so generates more fragments than necessary. Because of that, adpators are a temporary solution and we should not release with them on the produciton code paths. --- flat_mutation_reader.cc | 188 ++++++++++++++++++++++++++++ flat_mutation_reader_v2.hh | 6 + range_tombstone_assembler.hh | 91 ++++++++++++++ range_tombstone_change_generator.hh | 135 ++++++++++++++++++++ 4 files changed, 420 insertions(+) create mode 100644 range_tombstone_assembler.hh create mode 100644 range_tombstone_change_generator.hh diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index ea2b9819f1..0fdd4d662c 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -21,6 +21,8 @@ #include "flat_mutation_reader.hh" #include "flat_mutation_reader_v2.hh" +#include "range_tombstone_assembler.hh" +#include "range_tombstone_change_generator.hh" #include "mutation_fragment_stream_validator.hh" #include "mutation_reader.hh" #include "seastar/util/reference_wrapper.hh" @@ -1273,3 +1275,189 @@ void flat_mutation_reader_v2::on_close_error(std::unique_ptr i, std::excep on_internal_error_noexcept(fmr_logger, format("Failed to close {} [{}]: permit {}: {}", typeid(*ip).name(), fmt::ptr(ip), ip->_permit.description(), ep)); } + +flat_mutation_reader downgrade_to_v1(flat_mutation_reader_v2 r) { + class transforming_reader : public flat_mutation_reader::impl { + flat_mutation_reader_v2 _reader; + struct consumer { + transforming_reader* _owner; + stop_iteration operator()(mutation_fragment_v2&& mf) { + std::move(mf).consume(*_owner); + return stop_iteration(_owner->is_buffer_full()); + } + }; + range_tombstone_assembler _rt_assembler; + public: + void consume(static_row mf) { + push_mutation_fragment(*_schema, _permit, std::move(mf)); + } + void consume(clustering_row mf) { + if (_rt_assembler.needs_flush()) { + if (auto rt_opt = _rt_assembler.flush(*_schema, position_in_partition::after_key(mf.position()))) { + push_mutation_fragment(*_schema, _permit, std::move(*rt_opt)); + } + } + push_mutation_fragment(*_schema, _permit, std::move(mf)); + } + void consume(range_tombstone_change mf) { + if (auto rt_opt = _rt_assembler.consume(*_schema, std::move(mf))) { + push_mutation_fragment(*_schema, _permit, std::move(*rt_opt)); + } + } + void consume(partition_start mf) { + _rt_assembler.reset(); + push_mutation_fragment(*_schema, _permit, std::move(mf)); + } + void consume(partition_end mf) { + _rt_assembler.on_end_of_stream(); + push_mutation_fragment(*_schema, _permit, std::move(mf)); + } + transforming_reader(flat_mutation_reader_v2&& r) + : impl(r.schema(), r.permit()) + , _reader(std::move(r)) + {} + virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { + if (_end_of_stream) { + return make_ready_future<>(); + } + return _reader.consume_pausable(consumer{this}, timeout).then([this] { + if (_reader.is_end_of_stream() && _reader.is_buffer_empty()) { + _rt_assembler.on_end_of_stream(); + _end_of_stream = true; + } + }); + } + virtual future<> next_partition() override { + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + _end_of_stream = false; + return _reader.next_partition(); + } + return make_ready_future<>(); + } + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + clear_buffer(); + _end_of_stream = false; + return _reader.fast_forward_to(pr, timeout); + } + virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { + clear_buffer(); + _end_of_stream = false; + return _reader.fast_forward_to(std::move(pr), timeout); + } + virtual future<> close() noexcept override { + return _reader.close(); + } + }; + return make_flat_mutation_reader(std::move(r)); +} + +flat_mutation_reader_v2 upgrade_to_v2(flat_mutation_reader r) { + class transforming_reader : public flat_mutation_reader_v2::impl { + flat_mutation_reader _reader; + struct consumer { + transforming_reader* _owner; + stop_iteration operator()(mutation_fragment&& mf) { + std::move(mf).consume(*_owner); + return stop_iteration(_owner->is_buffer_full()); + } + }; + range_tombstone_change_generator _rt_gen; + tombstone _current_rt; + std::optional _pr; + public: + void flush_tombstones(position_in_partition_view pos) { + _rt_gen.flush(pos, [&] (range_tombstone_change rt) { + _current_rt = rt.tombstone(); + push_mutation_fragment(*_schema, _permit, std::move(rt)); + }); + } + void consume(static_row mf) { + push_mutation_fragment(*_schema, _permit, std::move(mf)); + } + void consume(clustering_row mf) { + flush_tombstones(mf.position()); + push_mutation_fragment(*_schema, _permit, std::move(mf)); + } + void consume(range_tombstone rt) { + if (_pr) { + if (!rt.trim_front(*_schema, _pr->start())) { + return; + } + } + flush_tombstones(rt.position()); + _rt_gen.consume(std::move(rt)); + } + void consume(partition_start mf) { + _rt_gen.reset(); + _current_rt = {}; + _pr = std::nullopt; + push_mutation_fragment(*_schema, _permit, std::move(mf)); + } + void consume(partition_end mf) { + flush_tombstones(position_in_partition::after_all_clustered_rows()); + if (_current_rt) { + assert(!_pr); + push_mutation_fragment(*_schema, _permit, range_tombstone_change( + position_in_partition::after_all_clustered_rows(), {})); + } + push_mutation_fragment(*_schema, _permit, std::move(mf)); + } + transforming_reader(flat_mutation_reader&& r) + : impl(r.schema(), r.permit()) + , _reader(std::move(r)) + , _rt_gen(*_schema) + {} + virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { + if (_end_of_stream) { + return make_ready_future<>(); + } + return _reader.consume_pausable(consumer{this}, timeout).then([this] { + if (_reader.is_end_of_stream() && _reader.is_buffer_empty()) { + if (_pr) { + // If !_pr we should flush on partition_end + flush_tombstones(_pr->end()); + if (_current_rt) { + push_mutation_fragment(*_schema, _permit, range_tombstone_change(_pr->end(), {})); + } + } + _end_of_stream = true; + } + }); + } + virtual future<> next_partition() override { + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + _end_of_stream = false; + return _reader.next_partition(); + } + return make_ready_future<>(); + } + virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + clear_buffer(); + _end_of_stream = false; + return _reader.fast_forward_to(pr, timeout); + } + virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { + clear_buffer(); + // r is used to trim range tombstones and range_tombstone:s can be trimmed only to positions + // which are !is_clustering_row(). Replace with equivalent ranges. + // Long-term we should guarantee this on position_range. + if (pr.start().is_clustering_row()) { + pr.set_start(position_in_partition::before_key(pr.start().key())); + } + if (pr.end().is_clustering_row()) { + pr.set_end(position_in_partition::before_key(pr.end().key())); + } + _rt_gen.trim(pr.start()); + _current_rt = {}; + _pr = pr; + _end_of_stream = false; + return _reader.fast_forward_to(std::move(pr), timeout); + } + virtual future<> close() noexcept override { + return _reader.close(); + } + }; + return make_flat_mutation_reader_v2(std::move(r)); +} diff --git a/flat_mutation_reader_v2.hh b/flat_mutation_reader_v2.hh index 2cc98ef3aa..d6a2b38d48 100644 --- a/flat_mutation_reader_v2.hh +++ b/flat_mutation_reader_v2.hh @@ -749,3 +749,9 @@ flat_mutation_reader_v2 transform(flat_mutation_reader_v2 r, T t) { }; return make_flat_mutation_reader_v2(std::move(r), std::move(t)); } + +// Adapts a v2 reader to v1 reader +flat_mutation_reader downgrade_to_v1(flat_mutation_reader_v2); + +// Adapts a v1 reader to v2 reader +flat_mutation_reader_v2 upgrade_to_v2(flat_mutation_reader); diff --git a/range_tombstone_assembler.hh b/range_tombstone_assembler.hh new file mode 100644 index 0000000000..7f153d7a52 --- /dev/null +++ b/range_tombstone_assembler.hh @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include + +#include "mutation_fragment_v2.hh" + +/// Converts a stream of range_tombstone_change fragments to an equivalent stream of range_tombstone objects. +/// The input fragments must be ordered by their position(). +/// The produced range_tombstone objects are non-overlapping and ordered by their position(). +/// +/// on_end_of_stream() must be called after consuming all fragments to produce the final fragment. +/// +/// Example usage: +/// +/// range_tombstone_assembler rta; +/// if (auto rt_opt = rta.consume(range_tombstone_change(...))) { +/// produce(*rt_opt); +/// } +/// if (auto rt_opt = rta.consume(range_tombstone_change(...))) { +/// produce(*rt_opt); +/// } +/// if (auto rt_opt = rta.flush(position_in_partition(...)) { +/// produce(*rt_opt); +/// } +/// rta.on_end_of_stream(); +/// +class range_tombstone_assembler { + std::optional _prev_rt; +private: + bool has_active_tombstone() const { + return _prev_rt && _prev_rt->tombstone(); + } +public: + void reset() { + _prev_rt = std::nullopt; + } + + std::optional consume(const schema& s, range_tombstone_change&& rt) { + std::optional rt_opt; + auto less = position_in_partition::less_compare(s); + if (has_active_tombstone() && less(_prev_rt->position(), rt.position())) { + rt_opt = range_tombstone(_prev_rt->position(), rt.position(), _prev_rt->tombstone()); + } + _prev_rt = std::move(rt); + return rt_opt; + } + + void on_end_of_stream() { + if (has_active_tombstone()) { + throw std::logic_error(format("Stream ends with an active range tombstone: {}", *_prev_rt)); + } + } + + // Returns true if and only if flush() may return something. + // Returns false if flush() won't return anything for sure. + bool needs_flush() const { + return has_active_tombstone(); + } + + std::optional flush(const schema& s, position_in_partition_view pos) { + auto less = position_in_partition::less_compare(s); + if (has_active_tombstone() && less(_prev_rt->position(), pos)) { + position_in_partition start = _prev_rt->position(); + _prev_rt->set_position(position_in_partition(pos)); + return range_tombstone(std::move(start), pos, _prev_rt->tombstone()); + } + return std::nullopt; + } +}; diff --git a/range_tombstone_change_generator.hh b/range_tombstone_change_generator.hh new file mode 100644 index 0000000000..792297e986 --- /dev/null +++ b/range_tombstone_change_generator.hh @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "mutation_fragment_v2.hh" +#include "range_tombstone_list.hh" + +template +concept RangeTombstoneChangeConsumer = std::invocable; + +/// Generates range_tombstone_change fragments for a stream of range_tombstone fragments. +/// +/// The input range_tombstones passed to consume() may be overlapping, but must be weakly ordered by position(). +/// It's ok to pass consecutive range_tombstone objects with the same position. +/// +/// Generated range_tombstone_change fragments will have strictly monotonic positions. +/// +/// Example usage: +/// +/// consume(range_tombstone(1, +inf, t)); +/// flush(2, consumer); +/// consume(range_tombstone(2, +inf, t)); +/// flush(3, consumer); +/// consume(range_tombstone(4, +inf, t)); +/// consume(range_tombstone(4, 7, t)); +/// flush(5, consumer); +/// flush(6, consumer); +/// +class range_tombstone_change_generator { + range_tombstone_list _range_tombstones; + // All range_tombstone_change fragments with positions < than this have been emitted. + position_in_partition _lower_bound = position_in_partition::before_all_clustered_rows(); + const schema& _schema; +public: + range_tombstone_change_generator(const schema& s) + : _range_tombstones(s) + , _schema(s) + { } + + // Discards deletion information for positions < lower_bound. + // After this, the lowest position of emitted range_tombstone_change will be before_key(lower_bound). + void trim(const position_in_partition& lower_bound) { + position_in_partition::less_compare less(_schema); + + if (lower_bound.is_clustering_row()) { + _lower_bound = position_in_partition::before_key(lower_bound.key()); + } else { + _lower_bound = lower_bound; + } + + while (!_range_tombstones.empty() && !less(lower_bound, _range_tombstones.begin()->end_position())) { + _range_tombstones.pop_as(_range_tombstones.begin()); + } + + if (!_range_tombstones.empty() && less(_range_tombstones.begin()->position(), _lower_bound)) { + // _range_tombstones.begin()->end_position() < lower_bound is guaranteed by previous loop. + _range_tombstones.begin()->set_start(_lower_bound); + } + } + + // Emits range_tombstone_change fragments with positions smaller than upper_bound + // for accumulated range tombstones. + // After this, only range_tombstones with positions >= upper_bound may be added, + // which guarantees that they won't affect the output of this flush. + // FIXME: respect preemption + template + void flush(position_in_partition_view upper_bound, C consumer) { + position_in_partition::less_compare less(_schema); + std::optional prev; + + while (!_range_tombstones.empty() && less(_range_tombstones.begin()->end_position(), upper_bound)) { + auto rt = _range_tombstones.pop_as(_range_tombstones.begin()); + + if (prev && less(prev->end_position(), rt.position())) { // [1] + // previous range tombstone not adjacent, emit gap. + consumer(range_tombstone_change(prev->end_position(), tombstone())); + } + + // Check if start of rt was already emitted, emit if not. + if (!less(rt.position(), _lower_bound)) { + consumer(range_tombstone_change(rt.position(), rt.tomb)); + } + + // Delay emitting end bound in case it's adjacent with the next tombstone. See [1] and [2] + prev = std::move(rt); + } + + // If previous range tombstone not adjacent with current, emit gap. + // It cannot get adjacent later because prev->end_position() < upper_bound, + // so nothing == prev->end_position() can be added after this invocation. + if (prev && (_range_tombstones.empty() + || less(prev->end_position(), _range_tombstones.begin()->position()))) { + consumer(range_tombstone_change(prev->end_position(), tombstone())); // [2] + } + + // Emit the fragment for start bound of a range_tombstone which is overlapping with upper_bound, + // unless no such fragment or already emitted. + if (!_range_tombstones.empty() + && less(_range_tombstones.begin()->position(), upper_bound) + && (!less(_range_tombstones.begin()->position(), _lower_bound))) { + consumer(range_tombstone_change( + _range_tombstones.begin()->position(), _range_tombstones.begin()->tomb)); + } + + _lower_bound = upper_bound; + } + + void consume(range_tombstone rt) { + _range_tombstones.apply(_schema, std::move(rt)); + } + + void reset() { + _range_tombstones.clear(); + _lower_bound = position_in_partition::before_all_clustered_rows(); + } +};