diff --git a/position_in_partition.hh b/position_in_partition.hh index 5065249b97..5b81c6b960 100644 --- a/position_in_partition.hh +++ b/position_in_partition.hh @@ -160,6 +160,10 @@ public: return {partition_region::clustered, bound_weight::after_all_prefixed, &ck}; } + static position_in_partition_view before_key(const clustering_key& ck) { + return {partition_region::clustered, bound_weight::before_all_prefixed, &ck}; + } + partition_region region() const { return _type; } bound_weight get_bound_weight() const { return _bound_weight; } bool is_partition_start() const { return _type == partition_region::partition_start; } diff --git a/query.cc b/query.cc index aa72d0ad67..b4fc999cf5 100644 --- a/query.cc +++ b/query.cc @@ -99,8 +99,14 @@ void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& range } void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed) { + if (key.is_full(s)) { + return trim_clustering_row_ranges_to(s, ranges, + reversed ? position_in_partition_view::before_key(key) : position_in_partition_view::after_key(key), reversed); + } + auto full_key = key; + clustering_key::make_full(s, full_key); return trim_clustering_row_ranges_to(s, ranges, - position_in_partition_view(key, reversed ? bound_weight::before_all_prefixed : bound_weight::after_all_prefixed), reversed); + reversed ? position_in_partition_view::after_key(full_key) : position_in_partition_view::before_key(full_key), reversed); } partition_slice::partition_slice(clustering_row_ranges row_ranges, diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index 06f3859919..d57277dd93 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -1457,18 +1457,24 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) { } SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) { + struct null { }; + struct missing { }; struct key { int c0; - std::optional c1; + std::variant c1; - key(int c0, std::optional c1 = {}) : c0(c0), c1(c1) { } + key(int c0, int c1) : c0(c0), c1(c1) { } + key(int c0, null) : c0(c0), c1(null{}) { } + key(int c0) : c0(c0), c1(missing{}) { } clustering_key to_clustering_key(const schema& s) const { std::vector v; v.push_back(int32_type->decompose(data_value(c0))); - if (c1) { - v.push_back(int32_type->decompose(data_value(*c1))); - } + std::visit(make_visitor( + [&v] (int c1) { v.push_back(int32_type->decompose(data_value(c1))); }, + [&v] (null c1) { v.push_back(bytes{}); }, + [] (missing) { }), + c1); return clustering_key::from_exploded(s, std::move(v)); } }; @@ -1476,12 +1482,14 @@ SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) { key value; incl(int c0, int c1) : value(c0, c1) { } + incl(int c0, null) : value(c0, null{}) { } incl(int c0) : value(c0) { } }; struct excl { key value; excl(int c0, int c1) : value(c0, c1) { } + excl(int c0, null) : value(c0, null{}) { } excl(int c0) : value(c0) { } }; struct bound { @@ -1555,6 +1563,10 @@ SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) { // 8) Equal to end(range with excl end) // 9) After range // 10) Full range + // 11) Prefix key is before range + // 12) Prefix key is equal to prefix start of range + // 13) Prefix key intersects with range + // 14) Prefix key is after range // (1) check( @@ -1640,6 +1652,30 @@ SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) { {7, 9}, { {excl{7, 9}, inf{}} }); + // (11) + check( + { {incl{10, 10}, excl{10, 30}} }, + {10}, + { {incl{10, 10}, excl{10, 30}} }); + + // (12) + check( + { {incl{10}, excl{10, 30}} }, + {10}, + { {incl{10, null{}}, excl{10, 30}} }); + + // (13) + check( + { {incl{9, 10}, excl{10, 30}} }, + {10}, + { {incl{10, null{}}, excl{10, 30}} }); + + // (14) + check( + { {incl{9, 10}, excl{10, 30}} }, + {11}, + { }); + // In reversed now // (1) @@ -1726,6 +1762,30 @@ SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) { {7, 9}, { {inf{}, excl{7, 9}} }); + // (11) + check_reversed( + { {incl{10, 10}, excl{10, 30}} }, + {11}, + { {incl{10, 10}, excl{10, 30}} }); + + // (12) + check_reversed( + { {excl{9, 39}, incl{10}} }, + {10}, + { {excl{9, 39}, incl{10, null{}}} }); + + // (13) + check_reversed( + { {incl{9, 10}, incl{10, 30}} }, + {10}, + { {incl{9, 10}, incl{10, null{}}} }); + + // (14) + check_reversed( + { {incl{9, 10}, excl{10, 30}} }, + {9}, + { }); + return make_ready_future<>(); }