flat_mutation_reader: make_reversing_reader(): add memory limit
If the reversing requires more memory than the limit, the read is aborted. All users are updated to get a meaningful limit, from the respective table object, with the exception of tests of course.
This commit is contained in:
@@ -1208,6 +1208,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
cmd.row_limit,
|
||||
cmd.partition_limit,
|
||||
cmd.timestamp,
|
||||
cf.get_config().max_memory_for_unlimited_query,
|
||||
std::move(accounter),
|
||||
std::move(trace_state),
|
||||
timeout,
|
||||
|
||||
@@ -59,12 +59,14 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() {
|
||||
_buffer_size = compute_buffer_size(*_schema, _buffer);
|
||||
}
|
||||
|
||||
flat_mutation_reader make_reversing_reader(flat_mutation_reader& original) {
|
||||
flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption) {
|
||||
class partition_reversing_mutation_reader final : public flat_mutation_reader::impl {
|
||||
flat_mutation_reader* _source;
|
||||
range_tombstone_list _range_tombstones;
|
||||
std::stack<mutation_fragment> _mutation_fragments;
|
||||
mutation_fragment_opt _partition_end;
|
||||
size_t _stack_size = 0;
|
||||
const size_t _max_stack_size;
|
||||
private:
|
||||
stop_iteration emit_partition() {
|
||||
auto emit_range_tombstone = [&] {
|
||||
@@ -80,6 +82,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original) {
|
||||
if (!_range_tombstones.empty() && !cmp(_range_tombstones.tombstones().rbegin()->end_position(), mf.position())) {
|
||||
emit_range_tombstone();
|
||||
} else {
|
||||
_stack_size -= mf.memory_usage(*_schema);
|
||||
push_mutation_fragment(std::move(mf));
|
||||
_mutation_fragments.pop();
|
||||
}
|
||||
@@ -114,15 +117,32 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original) {
|
||||
_range_tombstones.apply(*_schema, std::move(mf.as_range_tombstone()));
|
||||
} else {
|
||||
_mutation_fragments.emplace(std::move(mf));
|
||||
_stack_size += _mutation_fragments.top().memory_usage(*_schema);
|
||||
if (_stack_size >= _max_stack_size) {
|
||||
const partition_key* key = nullptr;
|
||||
auto it = buffer().end();
|
||||
--it;
|
||||
if (it->is_partition_start()) {
|
||||
key = &it->as_partition_start().key().key();
|
||||
} else {
|
||||
--it;
|
||||
key = &it->as_partition_start().key().key();
|
||||
}
|
||||
throw std::runtime_error(fmt::format(
|
||||
"Aborting reverse partition read because partition {} is larger than the maximum safe size of {} for reversible partitions.",
|
||||
key->with_schema(*_schema),
|
||||
_max_stack_size));
|
||||
}
|
||||
}
|
||||
}
|
||||
return make_ready_future<stop_iteration>(is_buffer_full());
|
||||
}
|
||||
public:
|
||||
explicit partition_reversing_mutation_reader(flat_mutation_reader& mr)
|
||||
explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, size_t max_stack_size)
|
||||
: flat_mutation_reader::impl(mr.schema())
|
||||
, _source(&mr)
|
||||
, _range_tombstones(*_schema)
|
||||
, _max_stack_size(max_stack_size)
|
||||
{ }
|
||||
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
@@ -143,6 +163,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original) {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty() && !is_end_of_stream()) {
|
||||
while (!_mutation_fragments.empty()) {
|
||||
_stack_size -= _mutation_fragments.top().memory_usage(*_schema);
|
||||
_mutation_fragments.pop();
|
||||
}
|
||||
_range_tombstones.clear();
|
||||
@@ -163,7 +184,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original) {
|
||||
}
|
||||
};
|
||||
|
||||
return make_flat_mutation_reader<partition_reversing_mutation_reader>(original);
|
||||
return make_flat_mutation_reader<partition_reversing_mutation_reader>(original, max_memory_consumption);
|
||||
}
|
||||
|
||||
template<typename Source>
|
||||
|
||||
@@ -740,10 +740,15 @@ make_generating_reader(schema_ptr s, std::function<future<mutation_fragment_opt>
|
||||
///
|
||||
/// \param original the reader to be reversed, has to be kept alive while the
|
||||
/// reversing reader is in use.
|
||||
/// \param max_memory_consumption the maximum amount of memory the reader is
|
||||
/// allowed to use for reversing. The reverse reader reads entire partitions
|
||||
/// into memory, before reversing them. Since partitions can be larger than
|
||||
/// the available memory, we need to enforce a limit on memory consumption.
|
||||
/// If the read uses more memory then this limit, the read is aborted.
|
||||
///
|
||||
/// FIXME: reversing should be done in the sstable layer, see #1413.
|
||||
flat_mutation_reader
|
||||
make_reversing_reader(flat_mutation_reader& original);
|
||||
make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption);
|
||||
|
||||
/// Low level fragment stream validator.
|
||||
///
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include "service/priority_manager.hh"
|
||||
#include "multishard_mutation_query.hh"
|
||||
#include "database.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
#include <boost/range/adaptor/reversed.hpp>
|
||||
|
||||
@@ -220,6 +221,10 @@ public:
|
||||
read_context& operator=(read_context&&) = delete;
|
||||
read_context& operator=(const read_context&) = delete;
|
||||
|
||||
distributed<database>& db() {
|
||||
return _db;
|
||||
}
|
||||
|
||||
virtual flat_mutation_reader create_reader(
|
||||
schema_ptr schema,
|
||||
const dht::partition_range& pr,
|
||||
@@ -604,8 +609,9 @@ static future<reconcilable_result> do_query_mutations(
|
||||
return do_with(std::move(reader), std::move(compaction_state), [&, accounter = std::move(accounter), timeout] (
|
||||
flat_mutation_reader& reader, lw_shared_ptr<compact_for_mutation_query_state>& compaction_state) mutable {
|
||||
auto rrb = reconcilable_result_builder(*reader.schema(), cmd.slice, std::move(accounter));
|
||||
auto& table = ctx->db().local().find_column_family(reader.schema());
|
||||
return query::consume_page(reader, compaction_state, cmd.slice, std::move(rrb), cmd.row_limit, cmd.partition_limit, cmd.timestamp,
|
||||
timeout).then([&] (consume_result&& result) mutable {
|
||||
timeout, table.get_config().max_memory_for_unlimited_query).then([&] (consume_result&& result) mutable {
|
||||
return make_ready_future<page_consume_result>(page_consume_result(std::move(result), reader.detach_buffer(), std::move(compaction_state)));
|
||||
});
|
||||
}).then_wrapped([&ctx] (future<page_consume_result>&& result_fut) {
|
||||
|
||||
@@ -2168,6 +2168,7 @@ future<> data_query(
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
query::result::builder& builder,
|
||||
uint64_t max_memory_reverse_query,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::querier_cache_context cache_ctx)
|
||||
@@ -2181,9 +2182,10 @@ future<> data_query(
|
||||
? std::move(*querier_opt)
|
||||
: query::data_querier(source, s, range, slice, service::get_local_sstable_query_read_priority(), trace_ptr);
|
||||
|
||||
return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable {
|
||||
return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr),
|
||||
cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable {
|
||||
auto qrb = query_result_builder(*s, builder);
|
||||
return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout).then(
|
||||
return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then(
|
||||
[=, &builder, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] () mutable {
|
||||
if (q.are_limits_reached() || builder.is_short_read()) {
|
||||
cache_ctx.insert(std::move(q), std::move(trace_ptr));
|
||||
@@ -2261,6 +2263,7 @@ static do_mutation_query(schema_ptr s,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
@@ -2278,7 +2281,7 @@ static do_mutation_query(schema_ptr s,
|
||||
return do_with(std::move(q), [=, &slice, accounter = std::move(accounter), trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (
|
||||
query::mutation_querier& q) mutable {
|
||||
auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter));
|
||||
return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout).then(
|
||||
return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then(
|
||||
[=, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (reconcilable_result r) mutable {
|
||||
if (q.are_limits_reached() || r.is_short_read()) {
|
||||
cache_ctx.insert(std::move(q), std::move(trace_ptr));
|
||||
@@ -2300,13 +2303,14 @@ mutation_query(schema_ptr s,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
db::timeout_clock::time_point timeout,
|
||||
query::querier_cache_context cache_ctx)
|
||||
{
|
||||
return do_mutation_query(std::move(s), std::move(source), seastar::cref(range), seastar::cref(slice),
|
||||
row_limit, partition_limit, query_time, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx));
|
||||
row_limit, partition_limit, query_time, max_memory_reverse_query, std::move(accounter), std::move(trace_ptr), timeout, std::move(cache_ctx));
|
||||
}
|
||||
|
||||
deletable_row::deletable_row(clustering_row&& cr)
|
||||
|
||||
@@ -161,6 +161,7 @@ future<reconcilable_result> mutation_query(
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
uint64_t max_memory_reverse_query,
|
||||
query::result_memory_accounter&& accounter = { },
|
||||
tracing::trace_state_ptr trace_ptr = nullptr,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout,
|
||||
@@ -175,6 +176,7 @@ future<> data_query(
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
query::result::builder& builder,
|
||||
uint64_t max_memory_reverse_query,
|
||||
tracing::trace_state_ptr trace_ptr = nullptr,
|
||||
db::timeout_clock::time_point timeout = db::no_timeout,
|
||||
query::querier_cache_context cache_ctx = { });
|
||||
@@ -189,6 +191,7 @@ class mutation_query_stage {
|
||||
uint32_t,
|
||||
uint32_t,
|
||||
gc_clock::time_point,
|
||||
uint64_t,
|
||||
query::result_memory_accounter&&,
|
||||
tracing::trace_state_ptr,
|
||||
db::timeout_clock::time_point,
|
||||
|
||||
12
querier.hh
12
querier.hh
@@ -84,7 +84,8 @@ auto consume_page(flat_mutation_reader& reader,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
db::timeout_clock::time_point timeout,
|
||||
size_t reverse_read_max_memory) {
|
||||
// FIXME: #3158
|
||||
// consumer cannot be moved after consume_new_partition() is called
|
||||
// on it because it stores references to some of it's own members.
|
||||
@@ -100,9 +101,9 @@ auto consume_page(flat_mutation_reader& reader,
|
||||
compaction_state,
|
||||
clustering_position_tracker(std::move(consumer), last_ckey));
|
||||
|
||||
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout] () mutable {
|
||||
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout, reverse_read_max_memory] () mutable {
|
||||
if (slice.options.contains(query::partition_slice::option::reversed)) {
|
||||
return do_with(make_reversing_reader(reader),
|
||||
return do_with(make_reversing_reader(reader, reverse_read_max_memory),
|
||||
[reader_consumer = std::move(reader_consumer), timeout] (flat_mutation_reader& reversing_reader) mutable {
|
||||
return reversing_reader.consume(std::move(reader_consumer), timeout);
|
||||
});
|
||||
@@ -183,9 +184,10 @@ public:
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
db::timeout_clock::time_point timeout,
|
||||
size_t reverse_read_max_memory) {
|
||||
return ::query::consume_page(_reader, _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time,
|
||||
timeout).then([this] (auto&& results) {
|
||||
timeout, reverse_read_max_memory).then([this] (auto&& results) {
|
||||
_last_ckey = std::get<std::optional<clustering_key>>(std::move(results));
|
||||
constexpr auto size = std::tuple_size<std::decay_t<decltype(results)>>::value;
|
||||
static_assert(size <= 2);
|
||||
|
||||
2
table.cc
2
table.cc
@@ -2397,7 +2397,7 @@ table::query(schema_ptr s,
|
||||
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] {
|
||||
auto&& range = *qs.current_partition_range++;
|
||||
return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(),
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, trace_state, timeout, cache_ctx);
|
||||
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, _config.max_memory_for_unlimited_query, trace_state, timeout, cache_ctx);
|
||||
}).then([qs_ptr = std::move(qs_ptr), &qs] {
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(
|
||||
make_lw_shared<query::result>(qs.builder.build()));
|
||||
|
||||
@@ -590,7 +590,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
|
||||
return fmr.consume_in_thread(std::move(fsc), db::no_timeout);
|
||||
} else {
|
||||
if (reversed) {
|
||||
auto reverse_reader = make_reversing_reader(fmr);
|
||||
auto reverse_reader = make_reversing_reader(fmr, size_t(1) << 20);
|
||||
return reverse_reader.consume(std::move(fsc), db::no_timeout).get0();
|
||||
}
|
||||
return fmr.consume(std::move(fsc), db::no_timeout).get0();
|
||||
@@ -787,3 +787,49 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_reader_from_fragments_as_mutation_source)
|
||||
};
|
||||
run_mutation_source_tests(populate);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
|
||||
simple_schema schema;
|
||||
|
||||
struct phony_consumer {
|
||||
void consume_new_partition(const dht::decorated_key&) { }
|
||||
void consume(tombstone) { }
|
||||
stop_iteration consume(static_row&&) { return stop_iteration::no; }
|
||||
stop_iteration consume(clustering_row&&) { return stop_iteration::no; }
|
||||
stop_iteration consume(range_tombstone&&) { return stop_iteration::no; }
|
||||
stop_iteration consume_end_of_partition() { return stop_iteration::no; }
|
||||
void consume_end_of_stream() { }
|
||||
};
|
||||
|
||||
auto test_with_partition = [&] (bool with_static_row) {
|
||||
BOOST_TEST_MESSAGE(fmt::format("Testing with_static_row={}", with_static_row));
|
||||
auto mut = schema.new_mutation("pk1");
|
||||
const size_t desired_mut_size = 1 * 1024 * 1024;
|
||||
const size_t row_size = 10 * 1024;
|
||||
|
||||
if (with_static_row) {
|
||||
schema.add_static_row(mut, "s1");
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < desired_mut_size / row_size; ++i) {
|
||||
schema.add_row(mut, schema.make_ckey(++i), sstring(row_size, '0'));
|
||||
}
|
||||
|
||||
auto reader = flat_mutation_reader_from_mutations({mut});
|
||||
auto reverse_reader = make_reversing_reader(reader, size_t(1) << 10);
|
||||
|
||||
try {
|
||||
reverse_reader.consume(phony_consumer{}, db::no_timeout).get();
|
||||
BOOST_FAIL("No exception thrown for reversing overly big partition");
|
||||
} catch (const std::runtime_error& e) {
|
||||
BOOST_TEST_MESSAGE(fmt::format("Got exception with message: {}", e.what()));
|
||||
auto str = sstring(e.what());
|
||||
BOOST_REQUIRE_EQUAL(str.find("Aborting reverse partition read because partition pk1"), 0);
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
};
|
||||
|
||||
test_with_partition(true);
|
||||
test_with_partition(false);
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ static query::partition_slice make_full_slice(const schema& s) {
|
||||
}
|
||||
|
||||
static auto inf32 = std::numeric_limits<unsigned>::max();
|
||||
static const uint64_t max_memory_for_reverse_query = 1 << 20;
|
||||
|
||||
query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) {
|
||||
return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32));
|
||||
@@ -100,7 +101,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
// FIXME: use mutation assertions
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
@@ -123,7 +124,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -159,7 +160,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -173,7 +174,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -206,7 +207,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -236,7 +237,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -264,7 +265,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -284,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -296,7 +297,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -323,7 +324,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -347,7 +348,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -369,7 +370,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -395,7 +396,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.is_empty();
|
||||
@@ -445,7 +446,8 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
|
||||
|
||||
auto query_time = now + std::chrono::seconds(1);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time).get0();
|
||||
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time,
|
||||
max_memory_for_reverse_query).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(result.partitions().size(), 2);
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
|
||||
@@ -463,24 +465,29 @@ SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
|
||||
auto src = make_source({m1});
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(s, partition_key::from_single_value(*s, "key2"));
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice, inf32, inf32);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now,
|
||||
max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
@@ -503,7 +510,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, 10, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, 10, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -519,7 +526,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, 1, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, 1, now, max_memory_for_reverse_query).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -541,10 +548,12 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), digest_only_builder).get0();
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), digest_only_builder, max_memory_for_reverse_query).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now(), result_and_digest_builder).get0();
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), result_and_digest_builder, max_memory_for_reverse_query).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
|
||||
}
|
||||
|
||||
@@ -213,7 +213,7 @@ public:
|
||||
|
||||
auto querier = make_querier<Querier>(range);
|
||||
auto [dk, ck] = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), db::no_timeout).get0();
|
||||
gc_clock::now(), db::no_timeout, std::numeric_limits<uint64_t>::max()).get0();
|
||||
const auto memory_usage = querier.memory_usage();
|
||||
_cache.insert(cache_key, std::move(querier), nullptr);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user