readers: reader-from-fragment: don't modify stream when created without range

The fragment reader currently unconditionally forwards its buffer to the
passed-in partition range. Even if this range is
`query::full_partition_range`, this will involve dropping any fragments
up to the first partitions tart. This causes problems for test users who
intentionally create invalid fragment streams, that don't start with a
partition-start.
Refactor the reader to not do any modifications on the stream, when
neither slice, nor partition-range was passed by the user.
This commit is contained in:
Botond Dénes
2023-05-09 03:56:18 -04:00
parent bfaac5192a
commit 93dd16fccc

View File

@@ -1015,22 +1015,17 @@ make_flat_mutation_reader_from_mutations_v2(schema_ptr s, reader_permit permit,
return make_flat_mutation_reader_from_mutations_v2(s, std::move(permit), std::move(mutations), pr, s->full_slice(), fwd);
}
flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), query::full_partition_range);
}
flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range& pr) {
static flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range* pr) {
class reader : public flat_mutation_reader_v2::impl {
std::deque<mutation_fragment_v2> _fragments;
const dht::partition_range* _pr;
const dht::partition_range* _pr = nullptr;
dht::ring_position_comparator _cmp;
private:
bool end_of_range() const {
return _fragments.empty() ||
(_fragments.front().is_partition_start() && _pr->after(_fragments.front().as_partition_start().key(), _cmp));
(_pr && _fragments.front().is_partition_start() && _pr->after(_fragments.front().as_partition_start().key(), _cmp));
}
void do_fast_forward_to(const dht::partition_range& pr) {
@@ -1043,12 +1038,13 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit
}
public:
reader(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range& pr)
reader(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range* pr)
: flat_mutation_reader_v2::impl(std::move(schema), std::move(permit))
, _fragments(std::move(fragments))
, _pr(&pr)
, _cmp(*_schema) {
do_fast_forward_to(*_pr);
if (pr) {
do_fast_forward_to(*pr);
}
}
virtual future<> fill_buffer() override {
while (!(_end_of_stream = end_of_range()) && !is_buffer_full()) {
@@ -1080,6 +1076,16 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit
return make_flat_mutation_reader_v2<reader>(std::move(schema), std::move(permit), std::move(fragments), pr);
}
flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments, const dht::partition_range& pr) {
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), &pr);
}
flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), nullptr);
}
std::deque<mutation_fragment_v2> reverse_fragments(const schema& schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
std::deque<mutation_fragment_v2> reversed_fragments;