From 2b0bc11f2e5a0d28e8461859186e17bc77a988b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 4 May 2022 17:19:53 +0300 Subject: [PATCH] service/paging: use position_in_partition instead of clustering_key for last row The former allows for expressing more positions, like a position before/after a clustering key. This practically enables the coordinator side paging logic, for a query to be stopped at a tombstone (which can have said positions). --- alternator/executor.cc | 16 +++++++++------ alternator/serialization.cc | 31 ++++++++++++++++++++++++++++++ alternator/serialization.hh | 6 ++++++ idl/paging_state.idl.hh | 15 +++++++++++++++ service/pager/paging_state.cc | 20 ++++++++++++------- service/pager/paging_state.hh | 30 +++++++++++++++++++++++++++-- service/pager/query_pager.hh | 2 +- service/pager/query_pagers.cc | 29 ++++++++++++++++++---------- test/boost/secondary_index_test.cc | 6 ++++-- 9 files changed, 127 insertions(+), 28 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index bd5521e8ef..fce4a5551a 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -3602,9 +3602,9 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag rjson::add_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_pk_it, cdef)); ++exploded_pk_it; } - auto ck = paging_state.get_clustering_key(); - if (ck) { - auto exploded_ck = ck->explode(); + auto pos = paging_state.get_position_in_partition(); + if (pos.has_key()) { + auto exploded_ck = pos.key().explode(); auto exploded_ck_it = exploded_ck.begin(); for (const column_definition& cdef : schema.clustering_key_columns()) { rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object()); @@ -3613,6 +3613,10 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag ++exploded_ck_it; } } + rjson::add_with_string_name(last_evaluated_key, scylla_paging_region, rjson::empty_object()); + rjson::add(last_evaluated_key[scylla_paging_region.data()], "S", rjson::from_string(to_string(pos.region()))); + rjson::add_with_string_name(last_evaluated_key, scylla_paging_weight, rjson::empty_object()); + rjson::add(last_evaluated_key[scylla_paging_weight.data()], "N", static_cast(pos.get_bound_weight())); return last_evaluated_key; } @@ -3636,11 +3640,11 @@ static future do_query(service::storage_proxy& pr if (exclusive_start_key) { partition_key pk = pk_from_json(*exclusive_start_key, schema); - std::optional ck; + auto pos = position_in_partition(position_in_partition::partition_start_tag_t()); if (schema->clustering_key_size() > 0) { - ck = ck_from_json(*exclusive_start_key, schema); + pos = pos_from_json(*exclusive_start_key, schema); } - paging_state = make_lw_shared(pk, ck, query::max_partitions, utils::UUID(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0); + paging_state = make_lw_shared(pk, pos, query::max_partitions, utils::UUID(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0); } auto regular_columns = boost::copy_range( diff --git a/alternator/serialization.cc b/alternator/serialization.cc index 4f5d1f482a..b1218b38a0 100644 --- a/alternator/serialization.cc +++ b/alternator/serialization.cc @@ -14,6 +14,7 @@ #include "rapidjson/writer.h" #include "concrete_types.hh" #include "cql3/type_json.hh" +#include "position_in_partition.hh" static logging::logger slogger("alternator-serialization"); @@ -248,6 +249,36 @@ clustering_key ck_from_json(const rjson::value& item, schema_ptr schema) { return clustering_key::from_exploded(raw_ck); } +position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema) { + auto ck = ck_from_json(item, schema); + const auto region_item = rjson::find(item, scylla_paging_region); + const auto weight_item = rjson::find(item, scylla_paging_weight); + if (bool(region_item) != bool(weight_item)) { + throw api_error::validation("Malformed value object: region and weight has to be either both missing or both present"); + } + partition_region region; + bound_weight weight; + if (region_item) { + auto region_view = rjson::to_string_view(get_typed_value(*region_item, "S", scylla_paging_region, "key region")); + auto weight_view = rjson::to_string_view(get_typed_value(*weight_item, "N", scylla_paging_weight, "key weight")); + auto region = parse_partition_region(region_view); + if (weight_view == "-1") { + weight = bound_weight::before_all_prefixed; + } else if (weight_view == "0") { + weight = bound_weight::equal; + } else if (weight_view == "1") { + weight = bound_weight::after_all_prefixed; + } else { + throw std::runtime_error(fmt::format("Invalid value for weight: {}", weight_view)); + } + return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(std::move(ck)) : std::nullopt); + } + if (ck.is_empty()) { + return position_in_partition(position_in_partition::partition_start_tag_t()); + } + return position_in_partition::for_key(std::move(ck)); +} + big_decimal unwrap_number(const rjson::value& v, std::string_view diagnostic) { if (!v.IsObject() || v.MemberCount() != 1) { throw api_error::validation(format("{}: invalid number object", diagnostic)); diff --git a/alternator/serialization.hh b/alternator/serialization.hh index f4e6ecdd2a..e39f274da5 100644 --- a/alternator/serialization.hh +++ b/alternator/serialization.hh @@ -17,6 +17,8 @@ #include "utils/rjson.hh" #include "utils/big_decimal.hh" +class position_in_partition; + namespace alternator { enum class alternator_type : int8_t { @@ -33,6 +35,9 @@ struct type_representation { data_type dtype; }; +inline constexpr std::string_view scylla_paging_region(":scylla:paging:region"); +inline constexpr std::string_view scylla_paging_weight(":scylla:paging:weight"); + type_info type_info_from_string(std::string_view type); type_representation represent_type(alternator_type atype); @@ -47,6 +52,7 @@ rjson::value json_key_column_value(bytes_view cell, const column_definition& col partition_key pk_from_json(const rjson::value& item, schema_ptr schema); clustering_key ck_from_json(const rjson::value& item, schema_ptr schema); +position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema); // If v encodes a number (i.e., it is a {"N": [...]}, returns an object representing it. Otherwise, // raises ValidationException with diagnostic. diff --git a/idl/paging_state.idl.hh b/idl/paging_state.idl.hh index 51c876468f..c39aa9c45f 100644 --- a/idl/paging_state.idl.hh +++ b/idl/paging_state.idl.hh @@ -6,6 +6,19 @@ enum class read_repair_decision : uint8_t { }; } +enum class bound_weight : int8_t { + before_all_prefixed = -1, + equal = 0, + after_all_prefixed = 1, +} + +enum class partition_region : uint8_t { + partition_start, + static_row, + clustered, + partition_end, +}; + namespace service { namespace pager { class paging_state { @@ -18,6 +31,8 @@ class paging_state { uint32_t get_rows_fetched_for_last_partition_low_bits() [[version 3.1]] = 0; uint32_t get_remaining_high_bits() [[version 4.3]] = 0; uint32_t get_rows_fetched_for_last_partition_high_bits() [[version 4.3]] = 0; + bound_weight get_clustering_key_weight() [[version 5.1]] = bound_weight::equal; + partition_region get_partition_region() [[version 5.1]] = partition_region::clustered; }; } } diff --git a/service/pager/paging_state.cc b/service/pager/paging_state.cc index 55972a0502..e048c022ed 100644 --- a/service/pager/paging_state.cc +++ b/service/pager/paging_state.cc @@ -34,7 +34,9 @@ service::pager::paging_state::paging_state(partition_key pk, std::optional query_read_repair_decision, uint32_t rows_fetched_for_last_partition_low_bits, uint32_t rem_high_bits, - uint32_t rows_fetched_for_last_partition_high_bits) + uint32_t rows_fetched_for_last_partition_high_bits, + bound_weight ck_weight, + partition_region region) : _partition_key(std::move(pk)) , _clustering_key(std::move(ck)) , _remaining_low_bits(rem_low_bits) @@ -43,20 +45,24 @@ service::pager::paging_state::paging_state(partition_key pk, , _query_read_repair_decision(query_read_repair_decision) , _rows_fetched_for_last_partition_low_bits(rows_fetched_for_last_partition_low_bits) , _remaining_high_bits(rem_high_bits) - , _rows_fetched_for_last_partition_high_bits(rows_fetched_for_last_partition_high_bits) { -} + , _rows_fetched_for_last_partition_high_bits(rows_fetched_for_last_partition_high_bits) + , _ck_weight(ck_weight) + , _region(region) +{ } service::pager::paging_state::paging_state(partition_key pk, - std::optional ck, + position_in_partition_view pos, uint64_t rem, utils::UUID query_uuid, replicas_per_token_range last_replicas, std::optional query_read_repair_decision, uint64_t rows_fetched_for_last_partition) - : paging_state(std::move(pk), std::move(ck), static_cast(rem), query_uuid, std::move(last_replicas), query_read_repair_decision, + : paging_state(std::move(pk), pos.has_key() ? std::optional(pos.key()) : std::nullopt, static_cast(rem), query_uuid, std::move(last_replicas), query_read_repair_decision, static_cast(rows_fetched_for_last_partition), static_cast(rem >> 32), - static_cast(rows_fetched_for_last_partition >> 32)) { -} + static_cast(rows_fetched_for_last_partition >> 32), + pos.get_bound_weight(), + pos.region()) +{ } lw_shared_ptr service::pager::paging_state::deserialize( bytes_opt data) { diff --git a/service/pager/paging_state.hh b/service/pager/paging_state.hh index db06a95804..a862c01205 100644 --- a/service/pager/paging_state.hh +++ b/service/pager/paging_state.hh @@ -17,6 +17,7 @@ #include "utils/UUID.hh" #include "dht/i_partitioner.hh" #include "db/read_repair_decision.hh" +#include "position_in_partition.hh" namespace service { @@ -36,8 +37,11 @@ private: uint32_t _rows_fetched_for_last_partition_low_bits; uint32_t _remaining_high_bits; uint32_t _rows_fetched_for_last_partition_high_bits; + bound_weight _ck_weight = bound_weight::equal; + partition_region _region = partition_region::partition_start; public: + // IDL ctor paging_state(partition_key pk, std::optional ck, uint32_t rem, @@ -46,10 +50,12 @@ public: std::optional query_read_repair_decision, uint32_t rows_fetched_for_last_partition, uint32_t remaining_ext, - uint32_t rows_fetched_for_last_partition_high_bits); + uint32_t rows_fetched_for_last_partition_high_bits, + bound_weight ck_weight, + partition_region region); paging_state(partition_key pk, - std::optional ck, + position_in_partition_view pos, uint64_t rem, utils::UUID reader_recall_uuid, replicas_per_token_range last_replicas, @@ -60,8 +66,11 @@ public: _partition_key = std::move(pk); } + // sets position to at the given clustering key void set_clustering_key(clustering_key ck) { _clustering_key = std::move(ck); + _ck_weight = bound_weight::equal; + _region = partition_region::clustered; } void set_remaining(uint64_t remaining) { @@ -77,10 +86,27 @@ public: } /** * Clustering key in last partition. I.e. first, next, row + * + * Use \ref get_position_in_partition() instead. */ const std::optional& get_clustering_key() const { return _clustering_key; } + /** + * Weight of last processed position, see \ref get_position_in_partition() + */ + bound_weight get_clustering_key_weight() const { + return _ck_weight; + } + /** + * Partition region of last processed position, see \ref get_position_in_partition() + */ + partition_region get_partition_region() const { + return _region; + } + position_in_partition_view get_position_in_partition() const { + return position_in_partition_view(_region, _ck_weight, _clustering_key ? &*_clustering_key : nullptr); + } /** * Max remaining rows to fetch in total. * I.e. initial row_limit - #rows returned so far. diff --git a/service/pager/query_pager.hh b/service/pager/query_pager.hh index 647947fb1e..e6c19cf860 100644 --- a/service/pager/query_pager.hh +++ b/service/pager/query_pager.hh @@ -62,7 +62,7 @@ protected: uint64_t _per_partition_limit; std::optional _last_pkey; - std::optional _last_ckey; + position_in_partition _last_pos; std::optional _query_uuid; shared_ptr _proxy; diff --git a/service/pager/query_pagers.cc b/service/pager/query_pagers.cc index 7d12497b44..42432a7ff0 100644 --- a/service/pager/query_pagers.cc +++ b/service/pager/query_pagers.cc @@ -49,6 +49,7 @@ query_pager::query_pager(service::storage_proxy& p, schema_ptr s, : _has_clustering_keys(has_clustering_keys(*s, *cmd)) , _max(cmd->get_row_limit()) , _per_partition_limit(cmd->slice.partition_row_limit()) + , _last_pos(position_in_partition::partition_start_tag_t()) , _proxy(p.shared_from_this()) , _schema(std::move(s)) , _selection(selection) @@ -71,7 +72,7 @@ future> query_pager::do if (!_last_pkey && state) { _max = state->get_remaining(); _last_pkey = state->get_partition_key(); - _last_ckey = state->get_clustering_key(); + _last_pos = state->get_position_in_partition(); _query_uuid = state->get_query_uuid(); _last_replicas = state->get_last_replicas(); _query_read_repair_decision = state->get_query_read_repair_decision(); @@ -92,7 +93,7 @@ future> query_pager::do auto reversed = _cmd->slice.options.contains(); - qlogger.trace("PKey={}, CKey={}, reversed={}", dpk, _last_ckey, reversed); + qlogger.trace("PKey={}, Pos={}, reversed={}", dpk, _last_pos, reversed); // Note: we're assuming both that the ranges are checked // and "cql-compliant", and that storage_proxy will process @@ -141,7 +142,7 @@ future> query_pager::do // last ck can be empty depending on whether we // deserialized state or not. This case means "last page ended on // something-not-bound-by-clustering" (i.e. a static row, alone) - const bool has_ck = _has_clustering_keys && _last_ckey; + const bool has_ck = _has_clustering_keys && _last_pos.region() == partition_region::clustered; // If we have no clustering keys, it should mean we only have one row // per PK. Thus we can just bypass the last one. @@ -149,8 +150,11 @@ future> query_pager::do if (has_ck) { query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges(); - clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema)); - query::trim_clustering_row_ranges_to(*_schema, row_ranges, ckp, reversed); + position_in_partition_view next_pos = _last_pos; + if (_last_pos.has_key()) { + next_pos = reversed ? position_in_partition_view::before_key(_last_pos) : position_in_partition_view::after_key(_last_pos); + } + query::trim_clustering_row_ranges_to(*_schema, row_ranges, next_pos, reversed); _cmd->slice.set_range(*_schema, *_last_pkey, row_ranges); } @@ -367,6 +371,7 @@ void query_pager::handle_result( auto view = query::result_view(*results); + _last_pos = position_in_partition(position_in_partition::partition_start_tag_t()); uint64_t row_count; if constexpr(!std::is_same_v, noop_visitor>) { query_result_visitor v(std::forward(visitor)); @@ -388,7 +393,9 @@ void query_pager::handle_result( } } _last_pkey = v.last_pkey; - _last_ckey = v.last_ckey; + if (v.last_ckey) { + _last_pos = position_in_partition::for_key(*v.last_ckey); + } } else { row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows()); _max = _max - row_count; @@ -400,7 +407,9 @@ void query_pager::handle_result( } auto [ last_pkey, last_ckey ] = view.get_last_partition_and_clustering_key(); _last_pkey = std::move(last_pkey); - _last_ckey = std::move(last_ckey); + if (last_ckey) { + _last_pos = position_in_partition::for_key(std::move(*last_ckey)); + } } } @@ -409,13 +418,13 @@ void query_pager::handle_result( if (_last_pkey) { qlogger.debug("Last partition key: {}", *_last_pkey); } - if (_has_clustering_keys && _last_ckey) { - qlogger.debug("Last clustering key: {}", *_last_ckey); + if (_has_clustering_keys && _last_pos.region() == partition_region::clustered) { + qlogger.debug("Last clustering pos: {}", _last_pos); } } lw_shared_ptr query_pager::state() const { - return make_lw_shared(_last_pkey.value_or(partition_key::make_empty()), _last_ckey, _exhausted ? 0 : _max, _cmd->query_uuid, _last_replicas, _query_read_repair_decision, _rows_fetched_for_last_partition); + return make_lw_shared(_last_pkey.value_or(partition_key::make_empty()), _last_pos, _exhausted ? 0 : _max, _cmd->query_uuid, _last_replicas, _query_read_repair_decision, _rows_fetched_for_last_partition); } } diff --git a/test/boost/secondary_index_test.cc b/test/boost/secondary_index_test.cc index b463ad5be6..cce20c65ef 100644 --- a/test/boost/secondary_index_test.cc +++ b/test/boost/secondary_index_test.cc @@ -547,7 +547,8 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { // which is a valid paging state as well, and should return // no rows. paging_state = make_lw_shared(partition_key::make_empty(), - std::nullopt, paging_state->get_remaining(), paging_state->get_query_uuid(), + position_in_partition_view(position_in_partition_view::partition_start_tag_t()), + paging_state->get_remaining(), paging_state->get_query_uuid(), paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(), paging_state->get_rows_fetched_for_last_partition()); @@ -561,7 +562,8 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { { // An artificial paging state with an empty key pair is also valid and is expected // not to return rows (since no row matches an empty partition key) - auto paging_state = make_lw_shared(partition_key::make_empty(), std::nullopt, + auto paging_state = make_lw_shared(partition_key::make_empty(), + position_in_partition_view(position_in_partition_view::partition_start_tag_t()), 1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1); auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});