From 7604980d638ef83cf8f8d8083fa1a82c00572ba4 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 18 Jul 2019 14:05:06 +0200 Subject: [PATCH] database: Add missing partition slicing on streaming reader recreation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit streaming_reader_lifecycle_policy::create_reader() was ignoring the partition_slice passed to it and always creating the reader for the full slice. That's wrong because create_reader() is called when recreating a reader after it's evicted. If the reader stopped in the middle of partition we need to start from that point. Otherwise, fragments in the mutation stream will appear duplicated or out of ordre, violating assumptions of the consumers. This was observed to result in repair writing incorrect sstables with duplicated clustering rows, which results in malformed_sstable_exception on read from those sstables. Fixes #4659. In v2: - Added an overload without partition_slice to avoid changing existing users which never slice Tests: - unit (dev) - manual (3 node ccm + repair) Backport: 3.1 Reviewd-by: Botond Dénes Message-Id: <1563451506-8871-1-git-send-email-tgrabiec@scylladb.com> --- database.cc | 4 ++-- database.hh | 5 +++++ table.cc | 4 ++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/database.cc b/database.cc index 27be3e8a12..0d2ec4b686 100644 --- a/database.cc +++ b/database.cc @@ -1930,7 +1930,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, virtual flat_mutation_reader create_reader( schema_ptr schema, const dht::partition_range& range, - const query::partition_slice&, + const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr, mutation_reader::forwarding fwd_mr) override { @@ -1941,7 +1941,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, _contexts[shard].read_operation = make_foreign(std::make_unique(cf.read_in_progress())); _contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore(); - return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, fwd_mr); + return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr); } virtual void destroy_reader(shard_id shard, future reader_fut) noexcept override { reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable { diff --git a/database.hh b/database.hh index ffe8d4f9b1..cacaca81ee 100644 --- a/database.hh +++ b/database.hh @@ -682,8 +682,13 @@ public: // Single range overload. flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range, + const query::partition_slice& slice, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no) const; + flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range) { + return make_streaming_reader(schema, range, schema->full_slice()); + } + sstables::shared_sstable make_streaming_sstable_for_write(std::optional subdir = {}); sstables::shared_sstable make_streaming_staging_sstable() { return make_streaming_sstable_for_write("staging"); diff --git a/table.cc b/table.cc index 71d449288b..f2f33105e6 100644 --- a/table.cc +++ b/table.cc @@ -510,8 +510,8 @@ table::make_streaming_reader(schema_ptr s, return make_flat_multi_range_reader(s, std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no); } -flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range, mutation_reader::forwarding fwd_mr) const { - const auto& slice = schema->full_slice(); +flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range, + const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) const { const auto& pc = service::get_local_streaming_read_priority(); auto trace_state = tracing::trace_state_ptr(); const auto fwd = streamed_mutation::forwarding::no;