clustering_order_reader_merger: handle empty readers

The merger could return end-of-stream if some (but not all) of the
underlying readers were empty (i.e. not even returning a
`partition_start`). This could happen in places where it was used
(`time_series_sstable_set::create_single_key_sstable_reader`) if we
opened an sstable which did not have the queried partition but passed
all the filters (specifically, the bloom filter returned a false
positive for this sstable).

The commit also extends the random tests for the merger to include empty
readers and adds an explicit test case that catches this bug (in a
limited scope: when we merge a single empty reader).

It also modifies `test_twcs_single_key_reader_filtering` (regression
test for #8432) because the time where the clustering key filter is
invoked changes (some invocations move from the constructor of the
merger to operator()). I checked manually that it still catches the bug
when I reintroduce it.

Fixes #8445.

Closes #8446
This commit is contained in:
Kamil Braun
2021-04-08 17:40:46 +02:00
committed by Nadav Har'El
parent 09f221203f
commit 7ffb0d826b
4 changed files with 141 additions and 56 deletions

View File

@@ -2275,9 +2275,9 @@ position_reader_queue::~position_reader_queue() {}
// are not implemented and throw an error; the reader is only used for single partition queries.
//
// Assumes that:
// - the queue contains at least one reader,
// - there are no static rows,
// - the returned fragments do not contain partition tombstones.
// - the returned fragments do not contain partition tombstones,
// - the merged readers return fragments from the same partition (but some or even all of them may be empty).
class clustering_order_reader_merger {
const schema_ptr _schema;
const reader_permit _permit;
@@ -2389,12 +2389,17 @@ class clustering_order_reader_merger {
if (!mf) {
// The reader returned end-of-stream before returning end-of-partition
// (otherwise we would have removed it in a previous peek). This means that
// we are in forwarding mode and the reader won't return any more fragments in the current range.
// either the reader was empty from the beginning (not even returning a `partition_start`)
// or we are in forwarding mode and the reader won't return any more fragments in the current range.
// If the reader's upper bound is smaller then the end of the current range then it won't
// return any more fragments in later ranges as well (subsequent fast-forward-to ranges
// are non-overlapping and strictly increasing), so we can remove it now.
// Otherwise it may start returning fragments later, so we save it for the moment
// in _halted_readers and will bring it back when we get fast-forwarded.
// Otherwise, if it previously returned a `partition_start`, it may start returning more fragments
// later (after we fast-forward) so we save it for the moment in _halted_readers and will bring it
// back when we get fast-forwarded.
// We also save the reader if it was empty from the beginning (no `partition_start`) since
// it makes the code simpler (to check for this here we would need additional state); it is a bit wasteful
// but completely empty readers should be rare.
if (_cmp(it->upper_bound, _pr_end) < 0) {
_all_readers.erase(it);
} else {
@@ -2524,19 +2529,6 @@ public:
: position_in_partition_view::after_all_clustered_rows())
, _should_emit_partition_end(fwd_sm == streamed_mutation::forwarding::no)
{
// The first call to `_reader_queue::pop` uses `after_all_clustered_rows`
// so we obtain at least one reader; we will return this reader's `partition_start`
// as the first fragment.
auto rs = _reader_queue->pop(position_in_partition_view::after_all_clustered_rows());
for (auto& r: rs) {
_all_readers.push_front(std::move(r));
_unpeeked_readers.push_back(_all_readers.begin());
}
if (rs.empty()) {
// No readers, no partition.
_should_emit_partition_end = false;
}
}
// We assume that operator() is called sequentially and that the caller doesn't use the batch
@@ -2553,8 +2545,22 @@ public:
return peek_readers(timeout).then([this, timeout] { return (*this)(timeout); });
}
auto next_peeked_pos = _peeked_readers.empty() ? _pr_end : _peeked_readers.front()->reader.peek_buffer().position();
// There might be queued readers containing fragments with positions <= next_peeked_pos:
// Before we return a batch of fragments using currently opened readers we must check the queue
// for potential new readers that must be opened. There are three cases which determine how ``far''
// should we look:
// - If there are some peeked readers in the heap, we must check for new readers
// whose `min_position`s are <= the position of the first peeked reader; there is no need
// to check for ``later'' readers (yet).
// - Otherwise, if we already fetched a partition start fragment, we need to look no further
// than the end of the current position range (_pr_end).
// - Otherwise we need to look for any reader (by calling the queue with `after_all_clustered_rows`),
// even for readers whose `min_position`s may be outside the current position range since they
// may be the only readers which have a `partition_start` fragment which we need to return
// before end-of-stream.
auto next_peeked_pos =
_peeked_readers.empty()
? (_partition_start_fetched ? _pr_end : position_in_partition_view::after_all_clustered_rows())
: _peeked_readers.front()->reader.peek_buffer().position();
if (!_reader_queue->empty(next_peeked_pos)) {
auto rs = _reader_queue->pop(next_peeked_pos);
for (auto& r: rs) {
@@ -2568,8 +2574,11 @@ public:
// We are either in forwarding mode and waiting for a fast-forward,
// or we've exhausted all the readers.
if (_should_emit_partition_end) {
// Not forwarding, so all readers must be exhausted. Return the last fragment.
_current_batch.push_back(mutation_fragment(*_schema, _permit, partition_end()));
// Not forwarding, so all readers must be exhausted.
// Return a partition end fragment unless all readers have been empty from the beginning.
if (_partition_start_fetched) {
_current_batch.push_back(mutation_fragment(*_schema, _permit, partition_end()));
}
_should_emit_partition_end = false;
}
return make_ready_future<mutation_fragment_batch>(_current_batch);

View File

@@ -3829,11 +3829,26 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
}
struct mutation_bounds {
mutation m;
std::optional<mutation> m;
position_in_partition lower;
position_in_partition upper;
};
static reader_bounds make_reader_bounds(
schema_ptr s, reader_permit permit, mutation_bounds mb, streamed_mutation::forwarding fwd,
const query::partition_slice* slice = nullptr) {
if (!slice) {
slice = &s->full_slice();
}
return reader_bounds {
.r = mb.m ? flat_mutation_reader_from_mutations(permit, {std::move(*mb.m)}, *slice, fwd)
: make_empty_flat_reader(s, permit),
.lower = std::move(mb.lower),
.upper = std::move(mb.upper)
};
}
struct clustering_order_merger_test_generator {
struct scenario {
std::vector<mutation_bounds> readers_data;
@@ -3843,13 +3858,13 @@ struct clustering_order_merger_test_generator {
schema_ptr _s;
partition_key _pk;
clustering_order_merger_test_generator()
: _s(make_schema()), _pk(partition_key::from_single_value(*_s, int32_type->decompose(0)))
clustering_order_merger_test_generator(std::optional<sstring> pk = std::nullopt)
: _s(make_schema()), _pk(partition_key::from_single_value(*_s, utf8_type->decompose(pk ? *pk : make_local_key(make_schema()))))
{}
static schema_ptr make_schema() {
return schema_builder("ks", "t")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type, column_kind::regular_column)
.build();
@@ -3882,6 +3897,13 @@ struct clustering_order_merger_test_generator {
std::vector<mutation_bounds> readers_data;
auto num_readers = tests::random::get_int(1, 10, engine);
auto num_empty_readers = tests::random::get_int(1, num_readers, engine);
while (num_empty_readers--) {
auto lower = -tests::random::get_int(0, 5, engine);
auto upper = tests::random::get_int(0, 5, engine);
readers_data.push_back(mutation_bounds{std::nullopt, mk_pos_for(lower), mk_pos_for(upper)});
num_readers--;
}
while (num_readers--) {
auto len = tests::random::get_int(0, 15, engine);
auto ks = tests::random::random_subset<int>(100, len, engine);
@@ -3929,16 +3951,17 @@ struct clustering_order_merger_test_generator {
SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
clustering_order_merger_test_generator g;
auto make_authority = [] (mutation mut, streamed_mutation::forwarding fwd) {
return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mut)}, fwd);
auto make_authority = [s = g._s] (std::optional<mutation> mut, streamed_mutation::forwarding fwd) {
if (mut) {
return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(*mut)}, fwd);
}
return make_empty_flat_reader(s, tests::make_permit());
};
auto make_tested = [s = g._s] (std::vector<mutation_bounds> ms, streamed_mutation::forwarding fwd) {
auto rs = boost::copy_range<std::vector<reader_bounds>>(std::move(ms)
| boost::adaptors::transformed([fwd] (auto&& mb) {
return reader_bounds{
flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mb.m)}, fwd),
std::move(mb.lower), std::move(mb.upper)};
| boost::adaptors::transformed([s, fwd] (auto&& mb) {
return make_reader_bounds(s, tests::make_permit(), std::move(mb), fwd);
}));
auto q = std::make_unique<simple_position_reader_queue>(*s, std::move(rs));
return make_clustering_combined_reader(s, tests::make_permit(), fwd, std::move(q));
@@ -3951,7 +3974,15 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
for (int run = 0; run < 1000; ++run) {
auto scenario = g.generate_scenario(engine);
auto merged = std::accumulate(scenario.readers_data.begin(), scenario.readers_data.end(),
mutation(g._s, g._pk), [] (mutation curr, const mutation_bounds& mb) { return std::move(curr) + mb.m; });
std::optional<mutation>{}, [&g] (std::optional<mutation> curr, const mutation_bounds& mb) {
if (mb.m) {
if (!curr) {
curr = mutation(g._s, g._pk);
}
*curr += *mb.m;
}
return curr;
});
{
auto fwd = streamed_mutation::forwarding::no;
@@ -3974,10 +4005,15 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) {
sstables::test_env::do_with_async([] (sstables::test_env& env) {
storage_service_for_tests ssft;
clustering_order_merger_test_generator g;
auto make_authority = [] (mutation mut, streamed_mutation::forwarding fwd) {
return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mut)}, fwd);
auto pkeys = make_local_keys(2, clustering_order_merger_test_generator::make_schema());
clustering_order_merger_test_generator g(pkeys[0]);
auto make_authority = [s = g._s] (std::optional<mutation> mut, streamed_mutation::forwarding fwd) {
if (mut) {
return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(*mut)}, fwd);
}
return make_empty_flat_reader(s, tests::make_permit());
};
auto make_tested = [s = g._s, pr = dht::partition_range::make_singular(dht::ring_position(g.decorated_pk()))]
@@ -4002,18 +4038,35 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) {
auto tmp = tmpdir();
time_series_sstable_set sst_set(g._s);
mutation merged(g._s, g._pk);
std::optional<mutation> merged;
std::unordered_set<int64_t> included_gens;
int64_t gen = 0;
for (auto& mb: scenario.readers_data) {
sst_set.insert(make_sstable_containing([s = g._s, &env, &tmp, gen = ++gen] () {
auto sst_factory = [s = g._s, &env, &tmp, gen = ++gen] () {
return env.make_sstable(std::move(s), tmp.path().string(), gen,
sstables::sstable::version_types::md, sstables::sstable::format_types::big);
}, {mb.m}));
};
if (mb.m) {
sst_set.insert(make_sstable_containing(std::move(sst_factory), {*mb.m}));
} else {
// We want to have an sstable that won't return any fragments when we query it
// for our partition (not even `partition_start`). For that we create an sstable
// with a different partition.
auto pk = partition_key::from_single_value(*g._s, utf8_type->decompose(pkeys[1]));
assert(pk != g._pk);
sst_set.insert(make_sstable_containing(std::move(sst_factory), {mutation(g._s, pk)}));
}
if (dist(engine)) {
included_gens.insert(gen);
merged += mb.m;
if (mb.m) {
if (!merged) {
merged = mutation(g._s, g._pk);
}
*merged += *mb.m;
}
}
}
@@ -4220,9 +4273,7 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
for (auto& [k, ms]: good) {
auto rs = boost::copy_range<std::vector<reader_bounds>>(std::move(ms)
| boost::adaptors::transformed([&] (auto&& mb) {
return reader_bounds{
flat_mutation_reader_from_mutations(permit, {std::move(mb.m)}, slice, fwd_sm),
std::move(mb.lower), std::move(mb.upper)};
return make_reader_bounds(s, permit, std::move(mb), fwd_sm, &slice);
}));
std::sort(rs.begin(), rs.end(), [less = position_in_partition::less_compare(*s)]
(const reader_bounds& a, const reader_bounds& b) { return less(a.lower, b.lower); });
@@ -4242,3 +4293,23 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
run_mutation_source_tests(std::move(populate));
}
// Regression test for #8445.
SEASTAR_THREAD_TEST_CASE(test_clustering_combining_of_empty_readers) {
auto s = clustering_order_merger_test_generator::make_schema();
std::vector<reader_bounds> rs;
rs.push_back({
.r = make_empty_flat_reader(s, tests::make_permit()),
.lower = position_in_partition::before_all_clustered_rows(),
.upper = position_in_partition::after_all_clustered_rows()
});
auto r = make_clustering_combined_reader(
s, tests::make_permit(), streamed_mutation::forwarding::no,
std::make_unique<simple_position_reader_queue>(*s, std::move(rs)));
auto mf = r(db::no_timeout).get0();
if (mf) {
BOOST_FAIL(format("reader combined of empty readers returned fragment {}", mutation_fragment::printer(*s, *mf)));
}
}

View File

@@ -7144,7 +7144,6 @@ SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) {
auto sst1 = make_sstable_containing(sst_gen, {make_row(0, 0)});
auto sst2 = make_sstable_containing(sst_gen, {make_row(0, 1)});
auto sst3 = make_sstable_containing(sst_gen, {make_row(0, 2)});
auto dkey = sst1->get_first_decorated_key();
auto cm = make_lw_shared<compaction_manager>();
@@ -7163,7 +7162,6 @@ SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) {
auto set = cs.make_sstable_set(s);
set.insert(std::move(sst1));
set.insert(std::move(sst2));
set.insert(std::move(sst3));
reader_permit permit = tests::make_permit();
utils::estimated_histogram eh;
@@ -7186,10 +7184,11 @@ SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) {
// consume all fragments
while (reader(db::no_timeout).get());
// sst1 and sst2 should have been checked by the CK filter before we started reading (when we created the reader).
// sst3 should have been checked by the CK filter during fragment consumption and shouldn't have passed.
// With the bug in #8432, sst3 wouldn't even be checked by the CK filter since it would pass right after checking the PK filter.
BOOST_REQUIRE_EQUAL(cf_stats.sstables_checked_by_clustering_filter - checked_by_ck, 1);
BOOST_REQUIRE_EQUAL(cf_stats.surviving_sstables_after_clustering_filter - surviving_after_ck, 0);
// At least sst2 should be checked by the CK filter during fragment consumption and should pass.
// With the bug in #8432, sst2 wouldn't even be checked by the CK filter since it would pass right after checking the PK filter.
BOOST_REQUIRE_GE(cf_stats.sstables_checked_by_clustering_filter - checked_by_ck, 1);
BOOST_REQUIRE_EQUAL(
cf_stats.surviving_sstables_after_clustering_filter - surviving_after_ck,
cf_stats.sstables_checked_by_clustering_filter - checked_by_ck);
});
}

View File

@@ -2327,11 +2327,15 @@ void for_each_schema_change(std::function<void(schema_ptr, const std::vector<mut
test_mutated_schemas();
}
static void compare_readers(const schema& s, flat_mutation_reader& authority, flat_reader_assertions& tested) {
// Returns true iff the readers were non-empty.
static bool compare_readers(const schema& s, flat_mutation_reader& authority, flat_reader_assertions& tested) {
bool empty = true;
while (auto expected = authority(db::no_timeout).get()) {
tested.produces(s, *expected);
empty = false;
}
tested.produces_end_of_stream();
return !empty;
}
void compare_readers(const schema& s, flat_mutation_reader authority, flat_mutation_reader tested) {
@@ -2339,12 +2343,14 @@ void compare_readers(const schema& s, flat_mutation_reader authority, flat_mutat
compare_readers(s, authority, assertions);
}
// Assumes that the readers return fragments from (at most) a single (and the same) partition.
void compare_readers(const schema& s, flat_mutation_reader authority, flat_mutation_reader tested, const std::vector<position_range>& fwd_ranges) {
auto assertions = assert_that(std::move(tested));
compare_readers(s, authority, assertions);
for (auto& r: fwd_ranges) {
authority.fast_forward_to(r, db::no_timeout).get();
assertions.fast_forward_to(r);
compare_readers(s, authority, assertions);
if (compare_readers(s, authority, assertions)) {
for (auto& r: fwd_ranges) {
authority.fast_forward_to(r, db::no_timeout).get();
assertions.fast_forward_to(r);
compare_readers(s, authority, assertions);
}
}
}