diff --git a/mutation_reader.cc b/mutation_reader.cc index 10ba2fabdb..1d5e3cb869 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -925,6 +925,8 @@ class shard_reader : public enable_lw_shared_from_this, public fla flat_mutation_reader recreate_reader(); flat_mutation_reader resume_or_create_reader(); future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout); + future<> ensure_buffer_contains_all_fragments_for_last_pos(flat_mutation_reader& reader, circular_buffer& buffer, + db::timeout_clock::time_point timeout); public: remote_reader( @@ -1146,6 +1148,32 @@ future<> shard_reader::remote_reader::do_fill_buffer(flat_mutation_reader& reade }); } +future<> shard_reader::remote_reader::ensure_buffer_contains_all_fragments_for_last_pos(flat_mutation_reader& reader, + circular_buffer& buffer, db::timeout_clock::time_point timeout) { + if (buffer.empty() || !buffer.back().is_range_tombstone()) { + return make_ready_future<>(); + } + + auto stop = [this, &reader, &buffer] { + if (reader.is_buffer_empty()) { + return reader.is_end_of_stream(); + } + const auto& next_pos = reader.peek_buffer().position(); + if (next_pos.region() != partition_region::clustered) { + return true; + } + return !next_pos.key().equal(*_schema, buffer.back().position().key()); + }; + + return do_until(stop, [this, &reader, &buffer, timeout] { + if (reader.is_buffer_empty()) { + return do_fill_buffer(reader, timeout); + } + buffer.emplace_back(reader.pop_mutation_fragment()); + return make_ready_future<>(); + }); +} + shard_reader::remote_reader::remote_reader( schema_ptr schema, reader_lifecycle_policy& lifecycle_policy, @@ -1170,14 +1198,27 @@ future shard_reader::remote_reader::fill_buffe if (pending_next_partition) { _last_position_in_partition = position_in_partition(position_in_partition::end_of_partition_tag_t{}); } - return do_with(resume_or_create_reader(), [this, pending_next_partition, timeout] (flat_mutation_reader& reader) { + return do_with(resume_or_create_reader(), circular_buffer{}, + [this, pending_next_partition, timeout] (flat_mutation_reader& reader, circular_buffer& buffer) mutable { if (pending_next_partition) { reader.next_partition(); } - return do_fill_buffer(reader, timeout).then([this, &reader] { - auto buffer = reader.detach_buffer(); - const auto eos = reader.is_end_of_stream(); + return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] { + buffer = reader.detach_buffer(); + // When the reader is recreated (after having been evicted) we + // recreate it such that it starts reading from *after* the last + // seen fragment's position. If the last seen fragment is a range + // tombstone it is *not* guaranteed that the next fragments in the + // data stream have positions strictly greater than the range + // tombstone's. If the reader is evicted and has to be recreated, + // these fragments would be then skipped as the read would continue + // after their position. + // To avoid this ensure that the buffer contains *all* fragments for + // the last seen position. + return ensure_buffer_contains_all_fragments_for_last_pos(reader, buffer, timeout); + }).then([this, &reader, &buffer] { + const auto eos = reader.is_end_of_stream() && reader.is_buffer_empty(); update_last_position(buffer); _irh = _lifecycle_policy.pause(std::move(reader)); return fill_buffer_result(std::move(buffer), eos);