query: add tombstone-limit to read-command
Propagate the tombstone-limit from coordinator to replicas, to make sure all is using the same limit.
This commit is contained in:
@@ -133,7 +133,8 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::strin
|
||||
}
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice));
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
|
||||
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
|
||||
service::client_state client_state{service::client_state::internal_tag()};
|
||||
|
||||
@@ -1359,7 +1359,8 @@ static lw_shared_ptr<query::read_command> previous_item_read_command(service::st
|
||||
auto regular_columns = boost::copy_range<query::column_id_vector>(
|
||||
schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; }));
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options());
|
||||
return ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice));
|
||||
return ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
}
|
||||
|
||||
static dht::partition_range_vector to_partition_ranges(const schema& schema, const partition_key& pk) {
|
||||
@@ -3063,7 +3064,8 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
|
||||
auto selection = cql3::selection::selection::wildcard(schema);
|
||||
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice));
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(_proxy.get_tombstone_limit()));
|
||||
|
||||
std::unordered_set<std::string> used_attribute_names;
|
||||
auto attrs_to_get = calculate_attrs_to_get(request, used_attribute_names);
|
||||
@@ -3217,7 +3219,8 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
rs.schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; }));
|
||||
auto selection = cql3::selection::selection::wildcard(rs.schema);
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice));
|
||||
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(_proxy.get_tombstone_limit()));
|
||||
command->allow_limit = db::allow_per_partition_rate_limit::yes;
|
||||
future<std::vector<rjson::value>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
|
||||
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
|
||||
@@ -3629,7 +3632,8 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
query::partition_slice::option_set opts = selection->get_query_options();
|
||||
opts.add(custom_opts);
|
||||
auto partition_slice = query::partition_slice(std::move(ck_bounds), std::move(static_columns), std::move(regular_columns), opts);
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice));
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
|
||||
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit));
|
||||
|
||||
|
||||
@@ -879,7 +879,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
++mul;
|
||||
}
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
|
||||
query::row_limit(limit * mul));
|
||||
query::tombstone_limit(_proxy.get_tombstone_limit()), query::row_limit(limit * mul));
|
||||
|
||||
return _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state)).then(
|
||||
[this, schema, partition_slice = std::move(partition_slice), selection = std::move(selection), start_time = std::move(start_time), limit, key_names = std::move(key_names), attr_names = std::move(attr_names), type, iter, high_ts] (service::storage_proxy::coordinator_query_result qr) mutable {
|
||||
|
||||
@@ -512,7 +512,7 @@ struct scan_ranges_context {
|
||||
opts.set<query::partition_slice::option::bypass_cache>();
|
||||
std::vector<query::clustering_range> ck_bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||
auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), opts);
|
||||
command = ::make_lw_shared<query::read_command>(s->id(), s->version(), partition_slice, proxy.get_max_result_size(partition_slice));
|
||||
command = ::make_lw_shared<query::read_command>(s->id(), s->version(), partition_slice, proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
executor::client_state client_state{executor::client_state::internal_tag()};
|
||||
tracing::trace_state_ptr trace_state;
|
||||
// NOTICE: empty_service_permit is used because the TTL service has fixed parallelism
|
||||
|
||||
@@ -1655,7 +1655,8 @@ public:
|
||||
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), std::move(static_columns), std::move(regular_columns), std::move(opts));
|
||||
const auto max_result_size = _ctx._proxy.get_max_result_size(partition_slice);
|
||||
auto command = ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), partition_slice, query::max_result_size(max_result_size), query::row_limit(row_limit));
|
||||
const auto tombstone_limit = query::tombstone_limit(_ctx._proxy.get_tombstone_limit());
|
||||
auto command = ::make_lw_shared<query::read_command>(_schema->id(), _schema->version(), partition_slice, query::max_result_size(max_result_size), tombstone_limit, query::row_limit(row_limit));
|
||||
|
||||
const auto select_cl = adjust_cl(write_cl);
|
||||
|
||||
|
||||
@@ -91,7 +91,8 @@ lw_shared_ptr<query::read_command> cas_request::read_command(query_processor& qp
|
||||
options.set(query::partition_slice::option::always_return_static_content);
|
||||
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, options);
|
||||
ps.set_partition_row_limit(max_rows);
|
||||
return make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(ps), qp.proxy().get_max_result_size(ps));
|
||||
return make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(ps), qp.proxy().get_max_result_size(ps),
|
||||
query::tombstone_limit(qp.proxy().get_tombstone_limit()));
|
||||
}
|
||||
|
||||
bool cas_request::applies_to() const {
|
||||
|
||||
@@ -211,7 +211,7 @@ modification_statement::read_command(query_processor& qp, query::clustering_row_
|
||||
}
|
||||
query::partition_slice ps(std::move(ranges), *s, columns_to_read(), update_parameters::options);
|
||||
const auto max_result_size = qp.proxy().get_max_result_size(ps);
|
||||
return make_lw_shared<query::read_command>(s->id(), s->version(), std::move(ps), query::max_result_size(max_result_size));
|
||||
return make_lw_shared<query::read_command>(s->id(), s->version(), std::move(ps), query::max_result_size(max_result_size), query::tombstone_limit::max);
|
||||
}
|
||||
|
||||
std::vector<query::clustering_range>
|
||||
|
||||
@@ -32,7 +32,8 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges,
|
||||
auto selection = cql3::selection::selection::for_columns(view, key_columns);
|
||||
|
||||
query::partition_slice partition_slice(std::move(clustering_bounds), {}, {}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(view->id(), view->version(), partition_slice, proxy.get_max_result_size(partition_slice));
|
||||
auto command = ::make_lw_shared<query::read_command>(view->id(), view->version(), partition_slice, proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
|
||||
tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges);
|
||||
|
||||
|
||||
@@ -353,6 +353,7 @@ select_statement::do_execute(query_processor& qp,
|
||||
_schema->version(),
|
||||
std::move(slice),
|
||||
max_result_size,
|
||||
query::tombstone_limit(qp.proxy().get_tombstone_limit()),
|
||||
query::row_limit(limit),
|
||||
query::partition_limit(query::max_partitions),
|
||||
now,
|
||||
@@ -524,6 +525,7 @@ indexed_table_select_statement::prepare_command_for_base_query(query_processor&
|
||||
_schema->version(),
|
||||
std::move(slice),
|
||||
qp.proxy().get_max_result_size(slice),
|
||||
query::tombstone_limit(qp.proxy().get_tombstone_limit()),
|
||||
query::row_limit(get_limit(options)),
|
||||
query::partition_limit(query::max_partitions),
|
||||
now,
|
||||
@@ -1237,6 +1239,7 @@ indexed_table_select_statement::read_posting_list(query_processor& qp,
|
||||
_view_schema->version(),
|
||||
partition_slice,
|
||||
qp.proxy().get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(qp.proxy().get_tombstone_limit()),
|
||||
query::row_limit(limit),
|
||||
query::partition_limit(query::max_partitions),
|
||||
now,
|
||||
@@ -1481,6 +1484,7 @@ parallelized_select_statement::do_execute(
|
||||
_schema->version(),
|
||||
std::move(slice),
|
||||
qp.proxy().get_max_result_size(slice),
|
||||
query::tombstone_limit(qp.proxy().get_tombstone_limit()),
|
||||
query::row_limit(query::max_rows),
|
||||
query::partition_limit(query::max_partitions),
|
||||
now,
|
||||
|
||||
@@ -869,7 +869,7 @@ read_schema_partition_for_table(distributed<service::storage_proxy>& proxy, sche
|
||||
.with_range(std::move(clustering_range))
|
||||
.build();
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice),
|
||||
query::row_limit(query::max_rows));
|
||||
query::tombstone_limit::max, query::row_limit(query::max_rows));
|
||||
co_return co_await query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key));
|
||||
}
|
||||
|
||||
@@ -878,7 +878,7 @@ read_keyspace_mutation(distributed<service::storage_proxy>& proxy, const sstring
|
||||
schema_ptr s = keyspaces();
|
||||
auto key = partition_key::from_singular(*s, keyspace_name);
|
||||
auto slice = s->full_slice();
|
||||
auto cmd = make_lw_shared<query::read_command>(s->id(), s->version(), std::move(slice), proxy.local().get_max_result_size(slice));
|
||||
auto cmd = make_lw_shared<query::read_command>(s->id(), s->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
|
||||
co_return co_await query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key));
|
||||
}
|
||||
|
||||
|
||||
@@ -2779,7 +2779,7 @@ system_keyspace::query_mutations(distributed<service::storage_proxy>& proxy, con
|
||||
replica::database& db = proxy.local().get_db().local();
|
||||
schema_ptr schema = db.find_schema(ks_name, cf_name);
|
||||
auto slice = partition_slice_builder(*schema).build();
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice));
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
|
||||
return proxy.local().query_mutations_locally(std::move(schema), std::move(cmd), query::full_partition_range, db::no_timeout)
|
||||
.then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> rr_ht) { return std::get<0>(std::move(rr_ht)); });
|
||||
}
|
||||
@@ -2789,7 +2789,7 @@ system_keyspace::query(distributed<service::storage_proxy>& proxy, const sstring
|
||||
replica::database& db = proxy.local().get_db().local();
|
||||
schema_ptr schema = db.find_schema(ks_name, cf_name);
|
||||
auto slice = partition_slice_builder(*schema).build();
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice));
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
|
||||
return proxy.local().query(schema, cmd, {query::full_partition_range}, db::consistency_level::ONE,
|
||||
{db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) {
|
||||
return make_lw_shared<query::result_set>(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result));
|
||||
@@ -2804,7 +2804,7 @@ system_keyspace::query(distributed<service::storage_proxy>& proxy, const sstring
|
||||
auto slice = partition_slice_builder(*schema)
|
||||
.with_range(std::move(row_range))
|
||||
.build();
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice));
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice), query::tombstone_limit::max);
|
||||
|
||||
return proxy.local().query(schema, cmd, {dht::partition_range::make_singular(key)}, db::consistency_level::ONE,
|
||||
{db::no_timeout, empty_service_permit(), service::client_state::for_internal_calls(), nullptr}).then([schema, cmd] (auto&& qr) {
|
||||
|
||||
@@ -2265,7 +2265,8 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
|
||||
|
||||
std::vector<query::clustering_range> bounds{query::clustering_range::make_singular(base_ck)};
|
||||
query::partition_slice partition_slice(std::move(bounds), {}, {}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(_base_schema->id(), _base_schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice));
|
||||
auto command = ::make_lw_shared<query::read_command>(_base_schema->id(), _base_schema->version(), partition_slice,
|
||||
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
|
||||
auto timeout = db::timeout_clock::now() + _timeout_duration;
|
||||
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
|
||||
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get0();
|
||||
|
||||
@@ -58,6 +58,7 @@ class read_command {
|
||||
query::is_first_page is_first_page [[version 2.2]] = query::is_first_page::no;
|
||||
std::optional<query::max_result_size> max_result_size [[version 4.3]] = std::nullopt;
|
||||
uint32_t row_limit_high_bits [[version 4.3]] = 0;
|
||||
uint64_t tombstone_limit [[version 5.2]] = query::max_tombstones;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -306,6 +306,8 @@ public:
|
||||
// the remote doesn't send it.
|
||||
std::optional<query::max_result_size> max_result_size;
|
||||
uint32_t row_limit_high_bits;
|
||||
// Cut the page after processing this many tombstones (even if the page is empty).
|
||||
uint64_t tombstone_limit;
|
||||
api::timestamp_type read_timestamp; // not serialized
|
||||
db::allow_per_partition_rate_limit allow_limit; // not serialized
|
||||
public:
|
||||
@@ -320,7 +322,8 @@ public:
|
||||
query_id query_uuid,
|
||||
query::is_first_page is_first_page,
|
||||
std::optional<query::max_result_size> max_result_size,
|
||||
uint32_t row_limit_high_bits)
|
||||
uint32_t row_limit_high_bits,
|
||||
uint64_t tombstone_limit)
|
||||
: cf_id(std::move(cf_id))
|
||||
, schema_version(std::move(schema_version))
|
||||
, slice(std::move(slice))
|
||||
@@ -332,6 +335,7 @@ public:
|
||||
, is_first_page(is_first_page)
|
||||
, max_result_size(max_result_size)
|
||||
, row_limit_high_bits(row_limit_high_bits)
|
||||
, tombstone_limit(tombstone_limit)
|
||||
, read_timestamp(api::new_timestamp())
|
||||
, allow_limit(db::allow_per_partition_rate_limit::no)
|
||||
{ }
|
||||
@@ -340,6 +344,7 @@ public:
|
||||
table_schema_version schema_version,
|
||||
partition_slice slice,
|
||||
query::max_result_size max_result_size,
|
||||
query::tombstone_limit tombstone_limit,
|
||||
query::row_limit row_limit = query::row_limit::max,
|
||||
query::partition_limit partition_limit = query::partition_limit::max,
|
||||
gc_clock::time_point now = gc_clock::now(),
|
||||
@@ -359,6 +364,7 @@ public:
|
||||
, is_first_page(is_first_page)
|
||||
, max_result_size(max_result_size)
|
||||
, row_limit_high_bits(static_cast<uint32_t>(static_cast<uint64_t>(row_limit) >> 32))
|
||||
, tombstone_limit(static_cast<uint64_t>(tombstone_limit))
|
||||
, read_timestamp(rt)
|
||||
, allow_limit(allow_limit)
|
||||
{ }
|
||||
|
||||
@@ -70,7 +70,8 @@ future<lw_shared_ptr<strings_result>> read_strings(service::storage_proxy& proxy
|
||||
|
||||
future<lw_shared_ptr<strings_result>> query_strings(service::storage_proxy& proxy, const redis_options& options, const bytes& key, service_permit permit, schema_ptr schema, query::partition_slice ps) {
|
||||
const auto max_result_size = proxy.get_max_result_size(ps);
|
||||
query::read_command cmd(schema->id(), schema->version(), ps, max_result_size, query::row_limit(1), query::partition_limit(1), gc_clock::now(), std::nullopt, query_id::create_null_id(), query::is_first_page::no);
|
||||
const auto max_tombstones = proxy.get_tombstone_limit();
|
||||
query::read_command cmd(schema->id(), schema->version(), ps, max_result_size, max_tombstones, query::row_limit(1), query::partition_limit(1), gc_clock::now(), std::nullopt, query_id::create_null_id(), query::is_first_page::no);
|
||||
auto pkey = partition_key::from_single_value(*schema, key);
|
||||
auto partition_range = dht::partition_range::make_singular(dht::decorate_key(*schema, std::move(pkey)));
|
||||
dht::partition_range_vector partition_ranges;
|
||||
@@ -146,7 +147,8 @@ future<lw_shared_ptr<std::map<bytes, bytes>>> read_hashes(service::storage_proxy
|
||||
|
||||
future<lw_shared_ptr<std::map<bytes, bytes>>> query_hashes(service::storage_proxy& proxy, const redis_options& options, const bytes& key, service_permit permit, schema_ptr schema, query::partition_slice ps) {
|
||||
const auto max_result_size = proxy.get_max_result_size(ps);
|
||||
query::read_command cmd(schema->id(), schema->version(), ps, max_result_size, query::row_limit::max, query::partition_limit(1), gc_clock::now(), std::nullopt, query_id::create_null_id(), query::is_first_page::no);
|
||||
const auto max_tombstones = proxy.get_tombstone_limit();
|
||||
query::read_command cmd(schema->id(), schema->version(), ps, max_result_size, max_tombstones, query::row_limit::max, query::partition_limit(1), gc_clock::now(), std::nullopt, query_id::create_null_id(), query::is_first_page::no);
|
||||
auto pkey = partition_key::from_single_value(*schema, key);
|
||||
auto partition_range = dht::partition_range::make_singular(dht::decorate_key(*schema, std::move(pkey)));
|
||||
dht::partition_range_vector partition_ranges;
|
||||
|
||||
@@ -5526,7 +5526,7 @@ static lw_shared_ptr<query::read_command> read_nothing_read_command(schema_ptr s
|
||||
// storage_proxy::query() returns immediately - without any networking.
|
||||
auto partition_slice = query::partition_slice({}, {}, {}, query::partition_slice::option_set());
|
||||
return ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
|
||||
query::max_result_size(query::result_memory_limiter::unlimited_result_size));
|
||||
query::max_result_size(query::result_memory_limiter::unlimited_result_size), query::tombstone_limit::max);
|
||||
}
|
||||
|
||||
static read_timeout_exception write_timeout_to_read(schema_ptr s, mutation_write_timeout_exception& ex) {
|
||||
|
||||
@@ -92,7 +92,8 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
|
||||
|
||||
auto assert_query_result = [&] (const std::vector<size_t>& expected_sizes) {
|
||||
auto max_size = std::numeric_limits<size_t>::max();
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), query::row_limit(1000));
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
||||
query::tombstone_limit::max, query::row_limit(1000));
|
||||
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
auto shard = this_shard_id();
|
||||
auto s = db.find_schema(uuid);
|
||||
@@ -192,7 +193,8 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
|
||||
auto max_size = std::numeric_limits<size_t>::max();
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size), query::row_limit(3));
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
||||
query::tombstone_limit::max, query::row_limit(3));
|
||||
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
auto shard = this_shard_id();
|
||||
auto s = db.find_schema(uuid);
|
||||
@@ -204,7 +206,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
||||
query::row_limit(query::max_rows), query::partition_limit(5));
|
||||
query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(5));
|
||||
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
auto shard = this_shard_id();
|
||||
auto s = db.find_schema(uuid);
|
||||
@@ -216,7 +218,7 @@ SEASTAR_TEST_CASE(test_querying_with_limits) {
|
||||
|
||||
{
|
||||
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
||||
query::row_limit(query::max_rows), query::partition_limit(3));
|
||||
query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(3));
|
||||
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
auto shard = this_shard_id();
|
||||
auto s = db.find_schema(uuid);
|
||||
@@ -885,7 +887,7 @@ SEASTAR_THREAD_TEST_CASE(read_max_size) {
|
||||
} else {
|
||||
slice.options.remove<query::partition_slice::option::allow_short_read>();
|
||||
}
|
||||
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size));
|
||||
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size), query::tombstone_limit::max);
|
||||
try {
|
||||
auto size = query_method(s, cmd).get0();
|
||||
// Just to ensure we are not interpreting empty results as success.
|
||||
@@ -963,7 +965,7 @@ SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) {
|
||||
testlog.info("checking: query_method={}", query_method_name);
|
||||
auto slice = s->full_slice();
|
||||
slice.options.remove<query::partition_slice::option::allow_short_read>();
|
||||
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size));
|
||||
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size), query::tombstone_limit::max);
|
||||
try {
|
||||
auto size = query_method(s, cmd).get0();
|
||||
// Just to ensure we are not interpreting empty results as success.
|
||||
|
||||
@@ -77,6 +77,7 @@ SEASTAR_THREAD_TEST_CASE(test_abandoned_read) {
|
||||
s->version(),
|
||||
s->full_slice(),
|
||||
query::max_result_size(query::result_memory_limiter::unlimited_result_size),
|
||||
query::tombstone_limit::max,
|
||||
query::row_limit(7),
|
||||
query::partition_limit::max,
|
||||
gc_clock::now(),
|
||||
@@ -106,7 +107,7 @@ static std::vector<mutation> read_all_partitions_one_by_one(distributed<replica:
|
||||
const auto res = db.invoke_on(sharder.shard_of(pkey.token()), [gs = global_schema_ptr(s), &pkey, &slice] (replica::database& db) {
|
||||
return async([s = gs.get(), &pkey, &slice, &db] () mutable {
|
||||
const auto cmd = query::read_command(s->id(), s->version(), slice,
|
||||
query::max_result_size(query::result_memory_limiter::unlimited_result_size));
|
||||
query::max_result_size(query::result_memory_limiter::unlimited_result_size), query::tombstone_limit::max);
|
||||
const auto range = dht::partition_range::make_singular(pkey);
|
||||
return make_foreign(std::make_unique<reconcilable_result>(
|
||||
std::get<0>(db.query_mutations(std::move(s), cmd, range, nullptr, db::no_timeout).get0())));
|
||||
@@ -137,6 +138,7 @@ read_partitions_with_generic_paged_scan(distributed<replica::database>& db, sche
|
||||
s->version(),
|
||||
slice,
|
||||
query::max_result_size(max_size),
|
||||
query::tombstone_limit::max,
|
||||
query::row_limit(page_size),
|
||||
query::partition_limit::max,
|
||||
gc_clock::now(),
|
||||
|
||||
@@ -57,7 +57,7 @@ SEASTAR_TEST_CASE(test_internal_operation_filtering) {
|
||||
auto partition_slice = query::partition_slice(
|
||||
{query::clustering_range::make_open_ended_both_sides()}, {}, {}, std::move(opts));
|
||||
|
||||
auto cmd = make_lw_shared<query::read_command>(sptr->id(), sptr->version(), partition_slice, query::max_result_size(1), query::row_limit(1));
|
||||
auto cmd = make_lw_shared<query::read_command>(sptr->id(), sptr->version(), partition_slice, query::max_result_size(1), query::tombstone_limit::max, query::row_limit(1));
|
||||
cmd->allow_limit = allow_limit;
|
||||
|
||||
// Rejection is probabilistic, so try many times
|
||||
|
||||
@@ -659,6 +659,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
s->version(),
|
||||
slice,
|
||||
query::max_result_size(1024 * 1024),
|
||||
query::tombstone_limit::max,
|
||||
query::row_limit(1),
|
||||
query::partition_limit(1),
|
||||
gc_clock::now(),
|
||||
@@ -688,6 +689,7 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
s->version(),
|
||||
slice,
|
||||
query::max_result_size(1024 * 1024),
|
||||
query::tombstone_limit::max,
|
||||
query::row_limit(1),
|
||||
query::partition_limit(1),
|
||||
gc_clock::now(),
|
||||
|
||||
@@ -406,7 +406,7 @@ public:
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), { }, std::move(regular_columns), opts,
|
||||
std::move(specific_ranges), cql_serialization_format::internal());
|
||||
auto cmd = make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice),
|
||||
query::row_limit(row_limit), query::partition_limit(partition_limit));
|
||||
query::tombstone_limit(proxy.get_tombstone_limit()), query::row_limit(row_limit), query::partition_limit(partition_limit));
|
||||
cmd->allow_limit = db::allow_per_partition_rate_limit::yes;
|
||||
return cmd;
|
||||
}
|
||||
@@ -693,7 +693,7 @@ public:
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts, nullptr);
|
||||
auto& proxy = _proxy.local();
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.get_max_result_size(slice),
|
||||
query::row_limit(row_limit));
|
||||
query::tombstone_limit(proxy.get_tombstone_limit()), query::row_limit(row_limit));
|
||||
cmd->allow_limit = db::allow_per_partition_rate_limit::yes;
|
||||
auto f = _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::SELECT);
|
||||
return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level, permit = std::move(permit)] () mutable {
|
||||
@@ -1614,7 +1614,8 @@ private:
|
||||
}
|
||||
auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts,
|
||||
nullptr, cql_serialization_format::internal(), per_partition_row_limit);
|
||||
auto cmd = make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice));
|
||||
auto cmd = make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice),
|
||||
query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
cmd->allow_limit = db::allow_per_partition_rate_limit::yes;
|
||||
return cmd;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user