diff --git a/service/pager/query_pagers.cc b/service/pager/query_pagers.cc index 75de0bc598..ed1c73eb51 100644 --- a/service/pager/query_pagers.cc +++ b/service/pager/query_pagers.cc @@ -383,18 +383,14 @@ void query_pager::handle_result( auto view = query::result_view(*results); _last_pos = position_in_partition::for_partition_start(); - uint64_t row_count; + uint64_t replica_row_count, row_count; if constexpr(!std::is_same_v, noop_visitor>) { query_result_visitor v(std::forward(visitor)); view.consume(_cmd->slice, v); - if (_last_pkey) { - update_slice(*_last_pkey); - } - row_count = v.total_rows - v.dropped_rows; - _max = _max - row_count; - _exhausted = (v.total_rows < page_size && !results->is_short_read() && v.dropped_rows == 0) || _max == 0; + replica_row_count = v.total_rows; + // If per partition limit is defined, we need to accumulate rows fetched for last partition key if the key matches if (_cmd->slice.partition_row_limit() < query::max_rows_if_set) { if (_last_pkey && v.last_pkey && _last_pkey->equal(*_query_schema, *v.last_pkey)) { @@ -403,32 +399,30 @@ void query_pager::handle_result( _rows_fetched_for_last_partition = v.last_partition_row_count; } } - const auto& last_pos = results->last_position(); - if (last_pos && !v.dropped_rows) { - _last_pkey = last_pos->partition; - _last_pos = last_pos->position; - } else { - _last_pkey = v.last_pkey; - if (v.last_ckey) { - _last_pos = position_in_partition::for_key(*v.last_ckey); - } - } } else { row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows()); - _max = _max - row_count; - _exhausted = (row_count < page_size && !results->is_short_read()) || _max == 0; + replica_row_count = row_count; + } - if (!_exhausted) { - if (_last_pkey) { - update_slice(*_last_pkey); - } + { + _max = _max - row_count; + _exhausted = (replica_row_count < page_size && !results->is_short_read()) || _max == 0; + + if (_last_pkey) { + update_slice(*_last_pkey); + } + + // The last page can be truly empty -- with unset last-position and no data to calculate it based on. + if (!replica_row_count && !results->is_short_read()) { + _last_pkey = {}; + } else { auto last_pos = results->get_or_calculate_last_position(); _last_pkey = std::move(last_pos.partition); _last_pos = std::move(last_pos.position); } } - qlogger.debug("Fetched {} rows, max_remain={} {}", row_count, _max, _exhausted ? "(exh)" : ""); + qlogger.debug("Fetched {} rows (kept {}), max_remain={} {}", replica_row_count, row_count, _max, _exhausted ? "(exh)" : ""); if (_last_pkey) { qlogger.debug("Last partition key: {}", *_last_pkey); diff --git a/test/cqlpy/test_tombstone_limit.py b/test/cqlpy/test_tombstone_limit.py index 7660d022a7..a49cc73f5c 100644 --- a/test/cqlpy/test_tombstone_limit.py +++ b/test/cqlpy/test_tombstone_limit.py @@ -357,3 +357,60 @@ def test_unpaged_query(cql, table, lowered_tombstone_limit, driver_bug_1): statement = SimpleStatement(f"SELECT * FROM {table} WHERE pk = {pk}", fetch_size=None) rows = list(cql.execute(statement)) assert len(rows) == 4 + + +def test_filtering_query_tombstone_suffix_last_position(cql, test_keyspace, lowered_tombstone_limit): + """ + Check that when filtering drops rows in a short page due to tombstone suffix, + the tombstone-suffix is not re-requested on the next page. + """ + with new_test_table(cql, test_keyspace, 'pk int, ck int, v int, PRIMARY KEY (pk, ck)') as table: + insert_row_id = cql.prepare(f"INSERT INTO {table} (pk, ck, v) VALUES (?, ?, ?)") + delete_row_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ? AND ck = ?") + + pk = 0 + + page1 = [] + for ck in range(0, 10): + row = (pk, ck, ck % 2) + cql.execute(insert_row_id, row) + if row[2] == 0: + page1.append(row) + + for ck in range(10, 25): + cql.execute(delete_row_id, (pk, ck)) + + page2 = [] + for ck in range(25, 30): + row = (pk, ck, ck % 2) + cql.execute(insert_row_id, row) + if row[2] == 0: + page2.append(row) + + statement = SimpleStatement(f"SELECT * FROM {table} WHERE pk = {pk} AND v = 0 ALLOW FILTERING", fetch_size=20) + + res = cql.execute(statement, trace=True) + + def to_list(current_rows): + return list(map(lambda r: tuple(r._asdict().values()), current_rows)) + + assert to_list(res.current_rows) == page1 + assert res.has_more_pages + + res.fetch_next_page() + + assert to_list(res.current_rows)== page2 + assert not res.has_more_pages + + tracing = res.get_all_query_traces(max_wait_sec_per=900) + + assert len(tracing) == 2 + + found_reuse = False + found_drop = False + for event in tracing[1].events: + found_reuse = found_reuse or "Reusing querier" == event.description + found_drop = found_drop or "Dropping querier because" in event.description + + assert found_reuse + assert not found_drop