database: Add missing partition slicing on streaming reader recreation

streaming_reader_lifecycle_policy::create_reader() was ignoring the
partition_slice passed to it and always creating the reader for the
full slice.

That's wrong because create_reader() is called when recreating a
reader after it's evicted. If the reader stopped in the middle of
partition we need to start from that point. Otherwise, fragments in
the mutation stream will appear duplicated or out of ordre, violating
assumptions of the consumers.

This was observed to result in repair writing incorrect sstables with
duplicated clustering rows, which results in
malformed_sstable_exception on read from those sstables.

Fixes #4659.

In v2:

  - Added an overload without partition_slice to avoid changing existing users which never slice

Tests:

  - unit (dev)
  - manual (3 node ccm + repair)

Backport: 3.1
Reviewd-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <1563451506-8871-1-git-send-email-tgrabiec@scylladb.com>
This commit is contained in:
Tomasz Grabiec
2019-07-18 14:05:06 +02:00
committed by Avi Kivity
parent 64a4c0ede2
commit 7604980d63
3 changed files with 9 additions and 4 deletions

View File

@@ -1930,7 +1930,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
virtual flat_mutation_reader create_reader(
schema_ptr schema,
const dht::partition_range& range,
const query::partition_slice&,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr,
mutation_reader::forwarding fwd_mr) override {
@@ -1941,7 +1941,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
_contexts[shard].read_operation = make_foreign(std::make_unique<utils::phased_barrier::operation>(cf.read_in_progress()));
_contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore();
return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, fwd_mr);
return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr);
}
virtual void destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept override {
reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable {

View File

@@ -682,8 +682,13 @@ public:
// Single range overload.
flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
const query::partition_slice& slice,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no) const;
flat_mutation_reader make_streaming_reader(schema_ptr schema, const dht::partition_range& range) {
return make_streaming_reader(schema, range, schema->full_slice());
}
sstables::shared_sstable make_streaming_sstable_for_write(std::optional<sstring> subdir = {});
sstables::shared_sstable make_streaming_staging_sstable() {
return make_streaming_sstable_for_write("staging");

View File

@@ -510,8 +510,8 @@ table::make_streaming_reader(schema_ptr s,
return make_flat_multi_range_reader(s, std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no);
}
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range, mutation_reader::forwarding fwd_mr) const {
const auto& slice = schema->full_slice();
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) const {
const auto& pc = service::get_local_streaming_read_priority();
auto trace_state = tracing::trace_state_ptr();
const auto fwd = streamed_mutation::forwarding::no;