service: query_pager: fix last-position for filtering queries
On short-pages, cut short because of a tombstone prefix. When page-results are filtered and the filter drops some rows, the last-position is taken from the page visitor, which does the filtering. This means that last partition and row position will be that of the last row the filter saw. This will not match the last position of the replica, when the replica cut the page due to tombstones. When fetching the next page, this means that all the tombstone suffix of the last page, will be re-fetched. Worse still: the last position of the next page will not match that of the saved reader left on the replica, so the saved reader will be dropped and a new one created from scratch. This wasted work will show up as elevated tail latencies. Fix by always taking the last position from raw query results. Fixes: #22620 Closes scylladb/scylladb#22622
This commit is contained in:
@@ -383,18 +383,14 @@ void query_pager::handle_result(
|
||||
auto view = query::result_view(*results);
|
||||
|
||||
_last_pos = position_in_partition::for_partition_start();
|
||||
uint64_t row_count;
|
||||
uint64_t replica_row_count, row_count;
|
||||
if constexpr(!std::is_same_v<std::decay_t<Visitor>, noop_visitor>) {
|
||||
query_result_visitor<Visitor> v(std::forward<Visitor>(visitor));
|
||||
view.consume(_cmd->slice, v);
|
||||
|
||||
if (_last_pkey) {
|
||||
update_slice(*_last_pkey);
|
||||
}
|
||||
|
||||
row_count = v.total_rows - v.dropped_rows;
|
||||
_max = _max - row_count;
|
||||
_exhausted = (v.total_rows < page_size && !results->is_short_read() && v.dropped_rows == 0) || _max == 0;
|
||||
replica_row_count = v.total_rows;
|
||||
|
||||
// If per partition limit is defined, we need to accumulate rows fetched for last partition key if the key matches
|
||||
if (_cmd->slice.partition_row_limit() < query::max_rows_if_set) {
|
||||
if (_last_pkey && v.last_pkey && _last_pkey->equal(*_query_schema, *v.last_pkey)) {
|
||||
@@ -403,32 +399,30 @@ void query_pager::handle_result(
|
||||
_rows_fetched_for_last_partition = v.last_partition_row_count;
|
||||
}
|
||||
}
|
||||
const auto& last_pos = results->last_position();
|
||||
if (last_pos && !v.dropped_rows) {
|
||||
_last_pkey = last_pos->partition;
|
||||
_last_pos = last_pos->position;
|
||||
} else {
|
||||
_last_pkey = v.last_pkey;
|
||||
if (v.last_ckey) {
|
||||
_last_pos = position_in_partition::for_key(*v.last_ckey);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows());
|
||||
_max = _max - row_count;
|
||||
_exhausted = (row_count < page_size && !results->is_short_read()) || _max == 0;
|
||||
replica_row_count = row_count;
|
||||
}
|
||||
|
||||
if (!_exhausted) {
|
||||
if (_last_pkey) {
|
||||
update_slice(*_last_pkey);
|
||||
}
|
||||
{
|
||||
_max = _max - row_count;
|
||||
_exhausted = (replica_row_count < page_size && !results->is_short_read()) || _max == 0;
|
||||
|
||||
if (_last_pkey) {
|
||||
update_slice(*_last_pkey);
|
||||
}
|
||||
|
||||
// The last page can be truly empty -- with unset last-position and no data to calculate it based on.
|
||||
if (!replica_row_count && !results->is_short_read()) {
|
||||
_last_pkey = {};
|
||||
} else {
|
||||
auto last_pos = results->get_or_calculate_last_position();
|
||||
_last_pkey = std::move(last_pos.partition);
|
||||
_last_pos = std::move(last_pos.position);
|
||||
}
|
||||
}
|
||||
|
||||
qlogger.debug("Fetched {} rows, max_remain={} {}", row_count, _max, _exhausted ? "(exh)" : "");
|
||||
qlogger.debug("Fetched {} rows (kept {}), max_remain={} {}", replica_row_count, row_count, _max, _exhausted ? "(exh)" : "");
|
||||
|
||||
if (_last_pkey) {
|
||||
qlogger.debug("Last partition key: {}", *_last_pkey);
|
||||
|
||||
@@ -357,3 +357,60 @@ def test_unpaged_query(cql, table, lowered_tombstone_limit, driver_bug_1):
|
||||
statement = SimpleStatement(f"SELECT * FROM {table} WHERE pk = {pk}", fetch_size=None)
|
||||
rows = list(cql.execute(statement))
|
||||
assert len(rows) == 4
|
||||
|
||||
|
||||
def test_filtering_query_tombstone_suffix_last_position(cql, test_keyspace, lowered_tombstone_limit):
|
||||
"""
|
||||
Check that when filtering drops rows in a short page due to tombstone suffix,
|
||||
the tombstone-suffix is not re-requested on the next page.
|
||||
"""
|
||||
with new_test_table(cql, test_keyspace, 'pk int, ck int, v int, PRIMARY KEY (pk, ck)') as table:
|
||||
insert_row_id = cql.prepare(f"INSERT INTO {table} (pk, ck, v) VALUES (?, ?, ?)")
|
||||
delete_row_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ? AND ck = ?")
|
||||
|
||||
pk = 0
|
||||
|
||||
page1 = []
|
||||
for ck in range(0, 10):
|
||||
row = (pk, ck, ck % 2)
|
||||
cql.execute(insert_row_id, row)
|
||||
if row[2] == 0:
|
||||
page1.append(row)
|
||||
|
||||
for ck in range(10, 25):
|
||||
cql.execute(delete_row_id, (pk, ck))
|
||||
|
||||
page2 = []
|
||||
for ck in range(25, 30):
|
||||
row = (pk, ck, ck % 2)
|
||||
cql.execute(insert_row_id, row)
|
||||
if row[2] == 0:
|
||||
page2.append(row)
|
||||
|
||||
statement = SimpleStatement(f"SELECT * FROM {table} WHERE pk = {pk} AND v = 0 ALLOW FILTERING", fetch_size=20)
|
||||
|
||||
res = cql.execute(statement, trace=True)
|
||||
|
||||
def to_list(current_rows):
|
||||
return list(map(lambda r: tuple(r._asdict().values()), current_rows))
|
||||
|
||||
assert to_list(res.current_rows) == page1
|
||||
assert res.has_more_pages
|
||||
|
||||
res.fetch_next_page()
|
||||
|
||||
assert to_list(res.current_rows)== page2
|
||||
assert not res.has_more_pages
|
||||
|
||||
tracing = res.get_all_query_traces(max_wait_sec_per=900)
|
||||
|
||||
assert len(tracing) == 2
|
||||
|
||||
found_reuse = False
|
||||
found_drop = False
|
||||
for event in tracing[1].events:
|
||||
found_reuse = found_reuse or "Reusing querier" == event.description
|
||||
found_drop = found_drop or "Dropping querier because" in event.description
|
||||
|
||||
assert found_reuse
|
||||
assert not found_drop
|
||||
|
||||
Reference in New Issue
Block a user