Merge 'Introduce native reversed format' from Botond Dénes

We define the native reverse format as a reversed mutation fragment
stream that is identical to one that would be emitted by a table with
the same schema but with reversed clustering order. The main difference
to the current format is how range tombstones are handled: instead of
looking at their start or end bound depending on the order, we always
use them as-usual and the reversing reader swaps their bounds to
facilitate this. This allows us to treat reversed streams completely
transparently: just pass along them a reversed schema and all the
reader, compacting and result building code is happily ignorant about
the fact that it is a reversed stream.

This series is the first step towards implementing efficient reverse
reads. It allows us to remove all the special casing we have in various
places for reverse reads and thus treating reverse streams transparently
in all the middle layers. The only layers that have to know about the
actual reversing are mutation sources proper. The plan is that when
reading in reverse we create a reversed schema in the top layer then
pass this down as the schema for the read. There are two layers that
will need to act on this reversed schema:
* The layer sitting on top of the first layer which still can't handle
  reversed streams, this layer will create a reversed reader to handle
  the transition.
* The mutation source proper: which will obtain the underlying schema
  and will emit the data in reverse order.

Once all the mutation sources are able to handle reverse reads, we can
get rid of the reverse reader entirely.

Refs: #1413

Tests: unit(dev)

TODO:
* v2
* more testing

Also on: https://github.com/denesb/scylla.git reverse-reads/v3

Changelog

v3:
* Drop the entire schema transformation mechanism;
* Drop reversing from `schema_builder()`;
* Don't keep any information about whether the schema is reversed or not
  in the schema itself, instead make reversing deterministic w.r.t.
  schema version, such that:
  `s.version() == s.make_reversed().make_reversed().version()`;
* Re-reverse range tombstones in `streaming_mutation_freezer`, so
  `reconcilable_results` sent to the coordinator during read repair
  still use the old reverse format;

v2:
* Add `data_type reversed(data_type)`;
* Add `bound_kind reverse_kind(bound_kind)`;
* Make new API safer to use:
    - `schema::underlying_type()`: return this when unengaged;
    - `schema::make_transformed()`: noop when applying the same
      transformation again;
* Generalize reversed into transformation. Add support to transferring
  to remote nodes and shards by way of making `schema_tables` aware of
  the transformation;
* Use reverse schema everywhere in reverse reader;

Closes #9184

* github.com:scylladb/scylla:
  range_tombstone_accumulator: drop _reversed flag
  test/boost/mutation_test: add test for mutation::consume() monotonicity
  test/boost/flat_mutation_reader_test: more reversed reader tests
  flat_mutation_reader: make_reversing_reader(): implement fast_forward_to(partition_range)
  flat_mutation_reader: make_reversing_reader(): take ownership of the reader
  test/lib/mutation_source_test: add consistent log to all methods
  mutation: introduce reverse()
  mutation_rebuilder: make it standalone
  mutation: make copy constructor compatible with mutation_opt
  treewide: switch to native reversed format for reverse reads
  mutation: consume(): add native reverse order
  mutation: consume(): don't include dummy rows
  query: add slice reversing functions
  partition_slice_builder: add range mutating methods
  partition_slice_builder: add constructor with slice
  query: specific_ranges: add non-const ranges accessor
  range_tombstone: add reverse()
  clustering_bounds_comparator: add reverse_kind()
  schema: introduce make_reversed()
  schema: add a transforming copy constructor
  utils: UUID_gen: introduce negate()
  types: add reversed(data_type)
  docs: design-notes: add reverse-reads.md
This commit is contained in:
Avi Kivity
2021-09-09 15:50:22 +03:00
35 changed files with 739 additions and 184 deletions

View File

@@ -40,7 +40,10 @@ enum class bound_kind : uint8_t {
std::ostream& operator<<(std::ostream& out, const bound_kind k);
// Swaps start <-> end && incl <-> excl
bound_kind invert_kind(bound_kind k);
// Swaps start <-> end
bound_kind reverse_kind(bound_kind k);
int32_t weight(bound_kind k);
class bound_view {

View File

@@ -1376,6 +1376,11 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
const auto reversed = cmd.slice.options.contains(query::partition_slice::option::reversed);
if (reversed) {
s = s->make_reversed();
}
column_family& cf = find_column_family(cmd.cf_id);
auto& semaphore = get_reader_concurrency_semaphore();
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
@@ -1429,6 +1434,11 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
future<std::tuple<reconcilable_result, cache_temperature>>
database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
const auto reversed = cmd.slice.options.contains(query::partition_slice::option::reversed);
if (reversed) {
s = s->make_reversed();
}
const auto short_read_allwoed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed);
column_family& cf = find_column_family(cmd.cf_id);

View File

@@ -192,8 +192,8 @@ public:
, _view_updates(std::move(views_to_update))
, _updates(std::move(updates))
, _existings(std::move(existings))
, _update_tombstone_tracker(*_schema, false)
, _existing_tombstone_tracker(*_schema, false)
, _update_tombstone_tracker(*_schema)
, _existing_tombstone_tracker(*_schema)
, _now(now) {
}
view_update_builder(view_update_builder&& other) noexcept = default;

View File

@@ -0,0 +1,123 @@
# Reverse reads
A read is called reverse when it reads with reverse clustering order
(compared to that of the schema). Example:
CREATE TABLE mytable (
pk int,
ck int,
s int STATIC,
v int,
PRIMARY KEY (pk, ck)
) WITH
CLUSTERING ORDER BY (ck ASC);
# Forward read (using table's native order)
SELECT * FROM mytable WHERE pk = 1;
# Explicit forward order
SELECT * FROM mytable WHERE pk = 1 ORDER BY ck ASC;
# Reverse read
SELECT * FROM mytable WHERE pk = 1 ORDER BY ck DESC;
If the table's native clustering order is DESC, then a read with ASC
order is considered reverse.
## Legacy format
The legacy format is how scylla handled reverse queries internally. We
are in the process of migrating to the native reverse format, but for
now coordinator-side code still uses the legacy format.
### Request
The `query::partition_slice::options::reversed` flag is set.
Clustering ranges in both `query::partition_slice::_row_ranges` and
`query::specific_ranges::_ranges`
(`query::partition_slice::_specific_ranges`) are half-reversed: they
are ordered in reverse, but when they are compared to other
mutation-fragments, their end bound is used as position, instead of the
start bound as usual. When compared to other clustering ranges the end
bound is used as the start bound and vice-versa.
Example:
For the clustering keys (ASC order): `ck1`, `ck2`, `ck3`, `ck4`, `ck5`,
`ck6`.
A `_row_ranges` field of a slice might contain this:
[ck1, ck2], [ck4, ck5]
The legacy reversed version would look like this:
[ck4, ck5], [ck1, ck2]
Note how the ranges themselves are the same (bounds not reversed), it is
just the range vector itself that is reversed.
### Result
Results are ordered with the reversed clustering order with the caveat
that range-tombstones are ordered by their end bound, using the native
schema's comparators. For example given the following partition:
ps{pk1}, sr{}, cr{ck1}, rt{[ck2, ck4)}, cr{ck2}, cr{ck3}, cr{ck4}, ck{ck5}, pe{}
The legacy reverse format equivalent of this looks like the following:
ps{pk1}, sr{}, cr{ck5}, rt{[ck2, ck4)}, cr{ck4}, cr{ck3}, cr{ck2}, ck{ck1}, pe{}
Note:
* Only clustering elements change;
* Range tombstone's bounds are not reversed;
* Range tombstones can be ordered off-by-one due to native schema
comparators used: `rt{[ck2, ck4)}` should be ordered *after*
`cr{ck4}`.
Legend:
* ps = partitions-tart
* sr = static-row
* cr = clustering-row
* rt = range-tombstone
* pe = partition-end
## Native format
The native format uses ordering equivalent to that of a table with
reverse clustering format. Using `mytable` as an example, the native
reverse format would be an identical table `my_reverse_table`, which
uses `CLUSTERING ORDER BY (ck DESC);`. This allows middle layers in a
read pipeline to just use a schema with reversed clustering order and
process the reverse stream as normal.
### Request
The `query::partition_slice::options::reversed` flag is set as in the
legacy format. Clustering ranges in both
`query::partition_slice::_row_ranges` and
`query::specific_ranges::_ranges`
(`query::partition_slice::_specific_ranges`) are fully-reversed: they
are ordered in reverse, their bound being swapped as well.
Example:
For the clustering keys (ASC order): `ck1`, `ck2`, `ck3`, `ck4`, `ck5`,
`ck6`.
A `_row_ranges` field of a slice might contain this:
[ck1, ck2], [ck4, ck5]
The native reversed version would look like this:
[ck5, ck4], [ck2, ck1]
In addition to this, the schema is reversed on the replica, at the start
of the read, so all the reverse-capable and intermediate readers in the
stack get a reversed schema to work with.
### Result
Results are ordered with the reversed clustering order with
the bounds of range-tombstones swapped. For example, given the same
partition that was used in the legacy format example, the native reverse
version would look like this:
ps{pk1}, sr{}, cr{ck5}, cr{ck4}, rt{(ck4, ck2]}, cr{ck3}, cr{ck2}, ck{ck1}, pe{}

View File

@@ -95,9 +95,9 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() {
_buffer_size = compute_buffer_size(*_schema, _buffer);
}
flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size) {
flat_mutation_reader make_reversing_reader(flat_mutation_reader original, query::max_result_size max_size) {
class partition_reversing_mutation_reader final : public flat_mutation_reader::impl {
flat_mutation_reader* _source;
flat_mutation_reader _source;
range_tombstone_list _range_tombstones;
std::stack<mutation_fragment> _mutation_fragments;
mutation_fragment_opt _partition_end;
@@ -107,13 +107,14 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
private:
stop_iteration emit_partition() {
auto emit_range_tombstone = [&] {
auto it = std::prev(_range_tombstones.end());
// _range_tombstones uses the reverse schema already, so we can use `begin()`
auto it = _range_tombstones.begin();
push_mutation_fragment(*_schema, _permit, _range_tombstones.pop(it));
};
position_in_partition::less_compare cmp(*_schema);
position_in_partition::tri_compare cmp(*_schema);
while (!_mutation_fragments.empty() && !is_buffer_full()) {
auto& mf = _mutation_fragments.top();
if (!_range_tombstones.empty() && !cmp(_range_tombstones.rbegin()->end_position(), mf.position())) {
if (!_range_tombstones.empty() && cmp(_range_tombstones.begin()->position(), mf.position()) <= 0) {
emit_range_tombstone();
} else {
_stack_size -= mf.memory_usage();
@@ -131,15 +132,15 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
return stop_iteration::no;
}
future<stop_iteration> consume_partition_from_source() {
if (_source->is_buffer_empty()) {
if (_source->is_end_of_stream()) {
if (_source.is_buffer_empty()) {
if (_source.is_end_of_stream()) {
_end_of_stream = true;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return _source->fill_buffer().then([] { return stop_iteration::no; });
return _source.fill_buffer().then([] { return stop_iteration::no; });
}
while (!_source->is_buffer_empty() && !is_buffer_full()) {
auto mf = _source->pop_mutation_fragment();
while (!_source.is_buffer_empty() && !is_buffer_full()) {
auto mf = _source.pop_mutation_fragment();
if (mf.is_partition_start() || mf.is_static_row()) {
push_mutation_fragment(std::move(mf));
} else if (mf.is_end_of_partition()) {
@@ -148,7 +149,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
} else if (mf.is_range_tombstone()) {
_range_tombstones.apply(*_schema, std::move(mf.as_range_tombstone()));
auto&& rt = std::move(mf).as_range_tombstone();
rt.reverse();
_range_tombstones.apply(*_schema, std::move(rt));
} else {
_mutation_fragments.emplace(std::move(mf));
_stack_size += _mutation_fragments.top().memory_usage();
@@ -181,9 +184,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
return make_ready_future<stop_iteration>(is_buffer_full());
}
public:
explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, query::max_result_size max_size)
: flat_mutation_reader::impl(mr.schema(), mr.permit())
, _source(&mr)
explicit partition_reversing_mutation_reader(flat_mutation_reader mr, query::max_result_size max_size)
: flat_mutation_reader::impl(mr.schema()->make_reversed(), mr.permit())
, _source(std::move(mr))
, _range_tombstones(*_schema)
, _max_size(max_size)
{ }
@@ -211,13 +214,20 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
}
_range_tombstones.clear();
_partition_end = std::nullopt;
return _source->next_partition();
return _source.next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
clear_buffer();
while (!_mutation_fragments.empty()) {
_mutation_fragments.pop();
}
_stack_size = 0;
_partition_end = std::nullopt;
_end_of_stream = false;
return _source.fast_forward_to(pr);
}
virtual future<> fast_forward_to(position_range) override {
@@ -225,12 +235,11 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
}
virtual future<> close() noexcept override {
// we don't own _source therefore do not close it
return make_ready_future<>();
return _source.close();
}
};
return make_flat_mutation_reader<partition_reversing_mutation_reader>(original, max_size);
return make_flat_mutation_reader<partition_reversing_mutation_reader>(std::move(original), max_size);
}
template<typename Source>

View File

@@ -890,17 +890,19 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer) {
flat_mutation_reader
make_generating_reader(schema_ptr s, reader_permit permit, std::function<future<mutation_fragment_opt> ()> get_next_fragment);
/// A reader that emits partitions in reverse.
/// A reader that emits partitions in native reverse order.
///
/// 1. Static row is still emitted first.
/// 2. Range tombstones are ordered by their end position.
/// 3. Clustered rows and range tombstones are emitted in descending order.
/// Because of 2 and 3 the guarantee that a range tombstone is emitted before
/// 1. The reader's schema() method will return a reversed schema (see
/// \ref schema::make_reversed()).
/// 2. Static row is still emitted first.
/// 3. Range tombstones' bounds are reversed (see \ref range_tombstone::reverse()).
/// 4. Clustered rows and range tombstones are emitted in descending order.
/// Because of 3 and 4 the guarantee that a range tombstone is emitted before
/// any mutation fragment affected by it still holds.
/// Ordering of partitions themselves remains unchanged.
/// For more details see docs/design-notes/reverse-reads.md.
///
/// \param original the reader to be reversed, has to be kept alive while the
/// reversing reader is in use.
/// \param original the reader to be reversed
/// \param max_size the maximum amount of memory the reader is allowed to use
/// for reversing and conversely the maximum size of the results. The
/// reverse reader reads entire partitions into memory, before reversing
@@ -911,7 +913,7 @@ make_generating_reader(schema_ptr s, reader_permit permit, std::function<future<
///
/// FIXME: reversing should be done in the sstable layer, see #1413.
flat_mutation_reader
make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size);
make_reversing_reader(flat_mutation_reader original, query::max_result_size max_size);
/// A cosumer function that is passed a flat_mutation_reader to be consumed from
/// and returns a future<> resolved when the reader is fully consumed, and closed.

View File

@@ -30,6 +30,12 @@ class specific_ranges {
std::vector<nonwrapping_range<clustering_key_prefix>> ranges();
};
// COMPATIBILITY NOTE: the partition-slice for reverse queries has two different
// format:
// * legacy format
// * native format
// The wire format uses the legacy format. See docs/design-notes/reverse-reads.md
// for more details on the formats.
class partition_slice {
std::vector<nonwrapping_range<clustering_key_prefix>> default_row_ranges();
utils::small_vector<uint32_t, 8> static_columns;

12
keys.cc
View File

@@ -27,6 +27,8 @@
#include <boost/algorithm/string.hpp>
#include <boost/any.hpp>
logging::logger klog("keys");
std::ostream& operator<<(std::ostream& out, const partition_key& pk) {
return out << "pk{" << to_hex(managed_bytes_view(pk.representation())) << "}";
}
@@ -126,6 +128,16 @@ bound_kind invert_kind(bound_kind k) {
abort();
}
bound_kind reverse_kind(bound_kind k) {
switch (k) {
case bound_kind::excl_start: return bound_kind::excl_end;
case bound_kind::incl_start: return bound_kind::incl_end;
case bound_kind::excl_end: return bound_kind::excl_start;
case bound_kind::incl_end: return bound_kind::incl_start;
}
on_internal_error(klog, format("reverse_kind(): invalid value for `bound_kind`: {}", static_cast<std::underlying_type_t<bound_kind>>(k)));
}
int32_t weight(bound_kind k) {
switch (k) {
case bound_kind::excl_end:

View File

@@ -196,49 +196,13 @@ future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reade
});
}
// r.is_buffer_empty() is always false at this point
struct adapter {
schema_ptr _s;
std::optional<mutation_rebuilder> _builder;
adapter(schema_ptr s) : _s(std::move(s)) { }
return r.consume(mutation_rebuilder(r.schema()));
}
void consume_new_partition(const dht::decorated_key& dk) {
assert(!_builder);
_builder = mutation_rebuilder(dk, std::move(_s));
}
stop_iteration consume(tombstone t) {
assert(_builder);
return _builder->consume(t);
}
stop_iteration consume(range_tombstone&& rt) {
assert(_builder);
return _builder->consume(std::move(rt));
}
stop_iteration consume(static_row&& sr) {
assert(_builder);
return _builder->consume(std::move(sr));
}
stop_iteration consume(clustering_row&& cr) {
assert(_builder);
return _builder->consume(std::move(cr));
}
stop_iteration consume_end_of_partition() {
assert(_builder);
return stop_iteration::yes;
}
mutation_opt consume_end_of_stream() {
if (!_builder) {
return mutation_opt();
}
return _builder->consume_end_of_stream();
}
};
return r.consume(adapter(r.schema()));
mutation reverse(mutation mut) {
auto reverse_schema = mut.schema()->make_reversed();
mutation_rebuilder reverse_rebuilder(reverse_schema);
return *std::move(mut).consume(reverse_rebuilder, consume_in_reverse::yes).result;
}
std::ostream& operator<<(std::ostream& os, const mutation& m) {

View File

@@ -48,6 +48,7 @@ struct mutation_consume_result<void> {
enum class consume_in_reverse {
no = 0,
yes,
legacy_half_reverse,
};
class mutation final {
@@ -81,8 +82,11 @@ public:
: _ptr(std::make_unique<data>(std::move(schema), std::move(key), std::move(mp)))
{ }
mutation(const mutation& m)
: _ptr(std::make_unique<data>(schema_ptr(m.schema()), dht::decorated_key(m.decorated_key()), m.partition()))
{ }
{
if (m._ptr) {
_ptr = std::make_unique<data>(schema_ptr(m.schema()), dht::decorated_key(m.decorated_key()), m.partition());
}
}
mutation(mutation&&) = default;
mutation& operator=(mutation&& x) = default;
mutation& operator=(const mutation& m);
@@ -127,6 +131,16 @@ public:
// Consumes the mutation's content.
//
// The mutation is in a moved-from alike state after consumption.
// There are tree ways to consume the mutation:
// * consume_in_reverse::no - consume in forward order, as defined by the
// schema.
// * consume_in_reverse::yes - consume in reverse order, as if the schema
// had the opposite clustering order. This effectively reverses the
// mutation's content, according to the native reverse order[1].
// * consume_in_reverse::legacy_half_reverse - consume rows and range
// tombstones in legacy reverse order[2].
//
// For definition of [1] and [2] see docs/design-notes/reverse-reads.md.
template<FlattenedConsumer Consumer>
auto consume(Consumer& consumer, consume_in_reverse reverse) && -> mutation_consume_result<decltype(consumer.consume_end_of_stream())>;
@@ -151,19 +165,37 @@ private:
namespace {
template<consume_in_reverse reverse, FlattenedConsumer Consumer>
stop_iteration consume_clustering_fragments(const schema& s, mutation_partition& partition, Consumer& consumer) {
stop_iteration consume_clustering_fragments(schema_ptr s, mutation_partition& partition, Consumer& consumer) {
using crs_type = mutation_partition::rows_type;
using crs_iterator_type = std::conditional_t<reverse == consume_in_reverse::yes, crs_type::reverse_iterator, crs_type::iterator>;
using crs_iterator_type = std::conditional_t<reverse == consume_in_reverse::legacy_half_reverse || reverse == consume_in_reverse::yes, crs_type::reverse_iterator, crs_type::iterator>;
using rts_type = range_tombstone_list;
using rts_iterator_type = std::conditional_t<reverse == consume_in_reverse::yes, rts_type::reverse_iterator, rts_type::iterator>;
using rts_iterator_type = std::conditional_t<reverse == consume_in_reverse::legacy_half_reverse, rts_type::reverse_iterator, rts_type::iterator>;
if constexpr (reverse == consume_in_reverse::yes) {
s = s->make_reversed();
}
// only used when reverse == consume_in_reverse::yes
range_tombstone_list reversed_range_tombstones(*s);
crs_iterator_type crs_it, crs_end;
rts_iterator_type rts_it, rts_end;
if constexpr (reverse == consume_in_reverse::yes) {
if constexpr (reverse == consume_in_reverse::legacy_half_reverse) {
crs_it = partition.clustered_rows().rbegin();
crs_end = partition.clustered_rows().rend();
rts_it = partition.row_tombstones().rbegin();
rts_end = partition.row_tombstones().rend();
} else if constexpr (reverse == consume_in_reverse::yes) {
crs_it = partition.clustered_rows().rbegin();
crs_end = partition.clustered_rows().rend();
while (!partition.row_tombstones().empty()) {
auto rt = partition.mutable_row_tombstones().pop_front_and_lock();
rt.reverse();
reversed_range_tombstones.apply(*s, std::move(rt));
}
rts_it = reversed_range_tombstones.begin();
rts_end = reversed_range_tombstones.end();
} else {
crs_it = partition.clustered_rows().begin();
crs_end = partition.clustered_rows().end();
@@ -173,13 +205,13 @@ stop_iteration consume_clustering_fragments(const schema& s, mutation_partition&
stop_iteration stop = stop_iteration::no;
position_in_partition::tri_compare cmp(s);
position_in_partition::tri_compare cmp(*s);
while (!stop && (crs_it != crs_end || rts_it != rts_end)) {
bool emit_rt;
if (crs_it != crs_end && rts_it != rts_end) {
const auto cmp_res = cmp(rts_it->position(), crs_it->position());
if constexpr (reverse == consume_in_reverse::yes) {
if constexpr (reverse == consume_in_reverse::legacy_half_reverse) {
emit_rt = cmp_res > 0;
} else {
emit_rt = cmp_res < 0;
@@ -191,7 +223,11 @@ stop_iteration consume_clustering_fragments(const schema& s, mutation_partition&
stop = consumer.consume(std::move(rts_it->tombstone()));
++rts_it;
} else {
stop = consumer.consume(clustering_row(std::move(*crs_it)));
// Dummy rows are part of the in-memory representation but should be
// invisible to reads.
if (!crs_it->dummy()) {
stop = consumer.consume(clustering_row(std::move(*crs_it)));
}
++crs_it;
}
}
@@ -217,9 +253,11 @@ auto mutation::consume(Consumer& consumer, consume_in_reverse reverse) && -> mut
}
if (reverse == consume_in_reverse::yes) {
stop = consume_clustering_fragments<consume_in_reverse::yes>(*_ptr->_schema, partition, consumer);
stop = consume_clustering_fragments<consume_in_reverse::yes>(_ptr->_schema, partition, consumer);
} else if (reverse == consume_in_reverse::legacy_half_reverse) {
stop = consume_clustering_fragments<consume_in_reverse::legacy_half_reverse>(_ptr->_schema, partition, consumer);
} else {
stop = consume_clustering_fragments<consume_in_reverse::no>(*_ptr->_schema, partition, consumer);
stop = consume_clustering_fragments<consume_in_reverse::no>(_ptr->_schema, partition, consumer);
}
const auto stop_consuming = consumer.consume_end_of_partition();
@@ -292,3 +330,7 @@ class flat_mutation_reader;
// Reads a single partition from a reader. Returns empty optional if there are no more partitions to be read.
future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reader& reader);
// Reverses the mutation as if it was created with a schema with reverse
// clustering order. The resulting mutation will contain a reverse schema too.
mutation reverse(mutation mut);

View File

@@ -224,7 +224,7 @@ public:
, _row_limit(limit)
, _partition_limit(partition_limit)
, _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit())
, _range_tombstones(s, _slice.options.contains(query::partition_slice::option::reversed))
, _range_tombstones(s)
, _last_dk({dht::token(), partition_key::make_empty()})
{
static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction.");
@@ -238,7 +238,7 @@ public:
, _get_max_purgeable(std::move(get_max_purgeable))
, _can_gc([this] (tombstone t) { return can_gc(t); })
, _slice(s.full_slice())
, _range_tombstones(s, false)
, _range_tombstones(s)
, _last_dk({dht::token(), partition_key::make_empty()})
, _collector(std::make_unique<mutation_compactor_garbage_collector>(_schema))
{

View File

@@ -1978,8 +1978,7 @@ void reconcilable_result_builder::consume_new_partition(const dht::decorated_key
!has_ck_selector(_slice.row_ranges(_schema, dk.key()));
_static_row_is_alive = false;
_live_rows = 0;
auto is_reversed = _slice.options.contains(query::partition_slice::option::reversed);
_mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), is_reversed));
_mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), _reversed));
}
void reconcilable_result_builder::consume(tombstone t) {
@@ -2008,6 +2007,10 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom
stop_iteration reconcilable_result_builder::consume(range_tombstone&& rt) {
_memory_accounter.update(rt.memory_usage(_schema));
if (_reversed) {
// undo reversing done for the native reversed format, coordinator still uses old reversing format
rt.reverse();
}
return _mutation_consumer->consume(std::move(rt));
}
@@ -2040,7 +2043,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
auto consumer = compact_for_query<emit_only_live_rows::yes, query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
max_partitions, query_result_builder(*s, builder));
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::yes : consume_in_reverse::no;
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
for (const partition& p : r.partitions()) {
const auto res = p.mut().unfreeze(s).consume(consumer, reverse);
@@ -2059,7 +2062,7 @@ query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_l
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
auto consumer = compact_for_query<emit_only_live_rows::yes, query_result_builder>(*m.schema(), now, slice, row_limit,
query::max_partitions, query_result_builder(*m.schema(), builder));
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::yes : consume_in_reverse::no;
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
std::move(m).consume(consumer, reverse);
return builder.build();
}

View File

@@ -137,6 +137,7 @@ public:
class reconcilable_result_builder {
const schema& _schema;
const query::partition_slice& _slice;
bool _reversed;
bool _return_static_content_on_partition_with_no_rows{};
bool _static_row_is_alive{};
@@ -151,7 +152,7 @@ class reconcilable_result_builder {
public:
reconcilable_result_builder(const schema& s, const query::partition_slice& slice,
query::result_memory_accounter&& accounter) noexcept
: _schema(s), _slice(slice)
: _schema(s), _slice(slice), _reversed(_slice.options.contains(query::partition_slice::option::reversed))
, _memory_accounter(std::move(accounter))
{ }

View File

@@ -25,38 +25,51 @@
#include "range_tombstone_assembler.hh"
class mutation_rebuilder {
mutation _m;
schema_ptr _s;
mutation_opt _m;
public:
mutation_rebuilder(dht::decorated_key dk, schema_ptr s)
: _m(std::move(s), std::move(dk)) {
explicit mutation_rebuilder(schema_ptr s) : _s(std::move(s)) { }
void consume_new_partition(const dht::decorated_key& dk) {
assert(!_m);
_m = mutation(_s, std::move(dk));
}
stop_iteration consume(tombstone t) {
_m.partition().apply(t);
assert(_m);
_m->partition().apply(t);
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
_m.partition().apply_row_tombstone(*_m.schema(), std::move(rt));
assert(_m);
_m->partition().apply_row_tombstone(*_s, std::move(rt));
return stop_iteration::no;
}
stop_iteration consume(static_row&& sr) {
_m.partition().static_row().apply(*_m.schema(), column_kind::static_column, std::move(sr.cells()));
assert(_m);
_m->partition().static_row().apply(*_s, column_kind::static_column, std::move(sr.cells()));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
auto& dr = _m.partition().clustered_row(*_m.schema(), std::move(cr.key()));
assert(_m);
auto& dr = _m->partition().clustered_row(*_s, std::move(cr.key()));
dr.apply(cr.tomb());
dr.apply(cr.marker());
dr.cells().apply(*_m.schema(), column_kind::regular_column, std::move(cr.cells()));
dr.cells().apply(*_s, column_kind::regular_column, std::move(cr.cells()));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
assert(_m);
return stop_iteration::yes;
}
mutation_opt consume_end_of_stream() {
return mutation_opt(std::move(_m));
return std::move(_m);
}
};
@@ -65,10 +78,10 @@ public:
// Does not work with streams in streamed_mutation::forwarding::yes mode.
class mutation_rebuilder_v2 {
schema_ptr _s;
std::optional<mutation_rebuilder> _builder;
mutation_rebuilder _builder;
range_tombstone_assembler _rt_assembler;
public:
mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)) { }
mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { }
public:
stop_iteration consume(partition_start mf) {
consume_new_partition(mf.key());
@@ -82,47 +95,38 @@ public:
}
public:
void consume_new_partition(const dht::decorated_key& dk) {
assert(!_builder);
_builder = mutation_rebuilder(dk, _s);
_builder.consume_new_partition(dk);
}
stop_iteration consume(tombstone t) {
assert(_builder);
_builder->consume(t);
_builder.consume(t);
return stop_iteration::no;
}
stop_iteration consume(range_tombstone_change&& rt) {
assert(_builder);
if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) {
_builder->consume(std::move(*rt_opt));
_builder.consume(std::move(*rt_opt));
}
return stop_iteration::no;
}
stop_iteration consume(static_row&& sr) {
assert(_builder);
_builder->consume(std::move(sr));
_builder.consume(std::move(sr));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
assert(_builder);
_builder->consume(std::move(cr));
_builder.consume(std::move(cr));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
assert(_builder);
_rt_assembler.on_end_of_stream();
return stop_iteration::yes;
}
mutation_opt consume_end_of_stream() {
if (!_builder) {
return mutation_opt();
}
_rt_assembler.on_end_of_stream();
return _builder->consume_end_of_stream();
return _builder.consume_end_of_stream();
}
};

View File

@@ -25,6 +25,16 @@
#include "partition_slice_builder.hh"
partition_slice_builder::partition_slice_builder(const schema& schema, query::partition_slice slice)
: _regular_columns(std::move(slice.regular_columns))
, _static_columns(std::move(slice.static_columns))
, _row_ranges(std::move(slice._row_ranges))
, _specific_ranges(std::move(slice._specific_ranges))
, _schema(schema)
, _options(std::move(slice.options))
{
}
partition_slice_builder::partition_slice_builder(const schema& schema)
: _schema(schema)
{
@@ -63,7 +73,8 @@ partition_slice_builder::build() {
std::move(ranges),
std::move(static_columns),
std::move(regular_columns),
std::move(_options)
std::move(_options),
std::move(_specific_ranges)
};
}
@@ -88,6 +99,22 @@ partition_slice_builder::with_ranges(std::vector<query::clustering_range> ranges
return *this;
}
partition_slice_builder&
partition_slice_builder::mutate_ranges(std::function<void(std::vector<query::clustering_range>&)> func) {
if (_row_ranges) {
func(*_row_ranges);
}
return *this;
}
partition_slice_builder&
partition_slice_builder::mutate_specific_ranges(std::function<void(query::specific_ranges&)> func) {
if (_specific_ranges) {
func(*_specific_ranges);
}
return *this;
}
partition_slice_builder&
partition_slice_builder::with_no_regular_columns() {
_regular_columns = query::column_id_vector();

View File

@@ -40,10 +40,12 @@ class partition_slice_builder {
std::optional<query::column_id_vector> _regular_columns;
std::optional<query::column_id_vector> _static_columns;
std::optional<std::vector<query::clustering_range>> _row_ranges;
std::unique_ptr<query::specific_ranges> _specific_ranges;
const schema& _schema;
query::partition_slice::option_set _options;
public:
partition_slice_builder(const schema& schema);
partition_slice_builder(const schema& schema, query::partition_slice slice);
partition_slice_builder& with_static_column(bytes name);
partition_slice_builder& with_no_static_columns();
@@ -51,6 +53,10 @@ public:
partition_slice_builder& with_no_regular_columns();
partition_slice_builder& with_range(query::clustering_range range);
partition_slice_builder& with_ranges(std::vector<query::clustering_range>);
// noop if no ranges have been set yet
partition_slice_builder& mutate_ranges(std::function<void(std::vector<query::clustering_range>&)>);
// noop if no specific ranges have been set yet
partition_slice_builder& mutate_specific_ranges(std::function<void(query::specific_ranges&)>);
partition_slice_builder& without_partition_key_columns();
partition_slice_builder& without_clustering_key_columns();
partition_slice_builder& reversed();

View File

@@ -97,7 +97,7 @@ auto consume_page(flat_mutation_reader& reader,
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), max_size] () mutable {
if (slice.options.contains(query::partition_slice::option::reversed)) {
return with_closeable(make_reversing_reader(reader, max_size),
return with_closeable(make_reversing_reader(make_flat_mutation_reader<delegating_reader>(reader), max_size),
[reader_consumer = std::move(reader_consumer)] (flat_mutation_reader& reversing_reader) mutable {
return reversing_reader.consume(std::move(reader_consumer));
});
@@ -128,6 +128,14 @@ protected:
std::variant<flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
dht::partition_ranges_view _query_ranges;
protected:
schema_ptr underlying_schema() const {
if (is_reversed()) {
return _schema->make_reversed();
}
return _schema;
}
public:
querier_base(reader_permit permit, std::unique_ptr<const dht::partition_range> range,
std::unique_ptr<const query::partition_slice> slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges)
@@ -145,7 +153,7 @@ public:
, _permit(std::move(permit))
, _range(std::make_unique<const dht::partition_range>(std::move(range)))
, _slice(std::make_unique<const query::partition_slice>(std::move(slice)))
, _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _reader(ms.make_reader(underlying_schema(), _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _query_ranges(*_range)
{ }

View File

@@ -33,6 +33,7 @@
#include "query_class_config.hh"
class position_in_partition_view;
class partition_slice_builder;
namespace query {
@@ -110,6 +111,9 @@ public:
const clustering_row_ranges& ranges() const {
return _ranges;
}
clustering_row_ranges& ranges() {
return _ranges;
}
private:
friend std::ostream& operator<<(std::ostream& out, const specific_ranges& r);
@@ -124,7 +128,15 @@ constexpr auto max_rows_if_set = std::numeric_limits<uint32_t>::max();
// Specifies subset of rows, columns and cell attributes to be returned in a query.
// Can be accessed across cores.
// Schema-dependent.
//
// COMPATIBILITY NOTE: the partition-slice for reverse queries has two different
// format:
// * legacy format
// * native format
// The wire format uses the legacy format. See docs/design-notes/reverse-reads.md
// for more details on the formats.
class partition_slice {
friend class ::partition_slice_builder;
public:
enum class option {
send_clustering_key,
@@ -226,6 +238,13 @@ public:
friend std::ostream& operator<<(std::ostream& out, const specific_ranges& ps);
};
// See docs/design-notes/reverse-reads.md
partition_slice legacy_reverse_slice_to_native_reverse_slice(const schema& schema, partition_slice slice);
partition_slice native_reverse_slice_to_legacy_reverse_slice(const schema& schema, partition_slice slice);
// Fully reverse slice (forward to native reverse)
// Also set the reversed bit in `partition_slice::options`.
partition_slice reverse_slice(const schema& schema, partition_slice slice);
constexpr auto max_partitions = std::numeric_limits<uint32_t>::max();
// Tagged integers to disambiguate constructor arguments.

View File

@@ -29,6 +29,7 @@
#include "mutation_partition_serializer.hh"
#include "query-result-reader.hh"
#include "query_result_merger.hh"
#include "partition_slice_builder.hh"
namespace query {
@@ -114,6 +115,41 @@ void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& range
reversed ? position_in_partition_view::after_key(full_key) : position_in_partition_view::before_key(full_key), reversed);
}
static void reverse_clustering_ranges_bounds(clustering_row_ranges& ranges) {
for (auto& range : ranges) {
if (!range.is_singular()) {
range = query::clustering_range(range.end(), range.start());
}
}
}
partition_slice legacy_reverse_slice_to_native_reverse_slice(const schema& schema, partition_slice slice) {
return partition_slice_builder(schema, std::move(slice))
.mutate_ranges([] (clustering_row_ranges& ranges) { reverse_clustering_ranges_bounds(ranges); })
.mutate_specific_ranges([] (specific_ranges& ranges) { reverse_clustering_ranges_bounds(ranges.ranges()); })
.build();
}
partition_slice native_reverse_slice_to_legacy_reverse_slice(const schema& schema, partition_slice slice) {
// They are the same, we give them different names to express intent
return legacy_reverse_slice_to_native_reverse_slice(schema, std::move(slice));
}
partition_slice reverse_slice(const schema& schema, partition_slice slice) {
return partition_slice_builder(schema, std::move(slice))
.mutate_ranges([] (clustering_row_ranges& ranges) {
std::reverse(ranges.begin(), ranges.end());
reverse_clustering_ranges_bounds(ranges);
})
.mutate_specific_ranges([] (specific_ranges& sranges) {
auto& ranges = sranges.ranges();
std::reverse(ranges.begin(), ranges.end());
reverse_clustering_ranges_bounds(ranges);
})
.with_option<partition_slice::option::reversed>()
.build();
}
partition_slice::partition_slice(clustering_row_ranges row_ranges,
query::column_id_vector static_columns,
query::column_id_vector regular_columns,

View File

@@ -73,10 +73,6 @@ void range_tombstone_accumulator::update_current_tombstone() {
void range_tombstone_accumulator::drop_unneeded_tombstones(const clustering_key_prefix& ck, int w) {
auto cmp = [&] (const range_tombstone& rt, const clustering_key_prefix& ck, int w) {
if (_reversed) {
auto bv = rt.start_bound();
return _cmp(ck, w, bv.prefix(), weight(bv.kind()));
}
auto bv = rt.end_bound();
return _cmp(bv.prefix(), weight(bv.kind()), ck, w);
};
@@ -91,15 +87,11 @@ void range_tombstone_accumulator::drop_unneeded_tombstones(const clustering_key_
}
void range_tombstone_accumulator::apply(range_tombstone rt) {
if (_reversed) {
drop_unneeded_tombstones(rt.end, weight(rt.end_kind));
} else {
drop_unneeded_tombstones(rt.start, weight(rt.start_kind));
}
drop_unneeded_tombstones(rt.start, weight(rt.start_kind));
_current_tombstone.apply(rt.tomb);
auto cmp = [&] (const range_tombstone& rt1, const range_tombstone& rt2) {
return _reversed ? _cmp(rt2.start_bound(), rt1.start_bound()) : _cmp(rt1.end_bound(), rt2.end_bound());
return _cmp(rt1.end_bound(), rt2.end_bound());
};
_range_tombstones.insert(boost::upper_bound(_range_tombstones, rt, cmp), std::move(rt));
}

View File

@@ -192,6 +192,15 @@ public:
end_kind = new_end.kind();
}
// Swap bounds to reverse range-tombstone -- as if it came from a table with
// reverse native order. See docs/design-notes/reverse-reads.md.
void reverse() {
std::swap(start, end);
std::swap(start_kind, end_kind);
start_kind = reverse_kind(start_kind);
end_kind = reverse_kind(end_kind);
}
size_t external_memory_usage(const schema&) const noexcept {
return start.external_memory_usage() + end.external_memory_usage();
}
@@ -252,13 +261,12 @@ class range_tombstone_accumulator {
tombstone _partition_tombstone;
std::deque<range_tombstone> _range_tombstones;
tombstone _current_tombstone;
bool _reversed;
private:
void update_current_tombstone();
void drop_unneeded_tombstones(const clustering_key_prefix& ck, int w = 0);
public:
range_tombstone_accumulator(const schema& s, bool reversed)
: _cmp(s), _reversed(reversed) { }
explicit range_tombstone_accumulator(const schema& s)
: _cmp(s) { }
void set_partition_tombstone(tombstone t) {
_partition_tombstone = t;

View File

@@ -415,10 +415,16 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
}
}
schema::schema(const schema& o)
schema::schema(const schema& o, const std::function<void(schema&)>& transform)
: _raw(o._raw)
, _offsets(o._offsets)
{
// Do the transformation after all the raw fields are initialized, but
// *before* the derived fields are generated (from the raw ones).
if (transform) {
transform(*this);
}
rebuild();
if (o.is_view()) {
_view_info = std::make_unique<::view_info>(*this, o.view_info()->raw());
@@ -428,6 +434,23 @@ schema::schema(const schema& o)
}
}
schema::schema(const schema& o)
: schema(o, {})
{
}
schema::schema(reversed_tag, const schema& o)
: schema(o, [] (schema& s) {
s._raw._version = utils::UUID_gen::negate(s._raw._version);
for (auto& col : s._raw._columns) {
if (col.kind == column_kind::clustering_key) {
col.type = reversed(col.type);
}
}
})
{
}
lw_shared_ptr<const schema> make_shared_schema(std::optional<utils::UUID> id, std::string_view ks_name,
std::string_view cf_name, std::vector<schema::column> partition_key, std::vector<schema::column> clustering_key,
std::vector<schema::column> regular_columns, std::vector<schema::column> static_columns,
@@ -1568,6 +1591,10 @@ bool schema::equal_columns(const schema& other) const {
return boost::equal(all_columns(), other.all_columns());
}
schema_ptr schema::make_reversed() const {
return make_lw_shared<schema>(schema::reversed_tag{}, *this);
}
raw_view_info::raw_view_info(utils::UUID base_id, sstring base_name, bool include_all_columns, sstring where_clause)
: _base_id(std::move(base_id))
, _base_name(std::move(base_name))

View File

@@ -584,6 +584,10 @@ public:
}
};
class schema;
using schema_ptr = lw_shared_ptr<const schema>;
/*
* Effectively immutable.
* Not safe to access across cores because of shared_ptr's.
@@ -695,11 +699,16 @@ public:
data_type type;
};
private:
struct reversed_tag { };
lw_shared_ptr<cql3::column_specification> make_column_specification(const column_definition& def);
void rebuild();
schema(const raw_schema&, std::optional<raw_view_info>);
schema(const schema&, const std::function<void(schema&)>&);
public:
schema(const schema&);
// See \ref make_reversed().
schema(reversed_tag, const schema&);
~schema();
table_schema_version version() const {
return _raw._version;
@@ -966,6 +975,17 @@ public:
const v3_columns& v3() const {
return _v3_columns;
}
// Make a copy of the schema with reversed clustering order.
//
// The reversing is revertible, so that:
//
// s->make_reversed()->make_reversed()->version() == s->version()
//
// But note that: `s != s->make_reversed()->make_reversed()` (they are two
// different C++ objects).
// The schema's version is also reversed using UUID_gen::negate().
schema_ptr make_reversed() const;
};
lw_shared_ptr<const schema> make_shared_schema(std::optional<utils::UUID> id, std::string_view ks_name, std::string_view cf_name,
@@ -974,8 +994,6 @@ lw_shared_ptr<const schema> make_shared_schema(std::optional<utils::UUID> id, st
bool operator==(const schema&, const schema&);
using schema_ptr = lw_shared_ptr<const schema>;
/**
* Wrapper for schema_ptr used by functions that expect an engaged view_info field.
*/

View File

@@ -2052,7 +2052,10 @@ table::mutation_query(schema_ptr s,
std::exception_ptr ex;
try {
auto rrb = reconcilable_result_builder(*s, cmd.slice, std::move(accounter));
// Un-reverse the schema sent to the coordinator, it expects the
// legacy format.
auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s;
auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter));
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query);
if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) {

View File

@@ -230,3 +230,20 @@ BOOST_AUTO_TEST_CASE(test_max_time_uuid) {
auto unix_timestamp = utils::UUID_gen::unix_timestamp(uuid);
BOOST_CHECK(unix_timestamp == millis);
}
BOOST_AUTO_TEST_CASE(test_negate) {
using namespace utils;
auto original_uuid = UUID_gen::get_time_UUID();
BOOST_TEST_MESSAGE(fmt::format("original_uuid: {}", original_uuid));
auto negated_uuid = UUID_gen::negate(original_uuid);
BOOST_TEST_MESSAGE(fmt::format("negated_uuid: {}", negated_uuid));
BOOST_REQUIRE(original_uuid != negated_uuid);
auto re_negated_uuid = UUID_gen::negate(negated_uuid);
BOOST_TEST_MESSAGE(fmt::format("re_negated_uuid: {}", re_negated_uuid));
BOOST_REQUIRE(original_uuid == re_negated_uuid);
}

View File

@@ -41,6 +41,8 @@
#include "test/lib/flat_mutation_reader_assertions.hh"
#include "test/lib/log.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/random_schema.hh"
#include <boost/range/adaptor/map.hpp>
@@ -476,8 +478,8 @@ using in_thread = seastar::bool_class<class in_thread_tag>;
struct flat_stream_consumer {
schema_ptr _schema;
schema_ptr _reversed_schema;
reader_permit _permit;
reversed_partitions _reversed;
skip_after_first_fragment _skip_partition;
skip_after_first_partition _skip_stream;
std::vector<mutation> _mutations;
@@ -485,22 +487,17 @@ struct flat_stream_consumer {
bool _inside_partition = false;
private:
void verify_order(position_in_partition_view pos) {
position_in_partition::less_compare cmp(*_schema);
if (!_reversed) {
BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row()
|| cmp(*_previous_position, pos));
} else {
BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row()
|| cmp(pos, *_previous_position));
}
const schema& s = _reversed_schema ? *_reversed_schema : *_schema;
position_in_partition::less_compare cmp(s);
BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row() || cmp(*_previous_position, pos));
}
public:
flat_stream_consumer(schema_ptr s, reader_permit permit, reversed_partitions reversed,
skip_after_first_fragment skip_partition = skip_after_first_fragment::no,
skip_after_first_partition skip_stream = skip_after_first_partition::no)
: _schema(std::move(s))
, _reversed_schema(reversed ? _schema->make_reversed() : nullptr)
, _permit(std::move(permit))
, _reversed(reversed)
, _skip_partition(skip_partition)
, _skip_stream(skip_stream)
{ }
@@ -534,10 +531,13 @@ public:
}
stop_iteration consume(range_tombstone&& rt) {
BOOST_REQUIRE(_inside_partition);
auto pos = _reversed ? rt.end_position() : rt.position();
auto pos = rt.position();
verify_order(pos);
BOOST_REQUIRE_GE(_mutations.size(), 1);
_previous_position.emplace(pos);
if (_reversed_schema) {
rt.reverse(); // undo the reversing
}
_mutations.back().partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(rt)));
return stop_iteration(bool(_skip_partition));
}
@@ -564,7 +564,8 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
return fmr.consume_in_thread(std::move(fsc));
} else {
if (reversed) {
return with_closeable(make_reversing_reader(fmr, query::max_result_size(size_t(1) << 20)), [fsc = std::move(fsc)] (flat_mutation_reader& reverse_reader) mutable {
return with_closeable(make_reversing_reader(make_flat_mutation_reader<delegating_reader>(fmr), query::max_result_size(size_t(1) << 20)),
[fsc = std::move(fsc)] (flat_mutation_reader& reverse_reader) mutable {
return reverse_reader.consume(std::move(fsc));
}).get0();
}
@@ -813,11 +814,8 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
}
const uint64_t hard_limit = size_t(1) << 18;
auto reader = flat_mutation_reader_from_mutations(semaphore.make_permit(), {mut});
// need to close both readers since the reverse_reader
// doesn't own the reader passed to it by ref.
auto close_reader = deferred_close(reader);
auto reverse_reader = make_reversing_reader(reader, query::max_result_size(size_t(1) << 10, hard_limit));
auto reverse_reader = make_reversing_reader(flat_mutation_reader_from_mutations(semaphore.make_permit(), {mut}),
query::max_result_size(size_t(1) << 10, hard_limit));
auto close_reverse_reader = deferred_close(reverse_reader);
try {
@@ -840,3 +838,94 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
test_with_partition(true);
test_with_partition(false);
}
SEASTAR_THREAD_TEST_CASE(test_reverse_reader_reads_in_native_reverse_order) {
using namespace tests::data_model;
using key_range = nonwrapping_interval<mutation_description::key>;
std::mt19937 engine(tests::random::get_int<uint32_t>());
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto rnd_schema_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(1, 8));
auto rnd_schema = tests::random_schema(engine(), *rnd_schema_spec);
auto forward_schema = rnd_schema.schema();
auto reverse_schema = forward_schema->make_reversed();
auto forward_mt = make_lw_shared<memtable>(forward_schema);
auto reverse_mt = make_lw_shared<memtable>(reverse_schema);
for (size_t pk = 0; pk != 8; ++pk) {
auto mut = rnd_schema.new_mutation(pk);
if (forward_schema->has_static_columns()) {
rnd_schema.add_static_row(engine, mut);
}
auto ckeys = rnd_schema.make_ckeys(8);
for (size_t ck = 0; ck != ckeys.size(); ++ck) {
const auto& ckey = ckeys.at(ck);
if (ck % 4 == 0 && ck + 3 < ckeys.size()) {
const auto& ckey_1 = ckeys.at(ck + 1);
const auto& ckey_2 = ckeys.at(ck + 2);
const auto& ckey_3 = ckeys.at(ck + 3);
rnd_schema.delete_range(engine, mut, key_range::make({ckey, true}, {ckey_2, true}));
rnd_schema.delete_range(engine, mut, key_range::make({ckey, true}, {ckey_3, true}));
rnd_schema.delete_range(engine, mut, key_range::make({ckey_1, true}, {ckey_3, true}));
}
rnd_schema.add_row(engine, mut, ckey);
}
forward_mt->apply(mut.build(forward_schema));
reverse_mt->apply(mut.build(reverse_schema));
}
auto reversed_forward_reader = assert_that(make_reversing_reader(forward_mt->make_flat_reader(forward_schema, permit), query::max_result_size(1 << 20)));
auto reverse_reader = reverse_mt->make_flat_reader(reverse_schema, permit);
auto deferred_reverse_close = deferred_close(reverse_reader);
while (auto mf_opt = reverse_reader().get()) {
auto& mf = *mf_opt;
reversed_forward_reader.produces(*forward_schema, mf);
}
reversed_forward_reader.produces_end_of_stream();
}
SEASTAR_THREAD_TEST_CASE(test_reverse_reader_is_mutation_source) {
std::list<query::partition_slice> reversed_slices;
auto populate = [&reversed_slices] (schema_ptr s, const std::vector<mutation> &muts) {
auto reverse_schema = s->make_reversed();
auto reverse_mt = make_lw_shared<memtable>(reverse_schema);
for (const auto& mut : muts) {
reverse_mt->apply(reverse(mut));
}
return mutation_source([=, &reversed_slices] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_ptr,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
reversed_slices.emplace_back(query::reverse_slice(*schema, slice));
// We don't want the memtable reader to read in reverse.
reversed_slices.back().options.remove(query::partition_slice::option::reversed);
auto rd = make_reversing_reader(reverse_mt->make_flat_reader(schema->make_reversed(), std::move(permit), range, reversed_slices.back(), pc,
std::move(trace_ptr), streamed_mutation::forwarding::no, fwd_mr), query::max_result_size(1 << 20));
if (fwd_sm) {
return make_forwardable(std::move(rd));
}
return rd;
});
};
run_mutation_source_tests(populate);
}

View File

@@ -65,7 +65,11 @@ mutation_source make_source(std::vector<mutation> mutations) {
const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
assert(range.is_full()); // slicing not implemented yet
for (auto&& m : mutations) {
assert(m.schema() == s);
if (slice.options.contains(query::partition_slice::option::reversed)) {
assert(m.schema()->make_reversed()->version() == s->version());
} else {
assert(m.schema() == s);
}
}
return flat_mutation_reader_from_mutations(std::move(permit), mutations, slice, fwd);
});

View File

@@ -2889,7 +2889,7 @@ void check_clustering_row_summaries(const schema& schema, const clustering_row_s
}
void check_clustering_summaries(const schema& schema, const partition_summary& actual, const partition_summary& expected) {
range_tombstone_accumulator range_tombstones(schema, false);
range_tombstone_accumulator range_tombstones(schema);
range_tombstones.set_partition_tombstone(expected.tomb);
for (auto [actual_frag, expected_frag] : iterate_over_in_ordered_lockstep(actual.clustering_fragments, expected.clustering_fragments,
@@ -3122,3 +3122,69 @@ SEASTAR_THREAD_TEST_CASE(test_appending_hash_row_4567) {
// These checks are meaningful because legacy hashing is still used for old nodes.
BOOST_CHECK_EQUAL(compute_legacy_hash(r1, { 0, 1, 2 }), compute_legacy_hash(r2, { 0, 1, 2 }));
}
SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) {
std::mt19937 engine(tests::random::get_int<uint32_t>());
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto rnd_schema_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(1, 8));
auto rnd_schema = tests::random_schema(engine(), *rnd_schema_spec);
auto forward_schema = rnd_schema.schema();
auto reverse_schema = forward_schema->make_reversed();
const auto muts = tests::generate_random_mutations(
rnd_schema,
tests::default_timestamp_generator(),
tests::no_expiry_expiry_generator(),
std::uniform_int_distribution<size_t>(1, 1)).get();
class validating_consumer {
mutation_fragment_stream_validator _validator;
public:
explicit validating_consumer(const schema& s) : _validator(s) { }
void consume_new_partition(const dht::decorated_key&) {
BOOST_REQUIRE(_validator(mutation_fragment::kind::partition_start, position_in_partition_view(position_in_partition_view::partition_start_tag_t{})));
}
void consume(tombstone) { }
stop_iteration consume(static_row&& sr) {
BOOST_REQUIRE(_validator(mutation_fragment::kind::static_row, sr.position()));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
BOOST_REQUIRE(_validator(mutation_fragment::kind::clustering_row, cr.position()));
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
BOOST_REQUIRE(_validator(mutation_fragment::kind::range_tombstone, rt.position()));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
BOOST_REQUIRE(_validator(mutation_fragment::kind::partition_end, position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{})));
return stop_iteration::no;
}
void consume_end_of_stream() {
BOOST_REQUIRE(_validator.on_end_of_stream());
}
};
BOOST_TEST_MESSAGE("Forward");
{
auto mut = muts.front();
validating_consumer consumer(*forward_schema);
std::move(mut).consume(consumer, consume_in_reverse::no);
}
BOOST_TEST_MESSAGE("Reverse");
{
auto mut = muts.front();
validating_consumer consumer(*reverse_schema);
std::move(mut).consume(consumer, consume_in_reverse::yes);
}
}

View File

@@ -884,8 +884,7 @@ BOOST_AUTO_TEST_CASE(test_accumulator) {
auto ts1 = 1;
auto ts2 = 2;
testlog.info("Forward");
auto acc = range_tombstone_accumulator(*s, false);
auto acc = range_tombstone_accumulator(*s);
acc.apply(rtie(0, 4, ts1));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 0 })), tombstone(ts1, gc_now));
acc.apply(rtie(1, 2, ts2));
@@ -904,26 +903,4 @@ BOOST_AUTO_TEST_CASE(test_accumulator) {
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 13 })), tombstone(ts2, gc_now));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 14 })), tombstone());
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 15 })), tombstone());
testlog.info("Reversed");
acc = range_tombstone_accumulator(*s, true);
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 15 })), tombstone());
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 14 })), tombstone());
acc.apply(rtie(11, 14, ts2));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 13 })), tombstone(ts2, gc_now));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 12 })), tombstone(ts2, gc_now));
acc.apply(rtie(10, 12, ts1));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 11 })), tombstone(ts2, gc_now));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 10 })), tombstone(ts1, gc_now));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 9 })), tombstone());
acc.apply(rtie(6, 8, ts2));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 5 })), tombstone());
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 4 })), tombstone());
acc.apply(rtie(0, 4, ts1));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 3 })), tombstone(ts1, gc_now));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 2 })), tombstone(ts1, gc_now));
acc.apply(rtie(1, 2, ts2));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 1 })), tombstone(ts2, gc_now));
BOOST_REQUIRE_EQUAL(acc.tombstone_for_row(key({ 0 })), tombstone(ts1, gc_now));
}

View File

@@ -42,6 +42,7 @@
#include "test/lib/log.hh"
#include "serializer_impl.hh"
#include "cdc/cdc_extension.hh"
#include "utils/UUID_gen.hh"
SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) {
return do_with_cql_env([](cql_test_env& e) {
@@ -807,3 +808,26 @@ SEASTAR_TEST_CASE(test_schema_tables_use_null_sharder) {
}).get();
}, raft_cql_test_config());
}
SEASTAR_TEST_CASE(test_schema_make_reversed) {
auto schema = schema_builder("tests", get_name())
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("ck", bytes_type, column_kind::clustering_key)
.with_column("v1", bytes_type)
.build();
testlog.info(" schema->version(): {}", schema->version());
auto reversed_schema = schema->make_reversed();
testlog.info(" reversed_schema->version(): {}", reversed_schema->version());
BOOST_REQUIRE(schema->version() != reversed_schema->version());
BOOST_REQUIRE(utils::UUID_gen::negate(schema->version()) == reversed_schema->version());
auto re_reversed_schema = reversed_schema->make_reversed();
testlog.info("re_reversed_schema->version(): {}", re_reversed_schema->version());
BOOST_REQUIRE(schema->version() == re_reversed_schema->version());
BOOST_REQUIRE(reversed_schema->version() != re_reversed_schema->version());
return make_ready_future<>();
}

View File

@@ -406,7 +406,8 @@ static void test_streamed_mutation_forwarding_is_consistent_with_slicing(tests::
void consume_new_partition(const dht::decorated_key& dk) {
assert(!_builder);
_builder = mutation_rebuilder(dk, std::move(_s));
_builder = mutation_rebuilder(std::move(_s));
_builder->consume_new_partition(dk);
}
stop_iteration consume(tombstone t) {
@@ -842,7 +843,7 @@ static void test_streamed_mutation_forwarding_across_range_tombstones(tests::rea
}
static void test_range_queries(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
testlog.info("Testing range queries");
testlog.info(__PRETTY_FUNCTION__);
auto s = schema_builder("ks", "cf")
.with_column("key", bytes_type, column_kind::partition_key)
@@ -1196,7 +1197,7 @@ static void test_clustering_slices(tests::reader_concurrency_semaphore_wrapper&
}
static void test_query_only_static_row(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
testlog.info(__PRETTY_FUNCTION__);
simple_schema s;
@@ -1242,7 +1243,7 @@ static void test_query_only_static_row(tests::reader_concurrency_semaphore_wrapp
}
static void test_query_no_clustering_ranges_no_static_columns(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
testlog.info(__PRETTY_FUNCTION__);
simple_schema s(simple_schema::with_static::no);
@@ -1286,6 +1287,8 @@ static void test_query_no_clustering_ranges_no_static_columns(tests::reader_conc
}
void test_streamed_mutation_forwarding_succeeds_with_no_data(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
testlog.info(__PRETTY_FUNCTION__);
simple_schema s;
auto cks = s.make_ckeys(6);
@@ -1323,6 +1326,8 @@ void test_streamed_mutation_forwarding_succeeds_with_no_data(tests::reader_concu
static
void test_slicing_with_overlapping_range_tombstones(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
testlog.info(__PRETTY_FUNCTION__);
simple_schema ss;
auto s = ss.schema();
@@ -1420,6 +1425,8 @@ void test_slicing_with_overlapping_range_tombstones(tests::reader_concurrency_se
}
void test_downgrade_to_v1_clear_buffer(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
testlog.info(__PRETTY_FUNCTION__);
simple_schema s;
auto pkey = s.make_pkey();
sstring value(256, 'v');
@@ -1444,6 +1451,8 @@ void test_downgrade_to_v1_clear_buffer(tests::reader_concurrency_semaphore_wrapp
}
void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
testlog.info(__PRETTY_FUNCTION__);
simple_schema s;
auto pkey = s.make_pkey();
@@ -1634,7 +1643,8 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
}
void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
BOOST_TEST_MESSAGE(__PRETTY_FUNCTION__);
testlog.info(__PRETTY_FUNCTION__);
for_each_mutation([&] (const mutation& m) mutable {
const auto query_time = gc_clock::now();
@@ -1661,6 +1671,8 @@ void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaph
void test_next_partition(tests::reader_concurrency_semaphore_wrapper&, populate_fn_ex);
void run_mutation_reader_tests(populate_fn_ex populate, bool with_partition_range_forwarding) {
testlog.info(__PRETTY_FUNCTION__);
tests::reader_concurrency_semaphore_wrapper semaphore;
test_range_tombstones_v2(semaphore, populate);
@@ -1688,6 +1700,8 @@ void run_mutation_reader_tests(populate_fn_ex populate, bool with_partition_rang
}
void test_next_partition(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) {
testlog.info(__PRETTY_FUNCTION__);
simple_schema s;
auto pkeys = s.make_pkeys(4);

View File

@@ -3436,3 +3436,11 @@ std::ostream& operator<<(std::ostream& out, const data_value& v) {
shared_ptr<const reversed_type_impl> reversed_type_impl::get_instance(data_type type) {
return intern::get_instance(std::move(type));
}
data_type reversed(data_type type) {
if (type->is_reversed()) {
return type->underlying_type();
} else {
return reversed_type_impl::get_instance(type);
}
}

View File

@@ -829,6 +829,10 @@ public:
};
using reversed_type = shared_ptr<const reversed_type_impl>;
// Reverse the sort order of the type by wrapping in or stripping reversed_type,
// as needed.
data_type reversed(data_type);
class map_type_impl;
using map_type = shared_ptr<const map_type_impl>;

View File

@@ -168,6 +168,25 @@ UUID UUID_gen::get_name_UUID(const unsigned char *s, size_t len) {
return get_UUID(digest);
}
UUID UUID_gen::negate(UUID o) {
auto lsb = o.get_least_significant_bits();
const long clock_mask = 0x0000000000003FFFL;
// We flip the node-and-clock-seq octet of the UUID for time-UUIDs. This
// creates a virtual node with a time which cannot be generated anymore, so
// is safe against collisions.
// For name UUIDs we flip the same octet. Name UUIDs being an md5 hash over
// a buffer, flipping any bit should be safe against collisions.
long clock = (lsb >> 48) & clock_mask;
clock = ~clock & clock_mask;
lsb &= ~(clock_mask << 48); // zero current clock
lsb |= (clock << 48); // write new clock
return UUID(o.get_most_significant_bits(), lsb);
}
const thread_local int64_t UUID_gen::spoof_node = make_thread_local_node(make_random_node());
const thread_local int64_t UUID_gen::clock_seq_and_node = make_clock_seq_and_node();
thread_local UUID_gen UUID_gen::_instance;

View File

@@ -397,6 +397,16 @@ public:
(0x0fff000000000000UL & msb) >> 48 |
0x0000000000001000L); // sets the version to 1.
}
// Produce an UUID which is derived from this UUID in a reversible manner
//
// Such that:
//
// auto original_uuid = UUID_gen::get_time_UUID();
// auto negated_uuid = UUID_gen::negate(original_uuid);
// assert(original_uuid != negated_uuid);
// assert(original_uuid == UUID_gen::negate(negated_uuid));
static UUID negate(UUID);
};
// for the curious, here is how I generated START_EPOCH