service/paging: use position_in_partition instead of clustering_key for last row

The former allows for expressing more positions, like a position
before/after a clustering key. This practically enables the coordinator
side paging logic, for a query to be stopped at a tombstone (which can
have said positions).
This commit is contained in:
Botond Dénes
2022-05-04 17:19:53 +03:00
parent adabe3b5a3
commit 2b0bc11f2e
9 changed files with 127 additions and 28 deletions

View File

@@ -3602,9 +3602,9 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
rjson::add_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_pk_it, cdef));
++exploded_pk_it;
}
auto ck = paging_state.get_clustering_key();
if (ck) {
auto exploded_ck = ck->explode();
auto pos = paging_state.get_position_in_partition();
if (pos.has_key()) {
auto exploded_ck = pos.key().explode();
auto exploded_ck_it = exploded_ck.begin();
for (const column_definition& cdef : schema.clustering_key_columns()) {
rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
@@ -3613,6 +3613,10 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
++exploded_ck_it;
}
}
rjson::add_with_string_name(last_evaluated_key, scylla_paging_region, rjson::empty_object());
rjson::add(last_evaluated_key[scylla_paging_region.data()], "S", rjson::from_string(to_string(pos.region())));
rjson::add_with_string_name(last_evaluated_key, scylla_paging_weight, rjson::empty_object());
rjson::add(last_evaluated_key[scylla_paging_weight.data()], "N", static_cast<int>(pos.get_bound_weight()));
return last_evaluated_key;
}
@@ -3636,11 +3640,11 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
if (exclusive_start_key) {
partition_key pk = pk_from_json(*exclusive_start_key, schema);
std::optional<clustering_key> ck;
auto pos = position_in_partition(position_in_partition::partition_start_tag_t());
if (schema->clustering_key_size() > 0) {
ck = ck_from_json(*exclusive_start_key, schema);
pos = pos_from_json(*exclusive_start_key, schema);
}
paging_state = make_lw_shared<service::pager::paging_state>(pk, ck, query::max_partitions, utils::UUID(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, utils::UUID(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
}
auto regular_columns = boost::copy_range<query::column_id_vector>(

View File

@@ -14,6 +14,7 @@
#include "rapidjson/writer.h"
#include "concrete_types.hh"
#include "cql3/type_json.hh"
#include "position_in_partition.hh"
static logging::logger slogger("alternator-serialization");
@@ -248,6 +249,36 @@ clustering_key ck_from_json(const rjson::value& item, schema_ptr schema) {
return clustering_key::from_exploded(raw_ck);
}
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema) {
auto ck = ck_from_json(item, schema);
const auto region_item = rjson::find(item, scylla_paging_region);
const auto weight_item = rjson::find(item, scylla_paging_weight);
if (bool(region_item) != bool(weight_item)) {
throw api_error::validation("Malformed value object: region and weight has to be either both missing or both present");
}
partition_region region;
bound_weight weight;
if (region_item) {
auto region_view = rjson::to_string_view(get_typed_value(*region_item, "S", scylla_paging_region, "key region"));
auto weight_view = rjson::to_string_view(get_typed_value(*weight_item, "N", scylla_paging_weight, "key weight"));
auto region = parse_partition_region(region_view);
if (weight_view == "-1") {
weight = bound_weight::before_all_prefixed;
} else if (weight_view == "0") {
weight = bound_weight::equal;
} else if (weight_view == "1") {
weight = bound_weight::after_all_prefixed;
} else {
throw std::runtime_error(fmt::format("Invalid value for weight: {}", weight_view));
}
return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(std::move(ck)) : std::nullopt);
}
if (ck.is_empty()) {
return position_in_partition(position_in_partition::partition_start_tag_t());
}
return position_in_partition::for_key(std::move(ck));
}
big_decimal unwrap_number(const rjson::value& v, std::string_view diagnostic) {
if (!v.IsObject() || v.MemberCount() != 1) {
throw api_error::validation(format("{}: invalid number object", diagnostic));

View File

@@ -17,6 +17,8 @@
#include "utils/rjson.hh"
#include "utils/big_decimal.hh"
class position_in_partition;
namespace alternator {
enum class alternator_type : int8_t {
@@ -33,6 +35,9 @@ struct type_representation {
data_type dtype;
};
inline constexpr std::string_view scylla_paging_region(":scylla:paging:region");
inline constexpr std::string_view scylla_paging_weight(":scylla:paging:weight");
type_info type_info_from_string(std::string_view type);
type_representation represent_type(alternator_type atype);
@@ -47,6 +52,7 @@ rjson::value json_key_column_value(bytes_view cell, const column_definition& col
partition_key pk_from_json(const rjson::value& item, schema_ptr schema);
clustering_key ck_from_json(const rjson::value& item, schema_ptr schema);
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema);
// If v encodes a number (i.e., it is a {"N": [...]}, returns an object representing it. Otherwise,
// raises ValidationException with diagnostic.

View File

@@ -6,6 +6,19 @@ enum class read_repair_decision : uint8_t {
};
}
enum class bound_weight : int8_t {
before_all_prefixed = -1,
equal = 0,
after_all_prefixed = 1,
}
enum class partition_region : uint8_t {
partition_start,
static_row,
clustered,
partition_end,
};
namespace service {
namespace pager {
class paging_state {
@@ -18,6 +31,8 @@ class paging_state {
uint32_t get_rows_fetched_for_last_partition_low_bits() [[version 3.1]] = 0;
uint32_t get_remaining_high_bits() [[version 4.3]] = 0;
uint32_t get_rows_fetched_for_last_partition_high_bits() [[version 4.3]] = 0;
bound_weight get_clustering_key_weight() [[version 5.1]] = bound_weight::equal;
partition_region get_partition_region() [[version 5.1]] = partition_region::clustered;
};
}
}

View File

@@ -34,7 +34,9 @@ service::pager::paging_state::paging_state(partition_key pk,
std::optional<db::read_repair_decision> query_read_repair_decision,
uint32_t rows_fetched_for_last_partition_low_bits,
uint32_t rem_high_bits,
uint32_t rows_fetched_for_last_partition_high_bits)
uint32_t rows_fetched_for_last_partition_high_bits,
bound_weight ck_weight,
partition_region region)
: _partition_key(std::move(pk))
, _clustering_key(std::move(ck))
, _remaining_low_bits(rem_low_bits)
@@ -43,20 +45,24 @@ service::pager::paging_state::paging_state(partition_key pk,
, _query_read_repair_decision(query_read_repair_decision)
, _rows_fetched_for_last_partition_low_bits(rows_fetched_for_last_partition_low_bits)
, _remaining_high_bits(rem_high_bits)
, _rows_fetched_for_last_partition_high_bits(rows_fetched_for_last_partition_high_bits) {
}
, _rows_fetched_for_last_partition_high_bits(rows_fetched_for_last_partition_high_bits)
, _ck_weight(ck_weight)
, _region(region)
{ }
service::pager::paging_state::paging_state(partition_key pk,
std::optional<clustering_key> ck,
position_in_partition_view pos,
uint64_t rem,
utils::UUID query_uuid,
replicas_per_token_range last_replicas,
std::optional<db::read_repair_decision> query_read_repair_decision,
uint64_t rows_fetched_for_last_partition)
: paging_state(std::move(pk), std::move(ck), static_cast<uint32_t>(rem), query_uuid, std::move(last_replicas), query_read_repair_decision,
: paging_state(std::move(pk), pos.has_key() ? std::optional(pos.key()) : std::nullopt, static_cast<uint32_t>(rem), query_uuid, std::move(last_replicas), query_read_repair_decision,
static_cast<uint32_t>(rows_fetched_for_last_partition), static_cast<uint32_t>(rem >> 32),
static_cast<uint32_t>(rows_fetched_for_last_partition >> 32)) {
}
static_cast<uint32_t>(rows_fetched_for_last_partition >> 32),
pos.get_bound_weight(),
pos.region())
{ }
lw_shared_ptr<service::pager::paging_state> service::pager::paging_state::deserialize(
bytes_opt data) {

View File

@@ -17,6 +17,7 @@
#include "utils/UUID.hh"
#include "dht/i_partitioner.hh"
#include "db/read_repair_decision.hh"
#include "position_in_partition.hh"
namespace service {
@@ -36,8 +37,11 @@ private:
uint32_t _rows_fetched_for_last_partition_low_bits;
uint32_t _remaining_high_bits;
uint32_t _rows_fetched_for_last_partition_high_bits;
bound_weight _ck_weight = bound_weight::equal;
partition_region _region = partition_region::partition_start;
public:
// IDL ctor
paging_state(partition_key pk,
std::optional<clustering_key> ck,
uint32_t rem,
@@ -46,10 +50,12 @@ public:
std::optional<db::read_repair_decision> query_read_repair_decision,
uint32_t rows_fetched_for_last_partition,
uint32_t remaining_ext,
uint32_t rows_fetched_for_last_partition_high_bits);
uint32_t rows_fetched_for_last_partition_high_bits,
bound_weight ck_weight,
partition_region region);
paging_state(partition_key pk,
std::optional<clustering_key> ck,
position_in_partition_view pos,
uint64_t rem,
utils::UUID reader_recall_uuid,
replicas_per_token_range last_replicas,
@@ -60,8 +66,11 @@ public:
_partition_key = std::move(pk);
}
// sets position to at the given clustering key
void set_clustering_key(clustering_key ck) {
_clustering_key = std::move(ck);
_ck_weight = bound_weight::equal;
_region = partition_region::clustered;
}
void set_remaining(uint64_t remaining) {
@@ -77,10 +86,27 @@ public:
}
/**
* Clustering key in last partition. I.e. first, next, row
*
* Use \ref get_position_in_partition() instead.
*/
const std::optional<clustering_key>& get_clustering_key() const {
return _clustering_key;
}
/**
* Weight of last processed position, see \ref get_position_in_partition()
*/
bound_weight get_clustering_key_weight() const {
return _ck_weight;
}
/**
* Partition region of last processed position, see \ref get_position_in_partition()
*/
partition_region get_partition_region() const {
return _region;
}
position_in_partition_view get_position_in_partition() const {
return position_in_partition_view(_region, _ck_weight, _clustering_key ? &*_clustering_key : nullptr);
}
/**
* Max remaining rows to fetch in total.
* I.e. initial row_limit - #rows returned so far.

View File

@@ -62,7 +62,7 @@ protected:
uint64_t _per_partition_limit;
std::optional<partition_key> _last_pkey;
std::optional<clustering_key> _last_ckey;
position_in_partition _last_pos;
std::optional<utils::UUID> _query_uuid;
shared_ptr<service::storage_proxy> _proxy;

View File

@@ -49,6 +49,7 @@ query_pager::query_pager(service::storage_proxy& p, schema_ptr s,
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
, _max(cmd->get_row_limit())
, _per_partition_limit(cmd->slice.partition_row_limit())
, _last_pos(position_in_partition::partition_start_tag_t())
, _proxy(p.shared_from_this())
, _schema(std::move(s))
, _selection(selection)
@@ -71,7 +72,7 @@ future<result<service::storage_proxy::coordinator_query_result>> query_pager::do
if (!_last_pkey && state) {
_max = state->get_remaining();
_last_pkey = state->get_partition_key();
_last_ckey = state->get_clustering_key();
_last_pos = state->get_position_in_partition();
_query_uuid = state->get_query_uuid();
_last_replicas = state->get_last_replicas();
_query_read_repair_decision = state->get_query_read_repair_decision();
@@ -92,7 +93,7 @@ future<result<service::storage_proxy::coordinator_query_result>> query_pager::do
auto reversed = _cmd->slice.options.contains<query::partition_slice::option::reversed>();
qlogger.trace("PKey={}, CKey={}, reversed={}", dpk, _last_ckey, reversed);
qlogger.trace("PKey={}, Pos={}, reversed={}", dpk, _last_pos, reversed);
// Note: we're assuming both that the ranges are checked
// and "cql-compliant", and that storage_proxy will process
@@ -141,7 +142,7 @@ future<result<service::storage_proxy::coordinator_query_result>> query_pager::do
// last ck can be empty depending on whether we
// deserialized state or not. This case means "last page ended on
// something-not-bound-by-clustering" (i.e. a static row, alone)
const bool has_ck = _has_clustering_keys && _last_ckey;
const bool has_ck = _has_clustering_keys && _last_pos.region() == partition_region::clustered;
// If we have no clustering keys, it should mean we only have one row
// per PK. Thus we can just bypass the last one.
@@ -149,8 +150,11 @@ future<result<service::storage_proxy::coordinator_query_result>> query_pager::do
if (has_ck) {
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema));
query::trim_clustering_row_ranges_to(*_schema, row_ranges, ckp, reversed);
position_in_partition_view next_pos = _last_pos;
if (_last_pos.has_key()) {
next_pos = reversed ? position_in_partition_view::before_key(_last_pos) : position_in_partition_view::after_key(_last_pos);
}
query::trim_clustering_row_ranges_to(*_schema, row_ranges, next_pos, reversed);
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
}
@@ -367,6 +371,7 @@ void query_pager::handle_result(
auto view = query::result_view(*results);
_last_pos = position_in_partition(position_in_partition::partition_start_tag_t());
uint64_t row_count;
if constexpr(!std::is_same_v<std::decay_t<Visitor>, noop_visitor>) {
query_result_visitor<Visitor> v(std::forward<Visitor>(visitor));
@@ -388,7 +393,9 @@ void query_pager::handle_result(
}
}
_last_pkey = v.last_pkey;
_last_ckey = v.last_ckey;
if (v.last_ckey) {
_last_pos = position_in_partition::for_key(*v.last_ckey);
}
} else {
row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows());
_max = _max - row_count;
@@ -400,7 +407,9 @@ void query_pager::handle_result(
}
auto [ last_pkey, last_ckey ] = view.get_last_partition_and_clustering_key();
_last_pkey = std::move(last_pkey);
_last_ckey = std::move(last_ckey);
if (last_ckey) {
_last_pos = position_in_partition::for_key(std::move(*last_ckey));
}
}
}
@@ -409,13 +418,13 @@ void query_pager::handle_result(
if (_last_pkey) {
qlogger.debug("Last partition key: {}", *_last_pkey);
}
if (_has_clustering_keys && _last_ckey) {
qlogger.debug("Last clustering key: {}", *_last_ckey);
if (_has_clustering_keys && _last_pos.region() == partition_region::clustered) {
qlogger.debug("Last clustering pos: {}", _last_pos);
}
}
lw_shared_ptr<const paging_state> query_pager::state() const {
return make_lw_shared<paging_state>(_last_pkey.value_or(partition_key::make_empty()), _last_ckey, _exhausted ? 0 : _max, _cmd->query_uuid, _last_replicas, _query_read_repair_decision, _rows_fetched_for_last_partition);
return make_lw_shared<paging_state>(_last_pkey.value_or(partition_key::make_empty()), _last_pos, _exhausted ? 0 : _max, _cmd->query_uuid, _last_replicas, _query_read_repair_decision, _rows_fetched_for_last_partition);
}
}

View File

@@ -547,7 +547,8 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
// which is a valid paging state as well, and should return
// no rows.
paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(),
std::nullopt, paging_state->get_remaining(), paging_state->get_query_uuid(),
position_in_partition_view(position_in_partition_view::partition_start_tag_t()),
paging_state->get_remaining(), paging_state->get_query_uuid(),
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
paging_state->get_rows_fetched_for_last_partition());
@@ -561,7 +562,8 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{
// An artificial paging state with an empty key pair is also valid and is expected
// not to return rows (since no row matches an empty partition key)
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(),
position_in_partition_view(position_in_partition_view::partition_start_tag_t()),
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});