flat_mutation_reader: Introduce adaptors between v1 and v2 of mutation fragment stream
The transition to v2 will be incremental. To support that, we need adaptors between v1 and v2 which will be inserted at places which are boundaries of conversion. The v1 -> v2 converter needs to accumulate range tombstones, so has unbounded worst case memory footprint. The v2 -> v1 converter trims range tombstones around clustering rows, so generates more fragments than necessary. Because of that, adpators are a temporary solution and we should not release with them on the produciton code paths.
This commit is contained in:
@@ -21,6 +21,8 @@
|
||||
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "flat_mutation_reader_v2.hh"
|
||||
#include "range_tombstone_assembler.hh"
|
||||
#include "range_tombstone_change_generator.hh"
|
||||
#include "mutation_fragment_stream_validator.hh"
|
||||
#include "mutation_reader.hh"
|
||||
#include "seastar/util/reference_wrapper.hh"
|
||||
@@ -1273,3 +1275,189 @@ void flat_mutation_reader_v2::on_close_error(std::unique_ptr<impl> i, std::excep
|
||||
on_internal_error_noexcept(fmr_logger,
|
||||
format("Failed to close {} [{}]: permit {}: {}", typeid(*ip).name(), fmt::ptr(ip), ip->_permit.description(), ep));
|
||||
}
|
||||
|
||||
flat_mutation_reader downgrade_to_v1(flat_mutation_reader_v2 r) {
|
||||
class transforming_reader : public flat_mutation_reader::impl {
|
||||
flat_mutation_reader_v2 _reader;
|
||||
struct consumer {
|
||||
transforming_reader* _owner;
|
||||
stop_iteration operator()(mutation_fragment_v2&& mf) {
|
||||
std::move(mf).consume(*_owner);
|
||||
return stop_iteration(_owner->is_buffer_full());
|
||||
}
|
||||
};
|
||||
range_tombstone_assembler _rt_assembler;
|
||||
public:
|
||||
void consume(static_row mf) {
|
||||
push_mutation_fragment(*_schema, _permit, std::move(mf));
|
||||
}
|
||||
void consume(clustering_row mf) {
|
||||
if (_rt_assembler.needs_flush()) {
|
||||
if (auto rt_opt = _rt_assembler.flush(*_schema, position_in_partition::after_key(mf.position()))) {
|
||||
push_mutation_fragment(*_schema, _permit, std::move(*rt_opt));
|
||||
}
|
||||
}
|
||||
push_mutation_fragment(*_schema, _permit, std::move(mf));
|
||||
}
|
||||
void consume(range_tombstone_change mf) {
|
||||
if (auto rt_opt = _rt_assembler.consume(*_schema, std::move(mf))) {
|
||||
push_mutation_fragment(*_schema, _permit, std::move(*rt_opt));
|
||||
}
|
||||
}
|
||||
void consume(partition_start mf) {
|
||||
_rt_assembler.reset();
|
||||
push_mutation_fragment(*_schema, _permit, std::move(mf));
|
||||
}
|
||||
void consume(partition_end mf) {
|
||||
_rt_assembler.on_end_of_stream();
|
||||
push_mutation_fragment(*_schema, _permit, std::move(mf));
|
||||
}
|
||||
transforming_reader(flat_mutation_reader_v2&& r)
|
||||
: impl(r.schema(), r.permit())
|
||||
, _reader(std::move(r))
|
||||
{}
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
if (_end_of_stream) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _reader.consume_pausable(consumer{this}, timeout).then([this] {
|
||||
if (_reader.is_end_of_stream() && _reader.is_buffer_empty()) {
|
||||
_rt_assembler.on_end_of_stream();
|
||||
_end_of_stream = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
virtual future<> next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_end_of_stream = false;
|
||||
return _reader.next_partition();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(pr, timeout);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(std::move(pr), timeout);
|
||||
}
|
||||
virtual future<> close() noexcept override {
|
||||
return _reader.close();
|
||||
}
|
||||
};
|
||||
return make_flat_mutation_reader<transforming_reader>(std::move(r));
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 upgrade_to_v2(flat_mutation_reader r) {
|
||||
class transforming_reader : public flat_mutation_reader_v2::impl {
|
||||
flat_mutation_reader _reader;
|
||||
struct consumer {
|
||||
transforming_reader* _owner;
|
||||
stop_iteration operator()(mutation_fragment&& mf) {
|
||||
std::move(mf).consume(*_owner);
|
||||
return stop_iteration(_owner->is_buffer_full());
|
||||
}
|
||||
};
|
||||
range_tombstone_change_generator _rt_gen;
|
||||
tombstone _current_rt;
|
||||
std::optional<position_range> _pr;
|
||||
public:
|
||||
void flush_tombstones(position_in_partition_view pos) {
|
||||
_rt_gen.flush(pos, [&] (range_tombstone_change rt) {
|
||||
_current_rt = rt.tombstone();
|
||||
push_mutation_fragment(*_schema, _permit, std::move(rt));
|
||||
});
|
||||
}
|
||||
void consume(static_row mf) {
|
||||
push_mutation_fragment(*_schema, _permit, std::move(mf));
|
||||
}
|
||||
void consume(clustering_row mf) {
|
||||
flush_tombstones(mf.position());
|
||||
push_mutation_fragment(*_schema, _permit, std::move(mf));
|
||||
}
|
||||
void consume(range_tombstone rt) {
|
||||
if (_pr) {
|
||||
if (!rt.trim_front(*_schema, _pr->start())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
flush_tombstones(rt.position());
|
||||
_rt_gen.consume(std::move(rt));
|
||||
}
|
||||
void consume(partition_start mf) {
|
||||
_rt_gen.reset();
|
||||
_current_rt = {};
|
||||
_pr = std::nullopt;
|
||||
push_mutation_fragment(*_schema, _permit, std::move(mf));
|
||||
}
|
||||
void consume(partition_end mf) {
|
||||
flush_tombstones(position_in_partition::after_all_clustered_rows());
|
||||
if (_current_rt) {
|
||||
assert(!_pr);
|
||||
push_mutation_fragment(*_schema, _permit, range_tombstone_change(
|
||||
position_in_partition::after_all_clustered_rows(), {}));
|
||||
}
|
||||
push_mutation_fragment(*_schema, _permit, std::move(mf));
|
||||
}
|
||||
transforming_reader(flat_mutation_reader&& r)
|
||||
: impl(r.schema(), r.permit())
|
||||
, _reader(std::move(r))
|
||||
, _rt_gen(*_schema)
|
||||
{}
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
if (_end_of_stream) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _reader.consume_pausable(consumer{this}, timeout).then([this] {
|
||||
if (_reader.is_end_of_stream() && _reader.is_buffer_empty()) {
|
||||
if (_pr) {
|
||||
// If !_pr we should flush on partition_end
|
||||
flush_tombstones(_pr->end());
|
||||
if (_current_rt) {
|
||||
push_mutation_fragment(*_schema, _permit, range_tombstone_change(_pr->end(), {}));
|
||||
}
|
||||
}
|
||||
_end_of_stream = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
virtual future<> next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_end_of_stream = false;
|
||||
return _reader.next_partition();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(pr, timeout);
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
clear_buffer();
|
||||
// r is used to trim range tombstones and range_tombstone:s can be trimmed only to positions
|
||||
// which are !is_clustering_row(). Replace with equivalent ranges.
|
||||
// Long-term we should guarantee this on position_range.
|
||||
if (pr.start().is_clustering_row()) {
|
||||
pr.set_start(position_in_partition::before_key(pr.start().key()));
|
||||
}
|
||||
if (pr.end().is_clustering_row()) {
|
||||
pr.set_end(position_in_partition::before_key(pr.end().key()));
|
||||
}
|
||||
_rt_gen.trim(pr.start());
|
||||
_current_rt = {};
|
||||
_pr = pr;
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(std::move(pr), timeout);
|
||||
}
|
||||
virtual future<> close() noexcept override {
|
||||
return _reader.close();
|
||||
}
|
||||
};
|
||||
return make_flat_mutation_reader_v2<transforming_reader>(std::move(r));
|
||||
}
|
||||
|
||||
@@ -749,3 +749,9 @@ flat_mutation_reader_v2 transform(flat_mutation_reader_v2 r, T t) {
|
||||
};
|
||||
return make_flat_mutation_reader_v2<transforming_reader>(std::move(r), std::move(t));
|
||||
}
|
||||
|
||||
// Adapts a v2 reader to v1 reader
|
||||
flat_mutation_reader downgrade_to_v1(flat_mutation_reader_v2);
|
||||
|
||||
// Adapts a v1 reader to v2 reader
|
||||
flat_mutation_reader_v2 upgrade_to_v2(flat_mutation_reader);
|
||||
|
||||
91
range_tombstone_assembler.hh
Normal file
91
range_tombstone_assembler.hh
Normal file
@@ -0,0 +1,91 @@
|
||||
/*
|
||||
* Copyright (C) 2021 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <exception>
|
||||
#include <seastar/core/print.hh>
|
||||
|
||||
#include "mutation_fragment_v2.hh"
|
||||
|
||||
/// Converts a stream of range_tombstone_change fragments to an equivalent stream of range_tombstone objects.
|
||||
/// The input fragments must be ordered by their position().
|
||||
/// The produced range_tombstone objects are non-overlapping and ordered by their position().
|
||||
///
|
||||
/// on_end_of_stream() must be called after consuming all fragments to produce the final fragment.
|
||||
///
|
||||
/// Example usage:
|
||||
///
|
||||
/// range_tombstone_assembler rta;
|
||||
/// if (auto rt_opt = rta.consume(range_tombstone_change(...))) {
|
||||
/// produce(*rt_opt);
|
||||
/// }
|
||||
/// if (auto rt_opt = rta.consume(range_tombstone_change(...))) {
|
||||
/// produce(*rt_opt);
|
||||
/// }
|
||||
/// if (auto rt_opt = rta.flush(position_in_partition(...)) {
|
||||
/// produce(*rt_opt);
|
||||
/// }
|
||||
/// rta.on_end_of_stream();
|
||||
///
|
||||
class range_tombstone_assembler {
|
||||
std::optional<range_tombstone_change> _prev_rt;
|
||||
private:
|
||||
bool has_active_tombstone() const {
|
||||
return _prev_rt && _prev_rt->tombstone();
|
||||
}
|
||||
public:
|
||||
void reset() {
|
||||
_prev_rt = std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<range_tombstone> consume(const schema& s, range_tombstone_change&& rt) {
|
||||
std::optional<range_tombstone> rt_opt;
|
||||
auto less = position_in_partition::less_compare(s);
|
||||
if (has_active_tombstone() && less(_prev_rt->position(), rt.position())) {
|
||||
rt_opt = range_tombstone(_prev_rt->position(), rt.position(), _prev_rt->tombstone());
|
||||
}
|
||||
_prev_rt = std::move(rt);
|
||||
return rt_opt;
|
||||
}
|
||||
|
||||
void on_end_of_stream() {
|
||||
if (has_active_tombstone()) {
|
||||
throw std::logic_error(format("Stream ends with an active range tombstone: {}", *_prev_rt));
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if and only if flush() may return something.
|
||||
// Returns false if flush() won't return anything for sure.
|
||||
bool needs_flush() const {
|
||||
return has_active_tombstone();
|
||||
}
|
||||
|
||||
std::optional<range_tombstone> flush(const schema& s, position_in_partition_view pos) {
|
||||
auto less = position_in_partition::less_compare(s);
|
||||
if (has_active_tombstone() && less(_prev_rt->position(), pos)) {
|
||||
position_in_partition start = _prev_rt->position();
|
||||
_prev_rt->set_position(position_in_partition(pos));
|
||||
return range_tombstone(std::move(start), pos, _prev_rt->tombstone());
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
135
range_tombstone_change_generator.hh
Normal file
135
range_tombstone_change_generator.hh
Normal file
@@ -0,0 +1,135 @@
|
||||
/*
|
||||
* Copyright (C) 2021 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mutation_fragment_v2.hh"
|
||||
#include "range_tombstone_list.hh"
|
||||
|
||||
template<typename T>
|
||||
concept RangeTombstoneChangeConsumer = std::invocable<T, range_tombstone_change>;
|
||||
|
||||
/// Generates range_tombstone_change fragments for a stream of range_tombstone fragments.
|
||||
///
|
||||
/// The input range_tombstones passed to consume() may be overlapping, but must be weakly ordered by position().
|
||||
/// It's ok to pass consecutive range_tombstone objects with the same position.
|
||||
///
|
||||
/// Generated range_tombstone_change fragments will have strictly monotonic positions.
|
||||
///
|
||||
/// Example usage:
|
||||
///
|
||||
/// consume(range_tombstone(1, +inf, t));
|
||||
/// flush(2, consumer);
|
||||
/// consume(range_tombstone(2, +inf, t));
|
||||
/// flush(3, consumer);
|
||||
/// consume(range_tombstone(4, +inf, t));
|
||||
/// consume(range_tombstone(4, 7, t));
|
||||
/// flush(5, consumer);
|
||||
/// flush(6, consumer);
|
||||
///
|
||||
class range_tombstone_change_generator {
|
||||
range_tombstone_list _range_tombstones;
|
||||
// All range_tombstone_change fragments with positions < than this have been emitted.
|
||||
position_in_partition _lower_bound = position_in_partition::before_all_clustered_rows();
|
||||
const schema& _schema;
|
||||
public:
|
||||
range_tombstone_change_generator(const schema& s)
|
||||
: _range_tombstones(s)
|
||||
, _schema(s)
|
||||
{ }
|
||||
|
||||
// Discards deletion information for positions < lower_bound.
|
||||
// After this, the lowest position of emitted range_tombstone_change will be before_key(lower_bound).
|
||||
void trim(const position_in_partition& lower_bound) {
|
||||
position_in_partition::less_compare less(_schema);
|
||||
|
||||
if (lower_bound.is_clustering_row()) {
|
||||
_lower_bound = position_in_partition::before_key(lower_bound.key());
|
||||
} else {
|
||||
_lower_bound = lower_bound;
|
||||
}
|
||||
|
||||
while (!_range_tombstones.empty() && !less(lower_bound, _range_tombstones.begin()->end_position())) {
|
||||
_range_tombstones.pop_as<range_tombstone>(_range_tombstones.begin());
|
||||
}
|
||||
|
||||
if (!_range_tombstones.empty() && less(_range_tombstones.begin()->position(), _lower_bound)) {
|
||||
// _range_tombstones.begin()->end_position() < lower_bound is guaranteed by previous loop.
|
||||
_range_tombstones.begin()->set_start(_lower_bound);
|
||||
}
|
||||
}
|
||||
|
||||
// Emits range_tombstone_change fragments with positions smaller than upper_bound
|
||||
// for accumulated range tombstones.
|
||||
// After this, only range_tombstones with positions >= upper_bound may be added,
|
||||
// which guarantees that they won't affect the output of this flush.
|
||||
// FIXME: respect preemption
|
||||
template<RangeTombstoneChangeConsumer C>
|
||||
void flush(position_in_partition_view upper_bound, C consumer) {
|
||||
position_in_partition::less_compare less(_schema);
|
||||
std::optional<range_tombstone> prev;
|
||||
|
||||
while (!_range_tombstones.empty() && less(_range_tombstones.begin()->end_position(), upper_bound)) {
|
||||
auto rt = _range_tombstones.pop_as<range_tombstone>(_range_tombstones.begin());
|
||||
|
||||
if (prev && less(prev->end_position(), rt.position())) { // [1]
|
||||
// previous range tombstone not adjacent, emit gap.
|
||||
consumer(range_tombstone_change(prev->end_position(), tombstone()));
|
||||
}
|
||||
|
||||
// Check if start of rt was already emitted, emit if not.
|
||||
if (!less(rt.position(), _lower_bound)) {
|
||||
consumer(range_tombstone_change(rt.position(), rt.tomb));
|
||||
}
|
||||
|
||||
// Delay emitting end bound in case it's adjacent with the next tombstone. See [1] and [2]
|
||||
prev = std::move(rt);
|
||||
}
|
||||
|
||||
// If previous range tombstone not adjacent with current, emit gap.
|
||||
// It cannot get adjacent later because prev->end_position() < upper_bound,
|
||||
// so nothing == prev->end_position() can be added after this invocation.
|
||||
if (prev && (_range_tombstones.empty()
|
||||
|| less(prev->end_position(), _range_tombstones.begin()->position()))) {
|
||||
consumer(range_tombstone_change(prev->end_position(), tombstone())); // [2]
|
||||
}
|
||||
|
||||
// Emit the fragment for start bound of a range_tombstone which is overlapping with upper_bound,
|
||||
// unless no such fragment or already emitted.
|
||||
if (!_range_tombstones.empty()
|
||||
&& less(_range_tombstones.begin()->position(), upper_bound)
|
||||
&& (!less(_range_tombstones.begin()->position(), _lower_bound))) {
|
||||
consumer(range_tombstone_change(
|
||||
_range_tombstones.begin()->position(), _range_tombstones.begin()->tomb));
|
||||
}
|
||||
|
||||
_lower_bound = upper_bound;
|
||||
}
|
||||
|
||||
void consume(range_tombstone rt) {
|
||||
_range_tombstones.apply(_schema, std::move(rt));
|
||||
}
|
||||
|
||||
void reset() {
|
||||
_range_tombstones.clear();
|
||||
_lower_bound = position_in_partition::before_all_clustered_rows();
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user