From eba310163d2aef691b13bc3f76a6d5d18d432557 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 17 Apr 2019 17:12:23 +0300 Subject: [PATCH] multishard_combining_reader: fix handling of non-strictly monotonous positions The shard readers under a multishard reader are paused after every operation executed on them. When paused they can be evicted at any time. When this happens, they will be re-created lazily on the next operation, with a start position such that they continue reading from where the evicted reader left off. This start position is determined from the last fragment seen by the previous reader. When this position is clustering position, the reader will be recreated such that it reads the clustering range (from the half-read partition): (last-ckey, +inf). This can cause problems if the last fragment seen by the evicted reader was a range-tombstone. Range tombstones can share the same clustering position with other range tombstones and potentially one clustering row. This means that when the reader is recreated, it will start from the next clustering position, ignoring any unread fragments that share the same position as the last seen range tombstone. To fix, ensure that on each fill-buffer call, the buffer contains all fragments for the last position. To this end, when the last fragment in the buffer is a range tombstone (with pos x), we continue reading until we see a fragment with a position y that is greater. This way it is ensured that we have seen all fragments for pos x and it is safe to resume the read, starting from after position x. --- mutation_reader.cc | 49 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 10ba2fabdb..1d5e3cb869 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -925,6 +925,8 @@ class shard_reader : public enable_lw_shared_from_this, public fla flat_mutation_reader recreate_reader(); flat_mutation_reader resume_or_create_reader(); future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout); + future<> ensure_buffer_contains_all_fragments_for_last_pos(flat_mutation_reader& reader, circular_buffer& buffer, + db::timeout_clock::time_point timeout); public: remote_reader( @@ -1146,6 +1148,32 @@ future<> shard_reader::remote_reader::do_fill_buffer(flat_mutation_reader& reade }); } +future<> shard_reader::remote_reader::ensure_buffer_contains_all_fragments_for_last_pos(flat_mutation_reader& reader, + circular_buffer& buffer, db::timeout_clock::time_point timeout) { + if (buffer.empty() || !buffer.back().is_range_tombstone()) { + return make_ready_future<>(); + } + + auto stop = [this, &reader, &buffer] { + if (reader.is_buffer_empty()) { + return reader.is_end_of_stream(); + } + const auto& next_pos = reader.peek_buffer().position(); + if (next_pos.region() != partition_region::clustered) { + return true; + } + return !next_pos.key().equal(*_schema, buffer.back().position().key()); + }; + + return do_until(stop, [this, &reader, &buffer, timeout] { + if (reader.is_buffer_empty()) { + return do_fill_buffer(reader, timeout); + } + buffer.emplace_back(reader.pop_mutation_fragment()); + return make_ready_future<>(); + }); +} + shard_reader::remote_reader::remote_reader( schema_ptr schema, reader_lifecycle_policy& lifecycle_policy, @@ -1170,14 +1198,27 @@ future shard_reader::remote_reader::fill_buffe if (pending_next_partition) { _last_position_in_partition = position_in_partition(position_in_partition::end_of_partition_tag_t{}); } - return do_with(resume_or_create_reader(), [this, pending_next_partition, timeout] (flat_mutation_reader& reader) { + return do_with(resume_or_create_reader(), circular_buffer{}, + [this, pending_next_partition, timeout] (flat_mutation_reader& reader, circular_buffer& buffer) mutable { if (pending_next_partition) { reader.next_partition(); } - return do_fill_buffer(reader, timeout).then([this, &reader] { - auto buffer = reader.detach_buffer(); - const auto eos = reader.is_end_of_stream(); + return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] { + buffer = reader.detach_buffer(); + // When the reader is recreated (after having been evicted) we + // recreate it such that it starts reading from *after* the last + // seen fragment's position. If the last seen fragment is a range + // tombstone it is *not* guaranteed that the next fragments in the + // data stream have positions strictly greater than the range + // tombstone's. If the reader is evicted and has to be recreated, + // these fragments would be then skipped as the read would continue + // after their position. + // To avoid this ensure that the buffer contains *all* fragments for + // the last seen position. + return ensure_buffer_contains_all_fragments_for_last_pos(reader, buffer, timeout); + }).then([this, &reader, &buffer] { + const auto eos = reader.is_end_of_stream() && reader.is_buffer_empty(); update_last_position(buffer); _irh = _lifecycle_policy.pause(std::move(reader)); return fill_buffer_result(std::move(buffer), eos);