treewide: switch to native reversed format for reverse reads

We define the native reverse format as a reversed mutation fragment
stream that is identical to one that would be emitted by a table with
the same schema but with reversed clustering order. The main difference
to the current format is how range tombstones are handled: instead of
looking at their start or end bound depending on the order, we always
use them as-usual and the reversing reader swaps their bounds to
facilitate this. This allows us to treat reversed streams completely
transparently: just pass along them a reversed schema and all the
reader, compacting and result building code is happily ignorant about
the fact that it is a reversed stream.
This commit is contained in:
Botond Dénes
2021-08-24 16:12:49 +03:00
parent 0af5a8add0
commit 502a45ad58
12 changed files with 74 additions and 28 deletions

View File

@@ -1376,6 +1376,11 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
const auto reversed = cmd.slice.options.contains(query::partition_slice::option::reversed);
if (reversed) {
s = s->make_reversed();
}
column_family& cf = find_column_family(cmd.cf_id);
auto& semaphore = get_reader_concurrency_semaphore();
auto class_config = query::query_class_config{.semaphore = semaphore, .max_memory_for_unlimited_query = *cmd.max_result_size};
@@ -1429,6 +1434,11 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
future<std::tuple<reconcilable_result, cache_temperature>>
database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
const auto reversed = cmd.slice.options.contains(query::partition_slice::option::reversed);
if (reversed) {
s = s->make_reversed();
}
const auto short_read_allwoed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
auto accounter = co_await get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allwoed);
column_family& cf = find_column_family(cmd.cf_id);

View File

@@ -107,13 +107,14 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
private:
stop_iteration emit_partition() {
auto emit_range_tombstone = [&] {
auto it = std::prev(_range_tombstones.end());
// _range_tombstones uses the reverse schema already, so we can use `begin()`
auto it = _range_tombstones.begin();
push_mutation_fragment(*_schema, _permit, _range_tombstones.pop(it));
};
position_in_partition::less_compare cmp(*_schema);
position_in_partition::tri_compare cmp(*_schema);
while (!_mutation_fragments.empty() && !is_buffer_full()) {
auto& mf = _mutation_fragments.top();
if (!_range_tombstones.empty() && !cmp(_range_tombstones.rbegin()->end_position(), mf.position())) {
if (!_range_tombstones.empty() && cmp(_range_tombstones.begin()->position(), mf.position()) <= 0) {
emit_range_tombstone();
} else {
_stack_size -= mf.memory_usage();
@@ -148,7 +149,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
} else if (mf.is_range_tombstone()) {
_range_tombstones.apply(*_schema, std::move(mf.as_range_tombstone()));
auto&& rt = std::move(mf).as_range_tombstone();
rt.reverse();
_range_tombstones.apply(*_schema, std::move(rt));
} else {
_mutation_fragments.emplace(std::move(mf));
_stack_size += _mutation_fragments.top().memory_usage();
@@ -182,7 +185,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
}
public:
explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, query::max_result_size max_size)
: flat_mutation_reader::impl(mr.schema(), mr.permit())
: flat_mutation_reader::impl(mr.schema()->make_reversed(), mr.permit())
, _source(&mr)
, _range_tombstones(*_schema)
, _max_size(max_size)

View File

@@ -890,14 +890,17 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer) {
flat_mutation_reader
make_generating_reader(schema_ptr s, reader_permit permit, std::function<future<mutation_fragment_opt> ()> get_next_fragment);
/// A reader that emits partitions in reverse.
/// A reader that emits partitions in native reverse order.
///
/// 1. Static row is still emitted first.
/// 2. Range tombstones are ordered by their end position.
/// 3. Clustered rows and range tombstones are emitted in descending order.
/// Because of 2 and 3 the guarantee that a range tombstone is emitted before
/// 1. The reader's schema() method will return a reversed schema (see
/// \ref schema::make_reversed()).
/// 2. Static row is still emitted first.
/// 3. Range tombstones' bounds are reversed (see \ref range_tombstone::reverse()).
/// 4. Clustered rows and range tombstones are emitted in descending order.
/// Because of 3 and 4 the guarantee that a range tombstone is emitted before
/// any mutation fragment affected by it still holds.
/// Ordering of partitions themselves remains unchanged.
/// For more details see docs/design-notes/reverse-reads.md.
///
/// \param original the reader to be reversed, has to be kept alive while the
/// reversing reader is in use.

View File

@@ -30,6 +30,12 @@ class specific_ranges {
std::vector<nonwrapping_range<clustering_key_prefix>> ranges();
};
// COMPATIBILITY NOTE: the partition-slice for reverse queries has two different
// format:
// * legacy format
// * native format
// The wire format uses the legacy format. See docs/design-notes/reverse-reads.md
// for more details on the formats.
class partition_slice {
std::vector<nonwrapping_range<clustering_key_prefix>> default_row_ranges();
utils::small_vector<uint32_t, 8> static_columns;

View File

@@ -224,7 +224,7 @@ public:
, _row_limit(limit)
, _partition_limit(partition_limit)
, _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit())
, _range_tombstones(s, _slice.options.contains(query::partition_slice::option::reversed))
, _range_tombstones(s, false)
, _last_dk({dht::token(), partition_key::make_empty()})
{
static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction.");

View File

@@ -1978,8 +1978,7 @@ void reconcilable_result_builder::consume_new_partition(const dht::decorated_key
!has_ck_selector(_slice.row_ranges(_schema, dk.key()));
_static_row_is_alive = false;
_live_rows = 0;
auto is_reversed = _slice.options.contains(query::partition_slice::option::reversed);
_mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), is_reversed));
_mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), _reversed));
}
void reconcilable_result_builder::consume(tombstone t) {
@@ -2008,6 +2007,10 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom
stop_iteration reconcilable_result_builder::consume(range_tombstone&& rt) {
_memory_accounter.update(rt.memory_usage(_schema));
if (_reversed) {
// undo reversing done for the native reversed format, coordinator still uses old reversing format
rt.reverse();
}
return _mutation_consumer->consume(std::move(rt));
}

View File

@@ -137,6 +137,7 @@ public:
class reconcilable_result_builder {
const schema& _schema;
const query::partition_slice& _slice;
bool _reversed;
bool _return_static_content_on_partition_with_no_rows{};
bool _static_row_is_alive{};
@@ -151,7 +152,7 @@ class reconcilable_result_builder {
public:
reconcilable_result_builder(const schema& s, const query::partition_slice& slice,
query::result_memory_accounter&& accounter) noexcept
: _schema(s), _slice(slice)
: _schema(s), _slice(slice), _reversed(_slice.options.contains(query::partition_slice::option::reversed))
, _memory_accounter(std::move(accounter))
{ }

View File

@@ -128,6 +128,14 @@ protected:
std::variant<flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
dht::partition_ranges_view _query_ranges;
protected:
schema_ptr underlying_schema() const {
if (is_reversed()) {
return _schema->make_reversed();
}
return _schema;
}
public:
querier_base(reader_permit permit, std::unique_ptr<const dht::partition_range> range,
std::unique_ptr<const query::partition_slice> slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges)
@@ -145,7 +153,7 @@ public:
, _permit(std::move(permit))
, _range(std::make_unique<const dht::partition_range>(std::move(range)))
, _slice(std::make_unique<const query::partition_slice>(std::move(slice)))
, _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _reader(ms.make_reader(underlying_schema(), _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _query_ranges(*_range)
{ }

View File

@@ -128,6 +128,13 @@ constexpr auto max_rows_if_set = std::numeric_limits<uint32_t>::max();
// Specifies subset of rows, columns and cell attributes to be returned in a query.
// Can be accessed across cores.
// Schema-dependent.
//
// COMPATIBILITY NOTE: the partition-slice for reverse queries has two different
// format:
// * legacy format
// * native format
// The wire format uses the legacy format. See docs/design-notes/reverse-reads.md
// for more details on the formats.
class partition_slice {
friend class ::partition_slice_builder;
public:

View File

@@ -2052,7 +2052,10 @@ table::mutation_query(schema_ptr s,
std::exception_ptr ex;
try {
auto rrb = reconcilable_result_builder(*s, cmd.slice, std::move(accounter));
// Un-reverse the schema sent to the coordinator, it expects the
// legacy format.
auto result_schema = cmd.slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s;
auto rrb = reconcilable_result_builder(*result_schema, cmd.slice, std::move(accounter));
auto r = co_await q.consume_page(std::move(rrb), cmd.get_row_limit(), cmd.partition_limit, cmd.timestamp, class_config.max_memory_for_unlimited_query);
if (!saved_querier || (!q.are_limits_reached() && !r.is_short_read())) {

View File

@@ -476,8 +476,8 @@ using in_thread = seastar::bool_class<class in_thread_tag>;
struct flat_stream_consumer {
schema_ptr _schema;
schema_ptr _reversed_schema;
reader_permit _permit;
reversed_partitions _reversed;
skip_after_first_fragment _skip_partition;
skip_after_first_partition _skip_stream;
std::vector<mutation> _mutations;
@@ -485,22 +485,17 @@ struct flat_stream_consumer {
bool _inside_partition = false;
private:
void verify_order(position_in_partition_view pos) {
position_in_partition::less_compare cmp(*_schema);
if (!_reversed) {
BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row()
|| cmp(*_previous_position, pos));
} else {
BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row()
|| cmp(pos, *_previous_position));
}
const schema& s = _reversed_schema ? *_reversed_schema : *_schema;
position_in_partition::less_compare cmp(s);
BOOST_REQUIRE(!_previous_position || _previous_position->is_static_row() || cmp(*_previous_position, pos));
}
public:
flat_stream_consumer(schema_ptr s, reader_permit permit, reversed_partitions reversed,
skip_after_first_fragment skip_partition = skip_after_first_fragment::no,
skip_after_first_partition skip_stream = skip_after_first_partition::no)
: _schema(std::move(s))
, _reversed_schema(reversed ? _schema->make_reversed() : nullptr)
, _permit(std::move(permit))
, _reversed(reversed)
, _skip_partition(skip_partition)
, _skip_stream(skip_stream)
{ }
@@ -534,10 +529,13 @@ public:
}
stop_iteration consume(range_tombstone&& rt) {
BOOST_REQUIRE(_inside_partition);
auto pos = _reversed ? rt.end_position() : rt.position();
auto pos = rt.position();
verify_order(pos);
BOOST_REQUIRE_GE(_mutations.size(), 1);
_previous_position.emplace(pos);
if (_reversed_schema) {
rt.reverse(); // undo the reversing
}
_mutations.back().partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(rt)));
return stop_iteration(bool(_skip_partition));
}

View File

@@ -65,7 +65,11 @@ mutation_source make_source(std::vector<mutation> mutations) {
const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
assert(range.is_full()); // slicing not implemented yet
for (auto&& m : mutations) {
assert(m.schema() == s);
if (slice.options.contains(query::partition_slice::option::reversed)) {
assert(m.schema()->make_reversed()->version() == s->version());
} else {
assert(m.schema() == s);
}
}
return flat_mutation_reader_from_mutations(std::move(permit), mutations, slice, fwd);
});