test/mutation_test: test_compactor_validator_sanity_test

Greatly expand this test to check that the compactor validates the input
stream properly.
The test is renamed (the _sanity_test suffix is removed) to reflect the
expanded scope.
This commit is contained in:
Botond Dénes
2023-05-09 02:34:21 -04:00
parent 18ed94e60b
commit a35f4f6985

View File

@@ -3576,10 +3576,15 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_detach_state) {
}
};
// Check that consumed fragments are forwarded intact to the validator.
SEASTAR_THREAD_TEST_CASE(test_compactor_validator_sanity_test) {
SEASTAR_THREAD_TEST_CASE(test_compactor_validator) {
const auto abort_ie = set_abort_on_internal_error(false);
auto reset_abort_ie = defer([abort_ie] {
set_abort_on_internal_error(abort_ie);
});
simple_schema ss;
auto pk = ss.make_pkey();
auto pks = ss.make_pkeys(2);
auto cks = ss.make_ckeys(3);
auto s = ss.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
@@ -3587,40 +3592,33 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_validator_sanity_test) {
auto permit = semaphore.make_permit();
const auto expiry_point = gc_clock::now() + std::chrono::days(10);
const auto base_ts = ss.new_timestamp();
const auto row_ts = base_ts + 1;
const auto rtc_tombstone_ts = base_ts + 4;
const auto partition_tombstone_ts = base_ts + 5;
const auto row_ts2 = base_ts + 6;
const auto marker_ts = ss.new_timestamp();
const auto tomb_ts = ss.new_timestamp();
const auto row_ts = ss.new_timestamp();
const auto query_time = gc_clock::now();
const auto max_rows = std::numeric_limits<uint64_t>::max();
const auto max_partitions = std::numeric_limits<uint32_t>::max();
auto make_frags = [&] {
std::deque<mutation_fragment_v2> frags;
frags.emplace_back(*s, permit, partition_start(pk, {}));
frags.emplace_back(*s, permit, ss.make_static_row_v2(permit, "static_row"));
auto make_cr = [&] (const clustering_key& ckey, api::timestamp_type ts) {
const auto& v_def = *s->get_column_definition(to_bytes("v"));
for (uint32_t ck = 0; ck < 2; ++ck) {
auto ckey = ss.make_ckey(ck);
frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::before_key(ckey), tombstone{tomb_ts, expiry_point}));
auto row = clustering_row(ckey);
row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, row_ts, serialized("v")));
row.marker() = row_marker(marker_ts);
frags.emplace_back(mutation_fragment_v2(*s, permit, std::move(row)));
}
frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(10)), tombstone{}));
frags.emplace_back(*s, permit, partition_end{});
return frags;
auto row = clustering_row(ckey);
row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, ts, serialized("v")));
row.marker() = row_marker(ts);
return mutation_fragment_v2(*s, permit, std::move(row));
};
mutation_fragment_v2 ps1(*s, permit, partition_start(pks[0], {}));
mutation_fragment_v2 ps1_tomb(*s, permit, partition_start(pks[0], {partition_tombstone_ts, expiry_point}));
mutation_fragment_v2 ps2(*s, permit, partition_start(pks[1], {}));
mutation_fragment_v2 sr(*s, permit, ss.make_static_row_v2(permit, "static_row"));
mutation_fragment_v2 cr1(*s, permit, make_cr(cks[0], row_ts));
mutation_fragment_v2 cr1_high_ts(*s, permit, make_cr(cks[0], row_ts2));
mutation_fragment_v2 cr2(*s, permit, make_cr(cks[1], row_ts));
mutation_fragment_v2 cr3(*s, permit, make_cr(cks[2], row_ts));
mutation_fragment_v2 rtc1(*s, permit, range_tombstone_change(position_in_partition::before_key(cks[0]), tombstone{rtc_tombstone_ts, expiry_point}));
mutation_fragment_v2 rtc2(*s, permit, range_tombstone_change(position_in_partition::before_key(cks[1]), tombstone{rtc_tombstone_ts, expiry_point}));
mutation_fragment_v2 rtc_end(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, cks[2]), tombstone{}));
mutation_fragment_v2 pe(*s, permit, partition_end());
struct consumer_v2 {
void consume_new_partition(const dht::decorated_key& dk) { }
void consume(const tombstone& t) { }
@@ -3642,11 +3640,88 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_validator_sanity_test) {
void consume_end_of_stream() { }
};
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions,
mutation_fragment_stream_validation_level::clustering_key);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, consumer_v2{})).get();
auto check = [&] (std::initializer_list<std::reference_wrapper<const mutation_fragment_v2>> frag_refs, bool expected_is_valid) {
std::deque<mutation_fragment_v2> frags;
for (const auto& frag_ref : frag_refs) {
frags.emplace_back(*s, permit, frag_ref.get());
}
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, gc_clock::now(), s->full_slice(),
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), mutation_fragment_stream_validation_level::clustering_key);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, std::move(frags));
auto close_reader = deferred_close(reader);
bool is_valid = true;
try {
reader.consume(compact_for_query_v2<consumer_v2>(compaction_state, consumer_v2{})).get();
} catch (invalid_mutation_fragment_stream& ex) {
is_valid = false;
}
if (expected_is_valid != is_valid) {
auto msg = fmt::format("expected_is_valid ({}) != is_valid ({}), fragments:\n{}",
expected_is_valid,
is_valid,
fmt::join(frag_refs | boost::adaptors::transformed([&] (std::reference_wrapper<const mutation_fragment_v2> mf) {
return fmt::to_string(mutation_fragment_v2::printer(*s, mf.get()));
}), "\n"));
BOOST_FAIL(msg);
}
};
auto check_valid = [&] (std::initializer_list<std::reference_wrapper<const mutation_fragment_v2>> frag_refs) {
return check(frag_refs, true);
};
auto check_invalid = [&] (std::initializer_list<std::reference_wrapper<const mutation_fragment_v2>> frag_refs) {
return check(frag_refs, false);
};
// Partitions
check_valid({ps1, pe});
check_valid({ps1, pe, ps2, pe});
check_invalid({pe, ps1, pe});
check_invalid({ps2, pe, ps1, pe});
check_invalid({ps1});
check_invalid({ps1, pe, ps2});
// + static row
check_valid({ps1, sr, pe});
check_valid({ps1_tomb, sr, pe});
check_valid({ps1, sr, pe, ps2, sr, pe});
check_invalid({ps1, pe, sr, ps2, pe});
check_invalid({sr, ps1, pe});
// + clustering row
check_valid({ps1, cr1, pe});
check_valid({ps1, sr, cr1, pe});
check_valid({ps1, cr1, cr2, pe});
check_valid({ps1, sr, cr1, cr2, pe});
check_valid({ps1_tomb, cr1, pe});
check_valid({ps1_tomb, cr1, cr2, pe});
check_valid({ps1, cr1, pe, ps2, cr1, pe});
check_invalid({ps1, pe, cr1, ps2, pe});
check_invalid({cr1, ps1, pe});
check_invalid({ps1, cr1, sr, pe});
check_invalid({ps1_tomb, cr1, sr, pe});
check_invalid({ps1_tomb, cr1_high_ts, sr, pe});
check_invalid({ps1, cr2, cr1, pe});
// + range tombstones
check_valid({ps1, rtc1, rtc_end, pe});
check_valid({ps1, rtc1, rtc1, rtc_end, pe});
check_valid({ps1, rtc1, rtc1, cr1, rtc_end, pe});
check_valid({ps1, sr, rtc1, cr1, rtc_end, pe});
check_valid({ps1, rtc1, rtc2, rtc_end, pe});
check_valid({ps1, sr, rtc1, cr1, rtc2, cr2, rtc_end, pe});
check_valid({ps1_tomb, rtc1, cr1, rtc_end, pe});
check_valid({ps1_tomb, rtc1, cr1, rtc2, cr2, rtc_end, pe});
check_valid({ps1, rtc1, cr1, rtc_end, pe, ps2, rtc1, cr1, rtc_end, pe});
check_invalid({ps1, rtc1, pe});
check_invalid({ps1, pe, rtc1, rtc_end, ps2, pe});
check_invalid({rtc1, ps1, pe});
check_invalid({ps1, rtc1, rtc_end, sr, pe});
check_invalid({ps1, sr, cr1, rtc1, rtc_end, pe});
check_invalid({ps1_tomb, cr1, rtc1, rtc_end, pe});
check_invalid({ps1_tomb, cr1_high_ts, rtc1, rtc_end, pe});
check_invalid({ps1, rtc2, rtc1, rtc_end, pe});
check_invalid({ps1_tomb, rtc2, rtc1, rtc_end, pe});
};
SEASTAR_TEST_CASE(test_tracing_format) {