From d1d53f1b840ef233ee6a52e9cf1e4a8ed17edff3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 15 Jul 2022 10:29:41 +0300 Subject: [PATCH] query: add tombstone-limit to read-command Propagate the tombstone-limit from coordinator to replicas, to make sure all is using the same limit. --- alternator/auth.cc | 3 ++- alternator/executor.cc | 12 ++++++++---- alternator/streams.cc | 2 +- alternator/ttl.cc | 2 +- cdc/log.cc | 3 ++- cql3/statements/cas_request.cc | 3 ++- cql3/statements/modification_statement.cc | 2 +- .../prune_materialized_view_statement.cc | 3 ++- cql3/statements/select_statement.cc | 4 ++++ db/schema_tables.cc | 4 ++-- db/system_keyspace.cc | 6 +++--- db/view/view.cc | 3 ++- idl/read_command.idl.hh | 1 + query-request.hh | 8 +++++++- redis/query_utils.cc | 6 ++++-- service/storage_proxy.cc | 2 +- test/boost/database_test.cc | 14 ++++++++------ test/boost/multishard_mutation_query_test.cc | 4 +++- test/boost/per_partition_rate_limit_test.cc | 2 +- test/boost/querier_cache_test.cc | 2 ++ thrift/handler.cc | 7 ++++--- 21 files changed, 61 insertions(+), 32 deletions(-) diff --git a/alternator/auth.cc b/alternator/auth.cc index 195f8ed2ef..d2b6790329 100644 --- a/alternator/auth.cc +++ b/alternator/auth.cc @@ -133,7 +133,8 @@ future get_key_from_roles(service::storage_proxy& proxy, std::strin } auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col}); auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id}, selection->get_query_options()); - auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice)); + auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, + proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit())); auto cl = auth::password_authenticator::consistency_for_user(username); service::client_state client_state{service::client_state::internal_tag()}; diff --git a/alternator/executor.cc b/alternator/executor.cc index 36ca2c5043..3a8f10f88b 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -1359,7 +1359,8 @@ static lw_shared_ptr previous_item_read_command(service::st auto regular_columns = boost::copy_range( schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; })); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); - return ::make_lw_shared(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice)); + return ::make_lw_shared(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice), + query::tombstone_limit(proxy.get_tombstone_limit())); } static dht::partition_range_vector to_partition_ranges(const schema& schema, const partition_key& pk) { @@ -3063,7 +3064,8 @@ future executor::get_item(client_state& client_st auto selection = cql3::selection::selection::wildcard(schema); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); - auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice)); + auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice), + query::tombstone_limit(_proxy.get_tombstone_limit())); std::unordered_set used_attribute_names; auto attrs_to_get = calculate_attrs_to_get(request, used_attribute_names); @@ -3217,7 +3219,8 @@ future executor::batch_get_item(client_state& cli rs.schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; })); auto selection = cql3::selection::selection::wildcard(rs.schema); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); - auto command = ::make_lw_shared(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice)); + auto command = ::make_lw_shared(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice), + query::tombstone_limit(_proxy.get_tombstone_limit())); command->allow_limit = db::allow_per_partition_rate_limit::yes; future> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then( @@ -3629,7 +3632,8 @@ static future do_query(service::storage_proxy& pr query::partition_slice::option_set opts = selection->get_query_options(); opts.add(custom_opts); auto partition_slice = query::partition_slice(std::move(ck_bounds), std::move(static_columns), std::move(regular_columns), opts); - auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice)); + auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice), + query::tombstone_limit(proxy.get_tombstone_limit())); auto query_state_ptr = std::make_unique(client_state, trace_state, std::move(permit)); diff --git a/alternator/streams.cc b/alternator/streams.cc index 5d5afc9ea8..081052cb29 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -879,7 +879,7 @@ future executor::get_records(client_state& client ++mul; } auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice), - query::row_limit(limit * mul)); + query::tombstone_limit(_proxy.get_tombstone_limit()), query::row_limit(limit * mul)); return _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state)).then( [this, schema, partition_slice = std::move(partition_slice), selection = std::move(selection), start_time = std::move(start_time), limit, key_names = std::move(key_names), attr_names = std::move(attr_names), type, iter, high_ts] (service::storage_proxy::coordinator_query_result qr) mutable { diff --git a/alternator/ttl.cc b/alternator/ttl.cc index 1d4e84aec8..9d7afb905c 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -512,7 +512,7 @@ struct scan_ranges_context { opts.set(); std::vector ck_bounds{query::clustering_range::make_open_ended_both_sides()}; auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), opts); - command = ::make_lw_shared(s->id(), s->version(), partition_slice, proxy.get_max_result_size(partition_slice)); + command = ::make_lw_shared(s->id(), s->version(), partition_slice, proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit())); executor::client_state client_state{executor::client_state::internal_tag()}; tracing::trace_state_ptr trace_state; // NOTICE: empty_service_permit is used because the TTL service has fixed parallelism diff --git a/cdc/log.cc b/cdc/log.cc index 11832f6845..7935681de8 100644 --- a/cdc/log.cc +++ b/cdc/log.cc @@ -1655,7 +1655,8 @@ public: auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), std::move(opts)); const auto max_result_size = _ctx._proxy.get_max_result_size(partition_slice); - auto command = ::make_lw_shared(_schema->id(), _schema->version(), partition_slice, query::max_result_size(max_result_size), query::row_limit(row_limit)); + const auto tombstone_limit = query::tombstone_limit(_ctx._proxy.get_tombstone_limit()); + auto command = ::make_lw_shared(_schema->id(), _schema->version(), partition_slice, query::max_result_size(max_result_size), tombstone_limit, query::row_limit(row_limit)); const auto select_cl = adjust_cl(write_cl); diff --git a/cql3/statements/cas_request.cc b/cql3/statements/cas_request.cc index a25d90a1cd..e439ead574 100644 --- a/cql3/statements/cas_request.cc +++ b/cql3/statements/cas_request.cc @@ -91,7 +91,8 @@ lw_shared_ptr cas_request::read_command(query_processor& qp options.set(query::partition_slice::option::always_return_static_content); query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, options); ps.set_partition_row_limit(max_rows); - return make_lw_shared(_schema->id(), _schema->version(), std::move(ps), qp.proxy().get_max_result_size(ps)); + return make_lw_shared(_schema->id(), _schema->version(), std::move(ps), qp.proxy().get_max_result_size(ps), + query::tombstone_limit(qp.proxy().get_tombstone_limit())); } bool cas_request::applies_to() const { diff --git a/cql3/statements/modification_statement.cc b/cql3/statements/modification_statement.cc index fa33600e33..5901474be5 100644 --- a/cql3/statements/modification_statement.cc +++ b/cql3/statements/modification_statement.cc @@ -211,7 +211,7 @@ modification_statement::read_command(query_processor& qp, query::clustering_row_ } query::partition_slice ps(std::move(ranges), *s, columns_to_read(), update_parameters::options); const auto max_result_size = qp.proxy().get_max_result_size(ps); - return make_lw_shared(s->id(), s->version(), std::move(ps), query::max_result_size(max_result_size)); + return make_lw_shared(s->id(), s->version(), std::move(ps), query::max_result_size(max_result_size), query::tombstone_limit::max); } std::vector diff --git a/cql3/statements/prune_materialized_view_statement.cc b/cql3/statements/prune_materialized_view_statement.cc index f80bcc9326..f705b54f4a 100644 --- a/cql3/statements/prune_materialized_view_statement.cc +++ b/cql3/statements/prune_materialized_view_statement.cc @@ -32,7 +32,8 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges, auto selection = cql3::selection::selection::for_columns(view, key_columns); query::partition_slice partition_slice(std::move(clustering_bounds), {}, {}, selection->get_query_options()); - auto command = ::make_lw_shared(view->id(), view->version(), partition_slice, proxy.get_max_result_size(partition_slice)); + auto command = ::make_lw_shared(view->id(), view->version(), partition_slice, proxy.get_max_result_size(partition_slice), + query::tombstone_limit(proxy.get_tombstone_limit())); tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges); diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index aab8db8207..b97c047c55 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -353,6 +353,7 @@ select_statement::do_execute(query_processor& qp, _schema->version(), std::move(slice), max_result_size, + query::tombstone_limit(qp.proxy().get_tombstone_limit()), query::row_limit(limit), query::partition_limit(query::max_partitions), now, @@ -524,6 +525,7 @@ indexed_table_select_statement::prepare_command_for_base_query(query_processor& _schema->version(), std::move(slice), qp.proxy().get_max_result_size(slice), + query::tombstone_limit(qp.proxy().get_tombstone_limit()), query::row_limit(get_limit(options)), query::partition_limit(query::max_partitions), now, @@ -1237,6 +1239,7 @@ indexed_table_select_statement::read_posting_list(query_processor& qp, _view_schema->version(), partition_slice, qp.proxy().get_max_result_size(partition_slice), + query::tombstone_limit(qp.proxy().get_tombstone_limit()), query::row_limit(limit), query::partition_limit(query::max_partitions), now, @@ -1481,6 +1484,7 @@ parallelized_select_statement::do_execute( _schema->version(), std::move(slice), qp.proxy().get_max_result_size(slice), + query::tombstone_limit(qp.proxy().get_tombstone_limit()), query::row_limit(query::max_rows), query::partition_limit(query::max_partitions), now, diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 490b0e742d..17a07273ac 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -869,7 +869,7 @@ read_schema_partition_for_table(distributed& proxy, sche .with_range(std::move(clustering_range)) .build(); auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), - query::row_limit(query::max_rows)); + query::tombstone_limit::max, query::row_limit(query::max_rows)); co_return co_await query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key)); } @@ -878,7 +878,7 @@ read_keyspace_mutation(distributed& proxy, const sstring schema_ptr s = keyspaces(); auto key = partition_key::from_singular(*s, keyspace_name); auto slice = s->full_slice(); - auto cmd = make_lw_shared(s->id(), s->version(), std::move(slice), proxy.local().get_max_result_size(slice)); + auto cmd = make_lw_shared(s->id(), s->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); co_return co_await query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key)); } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 04886ac126..da9ea151a7 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2779,7 +2779,7 @@ system_keyspace::query_mutations(distributed& proxy, con replica::database& db = proxy.local().get_db().local(); schema_ptr schema = db.find_schema(ks_name, cf_name); auto slice = partition_slice_builder(*schema).build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice)); + auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), query::full_partition_range, db::no_timeout) .then([] (rpc::tuple>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); }); } @@ -2789,7 +2789,7 @@ system_keyspace::query(distributed& proxy, const sstring replica::database& db = proxy.local().get_db().local(); schema_ptr schema = db.find_schema(ks_name, cf_name); auto slice = partition_slice_builder(*schema).build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice)); + auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); return proxy.local().query(schema, cmd, {query::full_partition_range}, db::consistency_level::ONE, {db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) { return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result)); @@ -2804,7 +2804,7 @@ system_keyspace::query(distributed& proxy, const sstring auto slice = partition_slice_builder(*schema) .with_range(std::move(row_range)) .build(); - auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice)); + auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max); return proxy.local().query(schema, cmd, {dht::partition_range::make_singular(key)}, db::consistency_level::ONE, {db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) { diff --git a/db/view/view.cc b/db/view/view.cc index b17a00f2f1..cc1a832597 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2265,7 +2265,8 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q std::vector bounds{query::clustering_range::make_singular(base_ck)}; query::partition_slice partition_slice(std::move(bounds), {}, {}, selection->get_query_options()); - auto command = ::make_lw_shared(_base_schema->id(), _base_schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice)); + auto command = ::make_lw_shared(_base_schema->id(), _base_schema->version(), partition_slice, + _proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit())); auto timeout = db::timeout_clock::now() + _timeout_duration; service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()}; auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get0(); diff --git a/idl/read_command.idl.hh b/idl/read_command.idl.hh index 24687df5b6..5b263fb643 100644 --- a/idl/read_command.idl.hh +++ b/idl/read_command.idl.hh @@ -58,6 +58,7 @@ class read_command { query::is_first_page is_first_page [[version 2.2]] = query::is_first_page::no; std::optional max_result_size [[version 4.3]] = std::nullopt; uint32_t row_limit_high_bits [[version 4.3]] = 0; + uint64_t tombstone_limit [[version 5.2]] = query::max_tombstones; }; } diff --git a/query-request.hh b/query-request.hh index 3b751d6d7d..b8474377df 100644 --- a/query-request.hh +++ b/query-request.hh @@ -306,6 +306,8 @@ public: // the remote doesn't send it. std::optional max_result_size; uint32_t row_limit_high_bits; + // Cut the page after processing this many tombstones (even if the page is empty). + uint64_t tombstone_limit; api::timestamp_type read_timestamp; // not serialized db::allow_per_partition_rate_limit allow_limit; // not serialized public: @@ -320,7 +322,8 @@ public: query_id query_uuid, query::is_first_page is_first_page, std::optional max_result_size, - uint32_t row_limit_high_bits) + uint32_t row_limit_high_bits, + uint64_t tombstone_limit) : cf_id(std::move(cf_id)) , schema_version(std::move(schema_version)) , slice(std::move(slice)) @@ -332,6 +335,7 @@ public: , is_first_page(is_first_page) , max_result_size(max_result_size) , row_limit_high_bits(row_limit_high_bits) + , tombstone_limit(tombstone_limit) , read_timestamp(api::new_timestamp()) , allow_limit(db::allow_per_partition_rate_limit::no) { } @@ -340,6 +344,7 @@ public: table_schema_version schema_version, partition_slice slice, query::max_result_size max_result_size, + query::tombstone_limit tombstone_limit, query::row_limit row_limit = query::row_limit::max, query::partition_limit partition_limit = query::partition_limit::max, gc_clock::time_point now = gc_clock::now(), @@ -359,6 +364,7 @@ public: , is_first_page(is_first_page) , max_result_size(max_result_size) , row_limit_high_bits(static_cast(static_cast(row_limit) >> 32)) + , tombstone_limit(static_cast(tombstone_limit)) , read_timestamp(rt) , allow_limit(allow_limit) { } diff --git a/redis/query_utils.cc b/redis/query_utils.cc index 8e7297a84c..9bd5947621 100644 --- a/redis/query_utils.cc +++ b/redis/query_utils.cc @@ -70,7 +70,8 @@ future> read_strings(service::storage_proxy& proxy future> query_strings(service::storage_proxy& proxy, const redis_options& options, const bytes& key, service_permit permit, schema_ptr schema, query::partition_slice ps) { const auto max_result_size = proxy.get_max_result_size(ps); - query::read_command cmd(schema->id(), schema->version(), ps, max_result_size, query::row_limit(1), query::partition_limit(1), gc_clock::now(), std::nullopt, query_id::create_null_id(), query::is_first_page::no); + const auto max_tombstones = proxy.get_tombstone_limit(); + query::read_command cmd(schema->id(), schema->version(), ps, max_result_size, max_tombstones, query::row_limit(1), query::partition_limit(1), gc_clock::now(), std::nullopt, query_id::create_null_id(), query::is_first_page::no); auto pkey = partition_key::from_single_value(*schema, key); auto partition_range = dht::partition_range::make_singular(dht::decorate_key(*schema, std::move(pkey))); dht::partition_range_vector partition_ranges; @@ -146,7 +147,8 @@ future>> read_hashes(service::storage_proxy future>> query_hashes(service::storage_proxy& proxy, const redis_options& options, const bytes& key, service_permit permit, schema_ptr schema, query::partition_slice ps) { const auto max_result_size = proxy.get_max_result_size(ps); - query::read_command cmd(schema->id(), schema->version(), ps, max_result_size, query::row_limit::max, query::partition_limit(1), gc_clock::now(), std::nullopt, query_id::create_null_id(), query::is_first_page::no); + const auto max_tombstones = proxy.get_tombstone_limit(); + query::read_command cmd(schema->id(), schema->version(), ps, max_result_size, max_tombstones, query::row_limit::max, query::partition_limit(1), gc_clock::now(), std::nullopt, query_id::create_null_id(), query::is_first_page::no); auto pkey = partition_key::from_single_value(*schema, key); auto partition_range = dht::partition_range::make_singular(dht::decorate_key(*schema, std::move(pkey))); dht::partition_range_vector partition_ranges; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 0e88450f4b..d512894b88 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -5526,7 +5526,7 @@ static lw_shared_ptr read_nothing_read_command(schema_ptr s // storage_proxy::query() returns immediately - without any networking. auto partition_slice = query::partition_slice({}, {}, {}, query::partition_slice::option_set()); return ::make_lw_shared(schema->id(), schema->version(), partition_slice, - query::max_result_size(query::result_memory_limiter::unlimited_result_size)); + query::max_result_size(query::result_memory_limiter::unlimited_result_size), query::tombstone_limit::max); } static read_timeout_exception write_timeout_to_read(schema_ptr s, mutation_write_timeout_exception& ex) { diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 35464d1886..fca8d8b152 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -92,7 +92,8 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) { auto assert_query_result = [&] (const std::vector& expected_sizes) { auto max_size = std::numeric_limits::max(); - auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), query::row_limit(1000)); + auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), + query::tombstone_limit::max, query::row_limit(1000)); e.db().invoke_on_all([&] (replica::database& db) -> future<> { auto shard = this_shard_id(); auto s = db.find_schema(uuid); @@ -192,7 +193,8 @@ SEASTAR_TEST_CASE(test_querying_with_limits) { auto max_size = std::numeric_limits::max(); { - auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), query::row_limit(3)); + auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), + query::tombstone_limit::max, query::row_limit(3)); e.db().invoke_on_all([&] (replica::database& db) -> future<> { auto shard = this_shard_id(); auto s = db.find_schema(uuid); @@ -204,7 +206,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) { { auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), - query::row_limit(query::max_rows), query::partition_limit(5)); + query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(5)); e.db().invoke_on_all([&] (replica::database& db) -> future<> { auto shard = this_shard_id(); auto s = db.find_schema(uuid); @@ -216,7 +218,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) { { auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), - query::row_limit(query::max_rows), query::partition_limit(3)); + query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(3)); e.db().invoke_on_all([&] (replica::database& db) -> future<> { auto shard = this_shard_id(); auto s = db.find_schema(uuid); @@ -885,7 +887,7 @@ SEASTAR_THREAD_TEST_CASE(read_max_size) { } else { slice.options.remove(); } - query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size)); + query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size), query::tombstone_limit::max); try { auto size = query_method(s, cmd).get0(); // Just to ensure we are not interpreting empty results as success. @@ -963,7 +965,7 @@ SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) { testlog.info("checking: query_method={}", query_method_name); auto slice = s->full_slice(); slice.options.remove(); - query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size)); + query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size), query::tombstone_limit::max); try { auto size = query_method(s, cmd).get0(); // Just to ensure we are not interpreting empty results as success. diff --git a/test/boost/multishard_mutation_query_test.cc b/test/boost/multishard_mutation_query_test.cc index 671be6ccf2..334a582284 100644 --- a/test/boost/multishard_mutation_query_test.cc +++ b/test/boost/multishard_mutation_query_test.cc @@ -77,6 +77,7 @@ SEASTAR_THREAD_TEST_CASE(test_abandoned_read) { s->version(), s->full_slice(), query::max_result_size(query::result_memory_limiter::unlimited_result_size), + query::tombstone_limit::max, query::row_limit(7), query::partition_limit::max, gc_clock::now(), @@ -106,7 +107,7 @@ static std::vector read_all_partitions_one_by_one(distributedid(), s->version(), slice, - query::max_result_size(query::result_memory_limiter::unlimited_result_size)); + query::max_result_size(query::result_memory_limiter::unlimited_result_size), query::tombstone_limit::max); const auto range = dht::partition_range::make_singular(pkey); return make_foreign(std::make_unique( std::get<0>(db.query_mutations(std::move(s), cmd, range, nullptr, db::no_timeout).get0()))); @@ -137,6 +138,7 @@ read_partitions_with_generic_paged_scan(distributed& db, sche s->version(), slice, query::max_result_size(max_size), + query::tombstone_limit::max, query::row_limit(page_size), query::partition_limit::max, gc_clock::now(), diff --git a/test/boost/per_partition_rate_limit_test.cc b/test/boost/per_partition_rate_limit_test.cc index 0c52e310f3..1b0fed75e4 100644 --- a/test/boost/per_partition_rate_limit_test.cc +++ b/test/boost/per_partition_rate_limit_test.cc @@ -57,7 +57,7 @@ SEASTAR_TEST_CASE(test_internal_operation_filtering) { auto partition_slice = query::partition_slice( {query::clustering_range::make_open_ended_both_sides()}, {}, {}, std::move(opts)); - auto cmd = make_lw_shared(sptr->id(), sptr->version(), partition_slice, query::max_result_size(1), query::row_limit(1)); + auto cmd = make_lw_shared(sptr->id(), sptr->version(), partition_slice, query::max_result_size(1), query::tombstone_limit::max, query::row_limit(1)); cmd->allow_limit = allow_limit; // Rejection is probabilistic, so try many times diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index a9ae119a63..9da5956d74 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -659,6 +659,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { s->version(), slice, query::max_result_size(1024 * 1024), + query::tombstone_limit::max, query::row_limit(1), query::partition_limit(1), gc_clock::now(), @@ -688,6 +689,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) { s->version(), slice, query::max_result_size(1024 * 1024), + query::tombstone_limit::max, query::row_limit(1), query::partition_limit(1), gc_clock::now(), diff --git a/thrift/handler.cc b/thrift/handler.cc index cfbbcb7e44..a3ace77077 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -406,7 +406,7 @@ public: auto slice = query::partition_slice(std::move(clustering_ranges), { }, std::move(regular_columns), opts, std::move(specific_ranges), cql_serialization_format::internal()); auto cmd = make_lw_shared(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice), - query::row_limit(row_limit), query::partition_limit(partition_limit)); + query::tombstone_limit(proxy.get_tombstone_limit()), query::row_limit(row_limit), query::partition_limit(partition_limit)); cmd->allow_limit = db::allow_per_partition_rate_limit::yes; return cmd; } @@ -693,7 +693,7 @@ public: auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts, nullptr); auto& proxy = _proxy.local(); auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.get_max_result_size(slice), - query::row_limit(row_limit)); + query::tombstone_limit(proxy.get_tombstone_limit()), query::row_limit(row_limit)); cmd->allow_limit = db::allow_per_partition_rate_limit::yes; auto f = _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::SELECT); return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level, permit = std::move(permit)] () mutable { @@ -1614,7 +1614,8 @@ private: } auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts, nullptr, cql_serialization_format::internal(), per_partition_row_limit); - auto cmd = make_lw_shared(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice)); + auto cmd = make_lw_shared(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice), + query::tombstone_limit(proxy.get_tombstone_limit())); cmd->allow_limit = db::allow_per_partition_rate_limit::yes; return cmd; }