From 7ce932ce01a95f4bf24e8bbb731a3546ce827670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 31 Jan 2025 08:20:44 -0500 Subject: [PATCH] service: query_pager: fix last-position for filtering queries On short-pages, cut short because of a tombstone prefix. When page-results are filtered and the filter drops some rows, the last-position is taken from the page visitor, which does the filtering. This means that last partition and row position will be that of the last row the filter saw. This will not match the last position of the replica, when the replica cut the page due to tombstones. When fetching the next page, this means that all the tombstone suffix of the last page, will be re-fetched. Worse still: the last position of the next page will not match that of the saved reader left on the replica, so the saved reader will be dropped and a new one created from scratch. This wasted work will show up as elevated tail latencies. Fix by always taking the last position from raw query results. Fixes: #22620 Closes scylladb/scylladb#22622 --- service/pager/query_pagers.cc | 42 ++++++++++------------ test/cqlpy/test_tombstone_limit.py | 57 ++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 24 deletions(-) diff --git a/service/pager/query_pagers.cc b/service/pager/query_pagers.cc index 75de0bc598..ed1c73eb51 100644 --- a/service/pager/query_pagers.cc +++ b/service/pager/query_pagers.cc @@ -383,18 +383,14 @@ void query_pager::handle_result( auto view = query::result_view(*results); _last_pos = position_in_partition::for_partition_start(); - uint64_t row_count; + uint64_t replica_row_count, row_count; if constexpr(!std::is_same_v, noop_visitor>) { query_result_visitor v(std::forward(visitor)); view.consume(_cmd->slice, v); - if (_last_pkey) { - update_slice(*_last_pkey); - } - row_count = v.total_rows - v.dropped_rows; - _max = _max - row_count; - _exhausted = (v.total_rows < page_size && !results->is_short_read() && v.dropped_rows == 0) || _max == 0; + replica_row_count = v.total_rows; + // If per partition limit is defined, we need to accumulate rows fetched for last partition key if the key matches if (_cmd->slice.partition_row_limit() < query::max_rows_if_set) { if (_last_pkey && v.last_pkey && _last_pkey->equal(*_query_schema, *v.last_pkey)) { @@ -403,32 +399,30 @@ void query_pager::handle_result( _rows_fetched_for_last_partition = v.last_partition_row_count; } } - const auto& last_pos = results->last_position(); - if (last_pos && !v.dropped_rows) { - _last_pkey = last_pos->partition; - _last_pos = last_pos->position; - } else { - _last_pkey = v.last_pkey; - if (v.last_ckey) { - _last_pos = position_in_partition::for_key(*v.last_ckey); - } - } } else { row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows()); - _max = _max - row_count; - _exhausted = (row_count < page_size && !results->is_short_read()) || _max == 0; + replica_row_count = row_count; + } - if (!_exhausted) { - if (_last_pkey) { - update_slice(*_last_pkey); - } + { + _max = _max - row_count; + _exhausted = (replica_row_count < page_size && !results->is_short_read()) || _max == 0; + + if (_last_pkey) { + update_slice(*_last_pkey); + } + + // The last page can be truly empty -- with unset last-position and no data to calculate it based on. + if (!replica_row_count && !results->is_short_read()) { + _last_pkey = {}; + } else { auto last_pos = results->get_or_calculate_last_position(); _last_pkey = std::move(last_pos.partition); _last_pos = std::move(last_pos.position); } } - qlogger.debug("Fetched {} rows, max_remain={} {}", row_count, _max, _exhausted ? "(exh)" : ""); + qlogger.debug("Fetched {} rows (kept {}), max_remain={} {}", replica_row_count, row_count, _max, _exhausted ? "(exh)" : ""); if (_last_pkey) { qlogger.debug("Last partition key: {}", *_last_pkey); diff --git a/test/cqlpy/test_tombstone_limit.py b/test/cqlpy/test_tombstone_limit.py index 7660d022a7..a49cc73f5c 100644 --- a/test/cqlpy/test_tombstone_limit.py +++ b/test/cqlpy/test_tombstone_limit.py @@ -357,3 +357,60 @@ def test_unpaged_query(cql, table, lowered_tombstone_limit, driver_bug_1): statement = SimpleStatement(f"SELECT * FROM {table} WHERE pk = {pk}", fetch_size=None) rows = list(cql.execute(statement)) assert len(rows) == 4 + + +def test_filtering_query_tombstone_suffix_last_position(cql, test_keyspace, lowered_tombstone_limit): + """ + Check that when filtering drops rows in a short page due to tombstone suffix, + the tombstone-suffix is not re-requested on the next page. + """ + with new_test_table(cql, test_keyspace, 'pk int, ck int, v int, PRIMARY KEY (pk, ck)') as table: + insert_row_id = cql.prepare(f"INSERT INTO {table} (pk, ck, v) VALUES (?, ?, ?)") + delete_row_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ? AND ck = ?") + + pk = 0 + + page1 = [] + for ck in range(0, 10): + row = (pk, ck, ck % 2) + cql.execute(insert_row_id, row) + if row[2] == 0: + page1.append(row) + + for ck in range(10, 25): + cql.execute(delete_row_id, (pk, ck)) + + page2 = [] + for ck in range(25, 30): + row = (pk, ck, ck % 2) + cql.execute(insert_row_id, row) + if row[2] == 0: + page2.append(row) + + statement = SimpleStatement(f"SELECT * FROM {table} WHERE pk = {pk} AND v = 0 ALLOW FILTERING", fetch_size=20) + + res = cql.execute(statement, trace=True) + + def to_list(current_rows): + return list(map(lambda r: tuple(r._asdict().values()), current_rows)) + + assert to_list(res.current_rows) == page1 + assert res.has_more_pages + + res.fetch_next_page() + + assert to_list(res.current_rows)== page2 + assert not res.has_more_pages + + tracing = res.get_all_query_traces(max_wait_sec_per=900) + + assert len(tracing) == 2 + + found_reuse = False + found_drop = False + for event in tracing[1].events: + found_reuse = found_reuse or "Reusing querier" == event.description + found_drop = found_drop or "Dropping querier because" in event.description + + assert found_reuse + assert not found_drop