query: add trim_clustering_row_ranges_to()
This algorithm was already duplicated in two places (service/pager/query_pagers.cc and mutation_reader.cc). Soon it will be used in a third place. Instead of triplicating, move it into a function that everybody can use.
This commit is contained in:
@@ -1118,25 +1118,10 @@ void multishard_combining_reader::shard_reader::adjust_partition_slice() {
|
||||
|
||||
const auto& schema = *_parent._schema;
|
||||
_slice_override->clear_range(schema, _last_pkey->key());
|
||||
|
||||
auto& last_ckey = _last_position_in_partition->key();
|
||||
|
||||
auto cmp = bound_view::compare(schema);
|
||||
auto eq = clustering_key_prefix::equality(schema);
|
||||
|
||||
auto ranges = _slice_override->default_row_ranges();
|
||||
auto it = ranges.begin();
|
||||
while (it != ranges.end()) {
|
||||
auto range = bound_view::from_range(*it);
|
||||
if (cmp(range.second, last_ckey) || eq(range.second.prefix(), last_ckey)) {
|
||||
it = ranges.erase(it);
|
||||
} else {
|
||||
if (cmp(range.first, last_ckey)) {
|
||||
assert(cmp(last_ckey, range.second));
|
||||
*it = query::clustering_range(query::clustering_range::bound{last_ckey, false}, it->end());
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
query::trim_clustering_row_ranges_to(schema, ranges, last_ckey);
|
||||
|
||||
_slice_override->clear_ranges();
|
||||
_slice_override->set_range(schema, _last_pkey->key(), std::move(ranges));
|
||||
|
||||
@@ -56,6 +56,15 @@ bool is_single_row(const schema& s, const query::clustering_range& range) {
|
||||
|
||||
typedef std::vector<clustering_range> clustering_row_ranges;
|
||||
|
||||
/// Trim the clustering ranges.
|
||||
///
|
||||
/// Equivalent of intersecting each range with [key, +inf), or (-inf, key] if
|
||||
/// reversed == true. Ranges that do not intersect are dropped. Ranges that
|
||||
/// partially overlap are trimmed.
|
||||
/// Result: each range will overlap fully with [key, +inf), or (-int, key] if
|
||||
/// reversed is true.
|
||||
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed = false);
|
||||
|
||||
class specific_ranges {
|
||||
public:
|
||||
specific_ranges(partition_key pk, clustering_row_ranges ranges)
|
||||
|
||||
28
query.cc
28
query.cc
@@ -71,6 +71,34 @@ std::ostream& operator<<(std::ostream& out, const specific_ranges& s) {
|
||||
return out << "{" << s._pk << " : " << join(", ", s._ranges) << "}";
|
||||
}
|
||||
|
||||
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed) {
|
||||
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
|
||||
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
|
||||
};
|
||||
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.second : range.first;
|
||||
};
|
||||
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.first : range.second;
|
||||
};
|
||||
clustering_key_prefix::equality eq(s);
|
||||
|
||||
auto it = ranges.begin();
|
||||
while (it != ranges.end()) {
|
||||
auto range = bound_view::from_range(*it);
|
||||
if (cmp(end_bound(range), key) || eq(end_bound(range).prefix(), key)) {
|
||||
it = ranges.erase(it);
|
||||
continue;
|
||||
} else if (cmp(start_bound(range), key)) {
|
||||
assert(cmp(key, end_bound(range)));
|
||||
auto r = reversed ? clustering_range(it->start(), clustering_range::bound { key, false })
|
||||
: clustering_range(clustering_range::bound { key, false }, it->end());
|
||||
*it = std::move(r);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
partition_slice::partition_slice(clustering_row_ranges row_ranges,
|
||||
query::column_id_vector static_columns,
|
||||
query::column_id_vector regular_columns,
|
||||
|
||||
@@ -147,43 +147,6 @@ static bool has_clustering_keys(const schema& s, const query::read_command& cmd)
|
||||
qlogger.trace("Result ranges {}", ranges);
|
||||
};
|
||||
|
||||
// Because of #1446 we don't have a comparator to use with
|
||||
// range<clustering_key_prefix> which would produce correct results.
|
||||
// This means we cannot reuse the same logic for dealing with
|
||||
// partition and clustering keys.
|
||||
auto modify_ck_ranges = [reversed] (const schema& s, auto& ranges, auto& lo) {
|
||||
typedef typename std::remove_reference_t<decltype(ranges)>::value_type range_type;
|
||||
typedef typename range_type::bound bound_type;
|
||||
|
||||
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
|
||||
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
|
||||
};
|
||||
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.second : range.first;
|
||||
};
|
||||
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.first : range.second;
|
||||
};
|
||||
clustering_key_prefix::equality eq(s);
|
||||
|
||||
auto it = ranges.begin();
|
||||
while (it != ranges.end()) {
|
||||
auto range = bound_view::from_range(*it);
|
||||
if (cmp(end_bound(range), lo) || eq(end_bound(range).prefix(), lo)) {
|
||||
qlogger.trace("Remove ck range {}", *it);
|
||||
it = ranges.erase(it);
|
||||
continue;
|
||||
} else if (cmp(start_bound(range), lo)) {
|
||||
assert(cmp(lo, end_bound(range)));
|
||||
auto r = reversed ? range_type(it->start(), bound_type { lo, false })
|
||||
: range_type(bound_type { lo, false }, it->end());
|
||||
qlogger.trace("Modify ck range {} -> {}", *it, r);
|
||||
*it = std::move(r);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
};
|
||||
|
||||
// last ck can be empty depending on whether we
|
||||
// deserialized state or not. This case means "last page ended on
|
||||
// something-not-bound-by-clustering" (i.e. a static row, alone)
|
||||
@@ -196,7 +159,7 @@ static bool has_clustering_keys(const schema& s, const query::read_command& cmd)
|
||||
if (has_ck) {
|
||||
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
|
||||
clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema));
|
||||
modify_ck_ranges(*_schema, row_ranges, ckp);
|
||||
query::trim_clustering_row_ranges_to(*_schema, row_ranges, ckp, reversed);
|
||||
|
||||
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user