flat_mutation_reader_test: cleanup, seastar::async -> SEASTAR_THREAD_TEST_CASE
This commit is contained in:
@@ -111,193 +111,187 @@ static size_t count_fragments(mutation m) {
|
||||
return res;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
size_t fragments_in_m = count_fragments(m);
|
||||
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m);
|
||||
auto close_reader = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||
auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m));
|
||||
r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
|
||||
if (result._fragments.empty()) {
|
||||
continue;
|
||||
}
|
||||
for (auto& mf : result._fragments) {
|
||||
r2.produces(*m.schema(), mf);
|
||||
}
|
||||
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
size_t fragments_in_m = count_fragments(m);
|
||||
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m);
|
||||
auto close_reader = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||
auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m));
|
||||
r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
|
||||
if (result._fragments.empty()) {
|
||||
continue;
|
||||
}
|
||||
});
|
||||
for (auto& mf : result._fragments) {
|
||||
r2.produces(*m.schema(), mf);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto test = [&semaphore] (mutation m1, mutation m2) {
|
||||
size_t fragments_in_m1 = count_fragments(m1);
|
||||
size_t fragments_in_m2 = count_fragments(m2);
|
||||
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r2 = deferred_close(r2);
|
||||
auto start = r2().get0();
|
||||
BOOST_REQUIRE(start);
|
||||
BOOST_REQUIRE(start->is_partition_start());
|
||||
for (auto& mf : result._fragments) {
|
||||
auto mfopt = r2().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||
}
|
||||
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto test = [&semaphore] (mutation m1, mutation m2) {
|
||||
size_t fragments_in_m1 = count_fragments(m1);
|
||||
size_t fragments_in_m2 = count_fragments(m2);
|
||||
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r2 = deferred_close(r2);
|
||||
auto start = r2().get0();
|
||||
BOOST_REQUIRE(start);
|
||||
BOOST_REQUIRE(start->is_partition_start());
|
||||
for (auto& mf : result._fragments) {
|
||||
auto mfopt = r2().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||
}
|
||||
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
|
||||
size_t tombstones_count = 0;
|
||||
if (m1.partition().partition_tombstone()) {
|
||||
++tombstones_count;
|
||||
}
|
||||
if (m2.partition().partition_tombstone()) {
|
||||
++tombstones_count;
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
|
||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r2 = deferred_close(r2);
|
||||
auto start = r2().get0();
|
||||
BOOST_REQUIRE(start);
|
||||
BOOST_REQUIRE(start->is_partition_start());
|
||||
for (auto& mf : result._fragments) {
|
||||
auto mfopt = r2().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
|
||||
mfopt = r2().get0();
|
||||
}
|
||||
BOOST_REQUIRE(mfopt);
|
||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||
}
|
||||
}
|
||||
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
|
||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r = deferred_close(r);
|
||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||
BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
|
||||
BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
|
||||
size_t tombstones_count = 0;
|
||||
if (m1.partition().partition_tombstone()) {
|
||||
++tombstones_count;
|
||||
}
|
||||
};
|
||||
for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) {
|
||||
if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) {
|
||||
test(m, m2);
|
||||
} else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) {
|
||||
test(m2, m);
|
||||
if (m2.partition().partition_tombstone()) {
|
||||
++tombstones_count;
|
||||
}
|
||||
});
|
||||
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
|
||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||
auto close_r2 = deferred_close(r2);
|
||||
auto start = r2().get0();
|
||||
BOOST_REQUIRE(start);
|
||||
BOOST_REQUIRE(start->is_partition_start());
|
||||
for (auto& mf : result._fragments) {
|
||||
auto mfopt = r2().get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
|
||||
mfopt = r2().get0();
|
||||
}
|
||||
BOOST_REQUIRE(mfopt);
|
||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||
}
|
||||
}
|
||||
};
|
||||
for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) {
|
||||
if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) {
|
||||
test(m, m2);
|
||||
} else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) {
|
||||
test(m2, m);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
std::vector<frozen_mutation> fms;
|
||||
SEASTAR_THREAD_TEST_CASE(test_fragmenting_and_freezing) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
std::vector<frozen_mutation> fms;
|
||||
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
fms.emplace_back(std::move(fm));
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(fms.size(), 1);
|
||||
|
||||
auto m1 = fms.back().unfreeze(m.schema());
|
||||
BOOST_REQUIRE_EQUAL(m, m1);
|
||||
|
||||
fms.clear();
|
||||
|
||||
std::optional<bool> fragmented;
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!fragmented || *fragmented == frag);
|
||||
*fragmented = frag;
|
||||
fms.emplace_back(std::move(fm));
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, 1).get0();
|
||||
|
||||
auto&& rows = m.partition().non_dummy_rows();
|
||||
auto expected_fragments = std::distance(rows.begin(), rows.end())
|
||||
+ m.partition().row_tombstones().size()
|
||||
+ !m.partition().static_row().empty();
|
||||
BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
|
||||
BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
|
||||
|
||||
auto m2 = fms.back().unfreeze(m.schema());
|
||||
fms.pop_back();
|
||||
mutation_application_stats app_stats;
|
||||
while (!fms.empty()) {
|
||||
m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats);
|
||||
fms.pop_back();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(m, m2);
|
||||
});
|
||||
|
||||
auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) {
|
||||
for (auto i = 0; i < 4; i++) {
|
||||
auto muts = gen(4);
|
||||
auto s = muts[0].schema();
|
||||
|
||||
std::vector<frozen_mutation> frozen;
|
||||
|
||||
// Freeze all
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
fms.emplace_back(std::move(fm));
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
BOOST_REQUIRE_EQUAL(muts.size(), frozen.size());
|
||||
for (auto j = 0u; j < muts.size(); j++) {
|
||||
BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_EQUAL(fms.size(), 1);
|
||||
// Freeze first
|
||||
frozen.clear();
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
BOOST_REQUIRE_EQUAL(frozen.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s));
|
||||
|
||||
auto m1 = fms.back().unfreeze(m.schema());
|
||||
BOOST_REQUIRE_EQUAL(m, m1);
|
||||
|
||||
fms.clear();
|
||||
|
||||
std::optional<bool> fragmented;
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!fragmented || *fragmented == frag);
|
||||
*fragmented = frag;
|
||||
fms.emplace_back(std::move(fm));
|
||||
// Fragment and freeze all
|
||||
frozen.clear();
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, 1).get0();
|
||||
|
||||
auto&& rows = m.partition().non_dummy_rows();
|
||||
auto expected_fragments = std::distance(rows.begin(), rows.end())
|
||||
+ m.partition().row_tombstones().size()
|
||||
+ !m.partition().static_row().empty();
|
||||
BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
|
||||
BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
|
||||
|
||||
auto m2 = fms.back().unfreeze(m.schema());
|
||||
fms.pop_back();
|
||||
mutation_application_stats app_stats;
|
||||
while (!fms.empty()) {
|
||||
m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats);
|
||||
fms.pop_back();
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(m, m2);
|
||||
});
|
||||
|
||||
auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) {
|
||||
for (auto i = 0; i < 4; i++) {
|
||||
auto muts = gen(4);
|
||||
auto s = muts[0].schema();
|
||||
|
||||
std::vector<frozen_mutation> frozen;
|
||||
|
||||
// Freeze all
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
BOOST_REQUIRE_EQUAL(muts.size(), frozen.size());
|
||||
for (auto j = 0u; j < muts.size(); j++) {
|
||||
BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s));
|
||||
std::vector<mutation> unfrozen;
|
||||
while (!frozen.empty()) {
|
||||
auto m = frozen.front().unfreeze(s);
|
||||
frozen.erase(frozen.begin());
|
||||
if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) {
|
||||
unfrozen.emplace_back(std::move(m));
|
||||
} else {
|
||||
unfrozen.back().apply(std::move(m));
|
||||
}
|
||||
|
||||
// Freeze first
|
||||
frozen.clear();
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
BOOST_REQUIRE(!frag);
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}, std::numeric_limits<size_t>::max()).get0();
|
||||
BOOST_REQUIRE_EQUAL(frozen.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s));
|
||||
|
||||
// Fragment and freeze all
|
||||
frozen.clear();
|
||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||
frozen.emplace_back(fm);
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}, 1).get0();
|
||||
std::vector<mutation> unfrozen;
|
||||
while (!frozen.empty()) {
|
||||
auto m = frozen.front().unfreeze(s);
|
||||
frozen.erase(frozen.begin());
|
||||
if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) {
|
||||
unfrozen.emplace_back(std::move(m));
|
||||
} else {
|
||||
unfrozen.back().apply(std::move(m));
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(muts, unfrozen);
|
||||
}
|
||||
};
|
||||
BOOST_REQUIRE_EQUAL(muts, unfrozen);
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
});
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
|
||||
@@ -372,111 +366,109 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
|
||||
.is_equal_to(mut_orig);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multi_range_reader) {
|
||||
return seastar::async([] {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
SEASTAR_THREAD_TEST_CASE(test_multi_range_reader) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
auto keys = s.make_pkeys(10);
|
||||
auto ring = s.to_ring_positions(keys);
|
||||
auto keys = s.make_pkeys(10);
|
||||
auto ring = s.to_ring_positions(keys);
|
||||
|
||||
auto crs = boost::copy_range<std::vector<mutation_fragment>>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) {
|
||||
return s.make_row(permit, s.make_ckey(n), "value");
|
||||
}));
|
||||
auto crs = boost::copy_range<std::vector<mutation_fragment>>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) {
|
||||
return s.make_row(permit, s.make_ckey(n), "value");
|
||||
}));
|
||||
|
||||
auto ms = boost::copy_range<std::vector<mutation>>(keys | boost::adaptors::transformed([&] (auto& key) {
|
||||
auto m = mutation(s.schema(), key);
|
||||
for (auto& mf : crs) {
|
||||
m.apply(mf);
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
auto ms = boost::copy_range<std::vector<mutation>>(keys | boost::adaptors::transformed([&] (auto& key) {
|
||||
auto m = mutation(s.schema(), key);
|
||||
for (auto& mf : crs) {
|
||||
m.apply(mf);
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
|
||||
auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
||||
return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
|
||||
});
|
||||
|
||||
const auto empty_ranges = dht::partition_range_vector{};
|
||||
const auto single_ranges = dht::partition_range_vector{
|
||||
dht::partition_range::make(ring[1], ring[2]),
|
||||
};
|
||||
const auto multiple_ranges = dht::partition_range_vector {
|
||||
dht::partition_range::make(ring[1], ring[2]),
|
||||
dht::partition_range::make_singular(ring[4]),
|
||||
dht::partition_range::make(ring[6], ring[8]),
|
||||
};
|
||||
const auto empty_generator = [] { return std::optional<dht::partition_range>{}; };
|
||||
const auto single_generator = [r = std::optional<dht::partition_range>(single_ranges.front())] () mutable {
|
||||
return std::exchange(r, {});
|
||||
};
|
||||
const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional<dht::partition_range> {
|
||||
if (it == end) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return *(it++);
|
||||
};
|
||||
auto fft_range = dht::partition_range::make_starting_with(ring[9]);
|
||||
|
||||
// Generator ranges are single pass, so we need a new range each time they are used.
|
||||
auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
|
||||
testlog.info("empty ranges");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("single range");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
|
||||
.produces(ms[1])
|
||||
.produces(ms[2])
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("read full partitions and fast forward");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||
.produces(ms[1])
|
||||
.produces(ms[2])
|
||||
.produces(ms[4])
|
||||
.produces(ms[6])
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("read, skip partitions and fast forward");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||
.produces_partition_start(keys[1])
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[2])
|
||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||
.next_partition()
|
||||
.produces(ms[4])
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[6])
|
||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||
.produces_row_with_key(crs[1].as_clustering_row().key())
|
||||
.fast_forward_to(fft_range)
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[9])
|
||||
.next_partition()
|
||||
.produces_end_of_stream();
|
||||
};
|
||||
|
||||
testlog.info("vector version");
|
||||
run_test(
|
||||
[&] { return empty_ranges; },
|
||||
[&] { return single_ranges; },
|
||||
[&] { return multiple_ranges; });
|
||||
|
||||
testlog.info("generator version");
|
||||
run_test(
|
||||
[&] { return empty_generator; },
|
||||
[&] { return single_generator; },
|
||||
[&] { return multiple_generator; });
|
||||
auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
||||
return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
|
||||
});
|
||||
|
||||
const auto empty_ranges = dht::partition_range_vector{};
|
||||
const auto single_ranges = dht::partition_range_vector{
|
||||
dht::partition_range::make(ring[1], ring[2]),
|
||||
};
|
||||
const auto multiple_ranges = dht::partition_range_vector {
|
||||
dht::partition_range::make(ring[1], ring[2]),
|
||||
dht::partition_range::make_singular(ring[4]),
|
||||
dht::partition_range::make(ring[6], ring[8]),
|
||||
};
|
||||
const auto empty_generator = [] { return std::optional<dht::partition_range>{}; };
|
||||
const auto single_generator = [r = std::optional<dht::partition_range>(single_ranges.front())] () mutable {
|
||||
return std::exchange(r, {});
|
||||
};
|
||||
const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional<dht::partition_range> {
|
||||
if (it == end) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return *(it++);
|
||||
};
|
||||
auto fft_range = dht::partition_range::make_starting_with(ring[9]);
|
||||
|
||||
// Generator ranges are single pass, so we need a new range each time they are used.
|
||||
auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
|
||||
testlog.info("empty ranges");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("single range");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
|
||||
.produces(ms[1])
|
||||
.produces(ms[2])
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("read full partitions and fast forward");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||
.produces(ms[1])
|
||||
.produces(ms[2])
|
||||
.produces(ms[4])
|
||||
.produces(ms[6])
|
||||
.fast_forward_to(fft_range)
|
||||
.produces(ms[9])
|
||||
.produces_end_of_stream();
|
||||
|
||||
testlog.info("read, skip partitions and fast forward");
|
||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||
.produces_partition_start(keys[1])
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[2])
|
||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||
.next_partition()
|
||||
.produces(ms[4])
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[6])
|
||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||
.produces_row_with_key(crs[1].as_clustering_row().key())
|
||||
.fast_forward_to(fft_range)
|
||||
.next_partition()
|
||||
.produces_partition_start(keys[9])
|
||||
.next_partition()
|
||||
.produces_end_of_stream();
|
||||
};
|
||||
|
||||
testlog.info("vector version");
|
||||
run_test(
|
||||
[&] { return empty_ranges; },
|
||||
[&] { return single_ranges; },
|
||||
[&] { return multiple_ranges; });
|
||||
|
||||
testlog.info("generator version");
|
||||
run_test(
|
||||
[&] { return empty_generator; },
|
||||
[&] { return single_generator; },
|
||||
[&] { return multiple_generator; });
|
||||
}
|
||||
|
||||
using reversed_partitions = seastar::bool_class<class reversed_partitions_tag>;
|
||||
@@ -649,83 +641,79 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_consume_flat) {
|
||||
return seastar::async([] {
|
||||
auto test_random_streams = [&] (random_mutation_generator&& gen) {
|
||||
for (auto i = 0; i < 4; i++) {
|
||||
auto muts = gen(4);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes);
|
||||
}
|
||||
};
|
||||
SEASTAR_THREAD_TEST_CASE(test_consume_flat) {
|
||||
auto test_random_streams = [&] (random_mutation_generator&& gen) {
|
||||
for (auto i = 0; i < 4; i++) {
|
||||
auto muts = gen(4);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no);
|
||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes);
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
});
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_make_forwardable) {
|
||||
return seastar::async([] {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
SEASTAR_THREAD_TEST_CASE(test_make_forwardable) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto permit = semaphore.make_permit();
|
||||
|
||||
auto keys = s.make_pkeys(10);
|
||||
auto keys = s.make_pkeys(10);
|
||||
|
||||
auto crs = boost::copy_range < std::vector <
|
||||
mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) {
|
||||
return s.make_row(permit, s.make_ckey(n), "value");
|
||||
}));
|
||||
auto crs = boost::copy_range < std::vector <
|
||||
mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) {
|
||||
return s.make_row(permit, s.make_ckey(n), "value");
|
||||
}));
|
||||
|
||||
auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) {
|
||||
auto m = mutation(s.schema(), key);
|
||||
for (auto &mf : crs) {
|
||||
m.apply(mf);
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
|
||||
auto make_reader = [&] (auto& range) {
|
||||
return assert_that(
|
||||
make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
|
||||
};
|
||||
|
||||
auto test = [&] (auto& rd, auto& partition) {
|
||||
rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone());
|
||||
rd.produces_end_of_stream();
|
||||
rd.fast_forward_to(position_range::all_clustered_rows());
|
||||
for (auto &row : partition.partition().clustered_rows()) {
|
||||
rd.produces_row_with_key(row.key());
|
||||
}
|
||||
rd.produces_end_of_stream();
|
||||
rd.next_partition();
|
||||
};
|
||||
|
||||
auto rd = make_reader(query::full_partition_range);
|
||||
|
||||
for (auto& partition : ms) {
|
||||
test(rd, partition);
|
||||
auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) {
|
||||
auto m = mutation(s.schema(), key);
|
||||
for (auto &mf : crs) {
|
||||
m.apply(mf);
|
||||
}
|
||||
return m;
|
||||
}));
|
||||
|
||||
auto single_range = dht::partition_range::make_singular(ms[0].decorated_key());
|
||||
auto make_reader = [&] (auto& range) {
|
||||
return assert_that(
|
||||
make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
|
||||
};
|
||||
|
||||
auto rd2 = make_reader(single_range);
|
||||
|
||||
rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone());
|
||||
rd2.produces_end_of_stream();
|
||||
rd2.fast_forward_to(position_range::all_clustered_rows());
|
||||
rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key());
|
||||
rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key());
|
||||
|
||||
auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false});
|
||||
|
||||
rd2.fast_forward_to(remaining_range);
|
||||
|
||||
for (auto i = size_t(1); i < ms.size(); ++i) {
|
||||
test(rd2, ms[i]);
|
||||
auto test = [&] (auto& rd, auto& partition) {
|
||||
rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone());
|
||||
rd.produces_end_of_stream();
|
||||
rd.fast_forward_to(position_range::all_clustered_rows());
|
||||
for (auto &row : partition.partition().clustered_rows()) {
|
||||
rd.produces_row_with_key(row.key());
|
||||
}
|
||||
});
|
||||
rd.produces_end_of_stream();
|
||||
rd.next_partition();
|
||||
};
|
||||
|
||||
auto rd = make_reader(query::full_partition_range);
|
||||
|
||||
for (auto& partition : ms) {
|
||||
test(rd, partition);
|
||||
}
|
||||
|
||||
auto single_range = dht::partition_range::make_singular(ms[0].decorated_key());
|
||||
|
||||
auto rd2 = make_reader(single_range);
|
||||
|
||||
rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone());
|
||||
rd2.produces_end_of_stream();
|
||||
rd2.fast_forward_to(position_range::all_clustered_rows());
|
||||
rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key());
|
||||
rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key());
|
||||
|
||||
auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false});
|
||||
|
||||
rd2.fast_forward_to(remaining_range);
|
||||
|
||||
for (auto i = size_t(1); i < ms.size(); ++i) {
|
||||
test(rd2, ms[i]);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_make_forwardable_next_partition) {
|
||||
@@ -929,16 +917,14 @@ SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable_from_mutations_as_mutation_sou
|
||||
run_mutation_source_tests(populate);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
|
||||
return seastar::async([] {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
|
||||
auto close_rd = deferred_close(rd);
|
||||
rd().get();
|
||||
rd().get();
|
||||
// We rely on AddressSanitizer telling us if nothing was leaked.
|
||||
});
|
||||
SEASTAR_THREAD_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
for_each_mutation([&] (const mutation& m) {
|
||||
auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
|
||||
auto close_rd = deferred_close(rd);
|
||||
rd().get();
|
||||
rd().get();
|
||||
// We rely on AddressSanitizer telling us if nothing was leaked.
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user