mutation_readers: pass tombstone_gc_state to compating_reader

To be passed further done to `compact_mutation_state` in
a following patch.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-09-06 21:41:03 +03:00
parent 572d534d0d
commit 7e4612d3aa
6 changed files with 19 additions and 7 deletions

View File

@@ -684,7 +684,8 @@ private:
reader.consume_in_thread(std::move(cfc));
});
});
return consumer(make_compacting_reader(make_sstable_reader(), compaction_time, max_purgeable_func()));
const auto& gc_state = _table_s.get_tombstone_gc_state();
return consumer(make_compacting_reader(make_sstable_reader(), compaction_time, max_purgeable_func(), gc_state));
}
future<> consume() {

View File

@@ -16,6 +16,8 @@ namespace dht {
class decorated_key;
}
class tombstone_gc_state;
/// Creates a compacting reader.
///
/// The compaction is done with a \ref mutation_compactor, using compaction-type
@@ -33,4 +35,5 @@ class decorated_key;
/// if the source reader supports it
flat_mutation_reader_v2 make_compacting_reader(flat_mutation_reader_v2 source, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable,
const tombstone_gc_state& gc_state,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);

View File

@@ -28,6 +28,7 @@
#include "readers/queue.hh"
#include "readers/reversing_v2.hh"
#include "readers/upgrading_consumer.hh"
#include "tombstone_gc.hh"
#include <seastar/core/coroutine.hh>
#include <stack>
@@ -1456,6 +1457,8 @@ private:
public:
compacting_reader(flat_mutation_reader_v2 source, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable,
// FIXME: pass to _compactor
const tombstone_gc_state& gc_state,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no)
: impl(source.schema(), source.permit())
, _reader(std::move(source))
@@ -1541,6 +1544,7 @@ public:
} // anonymous namespace
flat_mutation_reader_v2 make_compacting_reader(flat_mutation_reader_v2 source, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable, streamed_mutation::forwarding fwd) {
return make_flat_mutation_reader_v2<compacting_reader>(std::move(source), compaction_time, get_max_purgeable, fwd);
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable,
const tombstone_gc_state& gc_state, streamed_mutation::forwarding fwd) {
return make_flat_mutation_reader_v2<compacting_reader>(std::move(source), compaction_time, get_max_purgeable, gc_state, fwd);
}

View File

@@ -1284,6 +1284,7 @@ table::sstables_as_snapshot_source() {
std::move(reader),
gc_clock::now(),
[](const dht::decorated_key&) { return api::min_timestamp; },
_compaction_manager.get_tombstone_gc_state(),
fwd);
}, [this, sst_set] {
return make_partition_presence_checker(sst_set);

View File

@@ -911,7 +911,8 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_reads_in_native_reverse_order) {
auto compacted = [] (flat_mutation_reader_v2 rd) {
return make_compacting_reader(std::move(rd),
gc_clock::time_point::max(),
[] (const dht::decorated_key&) { return api::max_timestamp; });
[] (const dht::decorated_key&) { return api::max_timestamp; },
tombstone_gc_state(nullptr));
};
auto reversed_forward_reader = assert_that(compacted(

View File

@@ -2585,7 +2585,8 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_as_mutation_source) {
source = make_forwardable(std::move(source));
}
auto mr = make_compacting_reader(std::move(source), query_time,
[] (const dht::decorated_key&) { return api::min_timestamp; }, fwd_sm);
[] (const dht::decorated_key&) { return api::min_timestamp; },
tombstone_gc_state(nullptr), fwd_sm);
if (single_fragment_buffer) {
mr.set_max_buffer_size(1);
}
@@ -2640,7 +2641,8 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_next_partition) {
auto mr = make_compacting_reader(make_flat_mutation_reader_from_fragments(ss.schema(), permit, std::move(mfs)),
gc_clock::now(),
[] (const dht::decorated_key&) { return api::min_timestamp; });
[] (const dht::decorated_key&) { return api::min_timestamp; },
tombstone_gc_state(nullptr));
mr.set_max_buffer_size(buffer_size);
return mr;
@@ -2685,7 +2687,7 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_is_consistent_with_compaction) {
.produces_range_tombstone_change({position_in_partition::for_range_end(r), {}})
.produces_partition_end();
assert_that(make_compacting_reader(read_m(), gc_clock::time_point::min(), [] (const dht::decorated_key&) { return api::min_timestamp; }))
assert_that(make_compacting_reader(read_m(), gc_clock::time_point::min(), [] (const dht::decorated_key&) { return api::min_timestamp; }, tombstone_gc_state(nullptr)))
.exact()
.produces_partition_start(m.decorated_key(), p_tomb)
.produces_partition_end();