streamed_mutation: add reverse_streamed_mutation()

reverse_streamed_mutation() is an inefficient way of reversing
streamed_mutations. First, it collects all mutation_fragments and then
it emits them in the reversed orders (except static row which always is
the first element and it also flips the bounds of range tombstones).

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
Paweł Dziepak
2016-06-14 17:21:52 +01:00
parent f676d1779b
commit 5566d23180
2 changed files with 61 additions and 0 deletions

View File

@@ -19,6 +19,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stack>
#include <boost/range/algorithm/heap_algorithm.hpp>
#include "mutation.hh"
@@ -427,3 +428,62 @@ mutation_fragment_opt range_tombstone_stream::get_next()
}
return { };
}
streamed_mutation reverse_streamed_mutation(streamed_mutation sm) {
class reversing_steamed_mutation final : public streamed_mutation::impl {
streamed_mutation_opt _source;
mutation_fragment_opt _static_row;
std::stack<mutation_fragment> _mutation_fragments;
tombstone _current_tombstone;
private:
future<> consume_source() {
return repeat([&] {
return (*_source)().then([&] (mutation_fragment_opt mf) {
if (!mf) {
return stop_iteration::yes;
} else if (mf->is_static_row()) {
_static_row = std::move(mf);
} else if (mf->is_range_tombstone_begin()) {
auto& rtb = mf->as_range_tombstone_begin();
_mutation_fragments.emplace(range_tombstone_end(std::move(rtb.key()), flip_bound_kind(rtb.kind())));
_current_tombstone = rtb.tomb();
} else if (mf->is_range_tombstone_end()) {
auto& rte = mf->as_range_tombstone_end();
_mutation_fragments.emplace(range_tombstone_begin(std::move(rte.key()), flip_bound_kind(rte.kind()), _current_tombstone));
} else {
_mutation_fragments.emplace(std::move(*mf));
}
return stop_iteration::no;
});
}).then([&] {
_source = { };
});
}
public:
explicit reversing_steamed_mutation(streamed_mutation sm)
: streamed_mutation::impl(sm.schema(), sm.decorated_key(), sm.partition_tombstone())
, _source(std::move(sm))
{ }
virtual future<> fill_buffer() override {
if (_source) {
return consume_source().then([this] { return fill_buffer(); });
}
if (_static_row) {
push_mutation_fragment(std::move(*_static_row));
_static_row = { };
}
while (!is_end_of_stream() && !is_buffer_full()) {
if (_mutation_fragments.empty()) {
_end_of_stream = true;
} else {
push_mutation_fragment(std::move(_mutation_fragments.top()));
_mutation_fragments.pop();
}
}
return make_ready_future<>();
}
};
return make_streamed_mutation<reversing_steamed_mutation>(std::move(sm));
};

View File

@@ -545,6 +545,7 @@ streamed_mutation streamed_mutation_from_mutation(mutation);
//Requires all streamed_mutations to have the same schema.
streamed_mutation merge_mutations(std::vector<streamed_mutation>);
streamed_mutation reverse_streamed_mutation(streamed_mutation);
// range_tombstone_stream is a helper object that simplifies producing a stream
// of range tombstones and merging it with a stream of clustering rows.