diff --git a/clustering_bounds_comparator.hh b/clustering_bounds_comparator.hh index 21133b40db..b089c1b17c 100644 --- a/clustering_bounds_comparator.hh +++ b/clustering_bounds_comparator.hh @@ -40,7 +40,10 @@ enum class bound_kind : uint8_t { std::ostream& operator<<(std::ostream& out, const bound_kind k); +// Swaps start <-> end && incl <-> excl bound_kind invert_kind(bound_kind k); +// Swaps start <-> end +bound_kind reverse_kind(bound_kind k); int32_t weight(bound_kind k); class bound_view { diff --git a/database.cc b/database.cc index d1c8df1bd5..4ec45c4ad7 100644 --- a/database.cc +++ b/database.cc @@ -1376,6 +1376,11 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) { future, cache_temperature>> database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { + const auto reversed = cmd.slice.options.contains(query::partition_slice::option::reversed); + if (reversed) { + s = s->make_reversed(); + } + column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; @@ -1429,6 +1434,11 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti future> database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { + const auto reversed = cmd.slice.options.contains(query::partition_slice::option::reversed); + if (reversed) { + s = s->make_reversed(); + } + const auto short_read_allwoed = query::short_read(cmd.slice.options.contains()); auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed); column_family& cf = find_column_family(cmd.cf_id); diff --git a/db/view/view.hh b/db/view/view.hh index 9371dad0b7..c6381fa8b8 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -192,8 +192,8 @@ public: , _view_updates(std::move(views_to_update)) , _updates(std::move(updates)) , _existings(std::move(existings)) - , _update_tombstone_tracker(*_schema, false) - , _existing_tombstone_tracker(*_schema, false) + , _update_tombstone_tracker(*_schema) + , _existing_tombstone_tracker(*_schema) , _now(now) { } view_update_builder(view_update_builder&& other) noexcept = default; diff --git a/docs/design-notes/reverse-reads.md b/docs/design-notes/reverse-reads.md new file mode 100644 index 0000000000..33c047be80 --- /dev/null +++ b/docs/design-notes/reverse-reads.md @@ -0,0 +1,123 @@ +# Reverse reads + +A read is called reverse when it reads with reverse clustering order +(compared to that of the schema). Example: + + CREATE TABLE mytable ( + pk int, + ck int, + s int STATIC, + v int, + PRIMARY KEY (pk, ck) + ) WITH + CLUSTERING ORDER BY (ck ASC); + + # Forward read (using table's native order) + SELECT * FROM mytable WHERE pk = 1; + # Explicit forward order + SELECT * FROM mytable WHERE pk = 1 ORDER BY ck ASC; + + # Reverse read + SELECT * FROM mytable WHERE pk = 1 ORDER BY ck DESC; + +If the table's native clustering order is DESC, then a read with ASC +order is considered reverse. + +## Legacy format + +The legacy format is how scylla handled reverse queries internally. We +are in the process of migrating to the native reverse format, but for +now coordinator-side code still uses the legacy format. + +### Request + +The `query::partition_slice::options::reversed` flag is set. +Clustering ranges in both `query::partition_slice::_row_ranges` and +`query::specific_ranges::_ranges` +(`query::partition_slice::_specific_ranges`) are half-reversed: they +are ordered in reverse, but when they are compared to other +mutation-fragments, their end bound is used as position, instead of the +start bound as usual. When compared to other clustering ranges the end +bound is used as the start bound and vice-versa. +Example: + +For the clustering keys (ASC order): `ck1`, `ck2`, `ck3`, `ck4`, `ck5`, +`ck6`. +A `_row_ranges` field of a slice might contain this: + + [ck1, ck2], [ck4, ck5] + +The legacy reversed version would look like this: + + [ck4, ck5], [ck1, ck2] + +Note how the ranges themselves are the same (bounds not reversed), it is +just the range vector itself that is reversed. + +### Result + +Results are ordered with the reversed clustering order with the caveat +that range-tombstones are ordered by their end bound, using the native +schema's comparators. For example given the following partition: + + ps{pk1}, sr{}, cr{ck1}, rt{[ck2, ck4)}, cr{ck2}, cr{ck3}, cr{ck4}, ck{ck5}, pe{} + +The legacy reverse format equivalent of this looks like the following: + + ps{pk1}, sr{}, cr{ck5}, rt{[ck2, ck4)}, cr{ck4}, cr{ck3}, cr{ck2}, ck{ck1}, pe{} + +Note: +* Only clustering elements change; +* Range tombstone's bounds are not reversed; +* Range tombstones can be ordered off-by-one due to native schema + comparators used: `rt{[ck2, ck4)}` should be ordered *after* + `cr{ck4}`. + +Legend: +* ps = partitions-tart +* sr = static-row +* cr = clustering-row +* rt = range-tombstone +* pe = partition-end + +## Native format + +The native format uses ordering equivalent to that of a table with +reverse clustering format. Using `mytable` as an example, the native +reverse format would be an identical table `my_reverse_table`, which +uses `CLUSTERING ORDER BY (ck DESC);`. This allows middle layers in a +read pipeline to just use a schema with reversed clustering order and +process the reverse stream as normal. + +### Request + +The `query::partition_slice::options::reversed` flag is set as in the +legacy format. Clustering ranges in both +`query::partition_slice::_row_ranges` and +`query::specific_ranges::_ranges` +(`query::partition_slice::_specific_ranges`) are fully-reversed: they +are ordered in reverse, their bound being swapped as well. +Example: + +For the clustering keys (ASC order): `ck1`, `ck2`, `ck3`, `ck4`, `ck5`, +`ck6`. +A `_row_ranges` field of a slice might contain this: + + [ck1, ck2], [ck4, ck5] + +The native reversed version would look like this: + + [ck5, ck4], [ck2, ck1] + +In addition to this, the schema is reversed on the replica, at the start +of the read, so all the reverse-capable and intermediate readers in the +stack get a reversed schema to work with. + +### Result + +Results are ordered with the reversed clustering order with +the bounds of range-tombstones swapped. For example, given the same +partition that was used in the legacy format example, the native reverse +version would look like this: + + ps{pk1}, sr{}, cr{ck5}, cr{ck4}, rt{(ck4, ck2]}, cr{ck3}, cr{ck2}, ck{ck1}, pe{} diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 4b4e337c1c..5b9bc9d4dd 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -95,9 +95,9 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() { _buffer_size = compute_buffer_size(*_schema, _buffer); } -flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size) { +flat_mutation_reader make_reversing_reader(flat_mutation_reader original, query::max_result_size max_size) { class partition_reversing_mutation_reader final : public flat_mutation_reader::impl { - flat_mutation_reader* _source; + flat_mutation_reader _source; range_tombstone_list _range_tombstones; std::stack _mutation_fragments; mutation_fragment_opt _partition_end; @@ -107,13 +107,14 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query private: stop_iteration emit_partition() { auto emit_range_tombstone = [&] { - auto it = std::prev(_range_tombstones.end()); + // _range_tombstones uses the reverse schema already, so we can use `begin()` + auto it = _range_tombstones.begin(); push_mutation_fragment(*_schema, _permit, _range_tombstones.pop(it)); }; - position_in_partition::less_compare cmp(*_schema); + position_in_partition::tri_compare cmp(*_schema); while (!_mutation_fragments.empty() && !is_buffer_full()) { auto& mf = _mutation_fragments.top(); - if (!_range_tombstones.empty() && !cmp(_range_tombstones.rbegin()->end_position(), mf.position())) { + if (!_range_tombstones.empty() && cmp(_range_tombstones.begin()->position(), mf.position()) <= 0) { emit_range_tombstone(); } else { _stack_size -= mf.memory_usage(); @@ -131,15 +132,15 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query return stop_iteration::no; } future consume_partition_from_source() { - if (_source->is_buffer_empty()) { - if (_source->is_end_of_stream()) { + if (_source.is_buffer_empty()) { + if (_source.is_end_of_stream()) { _end_of_stream = true; return make_ready_future(stop_iteration::yes); } - return _source->fill_buffer().then([] { return stop_iteration::no; }); + return _source.fill_buffer().then([] { return stop_iteration::no; }); } - while (!_source->is_buffer_empty() && !is_buffer_full()) { - auto mf = _source->pop_mutation_fragment(); + while (!_source.is_buffer_empty() && !is_buffer_full()) { + auto mf = _source.pop_mutation_fragment(); if (mf.is_partition_start() || mf.is_static_row()) { push_mutation_fragment(std::move(mf)); } else if (mf.is_end_of_partition()) { @@ -148,7 +149,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query return make_ready_future(stop_iteration::yes); } } else if (mf.is_range_tombstone()) { - _range_tombstones.apply(*_schema, std::move(mf.as_range_tombstone())); + auto&& rt = std::move(mf).as_range_tombstone(); + rt.reverse(); + _range_tombstones.apply(*_schema, std::move(rt)); } else { _mutation_fragments.emplace(std::move(mf)); _stack_size += _mutation_fragments.top().memory_usage(); @@ -181,9 +184,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query return make_ready_future(is_buffer_full()); } public: - explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, query::max_result_size max_size) - : flat_mutation_reader::impl(mr.schema(), mr.permit()) - , _source(&mr) + explicit partition_reversing_mutation_reader(flat_mutation_reader mr, query::max_result_size max_size) + : flat_mutation_reader::impl(mr.schema()->make_reversed(), mr.permit()) + , _source(std::move(mr)) , _range_tombstones(*_schema) , _max_size(max_size) { } @@ -211,13 +214,20 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query } _range_tombstones.clear(); _partition_end = std::nullopt; - return _source->next_partition(); + return _source.next_partition(); } return make_ready_future<>(); } - virtual future<> fast_forward_to(const dht::partition_range&) override { - return make_exception_future<>(make_backtraced_exception_ptr()); + virtual future<> fast_forward_to(const dht::partition_range& pr) override { + clear_buffer(); + while (!_mutation_fragments.empty()) { + _mutation_fragments.pop(); + } + _stack_size = 0; + _partition_end = std::nullopt; + _end_of_stream = false; + return _source.fast_forward_to(pr); } virtual future<> fast_forward_to(position_range) override { @@ -225,12 +235,11 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query } virtual future<> close() noexcept override { - // we don't own _source therefore do not close it - return make_ready_future<>(); + return _source.close(); } }; - return make_flat_mutation_reader(original, max_size); + return make_flat_mutation_reader(std::move(original), max_size); } template diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 9037513f57..1ec196b39e 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -890,17 +890,19 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer) { flat_mutation_reader make_generating_reader(schema_ptr s, reader_permit permit, std::function ()> get_next_fragment); -/// A reader that emits partitions in reverse. +/// A reader that emits partitions in native reverse order. /// -/// 1. Static row is still emitted first. -/// 2. Range tombstones are ordered by their end position. -/// 3. Clustered rows and range tombstones are emitted in descending order. -/// Because of 2 and 3 the guarantee that a range tombstone is emitted before +/// 1. The reader's schema() method will return a reversed schema (see +/// \ref schema::make_reversed()). +/// 2. Static row is still emitted first. +/// 3. Range tombstones' bounds are reversed (see \ref range_tombstone::reverse()). +/// 4. Clustered rows and range tombstones are emitted in descending order. +/// Because of 3 and 4 the guarantee that a range tombstone is emitted before /// any mutation fragment affected by it still holds. /// Ordering of partitions themselves remains unchanged. +/// For more details see docs/design-notes/reverse-reads.md. /// -/// \param original the reader to be reversed, has to be kept alive while the -/// reversing reader is in use. +/// \param original the reader to be reversed /// \param max_size the maximum amount of memory the reader is allowed to use /// for reversing and conversely the maximum size of the results. The /// reverse reader reads entire partitions into memory, before reversing @@ -911,7 +913,7 @@ make_generating_reader(schema_ptr s, reader_permit permit, std::function resolved when the reader is fully consumed, and closed. diff --git a/idl/read_command.idl.hh b/idl/read_command.idl.hh index 4be3008993..d73b2fa3ea 100644 --- a/idl/read_command.idl.hh +++ b/idl/read_command.idl.hh @@ -30,6 +30,12 @@ class specific_ranges { std::vector> ranges(); }; +// COMPATIBILITY NOTE: the partition-slice for reverse queries has two different +// format: +// * legacy format +// * native format +// The wire format uses the legacy format. See docs/design-notes/reverse-reads.md +// for more details on the formats. class partition_slice { std::vector> default_row_ranges(); utils::small_vector static_columns; diff --git a/keys.cc b/keys.cc index 46c57beb81..f60c980412 100644 --- a/keys.cc +++ b/keys.cc @@ -27,6 +27,8 @@ #include #include +logging::logger klog("keys"); + std::ostream& operator<<(std::ostream& out, const partition_key& pk) { return out << "pk{" << to_hex(managed_bytes_view(pk.representation())) << "}"; } @@ -126,6 +128,16 @@ bound_kind invert_kind(bound_kind k) { abort(); } +bound_kind reverse_kind(bound_kind k) { + switch (k) { + case bound_kind::excl_start: return bound_kind::excl_end; + case bound_kind::incl_start: return bound_kind::incl_end; + case bound_kind::excl_end: return bound_kind::excl_start; + case bound_kind::incl_end: return bound_kind::incl_start; + } + on_internal_error(klog, format("reverse_kind(): invalid value for `bound_kind`: {}", static_cast>(k))); +} + int32_t weight(bound_kind k) { switch (k) { case bound_kind::excl_end: diff --git a/mutation.cc b/mutation.cc index 72a3bdfdf9..bc713d5eb2 100644 --- a/mutation.cc +++ b/mutation.cc @@ -196,49 +196,13 @@ future read_mutation_from_flat_mutation_reader(flat_mutation_reade }); } // r.is_buffer_empty() is always false at this point - struct adapter { - schema_ptr _s; - std::optional _builder; - adapter(schema_ptr s) : _s(std::move(s)) { } + return r.consume(mutation_rebuilder(r.schema())); +} - void consume_new_partition(const dht::decorated_key& dk) { - assert(!_builder); - _builder = mutation_rebuilder(dk, std::move(_s)); - } - - stop_iteration consume(tombstone t) { - assert(_builder); - return _builder->consume(t); - } - - stop_iteration consume(range_tombstone&& rt) { - assert(_builder); - return _builder->consume(std::move(rt)); - } - - stop_iteration consume(static_row&& sr) { - assert(_builder); - return _builder->consume(std::move(sr)); - } - - stop_iteration consume(clustering_row&& cr) { - assert(_builder); - return _builder->consume(std::move(cr)); - } - - stop_iteration consume_end_of_partition() { - assert(_builder); - return stop_iteration::yes; - } - - mutation_opt consume_end_of_stream() { - if (!_builder) { - return mutation_opt(); - } - return _builder->consume_end_of_stream(); - } - }; - return r.consume(adapter(r.schema())); +mutation reverse(mutation mut) { + auto reverse_schema = mut.schema()->make_reversed(); + mutation_rebuilder reverse_rebuilder(reverse_schema); + return *std::move(mut).consume(reverse_rebuilder, consume_in_reverse::yes).result; } std::ostream& operator<<(std::ostream& os, const mutation& m) { diff --git a/mutation.hh b/mutation.hh index f06ba38b77..ef703eb637 100644 --- a/mutation.hh +++ b/mutation.hh @@ -48,6 +48,7 @@ struct mutation_consume_result { enum class consume_in_reverse { no = 0, yes, + legacy_half_reverse, }; class mutation final { @@ -81,8 +82,11 @@ public: : _ptr(std::make_unique(std::move(schema), std::move(key), std::move(mp))) { } mutation(const mutation& m) - : _ptr(std::make_unique(schema_ptr(m.schema()), dht::decorated_key(m.decorated_key()), m.partition())) - { } + { + if (m._ptr) { + _ptr = std::make_unique(schema_ptr(m.schema()), dht::decorated_key(m.decorated_key()), m.partition()); + } + } mutation(mutation&&) = default; mutation& operator=(mutation&& x) = default; mutation& operator=(const mutation& m); @@ -127,6 +131,16 @@ public: // Consumes the mutation's content. // // The mutation is in a moved-from alike state after consumption. + // There are tree ways to consume the mutation: + // * consume_in_reverse::no - consume in forward order, as defined by the + // schema. + // * consume_in_reverse::yes - consume in reverse order, as if the schema + // had the opposite clustering order. This effectively reverses the + // mutation's content, according to the native reverse order[1]. + // * consume_in_reverse::legacy_half_reverse - consume rows and range + // tombstones in legacy reverse order[2]. + // + // For definition of [1] and [2] see docs/design-notes/reverse-reads.md. template auto consume(Consumer& consumer, consume_in_reverse reverse) && -> mutation_consume_result; @@ -151,19 +165,37 @@ private: namespace { template -stop_iteration consume_clustering_fragments(const schema& s, mutation_partition& partition, Consumer& consumer) { +stop_iteration consume_clustering_fragments(schema_ptr s, mutation_partition& partition, Consumer& consumer) { using crs_type = mutation_partition::rows_type; - using crs_iterator_type = std::conditional_t; + using crs_iterator_type = std::conditional_t; using rts_type = range_tombstone_list; - using rts_iterator_type = std::conditional_t; + using rts_iterator_type = std::conditional_t; + + if constexpr (reverse == consume_in_reverse::yes) { + s = s->make_reversed(); + } + + // only used when reverse == consume_in_reverse::yes + range_tombstone_list reversed_range_tombstones(*s); crs_iterator_type crs_it, crs_end; rts_iterator_type rts_it, rts_end; - if constexpr (reverse == consume_in_reverse::yes) { + if constexpr (reverse == consume_in_reverse::legacy_half_reverse) { crs_it = partition.clustered_rows().rbegin(); crs_end = partition.clustered_rows().rend(); rts_it = partition.row_tombstones().rbegin(); rts_end = partition.row_tombstones().rend(); + } else if constexpr (reverse == consume_in_reverse::yes) { + crs_it = partition.clustered_rows().rbegin(); + crs_end = partition.clustered_rows().rend(); + + while (!partition.row_tombstones().empty()) { + auto rt = partition.mutable_row_tombstones().pop_front_and_lock(); + rt.reverse(); + reversed_range_tombstones.apply(*s, std::move(rt)); + } + rts_it = reversed_range_tombstones.begin(); + rts_end = reversed_range_tombstones.end(); } else { crs_it = partition.clustered_rows().begin(); crs_end = partition.clustered_rows().end(); @@ -173,13 +205,13 @@ stop_iteration consume_clustering_fragments(const schema& s, mutation_partition& stop_iteration stop = stop_iteration::no; - position_in_partition::tri_compare cmp(s); + position_in_partition::tri_compare cmp(*s); while (!stop && (crs_it != crs_end || rts_it != rts_end)) { bool emit_rt; if (crs_it != crs_end && rts_it != rts_end) { const auto cmp_res = cmp(rts_it->position(), crs_it->position()); - if constexpr (reverse == consume_in_reverse::yes) { + if constexpr (reverse == consume_in_reverse::legacy_half_reverse) { emit_rt = cmp_res > 0; } else { emit_rt = cmp_res < 0; @@ -191,7 +223,11 @@ stop_iteration consume_clustering_fragments(const schema& s, mutation_partition& stop = consumer.consume(std::move(rts_it->tombstone())); ++rts_it; } else { - stop = consumer.consume(clustering_row(std::move(*crs_it))); + // Dummy rows are part of the in-memory representation but should be + // invisible to reads. + if (!crs_it->dummy()) { + stop = consumer.consume(clustering_row(std::move(*crs_it))); + } ++crs_it; } } @@ -217,9 +253,11 @@ auto mutation::consume(Consumer& consumer, consume_in_reverse reverse) && -> mut } if (reverse == consume_in_reverse::yes) { - stop = consume_clustering_fragments(*_ptr->_schema, partition, consumer); + stop = consume_clustering_fragments(_ptr->_schema, partition, consumer); + } else if (reverse == consume_in_reverse::legacy_half_reverse) { + stop = consume_clustering_fragments(_ptr->_schema, partition, consumer); } else { - stop = consume_clustering_fragments(*_ptr->_schema, partition, consumer); + stop = consume_clustering_fragments(_ptr->_schema, partition, consumer); } const auto stop_consuming = consumer.consume_end_of_partition(); @@ -292,3 +330,7 @@ class flat_mutation_reader; // Reads a single partition from a reader. Returns empty optional if there are no more partitions to be read. future read_mutation_from_flat_mutation_reader(flat_mutation_reader& reader); + +// Reverses the mutation as if it was created with a schema with reverse +// clustering order. The resulting mutation will contain a reverse schema too. +mutation reverse(mutation mut); diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 42ac32fd5b..d5070ec244 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -224,7 +224,7 @@ public: , _row_limit(limit) , _partition_limit(partition_limit) , _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit()) - , _range_tombstones(s, _slice.options.contains(query::partition_slice::option::reversed)) + , _range_tombstones(s) , _last_dk({dht::token(), partition_key::make_empty()}) { static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction."); @@ -238,7 +238,7 @@ public: , _get_max_purgeable(std::move(get_max_purgeable)) , _can_gc([this] (tombstone t) { return can_gc(t); }) , _slice(s.full_slice()) - , _range_tombstones(s, false) + , _range_tombstones(s) , _last_dk({dht::token(), partition_key::make_empty()}) , _collector(std::make_unique(_schema)) { diff --git a/mutation_partition.cc b/mutation_partition.cc index 8dbd6b14d8..70222f6888 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1978,8 +1978,7 @@ void reconcilable_result_builder::consume_new_partition(const dht::decorated_key !has_ck_selector(_slice.row_ranges(_schema, dk.key())); _static_row_is_alive = false; _live_rows = 0; - auto is_reversed = _slice.options.contains(query::partition_slice::option::reversed); - _mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), is_reversed)); + _mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), _reversed)); } void reconcilable_result_builder::consume(tombstone t) { @@ -2008,6 +2007,10 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom stop_iteration reconcilable_result_builder::consume(range_tombstone&& rt) { _memory_accounter.update(rt.memory_usage(_schema)); + if (_reversed) { + // undo reversing done for the native reversed format, coordinator still uses old reversing format + rt.reverse(); + } return _mutation_consumer->consume(std::move(rt)); } @@ -2040,7 +2043,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); auto consumer = compact_for_query(*s, gc_clock::time_point::min(), slice, max_rows, max_partitions, query_result_builder(*s, builder)); - const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::yes : consume_in_reverse::no; + const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; for (const partition& p : r.partitions()) { const auto res = p.mut().unfreeze(s).consume(consumer, reverse); @@ -2059,7 +2062,7 @@ query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_l query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); auto consumer = compact_for_query(*m.schema(), now, slice, row_limit, query::max_partitions, query_result_builder(*m.schema(), builder)); - const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::yes : consume_in_reverse::no; + const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; std::move(m).consume(consumer, reverse); return builder.build(); } diff --git a/mutation_query.hh b/mutation_query.hh index 74ba786585..0e54f8ee39 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -137,6 +137,7 @@ public: class reconcilable_result_builder { const schema& _schema; const query::partition_slice& _slice; + bool _reversed; bool _return_static_content_on_partition_with_no_rows{}; bool _static_row_is_alive{}; @@ -151,7 +152,7 @@ class reconcilable_result_builder { public: reconcilable_result_builder(const schema& s, const query::partition_slice& slice, query::result_memory_accounter&& accounter) noexcept - : _schema(s), _slice(slice) + : _schema(s), _slice(slice), _reversed(_slice.options.contains(query::partition_slice::option::reversed)) , _memory_accounter(std::move(accounter)) { } diff --git a/mutation_rebuilder.hh b/mutation_rebuilder.hh index c893cc26e8..1a17667f5e 100644 --- a/mutation_rebuilder.hh +++ b/mutation_rebuilder.hh @@ -25,38 +25,51 @@ #include "range_tombstone_assembler.hh" class mutation_rebuilder { - mutation _m; + schema_ptr _s; + mutation_opt _m; public: - mutation_rebuilder(dht::decorated_key dk, schema_ptr s) - : _m(std::move(s), std::move(dk)) { + explicit mutation_rebuilder(schema_ptr s) : _s(std::move(s)) { } + + void consume_new_partition(const dht::decorated_key& dk) { + assert(!_m); + _m = mutation(_s, std::move(dk)); } stop_iteration consume(tombstone t) { - _m.partition().apply(t); + assert(_m); + _m->partition().apply(t); return stop_iteration::no; } stop_iteration consume(range_tombstone&& rt) { - _m.partition().apply_row_tombstone(*_m.schema(), std::move(rt)); + assert(_m); + _m->partition().apply_row_tombstone(*_s, std::move(rt)); return stop_iteration::no; } stop_iteration consume(static_row&& sr) { - _m.partition().static_row().apply(*_m.schema(), column_kind::static_column, std::move(sr.cells())); + assert(_m); + _m->partition().static_row().apply(*_s, column_kind::static_column, std::move(sr.cells())); return stop_iteration::no; } stop_iteration consume(clustering_row&& cr) { - auto& dr = _m.partition().clustered_row(*_m.schema(), std::move(cr.key())); + assert(_m); + auto& dr = _m->partition().clustered_row(*_s, std::move(cr.key())); dr.apply(cr.tomb()); dr.apply(cr.marker()); - dr.cells().apply(*_m.schema(), column_kind::regular_column, std::move(cr.cells())); + dr.cells().apply(*_s, column_kind::regular_column, std::move(cr.cells())); return stop_iteration::no; } + stop_iteration consume_end_of_partition() { + assert(_m); + return stop_iteration::yes; + } + mutation_opt consume_end_of_stream() { - return mutation_opt(std::move(_m)); + return std::move(_m); } }; @@ -65,10 +78,10 @@ public: // Does not work with streams in streamed_mutation::forwarding::yes mode. class mutation_rebuilder_v2 { schema_ptr _s; - std::optional _builder; + mutation_rebuilder _builder; range_tombstone_assembler _rt_assembler; public: - mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)) { } + mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { } public: stop_iteration consume(partition_start mf) { consume_new_partition(mf.key()); @@ -82,47 +95,38 @@ public: } public: void consume_new_partition(const dht::decorated_key& dk) { - assert(!_builder); - _builder = mutation_rebuilder(dk, _s); + _builder.consume_new_partition(dk); } stop_iteration consume(tombstone t) { - assert(_builder); - _builder->consume(t); + _builder.consume(t); return stop_iteration::no; } stop_iteration consume(range_tombstone_change&& rt) { - assert(_builder); if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) { - _builder->consume(std::move(*rt_opt)); + _builder.consume(std::move(*rt_opt)); } return stop_iteration::no; } stop_iteration consume(static_row&& sr) { - assert(_builder); - _builder->consume(std::move(sr)); + _builder.consume(std::move(sr)); return stop_iteration::no; } stop_iteration consume(clustering_row&& cr) { - assert(_builder); - _builder->consume(std::move(cr)); + _builder.consume(std::move(cr)); return stop_iteration::no; } stop_iteration consume_end_of_partition() { - assert(_builder); _rt_assembler.on_end_of_stream(); return stop_iteration::yes; } mutation_opt consume_end_of_stream() { - if (!_builder) { - return mutation_opt(); - } _rt_assembler.on_end_of_stream(); - return _builder->consume_end_of_stream(); + return _builder.consume_end_of_stream(); } }; diff --git a/partition_slice_builder.cc b/partition_slice_builder.cc index b1f2730361..e3465c538d 100644 --- a/partition_slice_builder.cc +++ b/partition_slice_builder.cc @@ -25,6 +25,16 @@ #include "partition_slice_builder.hh" +partition_slice_builder::partition_slice_builder(const schema& schema, query::partition_slice slice) + : _regular_columns(std::move(slice.regular_columns)) + , _static_columns(std::move(slice.static_columns)) + , _row_ranges(std::move(slice._row_ranges)) + , _specific_ranges(std::move(slice._specific_ranges)) + , _schema(schema) + , _options(std::move(slice.options)) +{ +} + partition_slice_builder::partition_slice_builder(const schema& schema) : _schema(schema) { @@ -63,7 +73,8 @@ partition_slice_builder::build() { std::move(ranges), std::move(static_columns), std::move(regular_columns), - std::move(_options) + std::move(_options), + std::move(_specific_ranges) }; } @@ -88,6 +99,22 @@ partition_slice_builder::with_ranges(std::vector ranges return *this; } +partition_slice_builder& +partition_slice_builder::mutate_ranges(std::function&)> func) { + if (_row_ranges) { + func(*_row_ranges); + } + return *this; +} + +partition_slice_builder& +partition_slice_builder::mutate_specific_ranges(std::function func) { + if (_specific_ranges) { + func(*_specific_ranges); + } + return *this; +} + partition_slice_builder& partition_slice_builder::with_no_regular_columns() { _regular_columns = query::column_id_vector(); diff --git a/partition_slice_builder.hh b/partition_slice_builder.hh index f44cccf410..9fbd296fd3 100644 --- a/partition_slice_builder.hh +++ b/partition_slice_builder.hh @@ -40,10 +40,12 @@ class partition_slice_builder { std::optional _regular_columns; std::optional _static_columns; std::optional> _row_ranges; + std::unique_ptr _specific_ranges; const schema& _schema; query::partition_slice::option_set _options; public: partition_slice_builder(const schema& schema); + partition_slice_builder(const schema& schema, query::partition_slice slice); partition_slice_builder& with_static_column(bytes name); partition_slice_builder& with_no_static_columns(); @@ -51,6 +53,10 @@ public: partition_slice_builder& with_no_regular_columns(); partition_slice_builder& with_range(query::clustering_range range); partition_slice_builder& with_ranges(std::vector); + // noop if no ranges have been set yet + partition_slice_builder& mutate_ranges(std::function&)>); + // noop if no specific ranges have been set yet + partition_slice_builder& mutate_specific_ranges(std::function); partition_slice_builder& without_partition_key_columns(); partition_slice_builder& without_clustering_key_columns(); partition_slice_builder& reversed(); diff --git a/querier.hh b/querier.hh index 0c24221444..fe3f58e404 100644 --- a/querier.hh +++ b/querier.hh @@ -97,7 +97,7 @@ auto consume_page(flat_mutation_reader& reader, auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), max_size] () mutable { if (slice.options.contains(query::partition_slice::option::reversed)) { - return with_closeable(make_reversing_reader(reader, max_size), + return with_closeable(make_reversing_reader(make_flat_mutation_reader(reader), max_size), [reader_consumer = std::move(reader_consumer)] (flat_mutation_reader& reversing_reader) mutable { return reversing_reader.consume(std::move(reader_consumer)); }); @@ -128,6 +128,14 @@ protected: std::variant _reader; dht::partition_ranges_view _query_ranges; +protected: + schema_ptr underlying_schema() const { + if (is_reversed()) { + return _schema->make_reversed(); + } + return _schema; + } + public: querier_base(reader_permit permit, std::unique_ptr range, std::unique_ptr slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges) @@ -145,7 +153,7 @@ public: , _permit(std::move(permit)) , _range(std::make_unique(std::move(range))) , _slice(std::make_unique(std::move(slice))) - , _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + , _reader(ms.make_reader(underlying_schema(), _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) , _query_ranges(*_range) { } diff --git a/query-request.hh b/query-request.hh index b26db904cb..f741b381e2 100644 --- a/query-request.hh +++ b/query-request.hh @@ -33,6 +33,7 @@ #include "query_class_config.hh" class position_in_partition_view; +class partition_slice_builder; namespace query { @@ -110,6 +111,9 @@ public: const clustering_row_ranges& ranges() const { return _ranges; } + clustering_row_ranges& ranges() { + return _ranges; + } private: friend std::ostream& operator<<(std::ostream& out, const specific_ranges& r); @@ -124,7 +128,15 @@ constexpr auto max_rows_if_set = std::numeric_limits::max(); // Specifies subset of rows, columns and cell attributes to be returned in a query. // Can be accessed across cores. // Schema-dependent. +// +// COMPATIBILITY NOTE: the partition-slice for reverse queries has two different +// format: +// * legacy format +// * native format +// The wire format uses the legacy format. See docs/design-notes/reverse-reads.md +// for more details on the formats. class partition_slice { + friend class ::partition_slice_builder; public: enum class option { send_clustering_key, @@ -226,6 +238,13 @@ public: friend std::ostream& operator<<(std::ostream& out, const specific_ranges& ps); }; +// See docs/design-notes/reverse-reads.md +partition_slice legacy_reverse_slice_to_native_reverse_slice(const schema& schema, partition_slice slice); +partition_slice native_reverse_slice_to_legacy_reverse_slice(const schema& schema, partition_slice slice); +// Fully reverse slice (forward to native reverse) +// Also set the reversed bit in `partition_slice::options`. +partition_slice reverse_slice(const schema& schema, partition_slice slice); + constexpr auto max_partitions = std::numeric_limits::max(); // Tagged integers to disambiguate constructor arguments. diff --git a/query.cc b/query.cc index aaa883c868..f0f515a626 100644 --- a/query.cc +++ b/query.cc @@ -29,6 +29,7 @@ #include "mutation_partition_serializer.hh" #include "query-result-reader.hh" #include "query_result_merger.hh" +#include "partition_slice_builder.hh" namespace query { @@ -114,6 +115,41 @@ void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& range reversed ? position_in_partition_view::after_key(full_key) : position_in_partition_view::before_key(full_key), reversed); } +static void reverse_clustering_ranges_bounds(clustering_row_ranges& ranges) { + for (auto& range : ranges) { + if (!range.is_singular()) { + range = query::clustering_range(range.end(), range.start()); + } + } +} + +partition_slice legacy_reverse_slice_to_native_reverse_slice(const schema& schema, partition_slice slice) { + return partition_slice_builder(schema, std::move(slice)) + .mutate_ranges([] (clustering_row_ranges& ranges) { reverse_clustering_ranges_bounds(ranges); }) + .mutate_specific_ranges([] (specific_ranges& ranges) { reverse_clustering_ranges_bounds(ranges.ranges()); }) + .build(); +} + +partition_slice native_reverse_slice_to_legacy_reverse_slice(const schema& schema, partition_slice slice) { + // They are the same, we give them different names to express intent + return legacy_reverse_slice_to_native_reverse_slice(schema, std::move(slice)); +} + +partition_slice reverse_slice(const schema& schema, partition_slice slice) { + return partition_slice_builder(schema, std::move(slice)) + .mutate_ranges([] (clustering_row_ranges& ranges) { + std::reverse(ranges.begin(), ranges.end()); + reverse_clustering_ranges_bounds(ranges); + }) + .mutate_specific_ranges([] (specific_ranges& sranges) { + auto& ranges = sranges.ranges(); + std::reverse(ranges.begin(), ranges.end()); + reverse_clustering_ranges_bounds(ranges); + }) + .with_option() + .build(); +} + partition_slice::partition_slice(clustering_row_ranges row_ranges, query::column_id_vector static_columns, query::column_id_vector regular_columns, diff --git a/range_tombstone.cc b/range_tombstone.cc index ce6cb83761..ffcf45e5ae 100644 --- a/range_tombstone.cc +++ b/range_tombstone.cc @@ -73,10 +73,6 @@ void range_tombstone_accumulator::update_current_tombstone() { void range_tombstone_accumulator::drop_unneeded_tombstones(const clustering_key_prefix& ck, int w) { auto cmp = [&] (const range_tombstone& rt, const clustering_key_prefix& ck, int w) { - if (_reversed) { - auto bv = rt.start_bound(); - return _cmp(ck, w, bv.prefix(), weight(bv.kind())); - } auto bv = rt.end_bound(); return _cmp(bv.prefix(), weight(bv.kind()), ck, w); }; @@ -91,15 +87,11 @@ void range_tombstone_accumulator::drop_unneeded_tombstones(const clustering_key_ } void range_tombstone_accumulator::apply(range_tombstone rt) { - if (_reversed) { - drop_unneeded_tombstones(rt.end, weight(rt.end_kind)); - } else { - drop_unneeded_tombstones(rt.start, weight(rt.start_kind)); - } + drop_unneeded_tombstones(rt.start, weight(rt.start_kind)); _current_tombstone.apply(rt.tomb); auto cmp = [&] (const range_tombstone& rt1, const range_tombstone& rt2) { - return _reversed ? _cmp(rt2.start_bound(), rt1.start_bound()) : _cmp(rt1.end_bound(), rt2.end_bound()); + return _cmp(rt1.end_bound(), rt2.end_bound()); }; _range_tombstones.insert(boost::upper_bound(_range_tombstones, rt, cmp), std::move(rt)); } diff --git a/range_tombstone.hh b/range_tombstone.hh index 6386fcdbcb..6672ead65e 100644 --- a/range_tombstone.hh +++ b/range_tombstone.hh @@ -192,6 +192,15 @@ public: end_kind = new_end.kind(); } + // Swap bounds to reverse range-tombstone -- as if it came from a table with + // reverse native order. See docs/design-notes/reverse-reads.md. + void reverse() { + std::swap(start, end); + std::swap(start_kind, end_kind); + start_kind = reverse_kind(start_kind); + end_kind = reverse_kind(end_kind); + } + size_t external_memory_usage(const schema&) const noexcept { return start.external_memory_usage() + end.external_memory_usage(); } @@ -252,13 +261,12 @@ class range_tombstone_accumulator { tombstone _partition_tombstone; std::deque _range_tombstones; tombstone _current_tombstone; - bool _reversed; private: void update_current_tombstone(); void drop_unneeded_tombstones(const clustering_key_prefix& ck, int w = 0); public: - range_tombstone_accumulator(const schema& s, bool reversed) - : _cmp(s), _reversed(reversed) { } + explicit range_tombstone_accumulator(const schema& s) + : _cmp(s) { } void set_partition_tombstone(tombstone t) { _partition_tombstone = t; diff --git a/schema.cc b/schema.cc index 219cb3392c..3fe92e3790 100644 --- a/schema.cc +++ b/schema.cc @@ -415,10 +415,16 @@ schema::schema(const raw_schema& raw, std::optional raw_view_info } } -schema::schema(const schema& o) +schema::schema(const schema& o, const std::function& transform) : _raw(o._raw) , _offsets(o._offsets) { + // Do the transformation after all the raw fields are initialized, but + // *before* the derived fields are generated (from the raw ones). + if (transform) { + transform(*this); + } + rebuild(); if (o.is_view()) { _view_info = std::make_unique<::view_info>(*this, o.view_info()->raw()); @@ -428,6 +434,23 @@ schema::schema(const schema& o) } } +schema::schema(const schema& o) + : schema(o, {}) +{ +} + +schema::schema(reversed_tag, const schema& o) + : schema(o, [] (schema& s) { + s._raw._version = utils::UUID_gen::negate(s._raw._version); + for (auto& col : s._raw._columns) { + if (col.kind == column_kind::clustering_key) { + col.type = reversed(col.type); + } + } + }) +{ +} + lw_shared_ptr make_shared_schema(std::optional id, std::string_view ks_name, std::string_view cf_name, std::vector partition_key, std::vector clustering_key, std::vector regular_columns, std::vector static_columns, @@ -1568,6 +1591,10 @@ bool schema::equal_columns(const schema& other) const { return boost::equal(all_columns(), other.all_columns()); } +schema_ptr schema::make_reversed() const { + return make_lw_shared(schema::reversed_tag{}, *this); +} + raw_view_info::raw_view_info(utils::UUID base_id, sstring base_name, bool include_all_columns, sstring where_clause) : _base_id(std::move(base_id)) , _base_name(std::move(base_name)) diff --git a/schema.hh b/schema.hh index a96f31304f..9e23df3047 100644 --- a/schema.hh +++ b/schema.hh @@ -584,6 +584,10 @@ public: } }; +class schema; + +using schema_ptr = lw_shared_ptr; + /* * Effectively immutable. * Not safe to access across cores because of shared_ptr's. @@ -695,11 +699,16 @@ public: data_type type; }; private: + struct reversed_tag { }; + lw_shared_ptr make_column_specification(const column_definition& def); void rebuild(); schema(const raw_schema&, std::optional); + schema(const schema&, const std::function&); public: schema(const schema&); + // See \ref make_reversed(). + schema(reversed_tag, const schema&); ~schema(); table_schema_version version() const { return _raw._version; @@ -966,6 +975,17 @@ public: const v3_columns& v3() const { return _v3_columns; } + + // Make a copy of the schema with reversed clustering order. + // + // The reversing is revertible, so that: + // + // s->make_reversed()->make_reversed()->version() == s->version() + // + // But note that: `s != s->make_reversed()->make_reversed()` (they are two + // different C++ objects). + // The schema's version is also reversed using UUID_gen::negate(). + schema_ptr make_reversed() const; }; lw_shared_ptr make_shared_schema(std::optional id, std::string_view ks_name, std::string_view cf_name, @@ -974,8 +994,6 @@ lw_shared_ptr make_shared_schema(std::optional id, st bool operator==(const schema&, const schema&); -using schema_ptr = lw_shared_ptr; - /** * Wrapper for schema_ptr used by functions that expect an engaged view_info field. */ diff --git a/table.cc b/table.cc index 5e784decd3..b3388b0634 100644 --- a/table.cc +++ b/table.cc @@ -2052,7 +2052,10 @@ table::mutation_query(schema_ptr s, std::exception_ptr ex; try { - auto rrb = reconcilable_result_builder(*s, cmd.slice, std::move(accounter)); + // Un-reverse the schema sent to the coordinator, it expects the + // legacy format. + auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s; + auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter)); auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query); if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) { diff --git a/test/boost/UUID_test.cc b/test/boost/UUID_test.cc index d1b3143bea..1b0d12f6da 100644 --- a/test/boost/UUID_test.cc +++ b/test/boost/UUID_test.cc @@ -230,3 +230,20 @@ BOOST_AUTO_TEST_CASE(test_max_time_uuid) { auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid); BOOST_CHECK(unix_timestamp == millis); } + +BOOST_AUTO_TEST_CASE(test_negate) { + using namespace utils; + + auto original_uuid = UUID_gen::get_time_UUID(); + BOOST_TEST_MESSAGE(fmt::format("original_uuid: {}", original_uuid)); + + auto negated_uuid = UUID_gen::negate(original_uuid); + BOOST_TEST_MESSAGE(fmt::format("negated_uuid: {}", negated_uuid)); + + BOOST_REQUIRE(original_uuid != negated_uuid); + + auto re_negated_uuid = UUID_gen::negate(negated_uuid); + BOOST_TEST_MESSAGE(fmt::format("re_negated_uuid: {}", re_negated_uuid)); + + BOOST_REQUIRE(original_uuid == re_negated_uuid); +} diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 5ee5a94bf5..fd049dd410 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -41,6 +41,8 @@ #include "test/lib/flat_mutation_reader_assertions.hh" #include "test/lib/log.hh" #include "test/lib/reader_concurrency_semaphore.hh" +#include "test/lib/random_utils.hh" +#include "test/lib/random_schema.hh" #include @@ -476,8 +478,8 @@ using in_thread = seastar::bool_class; struct flat_stream_consumer { schema_ptr _schema; + schema_ptr _reversed_schema; reader_permit _permit; - reversed_partitions _reversed; skip_after_first_fragment _skip_partition; skip_after_first_partition _skip_stream; std::vector _mutations; @@ -485,22 +487,17 @@ struct flat_stream_consumer { bool _inside_partition = false; private: void verify_order(position_in_partition_view pos) { - position_in_partition::less_compare cmp(*_schema); - if (!_reversed) { - BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row() - || cmp(*_previous_position, pos)); - } else { - BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row() - || cmp(pos, *_previous_position)); - } + const schema& s = _reversed_schema ? *_reversed_schema : *_schema; + position_in_partition::less_compare cmp(s); + BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row() || cmp(*_previous_position, pos)); } public: flat_stream_consumer(schema_ptr s, reader_permit permit, reversed_partitions reversed, skip_after_first_fragment skip_partition = skip_after_first_fragment::no, skip_after_first_partition skip_stream = skip_after_first_partition::no) : _schema(std::move(s)) + , _reversed_schema(reversed ? _schema->make_reversed() : nullptr) , _permit(std::move(permit)) - , _reversed(reversed) , _skip_partition(skip_partition) , _skip_stream(skip_stream) { } @@ -534,10 +531,13 @@ public: } stop_iteration consume(range_tombstone&& rt) { BOOST_REQUIRE(_inside_partition); - auto pos = _reversed ? rt.end_position() : rt.position(); + auto pos = rt.position(); verify_order(pos); BOOST_REQUIRE_GE(_mutations.size(), 1); _previous_position.emplace(pos); + if (_reversed_schema) { + rt.reverse(); // undo the reversing + } _mutations.back().partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(rt))); return stop_iteration(bool(_skip_partition)); } @@ -564,7 +564,8 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti return fmr.consume_in_thread(std::move(fsc)); } else { if (reversed) { - return with_closeable(make_reversing_reader(fmr, query::max_result_size(size_t(1) << 20)), [fsc = std::move(fsc)] (flat_mutation_reader& reverse_reader) mutable { + return with_closeable(make_reversing_reader(make_flat_mutation_reader(fmr), query::max_result_size(size_t(1) << 20)), + [fsc = std::move(fsc)] (flat_mutation_reader& reverse_reader) mutable { return reverse_reader.consume(std::move(fsc)); }).get0(); } @@ -813,11 +814,8 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) { } const uint64_t hard_limit = size_t(1) << 18; - auto reader = flat_mutation_reader_from_mutations(semaphore.make_permit(), {mut}); - // need to close both readers since the reverse_reader - // doesn't own the reader passed to it by ref. - auto close_reader = deferred_close(reader); - auto reverse_reader = make_reversing_reader(reader, query::max_result_size(size_t(1) << 10, hard_limit)); + auto reverse_reader = make_reversing_reader(flat_mutation_reader_from_mutations(semaphore.make_permit(), {mut}), + query::max_result_size(size_t(1) << 10, hard_limit)); auto close_reverse_reader = deferred_close(reverse_reader); try { @@ -840,3 +838,94 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) { test_with_partition(true); test_with_partition(false); } + +SEASTAR_THREAD_TEST_CASE(test_reverse_reader_reads_in_native_reverse_order) { + using namespace tests::data_model; + using key_range = nonwrapping_interval; + + std::mt19937 engine(tests::random::get_int()); + + tests::reader_concurrency_semaphore_wrapper semaphore; + auto permit = semaphore.make_permit(); + + auto rnd_schema_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(1, 8)); + auto rnd_schema = tests::random_schema(engine(), *rnd_schema_spec); + + auto forward_schema = rnd_schema.schema(); + auto reverse_schema = forward_schema->make_reversed(); + + auto forward_mt = make_lw_shared(forward_schema); + auto reverse_mt = make_lw_shared(reverse_schema); + + for (size_t pk = 0; pk != 8; ++pk) { + auto mut = rnd_schema.new_mutation(pk); + + if (forward_schema->has_static_columns()) { + rnd_schema.add_static_row(engine, mut); + } + + auto ckeys = rnd_schema.make_ckeys(8); + + for (size_t ck = 0; ck != ckeys.size(); ++ck) { + const auto& ckey = ckeys.at(ck); + if (ck % 4 == 0 && ck + 3 < ckeys.size()) { + const auto& ckey_1 = ckeys.at(ck + 1); + const auto& ckey_2 = ckeys.at(ck + 2); + const auto& ckey_3 = ckeys.at(ck + 3); + rnd_schema.delete_range(engine, mut, key_range::make({ckey, true}, {ckey_2, true})); + rnd_schema.delete_range(engine, mut, key_range::make({ckey, true}, {ckey_3, true})); + rnd_schema.delete_range(engine, mut, key_range::make({ckey_1, true}, {ckey_3, true})); + } + rnd_schema.add_row(engine, mut, ckey); + } + + forward_mt->apply(mut.build(forward_schema)); + reverse_mt->apply(mut.build(reverse_schema)); + } + + auto reversed_forward_reader = assert_that(make_reversing_reader(forward_mt->make_flat_reader(forward_schema, permit), query::max_result_size(1 << 20))); + + auto reverse_reader = reverse_mt->make_flat_reader(reverse_schema, permit); + auto deferred_reverse_close = deferred_close(reverse_reader); + + while (auto mf_opt = reverse_reader().get()) { + auto& mf = *mf_opt; + reversed_forward_reader.produces(*forward_schema, mf); + } + reversed_forward_reader.produces_end_of_stream(); +} + +SEASTAR_THREAD_TEST_CASE(test_reverse_reader_is_mutation_source) { + std::list reversed_slices; + auto populate = [&reversed_slices] (schema_ptr s, const std::vector &muts) { + auto reverse_schema = s->make_reversed(); + auto reverse_mt = make_lw_shared(reverse_schema); + for (const auto& mut : muts) { + reverse_mt->apply(reverse(mut)); + } + + return mutation_source([=, &reversed_slices] ( + schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_ptr, + streamed_mutation::forwarding fwd_sm, + mutation_reader::forwarding fwd_mr) mutable { + reversed_slices.emplace_back(query::reverse_slice(*schema, slice)); + // We don't want the memtable reader to read in reverse. + reversed_slices.back().options.remove(query::partition_slice::option::reversed); + auto rd = make_reversing_reader(reverse_mt->make_flat_reader(schema->make_reversed(), std::move(permit), range, reversed_slices.back(), pc, + std::move(trace_ptr), streamed_mutation::forwarding::no, fwd_mr), query::max_result_size(1 << 20)); + if (fwd_sm) { + return make_forwardable(std::move(rd)); + } + return rd; + }); + }; + run_mutation_source_tests(populate); +} diff --git a/test/boost/mutation_query_test.cc b/test/boost/mutation_query_test.cc index 545ff8f383..e1e30d313c 100644 --- a/test/boost/mutation_query_test.cc +++ b/test/boost/mutation_query_test.cc @@ -65,7 +65,11 @@ mutation_source make_source(std::vector mutations) { const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { assert(range.is_full()); // slicing not implemented yet for (auto&& m : mutations) { - assert(m.schema() == s); + if (slice.options.contains(query::partition_slice::option::reversed)) { + assert(m.schema()->make_reversed()->version() == s->version()); + } else { + assert(m.schema() == s); + } } return flat_mutation_reader_from_mutations(std::move(permit), mutations, slice, fwd); }); diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 4ec19f0ad2..46e0943c88 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2889,7 +2889,7 @@ void check_clustering_row_summaries(const schema& schema, const clustering_row_s } void check_clustering_summaries(const schema& schema, const partition_summary& actual, const partition_summary& expected) { - range_tombstone_accumulator range_tombstones(schema, false); + range_tombstone_accumulator range_tombstones(schema); range_tombstones.set_partition_tombstone(expected.tomb); for (auto [actual_frag, expected_frag] : iterate_over_in_ordered_lockstep(actual.clustering_fragments, expected.clustering_fragments, @@ -3122,3 +3122,69 @@ SEASTAR_THREAD_TEST_CASE(test_appending_hash_row_4567) { // These checks are meaningful because legacy hashing is still used for old nodes. BOOST_CHECK_EQUAL(compute_legacy_hash(r1, { 0, 1, 2 }), compute_legacy_hash(r2, { 0, 1, 2 })); } + +SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) { + std::mt19937 engine(tests::random::get_int()); + + tests::reader_concurrency_semaphore_wrapper semaphore; + auto permit = semaphore.make_permit(); + + auto rnd_schema_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(1, 8)); + auto rnd_schema = tests::random_schema(engine(), *rnd_schema_spec); + + auto forward_schema = rnd_schema.schema(); + auto reverse_schema = forward_schema->make_reversed(); + + const auto muts = tests::generate_random_mutations( + rnd_schema, + tests::default_timestamp_generator(), + tests::no_expiry_expiry_generator(), + std::uniform_int_distribution(1, 1)).get(); + + class validating_consumer { + mutation_fragment_stream_validator _validator; + + public: + explicit validating_consumer(const schema& s) : _validator(s) { } + + void consume_new_partition(const dht::decorated_key&) { + BOOST_REQUIRE(_validator(mutation_fragment::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{}))); + } + void consume(tombstone) { } + stop_iteration consume(static_row&& sr) { + BOOST_REQUIRE(_validator(mutation_fragment::kind::static_row, sr.position())); + return stop_iteration::no; + } + stop_iteration consume(clustering_row&& cr) { + BOOST_REQUIRE(_validator(mutation_fragment::kind::clustering_row, cr.position())); + return stop_iteration::no; + } + stop_iteration consume(range_tombstone&& rt) { + BOOST_REQUIRE(_validator(mutation_fragment::kind::range_tombstone, rt.position())); + return stop_iteration::no; + } + stop_iteration consume_end_of_partition() { + BOOST_REQUIRE(_validator(mutation_fragment::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{}))); + return stop_iteration::no; + } + void consume_end_of_stream() { + BOOST_REQUIRE(_validator.on_end_of_stream()); + } + }; + + BOOST_TEST_MESSAGE("Forward"); + { + auto mut = muts.front(); + validating_consumer consumer(*forward_schema); + std::move(mut).consume(consumer, consume_in_reverse::no); + } + BOOST_TEST_MESSAGE("Reverse"); + { + auto mut = muts.front(); + validating_consumer consumer(*reverse_schema); + std::move(mut).consume(consumer, consume_in_reverse::yes); + } +} diff --git a/test/boost/range_tombstone_list_test.cc b/test/boost/range_tombstone_list_test.cc index 3840451b74..4de8311abd 100644 --- a/test/boost/range_tombstone_list_test.cc +++ b/test/boost/range_tombstone_list_test.cc @@ -884,8 +884,7 @@ BOOST_AUTO_TEST_CASE(test_accumulator) { auto ts1 = 1; auto ts2 = 2; - testlog.info("Forward"); - auto acc = range_tombstone_accumulator(*s, false); + auto acc = range_tombstone_accumulator(*s); acc.apply(rtie(0, 4, ts1)); BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 0 })), tombstone(ts1, gc_now)); acc.apply(rtie(1, 2, ts2)); @@ -904,26 +903,4 @@ BOOST_AUTO_TEST_CASE(test_accumulator) { BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 13 })), tombstone(ts2, gc_now)); BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 14 })), tombstone()); BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 15 })), tombstone()); - - testlog.info("Reversed"); - acc = range_tombstone_accumulator(*s, true); - - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 15 })), tombstone()); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 14 })), tombstone()); - acc.apply(rtie(11, 14, ts2)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 13 })), tombstone(ts2, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 12 })), tombstone(ts2, gc_now)); - acc.apply(rtie(10, 12, ts1)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 11 })), tombstone(ts2, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 10 })), tombstone(ts1, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 9 })), tombstone()); - acc.apply(rtie(6, 8, ts2)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 5 })), tombstone()); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 4 })), tombstone()); - acc.apply(rtie(0, 4, ts1)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 3 })), tombstone(ts1, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 2 })), tombstone(ts1, gc_now)); - acc.apply(rtie(1, 2, ts2)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 1 })), tombstone(ts2, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 0 })), tombstone(ts1, gc_now)); } diff --git a/test/boost/schema_change_test.cc b/test/boost/schema_change_test.cc index c697634b2c..782532379f 100644 --- a/test/boost/schema_change_test.cc +++ b/test/boost/schema_change_test.cc @@ -42,6 +42,7 @@ #include "test/lib/log.hh" #include "serializer_impl.hh" #include "cdc/cdc_extension.hh" +#include "utils/UUID_gen.hh" SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) { return do_with_cql_env([](cql_test_env& e) { @@ -807,3 +808,26 @@ SEASTAR_TEST_CASE(test_schema_tables_use_null_sharder) { }).get(); }, raft_cql_test_config()); } + +SEASTAR_TEST_CASE(test_schema_make_reversed) { + auto schema = schema_builder("tests", get_name()) + .with_column("pk", bytes_type, column_kind::partition_key) + .with_column("ck", bytes_type, column_kind::clustering_key) + .with_column("v1", bytes_type) + .build(); + testlog.info(" schema->version(): {}", schema->version()); + + auto reversed_schema = schema->make_reversed(); + testlog.info(" reversed_schema->version(): {}", reversed_schema->version()); + + BOOST_REQUIRE(schema->version() != reversed_schema->version()); + BOOST_REQUIRE(utils::UUID_gen::negate(schema->version()) == reversed_schema->version()); + + auto re_reversed_schema = reversed_schema->make_reversed(); + testlog.info("re_reversed_schema->version(): {}", re_reversed_schema->version()); + + BOOST_REQUIRE(schema->version() == re_reversed_schema->version()); + BOOST_REQUIRE(reversed_schema->version() != re_reversed_schema->version()); + + return make_ready_future<>(); +} diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index 6a031e9629..ff6a165f1a 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -406,7 +406,8 @@ static void test_streamed_mutation_forwarding_is_consistent_with_slicing(tests:: void consume_new_partition(const dht::decorated_key& dk) { assert(!_builder); - _builder = mutation_rebuilder(dk, std::move(_s)); + _builder = mutation_rebuilder(std::move(_s)); + _builder->consume_new_partition(dk); } stop_iteration consume(tombstone t) { @@ -842,7 +843,7 @@ static void test_streamed_mutation_forwarding_across_range_tombstones(tests::rea } static void test_range_queries(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { - testlog.info("Testing range queries"); + testlog.info(__PRETTY_FUNCTION__); auto s = schema_builder("ks", "cf") .with_column("key", bytes_type, column_kind::partition_key) @@ -1196,7 +1197,7 @@ static void test_clustering_slices(tests::reader_concurrency_semaphore_wrapper& } static void test_query_only_static_row(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { - BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__); + testlog.info(__PRETTY_FUNCTION__); simple_schema s; @@ -1242,7 +1243,7 @@ static void test_query_only_static_row(tests::reader_concurrency_semaphore_wrapp } static void test_query_no_clustering_ranges_no_static_columns(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { - BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__); + testlog.info(__PRETTY_FUNCTION__); simple_schema s(simple_schema::with_static::no); @@ -1286,6 +1287,8 @@ static void test_query_no_clustering_ranges_no_static_columns(tests::reader_conc } void test_streamed_mutation_forwarding_succeeds_with_no_data(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema s; auto cks = s.make_ckeys(6); @@ -1323,6 +1326,8 @@ void test_streamed_mutation_forwarding_succeeds_with_no_data(tests::reader_concu static void test_slicing_with_overlapping_range_tombstones(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema ss; auto s = ss.schema(); @@ -1420,6 +1425,8 @@ void test_slicing_with_overlapping_range_tombstones(tests::reader_concurrency_se } void test_downgrade_to_v1_clear_buffer(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema s; auto pkey = s.make_pkey(); sstring value(256, 'v'); @@ -1444,6 +1451,8 @@ void test_downgrade_to_v1_clear_buffer(tests::reader_concurrency_semaphore_wrapp } void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema s; auto pkey = s.make_pkey(); @@ -1634,7 +1643,8 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap } void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { - BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__); + testlog.info(__PRETTY_FUNCTION__); + for_each_mutation([&] (const mutation& m) mutable { const auto query_time = gc_clock::now(); @@ -1661,6 +1671,8 @@ void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaph void test_next_partition(tests::reader_concurrency_semaphore_wrapper&, populate_fn_ex); void run_mutation_reader_tests(populate_fn_ex populate, bool with_partition_range_forwarding) { + testlog.info(__PRETTY_FUNCTION__); + tests::reader_concurrency_semaphore_wrapper semaphore; test_range_tombstones_v2(semaphore, populate); @@ -1688,6 +1700,8 @@ void run_mutation_reader_tests(populate_fn_ex populate, bool with_partition_rang } void test_next_partition(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema s; auto pkeys = s.make_pkeys(4); diff --git a/types.cc b/types.cc index f2e9472006..53299b28d0 100644 --- a/types.cc +++ b/types.cc @@ -3436,3 +3436,11 @@ std::ostream& operator<<(std::ostream& out, const data_value& v) { shared_ptr reversed_type_impl::get_instance(data_type type) { return intern::get_instance(std::move(type)); } + +data_type reversed(data_type type) { + if (type->is_reversed()) { + return type->underlying_type(); + } else { + return reversed_type_impl::get_instance(type); + } +} diff --git a/types.hh b/types.hh index 6615be3907..c4b7826e00 100644 --- a/types.hh +++ b/types.hh @@ -829,6 +829,10 @@ public: }; using reversed_type = shared_ptr; +// Reverse the sort order of the type by wrapping in or stripping reversed_type, +// as needed. +data_type reversed(data_type); + class map_type_impl; using map_type = shared_ptr; diff --git a/utils/UUID_gen.cc b/utils/UUID_gen.cc index d342a95518..ea31f97ebc 100644 --- a/utils/UUID_gen.cc +++ b/utils/UUID_gen.cc @@ -168,6 +168,25 @@ UUID UUID_gen::get_name_UUID(const unsigned char *s, size_t len) { return get_UUID(digest); } +UUID UUID_gen::negate(UUID o) { + auto lsb = o.get_least_significant_bits(); + + const long clock_mask = 0x0000000000003FFFL; + + // We flip the node-and-clock-seq octet of the UUID for time-UUIDs. This + // creates a virtual node with a time which cannot be generated anymore, so + // is safe against collisions. + // For name UUIDs we flip the same octet. Name UUIDs being an md5 hash over + // a buffer, flipping any bit should be safe against collisions. + long clock = (lsb >> 48) & clock_mask; + clock = ~clock & clock_mask; + + lsb &= ~(clock_mask << 48); // zero current clock + lsb |= (clock << 48); // write new clock + + return UUID(o.get_most_significant_bits(), lsb); +} + const thread_local int64_t UUID_gen::spoof_node = make_thread_local_node(make_random_node()); const thread_local int64_t UUID_gen::clock_seq_and_node = make_clock_seq_and_node(); thread_local UUID_gen UUID_gen::_instance; diff --git a/utils/UUID_gen.hh b/utils/UUID_gen.hh index 06ecd6e833..8e5e1f50c1 100644 --- a/utils/UUID_gen.hh +++ b/utils/UUID_gen.hh @@ -397,6 +397,16 @@ public: (0x0fff000000000000UL & msb) >> 48 | 0x0000000000001000L); // sets the version to 1. } + + // Produce an UUID which is derived from this UUID in a reversible manner + // + // Such that: + // + // auto original_uuid = UUID_gen::get_time_UUID(); + // auto negated_uuid = UUID_gen::negate(original_uuid); + // assert(original_uuid != negated_uuid); + // assert(original_uuid == UUID_gen::negate(negated_uuid)); + static UUID negate(UUID); }; // for the curious, here is how I generated START_EPOCH