From 0cc00b5d17158036c57415a312bf0e20d6ac7dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 6 Sep 2021 12:29:48 +0300 Subject: [PATCH 01/23] docs: design-notes: add reverse-reads.md Explaining how reverse reads work, in particular the difference between the legacy and native formats. --- docs/design-notes/reverse-reads.md | 123 +++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 docs/design-notes/reverse-reads.md diff --git a/docs/design-notes/reverse-reads.md b/docs/design-notes/reverse-reads.md new file mode 100644 index 0000000000..33c047be80 --- /dev/null +++ b/docs/design-notes/reverse-reads.md @@ -0,0 +1,123 @@ +# Reverse reads + +A read is called reverse when it reads with reverse clustering order +(compared to that of the schema). Example: + + CREATE TABLE mytable ( + pk int, + ck int, + s int STATIC, + v int, + PRIMARY KEY (pk, ck) + ) WITH + CLUSTERING ORDER BY (ck ASC); + + # Forward read (using table's native order) + SELECT * FROM mytable WHERE pk = 1; + # Explicit forward order + SELECT * FROM mytable WHERE pk = 1 ORDER BY ck ASC; + + # Reverse read + SELECT * FROM mytable WHERE pk = 1 ORDER BY ck DESC; + +If the table's native clustering order is DESC, then a read with ASC +order is considered reverse. + +## Legacy format + +The legacy format is how scylla handled reverse queries internally. We +are in the process of migrating to the native reverse format, but for +now coordinator-side code still uses the legacy format. + +### Request + +The `query::partition_slice::options::reversed` flag is set. +Clustering ranges in both `query::partition_slice::_row_ranges` and +`query::specific_ranges::_ranges` +(`query::partition_slice::_specific_ranges`) are half-reversed: they +are ordered in reverse, but when they are compared to other +mutation-fragments, their end bound is used as position, instead of the +start bound as usual. When compared to other clustering ranges the end +bound is used as the start bound and vice-versa. +Example: + +For the clustering keys (ASC order): `ck1`, `ck2`, `ck3`, `ck4`, `ck5`, +`ck6`. +A `_row_ranges` field of a slice might contain this: + + [ck1, ck2], [ck4, ck5] + +The legacy reversed version would look like this: + + [ck4, ck5], [ck1, ck2] + +Note how the ranges themselves are the same (bounds not reversed), it is +just the range vector itself that is reversed. + +### Result + +Results are ordered with the reversed clustering order with the caveat +that range-tombstones are ordered by their end bound, using the native +schema's comparators. For example given the following partition: + + ps{pk1}, sr{}, cr{ck1}, rt{[ck2, ck4)}, cr{ck2}, cr{ck3}, cr{ck4}, ck{ck5}, pe{} + +The legacy reverse format equivalent of this looks like the following: + + ps{pk1}, sr{}, cr{ck5}, rt{[ck2, ck4)}, cr{ck4}, cr{ck3}, cr{ck2}, ck{ck1}, pe{} + +Note: +* Only clustering elements change; +* Range tombstone's bounds are not reversed; +* Range tombstones can be ordered off-by-one due to native schema + comparators used: `rt{[ck2, ck4)}` should be ordered *after* + `cr{ck4}`. + +Legend: +* ps = partitions-tart +* sr = static-row +* cr = clustering-row +* rt = range-tombstone +* pe = partition-end + +## Native format + +The native format uses ordering equivalent to that of a table with +reverse clustering format. Using `mytable` as an example, the native +reverse format would be an identical table `my_reverse_table`, which +uses `CLUSTERING ORDER BY (ck DESC);`. This allows middle layers in a +read pipeline to just use a schema with reversed clustering order and +process the reverse stream as normal. + +### Request + +The `query::partition_slice::options::reversed` flag is set as in the +legacy format. Clustering ranges in both +`query::partition_slice::_row_ranges` and +`query::specific_ranges::_ranges` +(`query::partition_slice::_specific_ranges`) are fully-reversed: they +are ordered in reverse, their bound being swapped as well. +Example: + +For the clustering keys (ASC order): `ck1`, `ck2`, `ck3`, `ck4`, `ck5`, +`ck6`. +A `_row_ranges` field of a slice might contain this: + + [ck1, ck2], [ck4, ck5] + +The native reversed version would look like this: + + [ck5, ck4], [ck2, ck1] + +In addition to this, the schema is reversed on the replica, at the start +of the read, so all the reverse-capable and intermediate readers in the +stack get a reversed schema to work with. + +### Result + +Results are ordered with the reversed clustering order with +the bounds of range-tombstones swapped. For example, given the same +partition that was used in the legacy format example, the native reverse +version would look like this: + + ps{pk1}, sr{}, cr{ck5}, cr{ck4}, rt{(ck4, ck2]}, cr{ck3}, cr{ck2}, ck{ck1}, pe{} From 183ac6981afc2396dd9fccbebc3d3350138c435c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 5 Aug 2021 09:34:28 +0300 Subject: [PATCH 02/23] types: add reversed(data_type) Reversing the sort order of a type. --- types.cc | 8 ++++++++ types.hh | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/types.cc b/types.cc index f2e9472006..53299b28d0 100644 --- a/types.cc +++ b/types.cc @@ -3436,3 +3436,11 @@ std::ostream& operator<<(std::ostream& out, const data_value& v) { shared_ptr reversed_type_impl::get_instance(data_type type) { return intern::get_instance(std::move(type)); } + +data_type reversed(data_type type) { + if (type->is_reversed()) { + return type->underlying_type(); + } else { + return reversed_type_impl::get_instance(type); + } +} diff --git a/types.hh b/types.hh index 6615be3907..c4b7826e00 100644 --- a/types.hh +++ b/types.hh @@ -829,6 +829,10 @@ public: }; using reversed_type = shared_ptr; +// Reverse the sort order of the type by wrapping in or stripping reversed_type, +// as needed. +data_type reversed(data_type); + class map_type_impl; using map_type = shared_ptr; From 65913f4cfaec8e0c38a808bbde0ac522204f5ced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 23 Aug 2021 17:57:28 +0300 Subject: [PATCH 03/23] utils: UUID_gen: introduce negate() --- test/boost/UUID_test.cc | 17 +++++++++++++++++ utils/UUID_gen.cc | 19 +++++++++++++++++++ utils/UUID_gen.hh | 10 ++++++++++ 3 files changed, 46 insertions(+) diff --git a/test/boost/UUID_test.cc b/test/boost/UUID_test.cc index d1b3143bea..1b0d12f6da 100644 --- a/test/boost/UUID_test.cc +++ b/test/boost/UUID_test.cc @@ -230,3 +230,20 @@ BOOST_AUTO_TEST_CASE(test_max_time_uuid) { auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid); BOOST_CHECK(unix_timestamp == millis); } + +BOOST_AUTO_TEST_CASE(test_negate) { + using namespace utils; + + auto original_uuid = UUID_gen::get_time_UUID(); + BOOST_TEST_MESSAGE(fmt::format("original_uuid: {}", original_uuid)); + + auto negated_uuid = UUID_gen::negate(original_uuid); + BOOST_TEST_MESSAGE(fmt::format("negated_uuid: {}", negated_uuid)); + + BOOST_REQUIRE(original_uuid != negated_uuid); + + auto re_negated_uuid = UUID_gen::negate(negated_uuid); + BOOST_TEST_MESSAGE(fmt::format("re_negated_uuid: {}", re_negated_uuid)); + + BOOST_REQUIRE(original_uuid == re_negated_uuid); +} diff --git a/utils/UUID_gen.cc b/utils/UUID_gen.cc index d342a95518..ea31f97ebc 100644 --- a/utils/UUID_gen.cc +++ b/utils/UUID_gen.cc @@ -168,6 +168,25 @@ UUID UUID_gen::get_name_UUID(const unsigned char *s, size_t len) { return get_UUID(digest); } +UUID UUID_gen::negate(UUID o) { + auto lsb = o.get_least_significant_bits(); + + const long clock_mask = 0x0000000000003FFFL; + + // We flip the node-and-clock-seq octet of the UUID for time-UUIDs. This + // creates a virtual node with a time which cannot be generated anymore, so + // is safe against collisions. + // For name UUIDs we flip the same octet. Name UUIDs being an md5 hash over + // a buffer, flipping any bit should be safe against collisions. + long clock = (lsb >> 48) & clock_mask; + clock = ~clock & clock_mask; + + lsb &= ~(clock_mask << 48); // zero current clock + lsb |= (clock << 48); // write new clock + + return UUID(o.get_most_significant_bits(), lsb); +} + const thread_local int64_t UUID_gen::spoof_node = make_thread_local_node(make_random_node()); const thread_local int64_t UUID_gen::clock_seq_and_node = make_clock_seq_and_node(); thread_local UUID_gen UUID_gen::_instance; diff --git a/utils/UUID_gen.hh b/utils/UUID_gen.hh index 06ecd6e833..8e5e1f50c1 100644 --- a/utils/UUID_gen.hh +++ b/utils/UUID_gen.hh @@ -397,6 +397,16 @@ public: (0x0fff000000000000UL & msb) >> 48 | 0x0000000000001000L); // sets the version to 1. } + + // Produce an UUID which is derived from this UUID in a reversible manner + // + // Such that: + // + // auto original_uuid = UUID_gen::get_time_UUID(); + // auto negated_uuid = UUID_gen::negate(original_uuid); + // assert(original_uuid != negated_uuid); + // assert(original_uuid == UUID_gen::negate(negated_uuid)); + static UUID negate(UUID); }; // for the curious, here is how I generated START_EPOCH From 9a9b58e67b423a1cdf9cabbb3c4b2a2cfc008d56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 9 Aug 2021 17:25:11 +0300 Subject: [PATCH 04/23] schema: add a transforming copy constructor Taking a transform functor, which is executed after the raw schema is copied, but before the derivate fields are computed (rebuild()). --- schema.cc | 13 ++++++++++++- schema.hh | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/schema.cc b/schema.cc index 219cb3392c..9452b000f7 100644 --- a/schema.cc +++ b/schema.cc @@ -415,10 +415,16 @@ schema::schema(const raw_schema& raw, std::optional raw_view_info } } -schema::schema(const schema& o) +schema::schema(const schema& o, const std::function& transform) : _raw(o._raw) , _offsets(o._offsets) { + // Do the transformation after all the raw fields are initialized, but + // *before* the derived fields are generated (from the raw ones). + if (transform) { + transform(*this); + } + rebuild(); if (o.is_view()) { _view_info = std::make_unique<::view_info>(*this, o.view_info()->raw()); @@ -428,6 +434,11 @@ schema::schema(const schema& o) } } +schema::schema(const schema& o) + : schema(o, {}) +{ +} + lw_shared_ptr make_shared_schema(std::optional id, std::string_view ks_name, std::string_view cf_name, std::vector partition_key, std::vector clustering_key, std::vector regular_columns, std::vector static_columns, diff --git a/schema.hh b/schema.hh index a96f31304f..f0edaf2713 100644 --- a/schema.hh +++ b/schema.hh @@ -698,6 +698,7 @@ private: lw_shared_ptr make_column_specification(const column_definition& def); void rebuild(); schema(const raw_schema&, std::optional); + schema(const schema&, const std::function&); public: schema(const schema&); ~schema(); From f200c8104a7e81ee8dbfafa171228e7bce17b164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 9 Aug 2021 13:06:28 +0300 Subject: [PATCH 05/23] schema: introduce make_reversed() `make_revered()` creates a schema identical to the schema instance it is called on, with clustering order reversed. To distinguish the reverse schema from the original one, the node-id part of its version UUID is bit-flipped. This ensures that reversing a schema twice will result in the identical schema to the original one (although a different C++ object). This reversed schema will be used in reversed reads, so intermediate layers can be ignorant of the fact that the read happens in reverse. --- schema.cc | 16 ++++++++++++++++ schema.hh | 21 +++++++++++++++++++-- test/boost/schema_change_test.cc | 24 ++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/schema.cc b/schema.cc index 9452b000f7..3fe92e3790 100644 --- a/schema.cc +++ b/schema.cc @@ -439,6 +439,18 @@ schema::schema(const schema& o) { } +schema::schema(reversed_tag, const schema& o) + : schema(o, [] (schema& s) { + s._raw._version = utils::UUID_gen::negate(s._raw._version); + for (auto& col : s._raw._columns) { + if (col.kind == column_kind::clustering_key) { + col.type = reversed(col.type); + } + } + }) +{ +} + lw_shared_ptr make_shared_schema(std::optional id, std::string_view ks_name, std::string_view cf_name, std::vector partition_key, std::vector clustering_key, std::vector regular_columns, std::vector static_columns, @@ -1579,6 +1591,10 @@ bool schema::equal_columns(const schema& other) const { return boost::equal(all_columns(), other.all_columns()); } +schema_ptr schema::make_reversed() const { + return make_lw_shared(schema::reversed_tag{}, *this); +} + raw_view_info::raw_view_info(utils::UUID base_id, sstring base_name, bool include_all_columns, sstring where_clause) : _base_id(std::move(base_id)) , _base_name(std::move(base_name)) diff --git a/schema.hh b/schema.hh index f0edaf2713..9e23df3047 100644 --- a/schema.hh +++ b/schema.hh @@ -584,6 +584,10 @@ public: } }; +class schema; + +using schema_ptr = lw_shared_ptr; + /* * Effectively immutable. * Not safe to access across cores because of shared_ptr's. @@ -695,12 +699,16 @@ public: data_type type; }; private: + struct reversed_tag { }; + lw_shared_ptr make_column_specification(const column_definition& def); void rebuild(); schema(const raw_schema&, std::optional); schema(const schema&, const std::function&); public: schema(const schema&); + // See \ref make_reversed(). + schema(reversed_tag, const schema&); ~schema(); table_schema_version version() const { return _raw._version; @@ -967,6 +975,17 @@ public: const v3_columns& v3() const { return _v3_columns; } + + // Make a copy of the schema with reversed clustering order. + // + // The reversing is revertible, so that: + // + // s->make_reversed()->make_reversed()->version() == s->version() + // + // But note that: `s != s->make_reversed()->make_reversed()` (they are two + // different C++ objects). + // The schema's version is also reversed using UUID_gen::negate(). + schema_ptr make_reversed() const; }; lw_shared_ptr make_shared_schema(std::optional id, std::string_view ks_name, std::string_view cf_name, @@ -975,8 +994,6 @@ lw_shared_ptr make_shared_schema(std::optional id, st bool operator==(const schema&, const schema&); -using schema_ptr = lw_shared_ptr; - /** * Wrapper for schema_ptr used by functions that expect an engaged view_info field. */ diff --git a/test/boost/schema_change_test.cc b/test/boost/schema_change_test.cc index c697634b2c..782532379f 100644 --- a/test/boost/schema_change_test.cc +++ b/test/boost/schema_change_test.cc @@ -42,6 +42,7 @@ #include "test/lib/log.hh" #include "serializer_impl.hh" #include "cdc/cdc_extension.hh" +#include "utils/UUID_gen.hh" SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) { return do_with_cql_env([](cql_test_env& e) { @@ -807,3 +808,26 @@ SEASTAR_TEST_CASE(test_schema_tables_use_null_sharder) { }).get(); }, raft_cql_test_config()); } + +SEASTAR_TEST_CASE(test_schema_make_reversed) { + auto schema = schema_builder("tests", get_name()) + .with_column("pk", bytes_type, column_kind::partition_key) + .with_column("ck", bytes_type, column_kind::clustering_key) + .with_column("v1", bytes_type) + .build(); + testlog.info(" schema->version(): {}", schema->version()); + + auto reversed_schema = schema->make_reversed(); + testlog.info(" reversed_schema->version(): {}", reversed_schema->version()); + + BOOST_REQUIRE(schema->version() != reversed_schema->version()); + BOOST_REQUIRE(utils::UUID_gen::negate(schema->version()) == reversed_schema->version()); + + auto re_reversed_schema = reversed_schema->make_reversed(); + testlog.info("re_reversed_schema->version(): {}", re_reversed_schema->version()); + + BOOST_REQUIRE(schema->version() == re_reversed_schema->version()); + BOOST_REQUIRE(reversed_schema->version() != re_reversed_schema->version()); + + return make_ready_future<>(); +} From d0351eaaed6621642a572a144915a0706b94e3a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 5 Aug 2021 09:10:40 +0300 Subject: [PATCH 06/23] clustering_bounds_comparator: add reverse_kind() Hiding the tricky reversing of a bound_kind. --- clustering_bounds_comparator.hh | 3 +++ keys.cc | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/clustering_bounds_comparator.hh b/clustering_bounds_comparator.hh index 21133b40db..b089c1b17c 100644 --- a/clustering_bounds_comparator.hh +++ b/clustering_bounds_comparator.hh @@ -40,7 +40,10 @@ enum class bound_kind : uint8_t { std::ostream& operator<<(std::ostream& out, const bound_kind k); +// Swaps start <-> end && incl <-> excl bound_kind invert_kind(bound_kind k); +// Swaps start <-> end +bound_kind reverse_kind(bound_kind k); int32_t weight(bound_kind k); class bound_view { diff --git a/keys.cc b/keys.cc index 46c57beb81..f60c980412 100644 --- a/keys.cc +++ b/keys.cc @@ -27,6 +27,8 @@ #include #include +logging::logger klog("keys"); + std::ostream& operator<<(std::ostream& out, const partition_key& pk) { return out << "pk{" << to_hex(managed_bytes_view(pk.representation())) << "}"; } @@ -126,6 +128,16 @@ bound_kind invert_kind(bound_kind k) { abort(); } +bound_kind reverse_kind(bound_kind k) { + switch (k) { + case bound_kind::excl_start: return bound_kind::excl_end; + case bound_kind::incl_start: return bound_kind::incl_end; + case bound_kind::excl_end: return bound_kind::excl_start; + case bound_kind::incl_end: return bound_kind::incl_start; + } + on_internal_error(klog, format("reverse_kind(): invalid value for `bound_kind`: {}", static_cast>(k))); +} + int32_t weight(bound_kind k) { switch (k) { case bound_kind::excl_end: From 30f6f676b82baa60a6b6ffede5303e01a27a998e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 2 Aug 2021 14:19:23 +0300 Subject: [PATCH 07/23] range_tombstone: add reverse() Reversing the range-tombstone, as-if it was emitted from a table with reversed clustering order. --- range_tombstone.hh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/range_tombstone.hh b/range_tombstone.hh index 6386fcdbcb..e82c3528bc 100644 --- a/range_tombstone.hh +++ b/range_tombstone.hh @@ -192,6 +192,15 @@ public: end_kind = new_end.kind(); } + // Swap bounds to reverse range-tombstone -- as if it came from a table with + // reverse native order. See docs/design-notes/reverse-reads.md. + void reverse() { + std::swap(start, end); + std::swap(start_kind, end_kind); + start_kind = reverse_kind(start_kind); + end_kind = reverse_kind(end_kind); + } + size_t external_memory_usage(const schema&) const noexcept { return start.external_memory_usage() + end.external_memory_usage(); } From 34abbe82fe21d21acc5298f2d4dd6b63fcfda583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 3 Sep 2021 15:47:16 +0300 Subject: [PATCH 08/23] query: specific_ranges: add non-const ranges accessor --- query-request.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/query-request.hh b/query-request.hh index b26db904cb..c6ea133263 100644 --- a/query-request.hh +++ b/query-request.hh @@ -110,6 +110,9 @@ public: const clustering_row_ranges& ranges() const { return _ranges; } + clustering_row_ranges& ranges() { + return _ranges; + } private: friend std::ostream& operator<<(std::ostream& out, const specific_ranges& r); From a2eb0f7d7ea98a507c478a6318ecb3dd333406e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 9 Sep 2021 14:06:14 +0300 Subject: [PATCH 09/23] partition_slice_builder: add constructor with slice Intended to be used to modify an existing slice. We want to move the slice into the direction where the schema is at: make it completely immutable, all mutations happening through the slice builder class. --- partition_slice_builder.cc | 9 +++++++++ partition_slice_builder.hh | 1 + query-request.hh | 2 ++ 3 files changed, 12 insertions(+) diff --git a/partition_slice_builder.cc b/partition_slice_builder.cc index b1f2730361..46d90af63a 100644 --- a/partition_slice_builder.cc +++ b/partition_slice_builder.cc @@ -25,6 +25,15 @@ #include "partition_slice_builder.hh" +partition_slice_builder::partition_slice_builder(const schema& schema, query::partition_slice slice) + : _regular_columns(std::move(slice.regular_columns)) + , _static_columns(std::move(slice.static_columns)) + , _row_ranges(std::move(slice._row_ranges)) + , _schema(schema) + , _options(std::move(slice.options)) +{ +} + partition_slice_builder::partition_slice_builder(const schema& schema) : _schema(schema) { diff --git a/partition_slice_builder.hh b/partition_slice_builder.hh index f44cccf410..3cb5d3a740 100644 --- a/partition_slice_builder.hh +++ b/partition_slice_builder.hh @@ -44,6 +44,7 @@ class partition_slice_builder { query::partition_slice::option_set _options; public: partition_slice_builder(const schema& schema); + partition_slice_builder(const schema& schema, query::partition_slice slice); partition_slice_builder& with_static_column(bytes name); partition_slice_builder& with_no_static_columns(); diff --git a/query-request.hh b/query-request.hh index c6ea133263..0c88a3977c 100644 --- a/query-request.hh +++ b/query-request.hh @@ -33,6 +33,7 @@ #include "query_class_config.hh" class position_in_partition_view; +class partition_slice_builder; namespace query { @@ -128,6 +129,7 @@ constexpr auto max_rows_if_set = std::numeric_limits::max(); // Can be accessed across cores. // Schema-dependent. class partition_slice { + friend class ::partition_slice_builder; public: enum class option { send_clustering_key, From 4fc39721a2f592b91038a7bb33d3dadbb6741ea6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 9 Sep 2021 14:08:04 +0300 Subject: [PATCH 10/23] partition_slice_builder: add range mutating methods --- partition_slice_builder.cc | 20 +++++++++++++++++++- partition_slice_builder.hh | 5 +++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/partition_slice_builder.cc b/partition_slice_builder.cc index 46d90af63a..e3465c538d 100644 --- a/partition_slice_builder.cc +++ b/partition_slice_builder.cc @@ -29,6 +29,7 @@ partition_slice_builder::partition_slice_builder(const schema& schema, query::pa : _regular_columns(std::move(slice.regular_columns)) , _static_columns(std::move(slice.static_columns)) , _row_ranges(std::move(slice._row_ranges)) + , _specific_ranges(std::move(slice._specific_ranges)) , _schema(schema) , _options(std::move(slice.options)) { @@ -72,7 +73,8 @@ partition_slice_builder::build() { std::move(ranges), std::move(static_columns), std::move(regular_columns), - std::move(_options) + std::move(_options), + std::move(_specific_ranges) }; } @@ -97,6 +99,22 @@ partition_slice_builder::with_ranges(std::vector ranges return *this; } +partition_slice_builder& +partition_slice_builder::mutate_ranges(std::function&)> func) { + if (_row_ranges) { + func(*_row_ranges); + } + return *this; +} + +partition_slice_builder& +partition_slice_builder::mutate_specific_ranges(std::function func) { + if (_specific_ranges) { + func(*_specific_ranges); + } + return *this; +} + partition_slice_builder& partition_slice_builder::with_no_regular_columns() { _regular_columns = query::column_id_vector(); diff --git a/partition_slice_builder.hh b/partition_slice_builder.hh index 3cb5d3a740..9fbd296fd3 100644 --- a/partition_slice_builder.hh +++ b/partition_slice_builder.hh @@ -40,6 +40,7 @@ class partition_slice_builder { std::optional _regular_columns; std::optional _static_columns; std::optional> _row_ranges; + std::unique_ptr _specific_ranges; const schema& _schema; query::partition_slice::option_set _options; public: @@ -52,6 +53,10 @@ public: partition_slice_builder& with_no_regular_columns(); partition_slice_builder& with_range(query::clustering_range range); partition_slice_builder& with_ranges(std::vector); + // noop if no ranges have been set yet + partition_slice_builder& mutate_ranges(std::function&)>); + // noop if no specific ranges have been set yet + partition_slice_builder& mutate_specific_ranges(std::function); partition_slice_builder& without_partition_key_columns(); partition_slice_builder& without_clustering_key_columns(); partition_slice_builder& reversed(); From 5d33d76cfd8621b1125008ca6af160ded87c3fea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 23 Aug 2021 17:03:02 +0300 Subject: [PATCH 11/23] query: add slice reversing functions --- query-request.hh | 7 +++++++ query.cc | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/query-request.hh b/query-request.hh index 0c88a3977c..59bfeee4d2 100644 --- a/query-request.hh +++ b/query-request.hh @@ -231,6 +231,13 @@ public: friend std::ostream& operator<<(std::ostream& out, const specific_ranges& ps); }; +// See docs/design-notes/reverse-reads.md +partition_slice legacy_reverse_slice_to_native_reverse_slice(const schema& schema, partition_slice slice); +partition_slice native_reverse_slice_to_legacy_reverse_slice(const schema& schema, partition_slice slice); +// Fully reverse slice (forward to native reverse) +// Also set the reversed bit in `partition_slice::options`. +partition_slice reverse_slice(const schema& schema, partition_slice slice); + constexpr auto max_partitions = std::numeric_limits::max(); // Tagged integers to disambiguate constructor arguments. diff --git a/query.cc b/query.cc index aaa883c868..f0f515a626 100644 --- a/query.cc +++ b/query.cc @@ -29,6 +29,7 @@ #include "mutation_partition_serializer.hh" #include "query-result-reader.hh" #include "query_result_merger.hh" +#include "partition_slice_builder.hh" namespace query { @@ -114,6 +115,41 @@ void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& range reversed ? position_in_partition_view::after_key(full_key) : position_in_partition_view::before_key(full_key), reversed); } +static void reverse_clustering_ranges_bounds(clustering_row_ranges& ranges) { + for (auto& range : ranges) { + if (!range.is_singular()) { + range = query::clustering_range(range.end(), range.start()); + } + } +} + +partition_slice legacy_reverse_slice_to_native_reverse_slice(const schema& schema, partition_slice slice) { + return partition_slice_builder(schema, std::move(slice)) + .mutate_ranges([] (clustering_row_ranges& ranges) { reverse_clustering_ranges_bounds(ranges); }) + .mutate_specific_ranges([] (specific_ranges& ranges) { reverse_clustering_ranges_bounds(ranges.ranges()); }) + .build(); +} + +partition_slice native_reverse_slice_to_legacy_reverse_slice(const schema& schema, partition_slice slice) { + // They are the same, we give them different names to express intent + return legacy_reverse_slice_to_native_reverse_slice(schema, std::move(slice)); +} + +partition_slice reverse_slice(const schema& schema, partition_slice slice) { + return partition_slice_builder(schema, std::move(slice)) + .mutate_ranges([] (clustering_row_ranges& ranges) { + std::reverse(ranges.begin(), ranges.end()); + reverse_clustering_ranges_bounds(ranges); + }) + .mutate_specific_ranges([] (specific_ranges& sranges) { + auto& ranges = sranges.ranges(); + std::reverse(ranges.begin(), ranges.end()); + reverse_clustering_ranges_bounds(ranges); + }) + .with_option() + .build(); +} + partition_slice::partition_slice(clustering_row_ranges row_ranges, query::column_id_vector static_columns, query::column_id_vector regular_columns, From 38ef80d4d2ec960fcb2d6b44726f265dee1f3800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 24 Aug 2021 17:16:02 +0300 Subject: [PATCH 12/23] mutation: consume(): don't include dummy rows --- mutation.hh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mutation.hh b/mutation.hh index f06ba38b77..33f2d9eb59 100644 --- a/mutation.hh +++ b/mutation.hh @@ -191,7 +191,11 @@ stop_iteration consume_clustering_fragments(const schema& s, mutation_partition& stop = consumer.consume(std::move(rts_it->tombstone())); ++rts_it; } else { - stop = consumer.consume(clustering_row(std::move(*crs_it))); + // Dummy rows are part of the in-memory representation but should be + // invisible to reads. + if (!crs_it->dummy()) { + stop = consumer.consume(clustering_row(std::move(*crs_it))); + } ++crs_it; } } From 0af5a8add04aca06d977413df1c58ee4bf578f30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 24 Aug 2021 17:29:54 +0300 Subject: [PATCH 13/23] mutation: consume(): add native reverse order The existing consume_in_reverse::yes is renamed to consume_in_reverse::legacy_half_reverse and consume_in_reverse::yes now means native reverse order. This is because we expect the legacy order to die out at one point and when that happens we can just remove that ugly third option and will be left with yes and no as before. --- mutation.hh | 47 +++++++++++++++++++++++++++++++++++-------- mutation_partition.cc | 4 ++-- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/mutation.hh b/mutation.hh index 33f2d9eb59..a214460fdb 100644 --- a/mutation.hh +++ b/mutation.hh @@ -48,6 +48,7 @@ struct mutation_consume_result { enum class consume_in_reverse { no = 0, yes, + legacy_half_reverse, }; class mutation final { @@ -127,6 +128,16 @@ public: // Consumes the mutation's content. // // The mutation is in a moved-from alike state after consumption. + // There are tree ways to consume the mutation: + // * consume_in_reverse::no - consume in forward order, as defined by the + // schema. + // * consume_in_reverse::yes - consume in reverse order, as if the schema + // had the opposite clustering order. This effectively reverses the + // mutation's content, according to the native reverse order[1]. + // * consume_in_reverse::legacy_half_reverse - consume rows and range + // tombstones in legacy reverse order[2]. + // + // For definition of [1] and [2] see docs/design-notes/reverse-reads.md. template auto consume(Consumer& consumer, consume_in_reverse reverse) && -> mutation_consume_result; @@ -151,19 +162,37 @@ private: namespace { template -stop_iteration consume_clustering_fragments(const schema& s, mutation_partition& partition, Consumer& consumer) { +stop_iteration consume_clustering_fragments(schema_ptr s, mutation_partition& partition, Consumer& consumer) { using crs_type = mutation_partition::rows_type; - using crs_iterator_type = std::conditional_t; + using crs_iterator_type = std::conditional_t; using rts_type = range_tombstone_list; - using rts_iterator_type = std::conditional_t; + using rts_iterator_type = std::conditional_t; + + if constexpr (reverse == consume_in_reverse::yes) { + s = s->make_reversed(); + } + + // only used when reverse == consume_in_reverse::yes + range_tombstone_list reversed_range_tombstones(*s); crs_iterator_type crs_it, crs_end; rts_iterator_type rts_it, rts_end; - if constexpr (reverse == consume_in_reverse::yes) { + if constexpr (reverse == consume_in_reverse::legacy_half_reverse) { crs_it = partition.clustered_rows().rbegin(); crs_end = partition.clustered_rows().rend(); rts_it = partition.row_tombstones().rbegin(); rts_end = partition.row_tombstones().rend(); + } else if constexpr (reverse == consume_in_reverse::yes) { + crs_it = partition.clustered_rows().rbegin(); + crs_end = partition.clustered_rows().rend(); + + while (!partition.row_tombstones().empty()) { + auto rt = partition.mutable_row_tombstones().pop_front_and_lock(); + rt.reverse(); + reversed_range_tombstones.apply(*s, std::move(rt)); + } + rts_it = reversed_range_tombstones.begin(); + rts_end = reversed_range_tombstones.end(); } else { crs_it = partition.clustered_rows().begin(); crs_end = partition.clustered_rows().end(); @@ -173,13 +202,13 @@ stop_iteration consume_clustering_fragments(const schema& s, mutation_partition& stop_iteration stop = stop_iteration::no; - position_in_partition::tri_compare cmp(s); + position_in_partition::tri_compare cmp(*s); while (!stop && (crs_it != crs_end || rts_it != rts_end)) { bool emit_rt; if (crs_it != crs_end && rts_it != rts_end) { const auto cmp_res = cmp(rts_it->position(), crs_it->position()); - if constexpr (reverse == consume_in_reverse::yes) { + if constexpr (reverse == consume_in_reverse::legacy_half_reverse) { emit_rt = cmp_res > 0; } else { emit_rt = cmp_res < 0; @@ -221,9 +250,11 @@ auto mutation::consume(Consumer& consumer, consume_in_reverse reverse) && -> mut } if (reverse == consume_in_reverse::yes) { - stop = consume_clustering_fragments(*_ptr->_schema, partition, consumer); + stop = consume_clustering_fragments(_ptr->_schema, partition, consumer); + } else if (reverse == consume_in_reverse::legacy_half_reverse) { + stop = consume_clustering_fragments(_ptr->_schema, partition, consumer); } else { - stop = consume_clustering_fragments(*_ptr->_schema, partition, consumer); + stop = consume_clustering_fragments(_ptr->_schema, partition, consumer); } const auto stop_consuming = consumer.consume_end_of_partition(); diff --git a/mutation_partition.cc b/mutation_partition.cc index 8dbd6b14d8..3e7a40af9d 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2040,7 +2040,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); auto consumer = compact_for_query(*s, gc_clock::time_point::min(), slice, max_rows, max_partitions, query_result_builder(*s, builder)); - const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::yes : consume_in_reverse::no; + const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; for (const partition& p : r.partitions()) { const auto res = p.mut().unfreeze(s).consume(consumer, reverse); @@ -2059,7 +2059,7 @@ query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_l query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }); auto consumer = compact_for_query(*m.schema(), now, slice, row_limit, query::max_partitions, query_result_builder(*m.schema(), builder)); - const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::yes : consume_in_reverse::no; + const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; std::move(m).consume(consumer, reverse); return builder.build(); } From 502a45ad58f5f72da4f58e808bdecc128e15dbcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 24 Aug 2021 16:12:49 +0300 Subject: [PATCH 14/23] treewide: switch to native reversed format for reverse reads We define the native reverse format as a reversed mutation fragment stream that is identical to one that would be emitted by a table with the same schema but with reversed clustering order. The main difference to the current format is how range tombstones are handled: instead of looking at their start or end bound depending on the order, we always use them as-usual and the reversing reader swaps their bounds to facilitate this. This allows us to treat reversed streams completely transparently: just pass along them a reversed schema and all the reader, compacting and result building code is happily ignorant about the fact that it is a reversed stream. --- database.cc | 10 ++++++++++ flat_mutation_reader.cc | 13 ++++++++----- flat_mutation_reader.hh | 13 ++++++++----- idl/read_command.idl.hh | 6 ++++++ mutation_compactor.hh | 2 +- mutation_partition.cc | 7 +++++-- mutation_query.hh | 3 ++- querier.hh | 10 +++++++++- query-request.hh | 7 +++++++ table.cc | 5 ++++- test/boost/flat_mutation_reader_test.cc | 20 +++++++++----------- test/boost/mutation_query_test.cc | 6 +++++- 12 files changed, 74 insertions(+), 28 deletions(-) diff --git a/database.cc b/database.cc index d1c8df1bd5..4ec45c4ad7 100644 --- a/database.cc +++ b/database.cc @@ -1376,6 +1376,11 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) { future, cache_temperature>> database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { + const auto reversed = cmd.slice.options.contains(query::partition_slice::option::reversed); + if (reversed) { + s = s->make_reversed(); + } + column_family& cf = find_column_family(cmd.cf_id); auto& semaphore = get_reader_concurrency_semaphore(); auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size}; @@ -1429,6 +1434,11 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti future> database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { + const auto reversed = cmd.slice.options.contains(query::partition_slice::option::reversed); + if (reversed) { + s = s->make_reversed(); + } + const auto short_read_allwoed = query::short_read(cmd.slice.options.contains()); auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed); column_family& cf = find_column_family(cmd.cf_id); diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 4b4e337c1c..3fe733392e 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -107,13 +107,14 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query private: stop_iteration emit_partition() { auto emit_range_tombstone = [&] { - auto it = std::prev(_range_tombstones.end()); + // _range_tombstones uses the reverse schema already, so we can use `begin()` + auto it = _range_tombstones.begin(); push_mutation_fragment(*_schema, _permit, _range_tombstones.pop(it)); }; - position_in_partition::less_compare cmp(*_schema); + position_in_partition::tri_compare cmp(*_schema); while (!_mutation_fragments.empty() && !is_buffer_full()) { auto& mf = _mutation_fragments.top(); - if (!_range_tombstones.empty() && !cmp(_range_tombstones.rbegin()->end_position(), mf.position())) { + if (!_range_tombstones.empty() && cmp(_range_tombstones.begin()->position(), mf.position()) <= 0) { emit_range_tombstone(); } else { _stack_size -= mf.memory_usage(); @@ -148,7 +149,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query return make_ready_future(stop_iteration::yes); } } else if (mf.is_range_tombstone()) { - _range_tombstones.apply(*_schema, std::move(mf.as_range_tombstone())); + auto&& rt = std::move(mf).as_range_tombstone(); + rt.reverse(); + _range_tombstones.apply(*_schema, std::move(rt)); } else { _mutation_fragments.emplace(std::move(mf)); _stack_size += _mutation_fragments.top().memory_usage(); @@ -182,7 +185,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query } public: explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, query::max_result_size max_size) - : flat_mutation_reader::impl(mr.schema(), mr.permit()) + : flat_mutation_reader::impl(mr.schema()->make_reversed(), mr.permit()) , _source(&mr) , _range_tombstones(*_schema) , _max_size(max_size) diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 9037513f57..6d0f272db2 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -890,14 +890,17 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer) { flat_mutation_reader make_generating_reader(schema_ptr s, reader_permit permit, std::function ()> get_next_fragment); -/// A reader that emits partitions in reverse. +/// A reader that emits partitions in native reverse order. /// -/// 1. Static row is still emitted first. -/// 2. Range tombstones are ordered by their end position. -/// 3. Clustered rows and range tombstones are emitted in descending order. -/// Because of 2 and 3 the guarantee that a range tombstone is emitted before +/// 1. The reader's schema() method will return a reversed schema (see +/// \ref schema::make_reversed()). +/// 2. Static row is still emitted first. +/// 3. Range tombstones' bounds are reversed (see \ref range_tombstone::reverse()). +/// 4. Clustered rows and range tombstones are emitted in descending order. +/// Because of 3 and 4 the guarantee that a range tombstone is emitted before /// any mutation fragment affected by it still holds. /// Ordering of partitions themselves remains unchanged. +/// For more details see docs/design-notes/reverse-reads.md. /// /// \param original the reader to be reversed, has to be kept alive while the /// reversing reader is in use. diff --git a/idl/read_command.idl.hh b/idl/read_command.idl.hh index 4be3008993..d73b2fa3ea 100644 --- a/idl/read_command.idl.hh +++ b/idl/read_command.idl.hh @@ -30,6 +30,12 @@ class specific_ranges { std::vector> ranges(); }; +// COMPATIBILITY NOTE: the partition-slice for reverse queries has two different +// format: +// * legacy format +// * native format +// The wire format uses the legacy format. See docs/design-notes/reverse-reads.md +// for more details on the formats. class partition_slice { std::vector> default_row_ranges(); utils::small_vector static_columns; diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 42ac32fd5b..9d0c92bb01 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -224,7 +224,7 @@ public: , _row_limit(limit) , _partition_limit(partition_limit) , _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit()) - , _range_tombstones(s, _slice.options.contains(query::partition_slice::option::reversed)) + , _range_tombstones(s, false) , _last_dk({dht::token(), partition_key::make_empty()}) { static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction."); diff --git a/mutation_partition.cc b/mutation_partition.cc index 3e7a40af9d..70222f6888 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1978,8 +1978,7 @@ void reconcilable_result_builder::consume_new_partition(const dht::decorated_key !has_ck_selector(_slice.row_ranges(_schema, dk.key())); _static_row_is_alive = false; _live_rows = 0; - auto is_reversed = _slice.options.contains(query::partition_slice::option::reversed); - _mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), is_reversed)); + _mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), _reversed)); } void reconcilable_result_builder::consume(tombstone t) { @@ -2008,6 +2007,10 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom stop_iteration reconcilable_result_builder::consume(range_tombstone&& rt) { _memory_accounter.update(rt.memory_usage(_schema)); + if (_reversed) { + // undo reversing done for the native reversed format, coordinator still uses old reversing format + rt.reverse(); + } return _mutation_consumer->consume(std::move(rt)); } diff --git a/mutation_query.hh b/mutation_query.hh index 74ba786585..0e54f8ee39 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -137,6 +137,7 @@ public: class reconcilable_result_builder { const schema& _schema; const query::partition_slice& _slice; + bool _reversed; bool _return_static_content_on_partition_with_no_rows{}; bool _static_row_is_alive{}; @@ -151,7 +152,7 @@ class reconcilable_result_builder { public: reconcilable_result_builder(const schema& s, const query::partition_slice& slice, query::result_memory_accounter&& accounter) noexcept - : _schema(s), _slice(slice) + : _schema(s), _slice(slice), _reversed(_slice.options.contains(query::partition_slice::option::reversed)) , _memory_accounter(std::move(accounter)) { } diff --git a/querier.hh b/querier.hh index 0c24221444..9605f2a46a 100644 --- a/querier.hh +++ b/querier.hh @@ -128,6 +128,14 @@ protected: std::variant _reader; dht::partition_ranges_view _query_ranges; +protected: + schema_ptr underlying_schema() const { + if (is_reversed()) { + return _schema->make_reversed(); + } + return _schema; + } + public: querier_base(reader_permit permit, std::unique_ptr range, std::unique_ptr slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges) @@ -145,7 +153,7 @@ public: , _permit(std::move(permit)) , _range(std::make_unique(std::move(range))) , _slice(std::make_unique(std::move(slice))) - , _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) + , _reader(ms.make_reader(underlying_schema(), _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) , _query_ranges(*_range) { } diff --git a/query-request.hh b/query-request.hh index 59bfeee4d2..f741b381e2 100644 --- a/query-request.hh +++ b/query-request.hh @@ -128,6 +128,13 @@ constexpr auto max_rows_if_set = std::numeric_limits::max(); // Specifies subset of rows, columns and cell attributes to be returned in a query. // Can be accessed across cores. // Schema-dependent. +// +// COMPATIBILITY NOTE: the partition-slice for reverse queries has two different +// format: +// * legacy format +// * native format +// The wire format uses the legacy format. See docs/design-notes/reverse-reads.md +// for more details on the formats. class partition_slice { friend class ::partition_slice_builder; public: diff --git a/table.cc b/table.cc index 5e784decd3..b3388b0634 100644 --- a/table.cc +++ b/table.cc @@ -2052,7 +2052,10 @@ table::mutation_query(schema_ptr s, std::exception_ptr ex; try { - auto rrb = reconcilable_result_builder(*s, cmd.slice, std::move(accounter)); + // Un-reverse the schema sent to the coordinator, it expects the + // legacy format. + auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s; + auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter)); auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query); if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) { diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 5ee5a94bf5..394e66860d 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -476,8 +476,8 @@ using in_thread = seastar::bool_class; struct flat_stream_consumer { schema_ptr _schema; + schema_ptr _reversed_schema; reader_permit _permit; - reversed_partitions _reversed; skip_after_first_fragment _skip_partition; skip_after_first_partition _skip_stream; std::vector _mutations; @@ -485,22 +485,17 @@ struct flat_stream_consumer { bool _inside_partition = false; private: void verify_order(position_in_partition_view pos) { - position_in_partition::less_compare cmp(*_schema); - if (!_reversed) { - BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row() - || cmp(*_previous_position, pos)); - } else { - BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row() - || cmp(pos, *_previous_position)); - } + const schema& s = _reversed_schema ? *_reversed_schema : *_schema; + position_in_partition::less_compare cmp(s); + BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row() || cmp(*_previous_position, pos)); } public: flat_stream_consumer(schema_ptr s, reader_permit permit, reversed_partitions reversed, skip_after_first_fragment skip_partition = skip_after_first_fragment::no, skip_after_first_partition skip_stream = skip_after_first_partition::no) : _schema(std::move(s)) + , _reversed_schema(reversed ? _schema->make_reversed() : nullptr) , _permit(std::move(permit)) - , _reversed(reversed) , _skip_partition(skip_partition) , _skip_stream(skip_stream) { } @@ -534,10 +529,13 @@ public: } stop_iteration consume(range_tombstone&& rt) { BOOST_REQUIRE(_inside_partition); - auto pos = _reversed ? rt.end_position() : rt.position(); + auto pos = rt.position(); verify_order(pos); BOOST_REQUIRE_GE(_mutations.size(), 1); _previous_position.emplace(pos); + if (_reversed_schema) { + rt.reverse(); // undo the reversing + } _mutations.back().partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(rt))); return stop_iteration(bool(_skip_partition)); } diff --git a/test/boost/mutation_query_test.cc b/test/boost/mutation_query_test.cc index 545ff8f383..e1e30d313c 100644 --- a/test/boost/mutation_query_test.cc +++ b/test/boost/mutation_query_test.cc @@ -65,7 +65,11 @@ mutation_source make_source(std::vector mutations) { const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { assert(range.is_full()); // slicing not implemented yet for (auto&& m : mutations) { - assert(m.schema() == s); + if (slice.options.contains(query::partition_slice::option::reversed)) { + assert(m.schema()->make_reversed()->version() == s->version()); + } else { + assert(m.schema() == s); + } } return flat_mutation_reader_from_mutations(std::move(permit), mutations, slice, fwd); }); From 16b9d19e50b93b4c984666cc3f286d2ccd97338a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 18 Aug 2021 16:04:29 +0300 Subject: [PATCH 15/23] mutation: make copy constructor compatible with mutation_opt Currently `_data` is assumed to be engaged by the copy constructor which is not necessarily the case with `mutation_opt` objects (which is an `optimized_optional`). Fix this by only copying `_data` if non-null. --- mutation.hh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mutation.hh b/mutation.hh index a214460fdb..2c16bf314a 100644 --- a/mutation.hh +++ b/mutation.hh @@ -82,8 +82,11 @@ public: : _ptr(std::make_unique(std::move(schema), std::move(key), std::move(mp))) { } mutation(const mutation& m) - : _ptr(std::make_unique(schema_ptr(m.schema()), dht::decorated_key(m.decorated_key()), m.partition())) - { } + { + if (m._ptr) { + _ptr = std::make_unique(schema_ptr(m.schema()), dht::decorated_key(m.decorated_key()), m.partition()); + } + } mutation(mutation&&) = default; mutation& operator=(mutation&& x) = default; mutation& operator=(const mutation& m); From 74a22a706b5d17386a2a836911c7e06814f71457 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 18 Aug 2021 16:07:03 +0300 Subject: [PATCH 16/23] mutation_rebuilder: make it standalone Not requiring a wrapper object to become usable. --- mutation.cc | 44 +------------------------ mutation_rebuilder.hh | 56 +++++++++++++++++--------------- test/lib/mutation_source_test.cc | 3 +- 3 files changed, 33 insertions(+), 70 deletions(-) diff --git a/mutation.cc b/mutation.cc index 72a3bdfdf9..7032c2ddf7 100644 --- a/mutation.cc +++ b/mutation.cc @@ -196,49 +196,7 @@ future read_mutation_from_flat_mutation_reader(flat_mutation_reade }); } // r.is_buffer_empty() is always false at this point - struct adapter { - schema_ptr _s; - std::optional _builder; - adapter(schema_ptr s) : _s(std::move(s)) { } - - void consume_new_partition(const dht::decorated_key& dk) { - assert(!_builder); - _builder = mutation_rebuilder(dk, std::move(_s)); - } - - stop_iteration consume(tombstone t) { - assert(_builder); - return _builder->consume(t); - } - - stop_iteration consume(range_tombstone&& rt) { - assert(_builder); - return _builder->consume(std::move(rt)); - } - - stop_iteration consume(static_row&& sr) { - assert(_builder); - return _builder->consume(std::move(sr)); - } - - stop_iteration consume(clustering_row&& cr) { - assert(_builder); - return _builder->consume(std::move(cr)); - } - - stop_iteration consume_end_of_partition() { - assert(_builder); - return stop_iteration::yes; - } - - mutation_opt consume_end_of_stream() { - if (!_builder) { - return mutation_opt(); - } - return _builder->consume_end_of_stream(); - } - }; - return r.consume(adapter(r.schema())); + return r.consume(mutation_rebuilder(r.schema())); } std::ostream& operator<<(std::ostream& os, const mutation& m) { diff --git a/mutation_rebuilder.hh b/mutation_rebuilder.hh index c893cc26e8..1a17667f5e 100644 --- a/mutation_rebuilder.hh +++ b/mutation_rebuilder.hh @@ -25,38 +25,51 @@ #include "range_tombstone_assembler.hh" class mutation_rebuilder { - mutation _m; + schema_ptr _s; + mutation_opt _m; public: - mutation_rebuilder(dht::decorated_key dk, schema_ptr s) - : _m(std::move(s), std::move(dk)) { + explicit mutation_rebuilder(schema_ptr s) : _s(std::move(s)) { } + + void consume_new_partition(const dht::decorated_key& dk) { + assert(!_m); + _m = mutation(_s, std::move(dk)); } stop_iteration consume(tombstone t) { - _m.partition().apply(t); + assert(_m); + _m->partition().apply(t); return stop_iteration::no; } stop_iteration consume(range_tombstone&& rt) { - _m.partition().apply_row_tombstone(*_m.schema(), std::move(rt)); + assert(_m); + _m->partition().apply_row_tombstone(*_s, std::move(rt)); return stop_iteration::no; } stop_iteration consume(static_row&& sr) { - _m.partition().static_row().apply(*_m.schema(), column_kind::static_column, std::move(sr.cells())); + assert(_m); + _m->partition().static_row().apply(*_s, column_kind::static_column, std::move(sr.cells())); return stop_iteration::no; } stop_iteration consume(clustering_row&& cr) { - auto& dr = _m.partition().clustered_row(*_m.schema(), std::move(cr.key())); + assert(_m); + auto& dr = _m->partition().clustered_row(*_s, std::move(cr.key())); dr.apply(cr.tomb()); dr.apply(cr.marker()); - dr.cells().apply(*_m.schema(), column_kind::regular_column, std::move(cr.cells())); + dr.cells().apply(*_s, column_kind::regular_column, std::move(cr.cells())); return stop_iteration::no; } + stop_iteration consume_end_of_partition() { + assert(_m); + return stop_iteration::yes; + } + mutation_opt consume_end_of_stream() { - return mutation_opt(std::move(_m)); + return std::move(_m); } }; @@ -65,10 +78,10 @@ public: // Does not work with streams in streamed_mutation::forwarding::yes mode. class mutation_rebuilder_v2 { schema_ptr _s; - std::optional _builder; + mutation_rebuilder _builder; range_tombstone_assembler _rt_assembler; public: - mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)) { } + mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { } public: stop_iteration consume(partition_start mf) { consume_new_partition(mf.key()); @@ -82,47 +95,38 @@ public: } public: void consume_new_partition(const dht::decorated_key& dk) { - assert(!_builder); - _builder = mutation_rebuilder(dk, _s); + _builder.consume_new_partition(dk); } stop_iteration consume(tombstone t) { - assert(_builder); - _builder->consume(t); + _builder.consume(t); return stop_iteration::no; } stop_iteration consume(range_tombstone_change&& rt) { - assert(_builder); if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) { - _builder->consume(std::move(*rt_opt)); + _builder.consume(std::move(*rt_opt)); } return stop_iteration::no; } stop_iteration consume(static_row&& sr) { - assert(_builder); - _builder->consume(std::move(sr)); + _builder.consume(std::move(sr)); return stop_iteration::no; } stop_iteration consume(clustering_row&& cr) { - assert(_builder); - _builder->consume(std::move(cr)); + _builder.consume(std::move(cr)); return stop_iteration::no; } stop_iteration consume_end_of_partition() { - assert(_builder); _rt_assembler.on_end_of_stream(); return stop_iteration::yes; } mutation_opt consume_end_of_stream() { - if (!_builder) { - return mutation_opt(); - } _rt_assembler.on_end_of_stream(); - return _builder->consume_end_of_stream(); + return _builder.consume_end_of_stream(); } }; diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index 6a031e9629..d475f8902e 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -406,7 +406,8 @@ static void test_streamed_mutation_forwarding_is_consistent_with_slicing(tests:: void consume_new_partition(const dht::decorated_key& dk) { assert(!_builder); - _builder = mutation_rebuilder(dk, std::move(_s)); + _builder = mutation_rebuilder(std::move(_s)); + _builder->consume_new_partition(dk); } stop_iteration consume(tombstone t) { From 1d6896c14f93c1315e4ad64864b9118230752166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 18 Aug 2021 16:08:35 +0300 Subject: [PATCH 17/23] mutation: introduce reverse() Which reverses the mutation as if it was created with a schema with reversed clustering order. --- mutation.cc | 6 ++++++ mutation.hh | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/mutation.cc b/mutation.cc index 7032c2ddf7..bc713d5eb2 100644 --- a/mutation.cc +++ b/mutation.cc @@ -199,6 +199,12 @@ future read_mutation_from_flat_mutation_reader(flat_mutation_reade return r.consume(mutation_rebuilder(r.schema())); } +mutation reverse(mutation mut) { + auto reverse_schema = mut.schema()->make_reversed(); + mutation_rebuilder reverse_rebuilder(reverse_schema); + return *std::move(mut).consume(reverse_rebuilder, consume_in_reverse::yes).result; +} + std::ostream& operator<<(std::ostream& os, const mutation& m) { const ::schema& s = *m.schema(); const auto& dk = m.decorated_key(); diff --git a/mutation.hh b/mutation.hh index 2c16bf314a..ef703eb637 100644 --- a/mutation.hh +++ b/mutation.hh @@ -330,3 +330,7 @@ class flat_mutation_reader; // Reads a single partition from a reader. Returns empty optional if there are no more partitions to be read. future read_mutation_from_flat_mutation_reader(flat_mutation_reader& reader); + +// Reverses the mutation as if it was created with a schema with reverse +// clustering order. The resulting mutation will contain a reverse schema too. +mutation reverse(mutation mut); From c71a281e6b3cb5301241ac02d0edb48aedf59785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 19 Aug 2021 14:08:18 +0300 Subject: [PATCH 18/23] test/lib/mutation_source_test: add consistent log to all methods Most test methods log their own name either via testlog.info() or BOOST_TEST_MESSAGE() so failures can be more easily located. Not all do however. This commit fixes this and also converts all those using BOOST_TEST_MESSAGE() for this to testlog.info(), for consistency. --- test/lib/mutation_source_test.cc | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index d475f8902e..ff6a165f1a 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -843,7 +843,7 @@ static void test_streamed_mutation_forwarding_across_range_tombstones(tests::rea } static void test_range_queries(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { - testlog.info("Testing range queries"); + testlog.info(__PRETTY_FUNCTION__); auto s = schema_builder("ks", "cf") .with_column("key", bytes_type, column_kind::partition_key) @@ -1197,7 +1197,7 @@ static void test_clustering_slices(tests::reader_concurrency_semaphore_wrapper& } static void test_query_only_static_row(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { - BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__); + testlog.info(__PRETTY_FUNCTION__); simple_schema s; @@ -1243,7 +1243,7 @@ static void test_query_only_static_row(tests::reader_concurrency_semaphore_wrapp } static void test_query_no_clustering_ranges_no_static_columns(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { - BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__); + testlog.info(__PRETTY_FUNCTION__); simple_schema s(simple_schema::with_static::no); @@ -1287,6 +1287,8 @@ static void test_query_no_clustering_ranges_no_static_columns(tests::reader_conc } void test_streamed_mutation_forwarding_succeeds_with_no_data(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema s; auto cks = s.make_ckeys(6); @@ -1324,6 +1326,8 @@ void test_streamed_mutation_forwarding_succeeds_with_no_data(tests::reader_concu static void test_slicing_with_overlapping_range_tombstones(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema ss; auto s = ss.schema(); @@ -1421,6 +1425,8 @@ void test_slicing_with_overlapping_range_tombstones(tests::reader_concurrency_se } void test_downgrade_to_v1_clear_buffer(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema s; auto pkey = s.make_pkey(); sstring value(256, 'v'); @@ -1445,6 +1451,8 @@ void test_downgrade_to_v1_clear_buffer(tests::reader_concurrency_semaphore_wrapp } void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema s; auto pkey = s.make_pkey(); @@ -1635,7 +1643,8 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap } void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { - BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__); + testlog.info(__PRETTY_FUNCTION__); + for_each_mutation([&] (const mutation& m) mutable { const auto query_time = gc_clock::now(); @@ -1662,6 +1671,8 @@ void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaph void test_next_partition(tests::reader_concurrency_semaphore_wrapper&, populate_fn_ex); void run_mutation_reader_tests(populate_fn_ex populate, bool with_partition_range_forwarding) { + testlog.info(__PRETTY_FUNCTION__); + tests::reader_concurrency_semaphore_wrapper semaphore; test_range_tombstones_v2(semaphore, populate); @@ -1689,6 +1700,8 @@ void run_mutation_reader_tests(populate_fn_ex populate, bool with_partition_rang } void test_next_partition(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { + testlog.info(__PRETTY_FUNCTION__); + simple_schema s; auto pkeys = s.make_pkeys(4); From 350440b418b63f0822a582a1457a6f3ab36edb71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 19 Aug 2021 14:08:05 +0300 Subject: [PATCH 19/23] flat_mutation_reader: make_reversing_reader(): take ownership of the reader Makes for much simpler client code. --- flat_mutation_reader.cc | 25 ++++++++++++------------- flat_mutation_reader.hh | 5 ++--- querier.hh | 2 +- test/boost/flat_mutation_reader_test.cc | 10 ++++------ 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 3fe733392e..e519663c91 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -95,9 +95,9 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() { _buffer_size = compute_buffer_size(*_schema, _buffer); } -flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size) { +flat_mutation_reader make_reversing_reader(flat_mutation_reader original, query::max_result_size max_size) { class partition_reversing_mutation_reader final : public flat_mutation_reader::impl { - flat_mutation_reader* _source; + flat_mutation_reader _source; range_tombstone_list _range_tombstones; std::stack _mutation_fragments; mutation_fragment_opt _partition_end; @@ -132,15 +132,15 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query return stop_iteration::no; } future consume_partition_from_source() { - if (_source->is_buffer_empty()) { - if (_source->is_end_of_stream()) { + if (_source.is_buffer_empty()) { + if (_source.is_end_of_stream()) { _end_of_stream = true; return make_ready_future(stop_iteration::yes); } - return _source->fill_buffer().then([] { return stop_iteration::no; }); + return _source.fill_buffer().then([] { return stop_iteration::no; }); } - while (!_source->is_buffer_empty() && !is_buffer_full()) { - auto mf = _source->pop_mutation_fragment(); + while (!_source.is_buffer_empty() && !is_buffer_full()) { + auto mf = _source.pop_mutation_fragment(); if (mf.is_partition_start() || mf.is_static_row()) { push_mutation_fragment(std::move(mf)); } else if (mf.is_end_of_partition()) { @@ -184,9 +184,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query return make_ready_future(is_buffer_full()); } public: - explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, query::max_result_size max_size) + explicit partition_reversing_mutation_reader(flat_mutation_reader mr, query::max_result_size max_size) : flat_mutation_reader::impl(mr.schema()->make_reversed(), mr.permit()) - , _source(&mr) + , _source(std::move(mr)) , _range_tombstones(*_schema) , _max_size(max_size) { } @@ -214,7 +214,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query } _range_tombstones.clear(); _partition_end = std::nullopt; - return _source->next_partition(); + return _source.next_partition(); } return make_ready_future<>(); } @@ -228,12 +228,11 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query } virtual future<> close() noexcept override { - // we don't own _source therefore do not close it - return make_ready_future<>(); + return _source.close(); } }; - return make_flat_mutation_reader(original, max_size); + return make_flat_mutation_reader(std::move(original), max_size); } template diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 6d0f272db2..1ec196b39e 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -902,8 +902,7 @@ make_generating_reader(schema_ptr s, reader_permit permit, std::function resolved when the reader is fully consumed, and closed. diff --git a/querier.hh b/querier.hh index 9605f2a46a..fe3f58e404 100644 --- a/querier.hh +++ b/querier.hh @@ -97,7 +97,7 @@ auto consume_page(flat_mutation_reader& reader, auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), max_size] () mutable { if (slice.options.contains(query::partition_slice::option::reversed)) { - return with_closeable(make_reversing_reader(reader, max_size), + return with_closeable(make_reversing_reader(make_flat_mutation_reader(reader), max_size), [reader_consumer = std::move(reader_consumer)] (flat_mutation_reader& reversing_reader) mutable { return reversing_reader.consume(std::move(reader_consumer)); }); diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 394e66860d..9320877d6f 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -562,7 +562,8 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti return fmr.consume_in_thread(std::move(fsc)); } else { if (reversed) { - return with_closeable(make_reversing_reader(fmr, query::max_result_size(size_t(1) << 20)), [fsc = std::move(fsc)] (flat_mutation_reader& reverse_reader) mutable { + return with_closeable(make_reversing_reader(make_flat_mutation_reader(fmr), query::max_result_size(size_t(1) << 20)), + [fsc = std::move(fsc)] (flat_mutation_reader& reverse_reader) mutable { return reverse_reader.consume(std::move(fsc)); }).get0(); } @@ -811,11 +812,8 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) { } const uint64_t hard_limit = size_t(1) << 18; - auto reader = flat_mutation_reader_from_mutations(semaphore.make_permit(), {mut}); - // need to close both readers since the reverse_reader - // doesn't own the reader passed to it by ref. - auto close_reader = deferred_close(reader); - auto reverse_reader = make_reversing_reader(reader, query::max_result_size(size_t(1) << 10, hard_limit)); + auto reverse_reader = make_reversing_reader(flat_mutation_reader_from_mutations(semaphore.make_permit(), {mut}), + query::max_result_size(size_t(1) << 10, hard_limit)); auto close_reverse_reader = deferred_close(reverse_reader); try { From bf38e204af67422e9f4fbb0dea9ac0a607b0aac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 23 Aug 2021 15:42:41 +0300 Subject: [PATCH 20/23] flat_mutation_reader: make_reversing_reader(): implement fast_forward_to(partition_range) --- flat_mutation_reader.cc | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index e519663c91..5b9bc9d4dd 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -219,8 +219,15 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader original, query: return make_ready_future<>(); } - virtual future<> fast_forward_to(const dht::partition_range&) override { - return make_exception_future<>(make_backtraced_exception_ptr()); + virtual future<> fast_forward_to(const dht::partition_range& pr) override { + clear_buffer(); + while (!_mutation_fragments.empty()) { + _mutation_fragments.pop(); + } + _stack_size = 0; + _partition_end = std::nullopt; + _end_of_stream = false; + return _source.fast_forward_to(pr); } virtual future<> fast_forward_to(position_range) override { From 3cc882f6a88d9061bd87de6e921871d93c371d69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 2 Aug 2021 16:17:54 +0300 Subject: [PATCH 21/23] test/boost/flat_mutation_reader_test: more reversed reader tests Check that the reverse reader emits a stream identical to that emitted by a reader reading in native order from a table with reversed clustering order. --- test/boost/flat_mutation_reader_test.cc | 93 +++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 9320877d6f..fd049dd410 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -41,6 +41,8 @@ #include "test/lib/flat_mutation_reader_assertions.hh" #include "test/lib/log.hh" #include "test/lib/reader_concurrency_semaphore.hh" +#include "test/lib/random_utils.hh" +#include "test/lib/random_schema.hh" #include @@ -836,3 +838,94 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) { test_with_partition(true); test_with_partition(false); } + +SEASTAR_THREAD_TEST_CASE(test_reverse_reader_reads_in_native_reverse_order) { + using namespace tests::data_model; + using key_range = nonwrapping_interval; + + std::mt19937 engine(tests::random::get_int()); + + tests::reader_concurrency_semaphore_wrapper semaphore; + auto permit = semaphore.make_permit(); + + auto rnd_schema_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(1, 8)); + auto rnd_schema = tests::random_schema(engine(), *rnd_schema_spec); + + auto forward_schema = rnd_schema.schema(); + auto reverse_schema = forward_schema->make_reversed(); + + auto forward_mt = make_lw_shared(forward_schema); + auto reverse_mt = make_lw_shared(reverse_schema); + + for (size_t pk = 0; pk != 8; ++pk) { + auto mut = rnd_schema.new_mutation(pk); + + if (forward_schema->has_static_columns()) { + rnd_schema.add_static_row(engine, mut); + } + + auto ckeys = rnd_schema.make_ckeys(8); + + for (size_t ck = 0; ck != ckeys.size(); ++ck) { + const auto& ckey = ckeys.at(ck); + if (ck % 4 == 0 && ck + 3 < ckeys.size()) { + const auto& ckey_1 = ckeys.at(ck + 1); + const auto& ckey_2 = ckeys.at(ck + 2); + const auto& ckey_3 = ckeys.at(ck + 3); + rnd_schema.delete_range(engine, mut, key_range::make({ckey, true}, {ckey_2, true})); + rnd_schema.delete_range(engine, mut, key_range::make({ckey, true}, {ckey_3, true})); + rnd_schema.delete_range(engine, mut, key_range::make({ckey_1, true}, {ckey_3, true})); + } + rnd_schema.add_row(engine, mut, ckey); + } + + forward_mt->apply(mut.build(forward_schema)); + reverse_mt->apply(mut.build(reverse_schema)); + } + + auto reversed_forward_reader = assert_that(make_reversing_reader(forward_mt->make_flat_reader(forward_schema, permit), query::max_result_size(1 << 20))); + + auto reverse_reader = reverse_mt->make_flat_reader(reverse_schema, permit); + auto deferred_reverse_close = deferred_close(reverse_reader); + + while (auto mf_opt = reverse_reader().get()) { + auto& mf = *mf_opt; + reversed_forward_reader.produces(*forward_schema, mf); + } + reversed_forward_reader.produces_end_of_stream(); +} + +SEASTAR_THREAD_TEST_CASE(test_reverse_reader_is_mutation_source) { + std::list reversed_slices; + auto populate = [&reversed_slices] (schema_ptr s, const std::vector &muts) { + auto reverse_schema = s->make_reversed(); + auto reverse_mt = make_lw_shared(reverse_schema); + for (const auto& mut : muts) { + reverse_mt->apply(reverse(mut)); + } + + return mutation_source([=, &reversed_slices] ( + schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_ptr, + streamed_mutation::forwarding fwd_sm, + mutation_reader::forwarding fwd_mr) mutable { + reversed_slices.emplace_back(query::reverse_slice(*schema, slice)); + // We don't want the memtable reader to read in reverse. + reversed_slices.back().options.remove(query::partition_slice::option::reversed); + auto rd = make_reversing_reader(reverse_mt->make_flat_reader(schema->make_reversed(), std::move(permit), range, reversed_slices.back(), pc, + std::move(trace_ptr), streamed_mutation::forwarding::no, fwd_mr), query::max_result_size(1 << 20)); + if (fwd_sm) { + return make_forwardable(std::move(rd)); + } + return rd; + }); + }; + run_mutation_source_tests(populate); +} From f07805c3efc9cca126b6f86715d218c5ef753d92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 18 Aug 2021 16:09:46 +0300 Subject: [PATCH 22/23] test/boost/mutation_test: add test for mutation::consume() monotonicity In both forward and reverse modes. --- test/boost/mutation_test.cc | 66 +++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 4ec19f0ad2..a0af66d4e2 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -3122,3 +3122,69 @@ SEASTAR_THREAD_TEST_CASE(test_appending_hash_row_4567) { // These checks are meaningful because legacy hashing is still used for old nodes. BOOST_CHECK_EQUAL(compute_legacy_hash(r1, { 0, 1, 2 }), compute_legacy_hash(r2, { 0, 1, 2 })); } + +SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) { + std::mt19937 engine(tests::random::get_int()); + + tests::reader_concurrency_semaphore_wrapper semaphore; + auto permit = semaphore.make_permit(); + + auto rnd_schema_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(1, 8)); + auto rnd_schema = tests::random_schema(engine(), *rnd_schema_spec); + + auto forward_schema = rnd_schema.schema(); + auto reverse_schema = forward_schema->make_reversed(); + + const auto muts = tests::generate_random_mutations( + rnd_schema, + tests::default_timestamp_generator(), + tests::no_expiry_expiry_generator(), + std::uniform_int_distribution(1, 1)).get(); + + class validating_consumer { + mutation_fragment_stream_validator _validator; + + public: + explicit validating_consumer(const schema& s) : _validator(s) { } + + void consume_new_partition(const dht::decorated_key&) { + BOOST_REQUIRE(_validator(mutation_fragment::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{}))); + } + void consume(tombstone) { } + stop_iteration consume(static_row&& sr) { + BOOST_REQUIRE(_validator(mutation_fragment::kind::static_row, sr.position())); + return stop_iteration::no; + } + stop_iteration consume(clustering_row&& cr) { + BOOST_REQUIRE(_validator(mutation_fragment::kind::clustering_row, cr.position())); + return stop_iteration::no; + } + stop_iteration consume(range_tombstone&& rt) { + BOOST_REQUIRE(_validator(mutation_fragment::kind::range_tombstone, rt.position())); + return stop_iteration::no; + } + stop_iteration consume_end_of_partition() { + BOOST_REQUIRE(_validator(mutation_fragment::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{}))); + return stop_iteration::no; + } + void consume_end_of_stream() { + BOOST_REQUIRE(_validator.on_end_of_stream()); + } + }; + + BOOST_TEST_MESSAGE("Forward"); + { + auto mut = muts.front(); + validating_consumer consumer(*forward_schema); + std::move(mut).consume(consumer, consume_in_reverse::no); + } + BOOST_TEST_MESSAGE("Reverse"); + { + auto mut = muts.front(); + validating_consumer consumer(*reverse_schema); + std::move(mut).consume(consumer, consume_in_reverse::yes); + } +} From f02632aeb0d9560f4bc1e112d43f878424203d88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 5 Aug 2021 12:27:00 +0300 Subject: [PATCH 23/23] range_tombstone_accumulator: drop _reversed flag --- db/view/view.hh | 4 ++-- mutation_compactor.hh | 4 ++-- range_tombstone.cc | 12 ++---------- range_tombstone.hh | 5 ++--- test/boost/mutation_test.cc | 2 +- test/boost/range_tombstone_list_test.cc | 25 +------------------------ 6 files changed, 10 insertions(+), 42 deletions(-) diff --git a/db/view/view.hh b/db/view/view.hh index 9371dad0b7..c6381fa8b8 100644 --- a/db/view/view.hh +++ b/db/view/view.hh @@ -192,8 +192,8 @@ public: , _view_updates(std::move(views_to_update)) , _updates(std::move(updates)) , _existings(std::move(existings)) - , _update_tombstone_tracker(*_schema, false) - , _existing_tombstone_tracker(*_schema, false) + , _update_tombstone_tracker(*_schema) + , _existing_tombstone_tracker(*_schema) , _now(now) { } view_update_builder(view_update_builder&& other) noexcept = default; diff --git a/mutation_compactor.hh b/mutation_compactor.hh index 9d0c92bb01..d5070ec244 100644 --- a/mutation_compactor.hh +++ b/mutation_compactor.hh @@ -224,7 +224,7 @@ public: , _row_limit(limit) , _partition_limit(partition_limit) , _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit()) - , _range_tombstones(s, false) + , _range_tombstones(s) , _last_dk({dht::token(), partition_key::make_empty()}) { static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction."); @@ -238,7 +238,7 @@ public: , _get_max_purgeable(std::move(get_max_purgeable)) , _can_gc([this] (tombstone t) { return can_gc(t); }) , _slice(s.full_slice()) - , _range_tombstones(s, false) + , _range_tombstones(s) , _last_dk({dht::token(), partition_key::make_empty()}) , _collector(std::make_unique(_schema)) { diff --git a/range_tombstone.cc b/range_tombstone.cc index ce6cb83761..ffcf45e5ae 100644 --- a/range_tombstone.cc +++ b/range_tombstone.cc @@ -73,10 +73,6 @@ void range_tombstone_accumulator::update_current_tombstone() { void range_tombstone_accumulator::drop_unneeded_tombstones(const clustering_key_prefix& ck, int w) { auto cmp = [&] (const range_tombstone& rt, const clustering_key_prefix& ck, int w) { - if (_reversed) { - auto bv = rt.start_bound(); - return _cmp(ck, w, bv.prefix(), weight(bv.kind())); - } auto bv = rt.end_bound(); return _cmp(bv.prefix(), weight(bv.kind()), ck, w); }; @@ -91,15 +87,11 @@ void range_tombstone_accumulator::drop_unneeded_tombstones(const clustering_key_ } void range_tombstone_accumulator::apply(range_tombstone rt) { - if (_reversed) { - drop_unneeded_tombstones(rt.end, weight(rt.end_kind)); - } else { - drop_unneeded_tombstones(rt.start, weight(rt.start_kind)); - } + drop_unneeded_tombstones(rt.start, weight(rt.start_kind)); _current_tombstone.apply(rt.tomb); auto cmp = [&] (const range_tombstone& rt1, const range_tombstone& rt2) { - return _reversed ? _cmp(rt2.start_bound(), rt1.start_bound()) : _cmp(rt1.end_bound(), rt2.end_bound()); + return _cmp(rt1.end_bound(), rt2.end_bound()); }; _range_tombstones.insert(boost::upper_bound(_range_tombstones, rt, cmp), std::move(rt)); } diff --git a/range_tombstone.hh b/range_tombstone.hh index e82c3528bc..6672ead65e 100644 --- a/range_tombstone.hh +++ b/range_tombstone.hh @@ -261,13 +261,12 @@ class range_tombstone_accumulator { tombstone _partition_tombstone; std::deque _range_tombstones; tombstone _current_tombstone; - bool _reversed; private: void update_current_tombstone(); void drop_unneeded_tombstones(const clustering_key_prefix& ck, int w = 0); public: - range_tombstone_accumulator(const schema& s, bool reversed) - : _cmp(s), _reversed(reversed) { } + explicit range_tombstone_accumulator(const schema& s) + : _cmp(s) { } void set_partition_tombstone(tombstone t) { _partition_tombstone = t; diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index a0af66d4e2..46e0943c88 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2889,7 +2889,7 @@ void check_clustering_row_summaries(const schema& schema, const clustering_row_s } void check_clustering_summaries(const schema& schema, const partition_summary& actual, const partition_summary& expected) { - range_tombstone_accumulator range_tombstones(schema, false); + range_tombstone_accumulator range_tombstones(schema); range_tombstones.set_partition_tombstone(expected.tomb); for (auto [actual_frag, expected_frag] : iterate_over_in_ordered_lockstep(actual.clustering_fragments, expected.clustering_fragments, diff --git a/test/boost/range_tombstone_list_test.cc b/test/boost/range_tombstone_list_test.cc index 3840451b74..4de8311abd 100644 --- a/test/boost/range_tombstone_list_test.cc +++ b/test/boost/range_tombstone_list_test.cc @@ -884,8 +884,7 @@ BOOST_AUTO_TEST_CASE(test_accumulator) { auto ts1 = 1; auto ts2 = 2; - testlog.info("Forward"); - auto acc = range_tombstone_accumulator(*s, false); + auto acc = range_tombstone_accumulator(*s); acc.apply(rtie(0, 4, ts1)); BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 0 })), tombstone(ts1, gc_now)); acc.apply(rtie(1, 2, ts2)); @@ -904,26 +903,4 @@ BOOST_AUTO_TEST_CASE(test_accumulator) { BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 13 })), tombstone(ts2, gc_now)); BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 14 })), tombstone()); BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 15 })), tombstone()); - - testlog.info("Reversed"); - acc = range_tombstone_accumulator(*s, true); - - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 15 })), tombstone()); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 14 })), tombstone()); - acc.apply(rtie(11, 14, ts2)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 13 })), tombstone(ts2, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 12 })), tombstone(ts2, gc_now)); - acc.apply(rtie(10, 12, ts1)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 11 })), tombstone(ts2, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 10 })), tombstone(ts1, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 9 })), tombstone()); - acc.apply(rtie(6, 8, ts2)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 5 })), tombstone()); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 4 })), tombstone()); - acc.apply(rtie(0, 4, ts1)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 3 })), tombstone(ts1, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 2 })), tombstone(ts1, gc_now)); - acc.apply(rtie(1, 2, ts2)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 1 })), tombstone(ts2, gc_now)); - BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 0 })), tombstone(ts1, gc_now)); }