service: query_pager: fix last-position for filtering queries

On short-pages, cut short because of a tombstone prefix.
When page-results are filtered and the filter drops some rows, the
last-position is taken from the page visitor, which does the filtering.
This means that last partition and row position will be that of the last
row the filter saw. This will not match the last position of the
replica, when the replica cut the page due to tombstones.
When fetching the next page, this means that all the tombstone suffix of
the last page, will be re-fetched. Worse still: the last position of the
next page will not match that of the saved reader left on the replica, so
the saved reader will be dropped and a new one created from scratch.
This wasted work will show up as elevated tail latencies.
Fix by always taking the last position from raw query results.

Fixes: #22620

Closes scylladb/scylladb#22622

(cherry picked from commit 7ce932ce01)

Closes scylladb/scylladb#22719
This commit is contained in:
Botond Dénes
2025-01-31 08:20:44 -05:00
parent e79ee2ddb0
commit 1998733228
2 changed files with 75 additions and 24 deletions

View File

@@ -383,18 +383,14 @@ void query_pager::handle_result(
auto view = query::result_view(*results);
_last_pos = position_in_partition::for_partition_start();
uint64_t row_count;
uint64_t replica_row_count, row_count;
if constexpr(!std::is_same_v<std::decay_t<Visitor>, noop_visitor>) {
query_result_visitor<Visitor> v(std::forward<Visitor>(visitor));
view.consume(_cmd->slice, v);
if (_last_pkey) {
update_slice(*_last_pkey);
}
row_count = v.total_rows - v.dropped_rows;
_max = _max - row_count;
_exhausted = (v.total_rows < page_size && !results->is_short_read() && v.dropped_rows == 0) || _max == 0;
replica_row_count = v.total_rows;
// If per partition limit is defined, we need to accumulate rows fetched for last partition key if the key matches
if (_cmd->slice.partition_row_limit() < query::max_rows_if_set) {
if (_last_pkey && v.last_pkey && _last_pkey->equal(*_query_schema, *v.last_pkey)) {
@@ -403,32 +399,30 @@ void query_pager::handle_result(
_rows_fetched_for_last_partition = v.last_partition_row_count;
}
}
const auto& last_pos = results->last_position();
if (last_pos && !v.dropped_rows) {
_last_pkey = last_pos->partition;
_last_pos = last_pos->position;
} else {
_last_pkey = v.last_pkey;
if (v.last_ckey) {
_last_pos = position_in_partition::for_key(*v.last_ckey);
}
}
} else {
row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows());
_max = _max - row_count;
_exhausted = (row_count < page_size && !results->is_short_read()) || _max == 0;
replica_row_count = row_count;
}
if (!_exhausted) {
if (_last_pkey) {
update_slice(*_last_pkey);
}
{
_max = _max - row_count;
_exhausted = (replica_row_count < page_size && !results->is_short_read()) || _max == 0;
if (_last_pkey) {
update_slice(*_last_pkey);
}
// The last page can be truly empty -- with unset last-position and no data to calculate it based on.
if (!replica_row_count && !results->is_short_read()) {
_last_pkey = {};
} else {
auto last_pos = results->get_or_calculate_last_position();
_last_pkey = std::move(last_pos.partition);
_last_pos = std::move(last_pos.position);
}
}
qlogger.debug("Fetched {} rows, max_remain={} {}", row_count, _max, _exhausted ? "(exh)" : "");
qlogger.debug("Fetched {} rows (kept {}), max_remain={} {}", replica_row_count, row_count, _max, _exhausted ? "(exh)" : "");
if (_last_pkey) {
qlogger.debug("Last partition key: {}", *_last_pkey);

View File

@@ -357,3 +357,60 @@ def test_unpaged_query(cql, table, lowered_tombstone_limit, driver_bug_1):
statement = SimpleStatement(f"SELECT * FROM {table} WHERE pk = {pk}", fetch_size=None)
rows = list(cql.execute(statement))
assert len(rows) == 4
def test_filtering_query_tombstone_suffix_last_position(cql, test_keyspace, lowered_tombstone_limit):
"""
Check that when filtering drops rows in a short page due to tombstone suffix,
the tombstone-suffix is not re-requested on the next page.
"""
with new_test_table(cql, test_keyspace, 'pk int, ck int, v int, PRIMARY KEY (pk, ck)') as table:
insert_row_id = cql.prepare(f"INSERT INTO {table} (pk, ck, v) VALUES (?, ?, ?)")
delete_row_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ? AND ck = ?")
pk = 0
page1 = []
for ck in range(0, 10):
row = (pk, ck, ck % 2)
cql.execute(insert_row_id, row)
if row[2] == 0:
page1.append(row)
for ck in range(10, 25):
cql.execute(delete_row_id, (pk, ck))
page2 = []
for ck in range(25, 30):
row = (pk, ck, ck % 2)
cql.execute(insert_row_id, row)
if row[2] == 0:
page2.append(row)
statement = SimpleStatement(f"SELECT * FROM {table} WHERE pk = {pk} AND v = 0 ALLOW FILTERING", fetch_size=20)
res = cql.execute(statement, trace=True)
def to_list(current_rows):
return list(map(lambda r: tuple(r._asdict().values()), current_rows))
assert to_list(res.current_rows) == page1
assert res.has_more_pages
res.fetch_next_page()
assert to_list(res.current_rows)== page2
assert not res.has_more_pages
tracing = res.get_all_query_traces(max_wait_sec_per=900)
assert len(tracing) == 2
found_reuse = False
found_drop = False
for event in tracing[1].events:
found_reuse = found_reuse or "Reusing querier" == event.description
found_drop = found_drop or "Dropping querier because" in event.description
assert found_reuse
assert not found_drop