From 7ffb0d826b1237ee5786db154567d339bc230ff6 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 8 Apr 2021 17:40:46 +0200 Subject: [PATCH] clustering_order_reader_merger: handle empty readers The merger could return end-of-stream if some (but not all) of the underlying readers were empty (i.e. not even returning a `partition_start`). This could happen in places where it was used (`time_series_sstable_set::create_single_key_sstable_reader`) if we opened an sstable which did not have the queried partition but passed all the filters (specifically, the bloom filter returned a false positive for this sstable). The commit also extends the random tests for the merger to include empty readers and adds an explicit test case that catches this bug (in a limited scope: when we merge a single empty reader). It also modifies `test_twcs_single_key_reader_filtering` (regression test for #8432) because the time where the clustering key filter is invoked changes (some invocations move from the constructor of the merger to operator()). I checked manually that it still catches the bug when I reintroduce it. Fixes #8445. Closes #8446 --- mutation_reader.cc | 53 +++++++------ test/boost/mutation_reader_test.cc | 113 ++++++++++++++++++++++------ test/boost/sstable_datafile_test.cc | 13 ++-- test/lib/mutation_source_test.cc | 18 +++-- 4 files changed, 141 insertions(+), 56 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 8a4481d323..c4f330d2e2 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -2275,9 +2275,9 @@ position_reader_queue::~position_reader_queue() {} // are not implemented and throw an error; the reader is only used for single partition queries. // // Assumes that: -// - the queue contains at least one reader, // - there are no static rows, -// - the returned fragments do not contain partition tombstones. +// - the returned fragments do not contain partition tombstones, +// - the merged readers return fragments from the same partition (but some or even all of them may be empty). class clustering_order_reader_merger { const schema_ptr _schema; const reader_permit _permit; @@ -2389,12 +2389,17 @@ class clustering_order_reader_merger { if (!mf) { // The reader returned end-of-stream before returning end-of-partition // (otherwise we would have removed it in a previous peek). This means that - // we are in forwarding mode and the reader won't return any more fragments in the current range. + // either the reader was empty from the beginning (not even returning a `partition_start`) + // or we are in forwarding mode and the reader won't return any more fragments in the current range. // If the reader's upper bound is smaller then the end of the current range then it won't // return any more fragments in later ranges as well (subsequent fast-forward-to ranges // are non-overlapping and strictly increasing), so we can remove it now. - // Otherwise it may start returning fragments later, so we save it for the moment - // in _halted_readers and will bring it back when we get fast-forwarded. + // Otherwise, if it previously returned a `partition_start`, it may start returning more fragments + // later (after we fast-forward) so we save it for the moment in _halted_readers and will bring it + // back when we get fast-forwarded. + // We also save the reader if it was empty from the beginning (no `partition_start`) since + // it makes the code simpler (to check for this here we would need additional state); it is a bit wasteful + // but completely empty readers should be rare. if (_cmp(it->upper_bound, _pr_end) < 0) { _all_readers.erase(it); } else { @@ -2524,19 +2529,6 @@ public: : position_in_partition_view::after_all_clustered_rows()) , _should_emit_partition_end(fwd_sm == streamed_mutation::forwarding::no) { - // The first call to `_reader_queue::pop` uses `after_all_clustered_rows` - // so we obtain at least one reader; we will return this reader's `partition_start` - // as the first fragment. - auto rs = _reader_queue->pop(position_in_partition_view::after_all_clustered_rows()); - for (auto& r: rs) { - _all_readers.push_front(std::move(r)); - _unpeeked_readers.push_back(_all_readers.begin()); - } - - if (rs.empty()) { - // No readers, no partition. - _should_emit_partition_end = false; - } } // We assume that operator() is called sequentially and that the caller doesn't use the batch @@ -2553,8 +2545,22 @@ public: return peek_readers(timeout).then([this, timeout] { return (*this)(timeout); }); } - auto next_peeked_pos = _peeked_readers.empty() ? _pr_end : _peeked_readers.front()->reader.peek_buffer().position(); - // There might be queued readers containing fragments with positions <= next_peeked_pos: + // Before we return a batch of fragments using currently opened readers we must check the queue + // for potential new readers that must be opened. There are three cases which determine how ``far'' + // should we look: + // - If there are some peeked readers in the heap, we must check for new readers + // whose `min_position`s are <= the position of the first peeked reader; there is no need + // to check for ``later'' readers (yet). + // - Otherwise, if we already fetched a partition start fragment, we need to look no further + // than the end of the current position range (_pr_end). + // - Otherwise we need to look for any reader (by calling the queue with `after_all_clustered_rows`), + // even for readers whose `min_position`s may be outside the current position range since they + // may be the only readers which have a `partition_start` fragment which we need to return + // before end-of-stream. + auto next_peeked_pos = + _peeked_readers.empty() + ? (_partition_start_fetched ? _pr_end : position_in_partition_view::after_all_clustered_rows()) + : _peeked_readers.front()->reader.peek_buffer().position(); if (!_reader_queue->empty(next_peeked_pos)) { auto rs = _reader_queue->pop(next_peeked_pos); for (auto& r: rs) { @@ -2568,8 +2574,11 @@ public: // We are either in forwarding mode and waiting for a fast-forward, // or we've exhausted all the readers. if (_should_emit_partition_end) { - // Not forwarding, so all readers must be exhausted. Return the last fragment. - _current_batch.push_back(mutation_fragment(*_schema, _permit, partition_end())); + // Not forwarding, so all readers must be exhausted. + // Return a partition end fragment unless all readers have been empty from the beginning. + if (_partition_start_fetched) { + _current_batch.push_back(mutation_fragment(*_schema, _permit, partition_end())); + } _should_emit_partition_end = false; } return make_ready_future(_current_batch); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 6a0d52c7b2..61adff296e 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -3829,11 +3829,26 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) } struct mutation_bounds { - mutation m; + std::optional m; position_in_partition lower; position_in_partition upper; }; +static reader_bounds make_reader_bounds( + schema_ptr s, reader_permit permit, mutation_bounds mb, streamed_mutation::forwarding fwd, + const query::partition_slice* slice = nullptr) { + if (!slice) { + slice = &s->full_slice(); + } + + return reader_bounds { + .r = mb.m ? flat_mutation_reader_from_mutations(permit, {std::move(*mb.m)}, *slice, fwd) + : make_empty_flat_reader(s, permit), + .lower = std::move(mb.lower), + .upper = std::move(mb.upper) + }; +} + struct clustering_order_merger_test_generator { struct scenario { std::vector readers_data; @@ -3843,13 +3858,13 @@ struct clustering_order_merger_test_generator { schema_ptr _s; partition_key _pk; - clustering_order_merger_test_generator() - : _s(make_schema()), _pk(partition_key::from_single_value(*_s, int32_type->decompose(0))) + clustering_order_merger_test_generator(std::optional pk = std::nullopt) + : _s(make_schema()), _pk(partition_key::from_single_value(*_s, utf8_type->decompose(pk ? *pk : make_local_key(make_schema())))) {} static schema_ptr make_schema() { return schema_builder("ks", "t") - .with_column("pk", int32_type, column_kind::partition_key) + .with_column("pk", utf8_type, column_kind::partition_key) .with_column("ck", int32_type, column_kind::clustering_key) .with_column("v", int32_type, column_kind::regular_column) .build(); @@ -3882,6 +3897,13 @@ struct clustering_order_merger_test_generator { std::vector readers_data; auto num_readers = tests::random::get_int(1, 10, engine); + auto num_empty_readers = tests::random::get_int(1, num_readers, engine); + while (num_empty_readers--) { + auto lower = -tests::random::get_int(0, 5, engine); + auto upper = tests::random::get_int(0, 5, engine); + readers_data.push_back(mutation_bounds{std::nullopt, mk_pos_for(lower), mk_pos_for(upper)}); + num_readers--; + } while (num_readers--) { auto len = tests::random::get_int(0, 15, engine); auto ks = tests::random::random_subset(100, len, engine); @@ -3929,16 +3951,17 @@ struct clustering_order_merger_test_generator { SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) { clustering_order_merger_test_generator g; - auto make_authority = [] (mutation mut, streamed_mutation::forwarding fwd) { - return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mut)}, fwd); + auto make_authority = [s = g._s] (std::optional mut, streamed_mutation::forwarding fwd) { + if (mut) { + return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(*mut)}, fwd); + } + return make_empty_flat_reader(s, tests::make_permit()); }; auto make_tested = [s = g._s] (std::vector ms, streamed_mutation::forwarding fwd) { auto rs = boost::copy_range>(std::move(ms) - | boost::adaptors::transformed([fwd] (auto&& mb) { - return reader_bounds{ - flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mb.m)}, fwd), - std::move(mb.lower), std::move(mb.upper)}; + | boost::adaptors::transformed([s, fwd] (auto&& mb) { + return make_reader_bounds(s, tests::make_permit(), std::move(mb), fwd); })); auto q = std::make_unique(*s, std::move(rs)); return make_clustering_combined_reader(s, tests::make_permit(), fwd, std::move(q)); @@ -3951,7 +3974,15 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) { for (int run = 0; run < 1000; ++run) { auto scenario = g.generate_scenario(engine); auto merged = std::accumulate(scenario.readers_data.begin(), scenario.readers_data.end(), - mutation(g._s, g._pk), [] (mutation curr, const mutation_bounds& mb) { return std::move(curr) + mb.m; }); + std::optional{}, [&g] (std::optional curr, const mutation_bounds& mb) { + if (mb.m) { + if (!curr) { + curr = mutation(g._s, g._pk); + } + *curr += *mb.m; + } + return curr; + }); { auto fwd = streamed_mutation::forwarding::no; @@ -3974,10 +4005,15 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) { SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) { sstables::test_env::do_with_async([] (sstables::test_env& env) { storage_service_for_tests ssft; - clustering_order_merger_test_generator g; - auto make_authority = [] (mutation mut, streamed_mutation::forwarding fwd) { - return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mut)}, fwd); + auto pkeys = make_local_keys(2, clustering_order_merger_test_generator::make_schema()); + clustering_order_merger_test_generator g(pkeys[0]); + + auto make_authority = [s = g._s] (std::optional mut, streamed_mutation::forwarding fwd) { + if (mut) { + return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(*mut)}, fwd); + } + return make_empty_flat_reader(s, tests::make_permit()); }; auto make_tested = [s = g._s, pr = dht::partition_range::make_singular(dht::ring_position(g.decorated_pk()))] @@ -4002,18 +4038,35 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) { auto tmp = tmpdir(); time_series_sstable_set sst_set(g._s); - mutation merged(g._s, g._pk); + std::optional merged; std::unordered_set included_gens; int64_t gen = 0; for (auto& mb: scenario.readers_data) { - sst_set.insert(make_sstable_containing([s = g._s, &env, &tmp, gen = ++gen] () { + auto sst_factory = [s = g._s, &env, &tmp, gen = ++gen] () { return env.make_sstable(std::move(s), tmp.path().string(), gen, sstables::sstable::version_types::md, sstables::sstable::format_types::big); - }, {mb.m})); + }; + + if (mb.m) { + sst_set.insert(make_sstable_containing(std::move(sst_factory), {*mb.m})); + } else { + // We want to have an sstable that won't return any fragments when we query it + // for our partition (not even `partition_start`). For that we create an sstable + // with a different partition. + auto pk = partition_key::from_single_value(*g._s, utf8_type->decompose(pkeys[1])); + assert(pk != g._pk); + + sst_set.insert(make_sstable_containing(std::move(sst_factory), {mutation(g._s, pk)})); + } if (dist(engine)) { included_gens.insert(gen); - merged += mb.m; + if (mb.m) { + if (!merged) { + merged = mutation(g._s, g._pk); + } + *merged += *mb.m; + } } } @@ -4220,9 +4273,7 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) { for (auto& [k, ms]: good) { auto rs = boost::copy_range>(std::move(ms) | boost::adaptors::transformed([&] (auto&& mb) { - return reader_bounds{ - flat_mutation_reader_from_mutations(permit, {std::move(mb.m)}, slice, fwd_sm), - std::move(mb.lower), std::move(mb.upper)}; + return make_reader_bounds(s, permit, std::move(mb), fwd_sm, &slice); })); std::sort(rs.begin(), rs.end(), [less = position_in_partition::less_compare(*s)] (const reader_bounds& a, const reader_bounds& b) { return less(a.lower, b.lower); }); @@ -4242,3 +4293,23 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) { run_mutation_source_tests(std::move(populate)); } + +// Regression test for #8445. +SEASTAR_THREAD_TEST_CASE(test_clustering_combining_of_empty_readers) { + auto s = clustering_order_merger_test_generator::make_schema(); + + std::vector rs; + rs.push_back({ + .r = make_empty_flat_reader(s, tests::make_permit()), + .lower = position_in_partition::before_all_clustered_rows(), + .upper = position_in_partition::after_all_clustered_rows() + }); + auto r = make_clustering_combined_reader( + s, tests::make_permit(), streamed_mutation::forwarding::no, + std::make_unique(*s, std::move(rs))); + + auto mf = r(db::no_timeout).get0(); + if (mf) { + BOOST_FAIL(format("reader combined of empty readers returned fragment {}", mutation_fragment::printer(*s, *mf))); + } +} diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 5b2c71f8d2..b5133c4f02 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -7144,7 +7144,6 @@ SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) { auto sst1 = make_sstable_containing(sst_gen, {make_row(0, 0)}); auto sst2 = make_sstable_containing(sst_gen, {make_row(0, 1)}); - auto sst3 = make_sstable_containing(sst_gen, {make_row(0, 2)}); auto dkey = sst1->get_first_decorated_key(); auto cm = make_lw_shared(); @@ -7163,7 +7162,6 @@ SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) { auto set = cs.make_sstable_set(s); set.insert(std::move(sst1)); set.insert(std::move(sst2)); - set.insert(std::move(sst3)); reader_permit permit = tests::make_permit(); utils::estimated_histogram eh; @@ -7186,10 +7184,11 @@ SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) { // consume all fragments while (reader(db::no_timeout).get()); - // sst1 and sst2 should have been checked by the CK filter before we started reading (when we created the reader). - // sst3 should have been checked by the CK filter during fragment consumption and shouldn't have passed. - // With the bug in #8432, sst3 wouldn't even be checked by the CK filter since it would pass right after checking the PK filter. - BOOST_REQUIRE_EQUAL(cf_stats.sstables_checked_by_clustering_filter - checked_by_ck, 1); - BOOST_REQUIRE_EQUAL(cf_stats.surviving_sstables_after_clustering_filter - surviving_after_ck, 0); + // At least sst2 should be checked by the CK filter during fragment consumption and should pass. + // With the bug in #8432, sst2 wouldn't even be checked by the CK filter since it would pass right after checking the PK filter. + BOOST_REQUIRE_GE(cf_stats.sstables_checked_by_clustering_filter - checked_by_ck, 1); + BOOST_REQUIRE_EQUAL( + cf_stats.surviving_sstables_after_clustering_filter - surviving_after_ck, + cf_stats.sstables_checked_by_clustering_filter - checked_by_ck); }); } diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index 16fec48568..d849bf4dbc 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -2327,11 +2327,15 @@ void for_each_schema_change(std::function& fwd_ranges) { auto assertions = assert_that(std::move(tested)); - compare_readers(s, authority, assertions); - for (auto& r: fwd_ranges) { - authority.fast_forward_to(r, db::no_timeout).get(); - assertions.fast_forward_to(r); - compare_readers(s, authority, assertions); + if (compare_readers(s, authority, assertions)) { + for (auto& r: fwd_ranges) { + authority.fast_forward_to(r, db::no_timeout).get(); + assertions.fast_forward_to(r); + compare_readers(s, authority, assertions); + } } }