frozen_mutation: fragment_and_freeze(): convert to v2

This commit is contained in:
Botond Dénes
2022-03-18 13:42:03 +02:00
parent 7f3986ed1a
commit 2e634883d9
3 changed files with 23 additions and 15 deletions

View File

@@ -26,7 +26,7 @@
#include "idl/uuid.dist.impl.hh"
#include "idl/keys.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
#include "readers/flat_mutation_reader.hh"
#include "readers/flat_mutation_reader_v2.hh"
#include "converting_mutation_partition_applier.hh"
#include "mutation_partition_view.hh"
@@ -190,6 +190,8 @@ class fragmenting_mutation_freezer {
bool _fragmented = false;
size_t _dirty_size = 0;
size_t _fragment_size;
range_tombstone_change _current_rtc;
private:
future<stop_iteration> flush() {
bytes_ostream out;
@@ -219,7 +221,7 @@ private:
}
public:
fragmenting_mutation_freezer(const schema& s, frozen_mutation_consumer_fn c, size_t fragment_size)
: _schema(s), _rts(s), _consumer(c), _fragment_size(fragment_size) { }
: _schema(s), _rts(s), _consumer(c), _fragment_size(fragment_size), _current_rtc(position_in_partition::before_all_clustered_rows(), {}) { }
future<stop_iteration> consume(partition_start&& ps) {
_key = std::move(ps.key().key());
@@ -241,10 +243,16 @@ public:
return maybe_flush();
}
future<stop_iteration> consume(range_tombstone&& rt) {
_dirty_size += rt.memory_usage(_schema);
_rts.apply(_schema, std::move(rt));
return maybe_flush();
future<stop_iteration> consume(range_tombstone_change&& rtc) {
auto ret = make_ready_future<stop_iteration>(stop_iteration::no);
if (_current_rtc.tombstone()) {
auto rt = range_tombstone(_current_rtc.position(), rtc.position(), _current_rtc.tombstone());
_dirty_size += rt.memory_usage(_schema);
_rts.apply(_schema, std::move(rt));
ret = maybe_flush();
}
_current_rtc = std::move(rtc);
return ret;
}
future<stop_iteration> consume(partition_end&&) {
@@ -255,12 +263,12 @@ public:
}
};
future<> fragment_and_freeze(flat_mutation_reader mr, frozen_mutation_consumer_fn c, size_t fragment_size)
future<> fragment_and_freeze(flat_mutation_reader_v2 mr, frozen_mutation_consumer_fn c, size_t fragment_size)
{
std::exception_ptr ex;
try {
fragmenting_mutation_freezer freezer(*mr.schema(), c, fragment_size);
mutation_fragment_opt mfopt;
mutation_fragment_v2_opt mfopt;
while ((mfopt = co_await mr()) && (co_await std::move(*mfopt).consume(freezer) == stop_iteration::no));
} catch (...) {
ex = std::current_exception();

View File

@@ -14,7 +14,7 @@
class mutation;
class mutation_partition_view;
class flat_mutation_reader;
class flat_mutation_reader_v2;
namespace ser {
class mutation_view;
@@ -103,7 +103,7 @@ public:
static constexpr size_t default_frozen_fragment_size = 128 * 1024;
using frozen_mutation_consumer_fn = std::function<future<stop_iteration>(frozen_mutation, bool)>;
future<> fragment_and_freeze(flat_mutation_reader mr, frozen_mutation_consumer_fn c,
future<> fragment_and_freeze(flat_mutation_reader_v2 mr, frozen_mutation_consumer_fn c,
size_t fragment_size = default_frozen_fragment_size);
class reader_permit;

View File

@@ -206,7 +206,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
for_each_mutation([&] (const mutation& m) {
std::vector<frozen_mutation> fms;
fragment_and_freeze(make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!frag);
fms.emplace_back(std::move(fm));
return make_ready_future<stop_iteration>(stop_iteration::no);
@@ -220,7 +220,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
fms.clear();
std::optional<bool> fragmented;
fragment_and_freeze(make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!fragmented || *fragmented == frag);
*fragmented = frag;
fms.emplace_back(std::move(fm));
@@ -252,7 +252,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
std::vector<frozen_mutation> frozen;
// Freeze all
fragment_and_freeze(make_flat_mutation_reader_from_mutations(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!frag);
frozen.emplace_back(fm);
return make_ready_future<stop_iteration>(stop_iteration::no);
@@ -264,7 +264,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
// Freeze first
frozen.clear();
fragment_and_freeze(make_flat_mutation_reader_from_mutations(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!frag);
frozen.emplace_back(fm);
return make_ready_future<stop_iteration>(stop_iteration::yes);
@@ -274,7 +274,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
// Fragment and freeze all
frozen.clear();
fragment_and_freeze(make_flat_mutation_reader_from_mutations(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
frozen.emplace_back(fm);
return make_ready_future<stop_iteration>(stop_iteration::no);
}, 1).get0();