query-result-writer: stop when tombstone-limit is reached

The query result writer now counts tombstones and cuts the page (marking
it as a short one) when the tombstone limit is reached. This is to avoid
timing out on large span of tombstones, especially prefixes.
In the case of unpaged queries, we fail the read instead, similarly to
how we do with max result size.
If the limit is 0, the previous behaviour is used: tombstones are not
taken into consideration at all.
This commit is contained in:
Botond Dénes
2022-07-15 13:10:37 +03:00
parent 8066dbc635
commit 7730419f5c
6 changed files with 31 additions and 10 deletions

View File

@@ -389,7 +389,7 @@ private:
public:
data_query_result_builder(const schema& s, const query::partition_slice& slice)
: _res_builder(slice, query::result_options::only_result(), query::result_memory_accounter{query::result_memory_limiter::unlimited_result_size})
: _res_builder(slice, query::result_options::only_result(), query::result_memory_accounter{query::result_memory_limiter::unlimited_result_size}, query::max_tombstones)
, _builder(s, _res_builder) { }
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }

View File

@@ -831,9 +831,9 @@ private:
public:
data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts,
query::result_memory_accounter&& accounter, const compact_for_query_state_v2& compaction_state)
query::result_memory_accounter&& accounter, const compact_for_query_state_v2& compaction_state, uint64_t tombstone_limit)
: _compaction_state(compaction_state)
, _res_builder(std::make_unique<query::result::builder>(slice, opts, std::move(accounter)))
, _res_builder(std::make_unique<query::result::builder>(slice, opts, std::move(accounter), tombstone_limit))
, _builder(s, *_res_builder) { }
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
@@ -880,6 +880,6 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
return do_query_on_all_shards<data_query_result_builder>(db, query_schema, cmd, ranges, std::move(trace_state), timeout,
[table_schema, &cmd, opts] (query::result_memory_accounter&& accounter, const compact_for_query_state_v2& compaction_state) {
return data_query_result_builder(*table_schema, cmd.slice, opts, std::move(accounter), compaction_state);
return data_query_result_builder(*table_schema, cmd.slice, opts, std::move(accounter), compaction_state, cmd.tombstone_limit);
});
}

View File

@@ -2071,9 +2071,11 @@ void query_result_builder::consume_new_partition(const dht::decorated_key& dk) {
void query_result_builder::consume(tombstone t) {
_mutation_consumer->consume(t);
_stop = _rb.bump_and_check_tombstone_limit();
}
stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool is_live) {
if (!is_live) {
_stop = _rb.bump_and_check_tombstone_limit();
return _stop;
}
_stop = _mutation_consumer->consume(std::move(sr), t);
@@ -2081,12 +2083,14 @@ stop_iteration query_result_builder::consume(static_row&& sr, tombstone t, bool
}
stop_iteration query_result_builder::consume(clustering_row&& cr, row_tombstone t, bool is_live) {
if (!is_live) {
_stop = _rb.bump_and_check_tombstone_limit();
return _stop;
}
_stop = _mutation_consumer->consume(std::move(cr), t);
return _stop;
}
stop_iteration query_result_builder::consume(range_tombstone_change&& rtc) {
_stop = _rb.bump_and_check_tombstone_limit();
return _stop;
}
@@ -2205,7 +2209,7 @@ future<query::result>
to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint64_t max_rows, uint32_t max_partitions,
query::result_options opts) {
// This result was already built with a limit, don't apply another one.
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::max_tombstones);
auto consumer = compact_for_query_v2<query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
max_partitions, query_result_builder(*s, builder));
auto compaction_state = consumer.get_state();
@@ -2237,7 +2241,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
query::result
query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) {
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::max_tombstones);
auto consumer = compact_for_query_v2<query_result_builder>(*m.schema(), now, slice, row_limit,
query::max_partitions, query_result_builder(*m.schema(), builder));
auto compaction_state = consumer.get_state();

View File

@@ -112,13 +112,16 @@ class result::builder {
short_read _short_read;
digester _digest;
result_memory_accounter _memory_accounter;
const uint64_t _tombstone_limit = query::max_tombstones;
uint64_t _tombstones = 0;
public:
builder(const partition_slice& slice, result_options options, result_memory_accounter memory_accounter)
builder(const partition_slice& slice, result_options options, result_memory_accounter memory_accounter, uint64_t tombstone_limit)
: _slice(slice)
, _w(ser::writer_of_query_result<bytes_ostream>(_out).start_partitions())
, _request(options.request)
, _digest(digester(options.digest_algo))
, _memory_accounter(std::move(memory_accounter))
, _tombstone_limit(tombstone_limit)
{ }
builder(builder&&) = delete; // _out is captured by reference
@@ -127,6 +130,19 @@ public:
result_memory_accounter& memory_accounter() { return _memory_accounter; }
stop_iteration bump_and_check_tombstone_limit() {
++_tombstones;
if (_tombstones < _tombstone_limit) {
return stop_iteration::no;
}
if (!_slice.options.contains<partition_slice::option::allow_short_read>()) {
throw std::runtime_error(fmt::format(
"Tombstones processed by unpaged query exceeds limit of {} (configured via query_tombstone_page_limit)",
_tombstone_limit));
}
return stop_iteration::yes;
}
const partition_slice& slice() const { return _slice; }
uint64_t row_count() const {
@@ -210,6 +226,7 @@ class query_result_builder {
const schema& _schema;
query::result::builder& _rb;
std::optional<mutation_querier> _mutation_consumer;
// We need to remember that we requested stop, to mark the read as short in the end.
stop_iteration _stop;
public:
query_result_builder(const schema& s, query::result::builder& rb) noexcept;

View File

@@ -2024,7 +2024,7 @@ struct query_state {
query::result_memory_accounter memory_accounter)
: schema(std::move(s))
, cmd(cmd)
, builder(cmd.slice, opts, std::move(memory_accounter))
, builder(cmd.slice, opts, std::move(memory_accounter), cmd.tombstone_limit)
, limit(cmd.get_row_limit())
, partition_limit(cmd.partition_limit)
, current_partition_range(ranges.begin())

View File

@@ -550,11 +550,11 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
slice.options.set<query::partition_slice::option::allow_short_read>();
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash},
l.new_digest_read(query::max_result_size(query::result_memory_limiter::maximum_result_size), query::short_read::yes).get0());
l.new_digest_read(query::max_result_size(query::result_memory_limiter::maximum_result_size), query::short_read::yes).get0(), query::max_tombstones);
data_query(s, semaphore.make_permit(), source, query::full_partition_range, slice, digest_only_builder);
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash},
l.new_data_read(query::max_result_size(query::result_memory_limiter::maximum_result_size), query::short_read::yes).get0());
l.new_data_read(query::max_result_size(query::result_memory_limiter::maximum_result_size), query::short_read::yes).get0(), query::max_tombstones);
data_query(s, semaphore.make_permit(), source, query::full_partition_range, slice, result_and_digest_builder);
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());