treewide: move reversing to the mutation sources

Push down reversing to the mutation-sources proper, instead of doing it
on the querier level. This will allow us to test reverse reads on the
mutation source level.
The `max_size` parameter of `consume_page()` is now unused but is not
removed in this patch, it will be removed in a follow-up to reduce
churn.
This commit is contained in:
Botond Dénes
2021-09-23 15:43:59 +03:00
parent c7619de929
commit 41facb3270
11 changed files with 153 additions and 111 deletions

View File

@@ -110,12 +110,20 @@ mutation_source streaming_virtual_table::as_mutation_source() {
return mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const query::partition_slice& query_slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
std::unique_ptr<query::partition_slice> unreversed_slice;
const auto reversed = query_slice.options.contains(query::partition_slice::option::reversed);
if (reversed) {
s = s->make_reversed();
unreversed_slice = std::make_unique<query::partition_slice>(query::half_reverse_slice(*s, query_slice));
}
const auto& slice = reversed ? *unreversed_slice : query_slice;
// We cannot pass the partition_range directly to execute()
// because it is not guaranteed to be alive until execute() resolves.
// It is only guaranteed to be alive as long as the returned reader is alive.
@@ -172,6 +180,10 @@ mutation_source streaming_virtual_table::as_mutation_source() {
});
}
if (reversed) {
rd = make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice));
}
if (fwd == streamed_mutation::forwarding::yes) {
rd = make_forwardable(std::move(rd));
}

View File

@@ -671,7 +671,7 @@ partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) {
}
flat_mutation_reader
memtable::make_flat_reader(schema_ptr s,
memtable::do_make_flat_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
@@ -702,15 +702,41 @@ memtable::make_flat_reader(schema_ptr s,
rd.upgrade_schema(s);
return rd;
} else {
auto res = make_flat_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), std::move(permit), range, slice, pc, fwd_mr);
if (fwd == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(res));
} else {
return res;
}
return make_flat_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), std::move(permit), range, slice, pc, fwd_mr);
}
}
flat_mutation_reader
memtable::make_flat_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& query_slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
// When the memtable is flushed while a scanning read is ongoing, an sstable
// reader is created to replace the memtable reader mid-air. This sstable
// reader will get the memtable's slice. We don't want this sstable reader
// to read in reverse as we are reversing the stream on top of the memtable
// reader. Unreverse the slice here so when it is passed to the sstable
// reader it doesn't try to read in reverse. This is not required technically
// for single partition reads, but we do it anyway to keep things simple.
std::unique_ptr<query::partition_slice> unreversed_slice;
const auto reversed = query_slice.options.contains(query::partition_slice::option::reversed);
auto fwd_sm = fwd;
if (reversed) {
fwd_sm = streamed_mutation::forwarding::no;
s = s->make_reversed();
unreversed_slice = std::make_unique<query::partition_slice>(query::half_reverse_slice(*s, query_slice));
}
const auto& slice = reversed ? *unreversed_slice : query_slice;
auto rd = do_make_flat_reader(std::move(s), permit, range, slice, pc, std::move(trace_state_ptr), fwd_sm, fwd_mr);
if (reversed) {
rd = make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice));
}
if (fwd && (reversed || !query::is_single_partition(range) || fwd_mr)) {
rd = make_forwardable(std::move(rd));
}
return rd;
}
flat_mutation_reader
memtable::make_flush_reader(schema_ptr s, reader_permit permit, const io_priority_class& pc) {
if (group()) {

View File

@@ -177,6 +177,8 @@ private:
void remove_flushed_memory(uint64_t);
void clear() noexcept;
uint64_t dirty_size() const;
flat_mutation_reader do_make_flat_reader(schema_ptr, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr);
public:
explicit memtable(schema_ptr schema, dirty_memory_manager&, table_stats& table_stats, memtable_list *memtable_list = nullptr,
seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group());

View File

@@ -816,24 +816,6 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
});
}
namespace {
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards_in_reverse(
distributed<database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
query::result_options opts,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
auto [res, ct] = co_await query_mutations_on_all_shards(db, s, cmd, ranges, std::move(trace_state), timeout);
co_return std::tuple(
make_foreign(make_lw_shared<query::result>(to_data_query_result(*res, s, cmd.slice, cmd.get_row_limit(), cmd.partition_limit, opts))),
ct);
}
} // anonymous namespace
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards(
distributed<database>& db,
schema_ptr s,
@@ -843,12 +825,7 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
if (cmd.slice.options.contains(query::partition_slice::option::reversed)) {
// FIXME: #1413
// It is not worth it to add support for the current inefficient way of
// doing reverse queries to the multishard reader, so just use the
// reconcilable result result format and reverse individual partitions
// when converting to the final query::result.
return query_data_on_all_shards_in_reverse(db, std::move(s), cmd, ranges, opts, std::move(trace_state), timeout);
s = s->make_reversed();
}
return do_query_on_all_shards<data_query_result_builder>(db, s, cmd, ranges, std::move(trace_state), timeout,
[s, &cmd, opts] (query::result_memory_accounter&& accounter) {

View File

@@ -157,6 +157,9 @@ partition_presence_checker make_default_partition_presence_checker() {
// independent mutation_reader.
// The reader returns mutations having all the same schema, the one passed
// when invoking the source.
// When reading in reverse, a reverse schema has to be passed (compared to the
// table's schema), and a half-reverse (legacy) slice.
// See docs/design-notes/reverse-reads.md for more details.
class mutation_source {
using partition_range = const dht::partition_range&;
using io_priority = const io_priority_class&;

View File

@@ -95,17 +95,7 @@ auto consume_page(flat_mutation_reader& reader,
compaction_state,
clustering_position_tracker(std::move(consumer), last_ckey));
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), max_size] () mutable {
if (slice.options.contains(query::partition_slice::option::reversed)) {
return with_closeable(make_reversing_reader(make_flat_mutation_reader<delegating_reader>(reader), max_size),
[reader_consumer = std::move(reader_consumer)] (flat_mutation_reader& reversing_reader) mutable {
return reversing_reader.consume(std::move(reader_consumer));
});
}
return reader.consume(std::move(reader_consumer));
};
return consume().then([last_ckey] (auto&&... results) mutable {
return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable {
static_assert(sizeof...(results) <= 1);
return make_ready_future<std::tuple<std::optional<clustering_key_prefix>, std::decay_t<decltype(results)>...>>(std::tuple(std::move(*last_ckey), std::move(results)...));
});
@@ -128,14 +118,6 @@ protected:
std::variant<flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
dht::partition_ranges_view _query_ranges;
protected:
schema_ptr underlying_schema() const {
if (is_reversed()) {
return _schema->make_reversed();
}
return _schema;
}
public:
querier_base(reader_permit permit, std::unique_ptr<const dht::partition_range> range,
std::unique_ptr<const query::partition_slice> slice, flat_mutation_reader reader, dht::partition_ranges_view query_ranges)
@@ -153,7 +135,7 @@ public:
, _permit(std::move(permit))
, _range(std::make_unique<const dht::partition_range>(std::move(range)))
, _slice(std::make_unique<const query::partition_slice>(std::move(slice)))
, _reader(ms.make_reader(underlying_schema(), _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _reader(ms.make_reader(_schema, _permit, *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _query_ranges(*_range)
{ }

View File

@@ -725,7 +725,7 @@ row_cache::make_scanning_reader(const dht::partition_range& range, std::unique_p
}
flat_mutation_reader
row_cache::make_reader(schema_ptr s,
row_cache::do_make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
@@ -741,7 +741,7 @@ row_cache::make_reader(schema_ptr s,
if (query::is_single_partition(range) && !fwd_mr) {
tracing::trace(trace_state, "Querying cache for range {} and slice {}",
range, seastar::value_of([&slice] { return slice.get_all_ranges(); }));
auto mr = _read_section(_tracker.region(), [&] {
return _read_section(_tracker.region(), [&] {
dht::ring_position_comparator cmp(*_schema);
auto&& pos = range.start()->value();
partitions_type::bound_hint hint;
@@ -759,24 +759,41 @@ row_cache::make_reader(schema_ptr s,
return make_flat_mutation_reader<single_partition_populating_reader>(*this, make_context());
}
});
if (fwd == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
} else {
return mr;
}
}
tracing::trace(trace_state, "Scanning cache for range {} and slice {}",
range, seastar::value_of([&slice] { return slice.get_all_ranges(); }));
auto mr = make_scanning_reader(range, make_context());
if (fwd == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
} else {
return mr;
}
return make_scanning_reader(range, make_context());
}
flat_mutation_reader
row_cache::make_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& query_slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
// We want to do the reversing on top of the cache reader so we have to
// un-reverse the slice so that underlying mutation sources don't try to
// reverse themselves. Once the cache supports reading in reverse itself,
// we can pass on the reverse slice.
std::unique_ptr<query::partition_slice> unreversed_slice;
const auto reversed = query_slice.options.contains(query::partition_slice::option::reversed);
if (reversed) {
s = s->make_reversed();
unreversed_slice = std::make_unique<query::partition_slice>(query::half_reverse_slice(*s, query_slice));
}
const auto& slice = reversed ? *unreversed_slice : query_slice;
auto rd = do_make_reader(std::move(s), permit, range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
if (reversed) {
rd = make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice));
}
if (fwd == streamed_mutation::forwarding::yes) {
rd = make_forwardable(std::move(rd));
}
return rd;
}
row_cache::~row_cache() {
with_allocator(_tracker.allocator(), [this] {

View File

@@ -359,6 +359,9 @@ private:
//
// internal_updater is only kept alive until its invocation returns.
future<> do_update(external_updater eu, internal_updater iu) noexcept;
flat_mutation_reader do_make_reader(schema_ptr, reader_permit permit, const dht::partition_range&, const query::partition_slice&,
const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding);
public:
~row_cache();
row_cache(schema_ptr, snapshot_source, cache_tracker&, is_continuous = is_continuous::no);

View File

@@ -875,12 +875,13 @@ time_series_sstable_set::create_single_key_sstable_reader(
return false;
};
auto reversed = slice.options.contains(query::partition_slice::option::reversed);
// Note that `sstable_position_reader_queue` always includes a reader which emits a `partition_start` fragment,
// guaranteeing that the reader we return emits it as well; this helps us avoid the problem from #3552.
return make_clustering_combined_reader(
schema, permit, fwd_sm,
make_position_reader_queue(
std::move(create_reader), std::move(filter), *pos.key(), schema, permit, fwd_sm, false));
std::move(create_reader), std::move(filter), *pos.key(), schema, permit, fwd_sm, reversed));
}
compound_sstable_set::compound_sstable_set(schema_ptr schema, std::vector<lw_shared_ptr<sstable_set>> sets)

View File

@@ -2139,10 +2139,27 @@ sstable::make_reader_v1(
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor& mon) {
if (_version >= version_types::mc) {
return downgrade_to_v1(mx::make_reader(shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon));
const auto reversed = slice.options.contains(query::partition_slice::option::reversed);
auto fwd_sm = fwd;
if (reversed) {
fwd_sm = streamed_mutation::forwarding::no;
schema = schema->make_reversed();
}
return kl::make_reader(shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon);
flat_mutation_reader rd(nullptr);
if (_version >= version_types::mc) {
rd = downgrade_to_v1(mx::make_reader(shared_from_this(), std::move(schema), permit, range, slice, pc, std::move(trace_state), fwd_sm, fwd_mr, mon));
} else {
rd = kl::make_reader(shared_from_this(), std::move(schema), permit, range, slice, pc, std::move(trace_state), fwd_sm, fwd_mr, mon);
}
if (reversed) {
rd = make_reversing_reader(std::move(rd), permit.max_result_size());
if (fwd) {
rd = make_forwardable(std::move(rd));
}
}
return rd;
}
flat_mutation_reader_v2

View File

@@ -94,7 +94,8 @@ static reconcilable_result mutation_query(schema_ptr s, reader_permit permit, co
auto querier = query::mutation_querier(source, s, std::move(permit), range, slice, service::get_local_sstable_query_read_priority(), {});
auto close_querier = deferred_close(querier);
auto rrb = reconcilable_result_builder(*s, slice, make_accounter());
auto table_schema = slice.options.contains(query::partition_slice::option::reversed) ? s->make_reversed() : s;
auto rrb = reconcilable_result_builder(*table_schema, slice, make_accounter());
return querier.consume_page(std::move(rrb), row_limit, partition_limit, query_time,
query::max_result_size(std::numeric_limits<uint64_t>::max())).get();
}
@@ -200,28 +201,29 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
return seastar::async([] {
auto s = make_schema();
auto table_schema = make_schema();
auto query_schema = table_schema->make_reversed();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto now = gc_clock::now();
mutation m1(s, partition_key::from_single_value(*s, "key1"));
mutation m1(table_schema, partition_key::from_single_value(*table_schema, "key1"));
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("C")), "v1", data_value(bytes("C_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("D")), "v1", data_value(bytes("D_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("E")), "v1", data_value(bytes("E_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*table_schema, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*table_schema, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*table_schema, bytes("C")), "v1", data_value(bytes("C_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*table_schema, bytes("D")), "v1", data_value(bytes("D_v1")), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*table_schema, bytes("E")), "v1", data_value(bytes("E_v1")), 1);
auto src = make_source({m1});
{
auto slice = partition_slice_builder(*s)
auto slice = partition_slice_builder(*query_schema)
.reversed()
.build();
reconcilable_result result = mutation_query(s, semaphore.make_permit(), src, query::full_partition_range, slice, 3, query::max_partitions, now);
reconcilable_result result = mutation_query(query_schema, semaphore.make_permit(), src, query::full_partition_range, slice, 3, query::max_partitions, now);
assert_that(to_result_set(result, s, slice))
assert_that(to_result_set(result, table_schema, slice))
.has_size(3)
.has(a_row()
.with_column("pk", data_value(bytes("key1")))
@@ -238,19 +240,19 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
auto slice = partition_slice_builder(*s)
auto slice = partition_slice_builder(*query_schema)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("E"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("E"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("D"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("D"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("C"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("C"))))
.reversed()
.build();
reconcilable_result result = mutation_query(s, semaphore.make_permit(), src, query::full_partition_range, slice, 3, query::max_partitions, now);
reconcilable_result result = mutation_query(query_schema, semaphore.make_permit(), src, query::full_partition_range, slice, 3, query::max_partitions, now);
assert_that(to_result_set(result, s, slice))
assert_that(to_result_set(result, table_schema, slice))
.has_size(3)
.has(a_row()
.with_column("pk", data_value(bytes("key1")))
@@ -267,17 +269,17 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
auto slice = partition_slice_builder(*s)
auto slice = partition_slice_builder(*query_schema)
.with_range(query::clustering_range(
{clustering_key_prefix::from_single_value(*s, bytes("C"))},
{clustering_key_prefix::from_single_value(*s, bytes("E"))}))
{clustering_key_prefix::from_single_value(*query_schema, bytes("C"))},
{clustering_key_prefix::from_single_value(*query_schema, bytes("E"))}))
.reversed()
.build();
{
reconcilable_result result = mutation_query(s, semaphore.make_permit(), src, query::full_partition_range, slice, 10, query::max_partitions, now);
reconcilable_result result = mutation_query(query_schema, semaphore.make_permit(), src, query::full_partition_range, slice, 10, query::max_partitions, now);
assert_that(to_result_set(result, s, slice))
assert_that(to_result_set(result, table_schema, slice))
.has_size(3)
.has(a_row()
.with_column("pk", data_value(bytes("key1")))
@@ -294,9 +296,9 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
reconcilable_result result = mutation_query(s, semaphore.make_permit(), src, query::full_partition_range, slice, 1, query::max_partitions, now);
reconcilable_result result = mutation_query(query_schema, semaphore.make_permit(), src, query::full_partition_range, slice, 1, query::max_partitions, now);
assert_that(to_result_set(result, s, slice))
assert_that(to_result_set(result, table_schema, slice))
.has_size(1)
.has(a_row()
.with_column("pk", data_value(bytes("key1")))
@@ -305,9 +307,9 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
reconcilable_result result = mutation_query(s, semaphore.make_permit(), src, query::full_partition_range, slice, 2, query::max_partitions, now);
reconcilable_result result = mutation_query(query_schema, semaphore.make_permit(), src, query::full_partition_range, slice, 2, query::max_partitions, now);
assert_that(to_result_set(result, s, slice))
assert_that(to_result_set(result, table_schema, slice))
.has_size(2)
.has(a_row()
.with_column("pk", data_value(bytes("key1")))
@@ -321,19 +323,19 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
auto slice = partition_slice_builder(*s)
auto slice = partition_slice_builder(*query_schema)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("E"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("E"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("D"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("D"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("C"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("C"))))
.reversed()
.build();
reconcilable_result result = mutation_query(s, semaphore.make_permit(), src, query::full_partition_range, slice, 2, query::max_partitions, now);
reconcilable_result result = mutation_query(query_schema, semaphore.make_permit(), src, query::full_partition_range, slice, 2, query::max_partitions, now);
assert_that(to_result_set(result, s, slice))
assert_that(to_result_set(result, table_schema, slice))
.has_size(2)
.has(a_row()
.with_column("pk", data_value(bytes("key1")))
@@ -346,17 +348,17 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
auto slice = partition_slice_builder(*s)
auto slice = partition_slice_builder(*query_schema)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("E"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("E"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("C"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("C"))))
.reversed()
.build();
reconcilable_result result = mutation_query(s, semaphore.make_permit(), src, query::full_partition_range, slice, 3, query::max_partitions, now);
reconcilable_result result = mutation_query(query_schema, semaphore.make_permit(), src, query::full_partition_range, slice, 3, query::max_partitions, now);
assert_that(to_result_set(result, s, slice))
assert_that(to_result_set(result, table_schema, slice))
.has_size(2)
.has(a_row()
.with_column("pk", data_value(bytes("key1")))
@@ -369,15 +371,15 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
}
{
auto slice = partition_slice_builder(*s)
auto slice = partition_slice_builder(*query_schema)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("B"))))
clustering_key_prefix::from_single_value(*query_schema, bytes("B"))))
.reversed()
.build();
reconcilable_result result = mutation_query(s, semaphore.make_permit(), src, query::full_partition_range, slice, 3, query::max_partitions, now);
reconcilable_result result = mutation_query(query_schema, semaphore.make_permit(), src, query::full_partition_range, slice, 3, query::max_partitions, now);
assert_that(to_result_set(result, s, slice))
assert_that(to_result_set(result, table_schema, slice))
.has_only(a_row()
.with_column("pk", data_value(bytes("key1")))
.with_column("ck", data_value(bytes("B")))