range: Add nonwrapping_range class

This patch introduces the nonwrapping_range class. This class is
intended to be used by code that requires non wrapping ranges.
Internally, it uses a wrapping_range. Users are responsible for
ensuring the bounds are correct when creating a nonwrapping_range.

The path proposed here is to incrementally replace usages of
wrapping_range/range by nonwrapping_range, pushing usages of wrapping
ranges as further to the edges as possible.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2016-08-09 17:11:52 +00:00
parent 2bb428973a
commit bb16e194bc
3 changed files with 203 additions and 143 deletions

235
range.hh
View File

@@ -25,6 +25,7 @@
#include <iostream>
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/adaptor/sliced.hpp>
#include <boost/range/adaptor/transformed.hpp>
template<typename T>
class range_bound {
@@ -47,7 +48,11 @@ public:
}
};
template<typename T>
class nonwrapping_range;
// A range which can have inclusive, exclusive or open-ended bounds on each end.
// The end bound can be smaller than the start bound.
template<typename T>
class wrapping_range {
template <typename U>
@@ -360,53 +365,10 @@ public:
return (_start == other._start) && (_end == other._end) && (_singular == other._singular);
}
// Takes a vector of possibly overlapping ranges and returns a vector containing
// a set of non-overlapping ranges covering the same values.
template<typename Comparator>
static std::vector<wrapping_range> deoverlap(std::vector<wrapping_range> ranges, Comparator&& cmp) {
auto size = ranges.size();
if (size <= 1) {
return ranges;
}
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
if ((*it).is_wrap_around(cmp)) {
auto&& unwrapped = it->unwrap();
it = ranges.erase(it);
it = ranges.emplace(it, std::move(unwrapped.first));
it = ranges.emplace(it, std::move(unwrapped.second));
}
}
std::sort(ranges.begin(), ranges.end(), [&](auto&& r1, auto&& r2) {
return wrapping_range::less_than(r1.start_bound(), r2.start_bound(), cmp);
});
std::vector<wrapping_range> deoverlapped_ranges;
deoverlapped_ranges.reserve(size);
auto&& current = ranges[0];
for (auto&& r : ranges | boost::adaptors::sliced(1, ranges.size())) {
bool includes_end = greater_than_or_equal(r.end_bound(), current.start_bound(), cmp)
&& greater_than_or_equal(current.end_bound(), r.end_bound(), cmp);
if (includes_end) {
continue; // last.start <= r.start <= r.end <= last.end
}
bool includes_start = greater_than_or_equal(current.end_bound(), r.start_bound(), cmp);
if (includes_start) {
current = wrapping_range(std::move(current.start()), std::move(r.end()));
} else {
deoverlapped_ranges.emplace_back(std::move(current));
current = std::move(r);
}
}
deoverlapped_ranges.emplace_back(std::move(current));
return deoverlapped_ranges;
}
template<typename U>
friend std::ostream& operator<<(std::ostream& out, const wrapping_range<U>& r);
private:
friend class nonwrapping_range<T>;
};
template<typename U>
@@ -440,6 +402,180 @@ std::ostream& operator<<(std::ostream& out, const wrapping_range<U>& r) {
return out;
}
// A range which can have inclusive, exclusive or open-ended bounds on each end.
// The end bound can never be smaller than the start bound.
template<typename T>
class nonwrapping_range {
template <typename U>
using optional = std::experimental::optional<U>;
public:
using bound = range_bound<T>;
private:
wrapping_range<T> _range;
public:
nonwrapping_range(T value)
: _range(std::move(value))
{ }
nonwrapping_range() : nonwrapping_range({}, {}) { }
// Can only be called if start <= end. IDL ctor.
nonwrapping_range(optional<bound> start, optional<bound> end, bool singular = false)
: _range(std::move(start), std::move(end), singular)
{ }
// Can only be called if !r.is_wrap_around().
explicit nonwrapping_range(wrapping_range<T>&& r)
: _range(std::move(r))
{ }
operator wrapping_range<T>() const & {
return _range;
}
operator wrapping_range<T>() && {
return std::move(_range);
}
// the point is before the range.
// Comparator must define a total ordering on T.
template<typename Comparator>
bool before(const T& point, Comparator&& cmp) const {
return _range.before(point, std::forward<Comparator>(cmp));
}
// the point is after the range.
// Comparator must define a total ordering on T.
template<typename Comparator>
bool after(const T& point, Comparator&& cmp) const {
return _range.after(point, std::forward<Comparator>(cmp));
}
// check if two ranges overlap.
// Comparator must define a total ordering on T.
template<typename Comparator>
bool overlaps(const nonwrapping_range& other, Comparator&& cmp) const {
// if both this and other have an open start, the two ranges will overlap.
if (!start() && !other.start()) {
return true;
}
return wrapping_range<T>::greater_than_or_equal(_range.end_bound(), other._range.start_bound(), cmp)
&& wrapping_range<T>::greater_than_or_equal(other._range.end_bound(), _range.start_bound(), cmp);
}
static nonwrapping_range make_open_ended_both_sides() {
return {{}, {}};
}
static nonwrapping_range make_singular(T value) {
return {std::move(value)};
}
static nonwrapping_range make_starting_with(bound b) {
return {{std::move(b)}, {}};
}
static nonwrapping_range make_ending_with(bound b) {
return {{}, {std::move(b)}};
}
bool is_singular() const {
return _range.is_singular();
}
bool is_full() const {
return _range.is_full();
}
const optional<bound>& start() const {
return _range.start();
}
const optional<bound>& end() const {
return _range.end();
}
// the point is inside the range
// Comparator must define a total ordering on T.
template<typename Comparator>
bool contains(const T& point, Comparator&& cmp) const {
return !before(point, cmp) && !after(point, cmp);
}
// Returns true iff all values contained by other are also contained by this.
// Comparator must define a total ordering on T.
template<typename Comparator>
bool contains(const nonwrapping_range& other, Comparator&& cmp) const {
return wrapping_range<T>::less_than_or_equal(_range.start_bound(), other._range.start_bound(), cmp)
&& wrapping_range<T>::greater_than_or_equal(_range.end_bound(), other._range.end_bound(), cmp);
}
// Returns ranges which cover all values covered by this range but not covered by the other range.
// Ranges are not overlapping and ordered.
// Comparator must define a total ordering on T.
template<typename Comparator>
std::vector<nonwrapping_range> subtract(const nonwrapping_range& other, Comparator&& cmp) const {
auto subtracted = _range.subtract(other._range, std::forward<Comparator>(cmp));
return boost::copy_range<std::vector<nonwrapping_range>>(subtracted | boost::adaptors::transformed([](auto&& r) {
return nonwrapping_range(std::move(r));
}));
}
// split range in two around a split_point. split_point has to be inside the range
// split_point will belong to first range
// Comparator must define a total ordering on T.
template<typename Comparator>
std::pair<nonwrapping_range<T>, nonwrapping_range<T>> split(const T& split_point, Comparator&& cmp) const {
assert(contains(split_point, std::forward<Comparator>(cmp)));
nonwrapping_range left(start(), bound(split_point));
nonwrapping_range right(bound(split_point, false), end());
return std::make_pair(std::move(left), std::move(right));
}
// Create a sub-range including values greater than the split_point. split_point has to be inside the range
// Comparator must define a total ordering on T.
template<typename Comparator>
nonwrapping_range<T> split_after(const T& split_point, Comparator&& cmp) const {
assert(contains(split_point, std::forward<Comparator>(cmp)));
return nonwrapping_range(bound(split_point, false), end());
}
// Transforms this range into a new range of a different value type
// Supplied transformer should transform value of type T (the old type) into value of type U (the new type).
template<typename Transformer, typename U = typename std::result_of<Transformer(T)>::type>
nonwrapping_range<U> transform(Transformer&& transformer) && {
return nonwrapping_range<U>(std::move(_range).transform(std::forward<Transformer>(transformer)));
}
template<typename Comparator>
bool equal(const nonwrapping_range& other, Comparator&& cmp) const {
return _range.equal(other._range, std::forward<Comparator>(cmp));
}
bool operator==(const nonwrapping_range& other) const {
return _range == other._range;
}
// Takes a vector of possibly overlapping ranges and returns a vector containing
// a set of non-overlapping ranges covering the same values.
template<typename Comparator>
static std::vector<nonwrapping_range> deoverlap(std::vector<nonwrapping_range> ranges, Comparator&& cmp) {
auto size = ranges.size();
if (size <= 1) {
return ranges;
}
std::sort(ranges.begin(), ranges.end(), [&](auto&& r1, auto&& r2) {
return wrapping_range<T>::less_than(r1._range.start_bound(), r2._range.start_bound(), cmp);
});
std::vector<nonwrapping_range> deoverlapped_ranges;
deoverlapped_ranges.reserve(size);
auto&& current = ranges[0];
for (auto&& r : ranges | boost::adaptors::sliced(1, ranges.size())) {
bool includes_end = wrapping_range<T>::greater_than_or_equal(r._range.end_bound(), current._range.start_bound(), cmp)
&& wrapping_range<T>::greater_than_or_equal(current._range.end_bound(), r._range.end_bound(), cmp);
if (includes_end) {
continue; // last.start <= r.start <= r.end <= last.end
}
bool includes_start = wrapping_range<T>::greater_than_or_equal(current._range.end_bound(), r._range.start_bound(), cmp);
if (includes_start) {
current = nonwrapping_range(std::move(current.start()), std::move(r.end()));
} else {
deoverlapped_ranges.emplace_back(std::move(current));
current = std::move(r);
}
}
deoverlapped_ranges.emplace_back(std::move(current));
return deoverlapped_ranges;
}
template<typename U>
friend std::ostream& operator<<(std::ostream& out, const nonwrapping_range<U>& r);
};
template<typename U>
std::ostream& operator<<(std::ostream& out, const nonwrapping_range<U>& r) {
return out << r._range;
}
template<typename T>
using range = wrapping_range<T>;
@@ -459,4 +595,13 @@ struct hash<wrapping_range<T>> {
}
};
template<typename T>
struct hash<nonwrapping_range<T>> {
using argument_type = nonwrapping_range<T>;
using result_type = decltype(std::hash<T>()(std::declval<T>()));
result_type operator()(argument_type const& s) const {
return hash<wrapping_range<T>>()(s);
}
};
}

View File

@@ -505,96 +505,3 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) {
BOOST_REQUIRE(search_item("8") == true);
BOOST_REQUIRE(search_item("9") == false);
}
BOOST_AUTO_TEST_CASE(range_deoverlap_tests) {
using ranges = std::vector<range<unsigned>>;
{
ranges rs = { range<unsigned>::make(1, 4), range<unsigned>::make(2, 5) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>::make(1, 5), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(1, deoverlapped.size());
}
{
ranges rs = { range<unsigned>::make(1, 4), range<unsigned>::make(4, 5) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>::make(1, 5), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(1, deoverlapped.size());
}
{
ranges rs = { range<unsigned>::make(2, 4), range<unsigned>::make(1, 3) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>::make(1, 4), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(1, deoverlapped.size());
}
{
ranges rs = { range<unsigned>::make(1, 4), range<unsigned>::make(0, 5), range<unsigned>::make(7, 12), range<unsigned>::make(8, 10) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>::make(0, 5), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(range<unsigned>::make(7, 12), deoverlapped[1]);
BOOST_REQUIRE_EQUAL(2, deoverlapped.size());
}
{
ranges rs = { range<unsigned>::make(1, 4), range<unsigned>({2}, { }) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>({1}, { }), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(1, deoverlapped.size());
}
{
ranges rs = { range<unsigned>({ }, {4}), range<unsigned>::make(3, 5) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>({ }, {5}), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(1, deoverlapped.size());
}
{
ranges rs = { range<unsigned>({14}, { }), range<unsigned>({2}, { }) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>({2}, { }), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(1, deoverlapped.size());
}
{
ranges rs = { range<unsigned>({2}, { }), range<unsigned>({12}, { }) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>({2}, { }), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(1, deoverlapped.size());
}
{
ranges rs = { range<unsigned>::make(14, 4), range<unsigned>::make(2, 4) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>({ }, {4}), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(range<unsigned>({14}, { }), deoverlapped[1]);
BOOST_REQUIRE_EQUAL(2, deoverlapped.size());
}
{
ranges rs = { range<unsigned>({3}, {{4, false}}), range<unsigned>({{4, false}}, {5}) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>({3}, {{4, false}}), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(range<unsigned>({{4, false}}, {5}), deoverlapped[1]);
BOOST_REQUIRE_EQUAL(2, deoverlapped.size());
}
{
ranges rs = { range<unsigned>({3}, {{4, false}}), range<unsigned>::make(4, 5) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>({3}, {{4, false}}), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(range<unsigned>::make(4, 5), deoverlapped[1]);
BOOST_REQUIRE_EQUAL(2, deoverlapped.size());
}
{
ranges rs = { range<unsigned>::make(3, 4), range<unsigned>({{4, false}}, {5}) };
auto deoverlapped = range<unsigned>::deoverlap(std::move(rs), unsigned_comparator());
BOOST_REQUIRE_EQUAL(range<unsigned>::make(3, 4), deoverlapped[0]);
BOOST_REQUIRE_EQUAL(range<unsigned>({{4, false}}, {5}), deoverlapped[1]);
BOOST_REQUIRE_EQUAL(2, deoverlapped.size());
}
}

View File

@@ -1591,10 +1591,10 @@ private:
template<typename RangeType, typename Comparator>
static std::vector<range<RangeType>> make_non_overlapping_ranges(
std::vector<ColumnSlice> column_slices,
const std::function<range<RangeType>(ColumnSlice&&)> mapper,
const Comparator&& cmp,
const std::function<wrapping_range<RangeType>(ColumnSlice&&)> mapper,
Comparator&& cmp,
bool reversed) {
std::vector<range<RangeType>> ranges;
std::vector<nonwrapping_range<RangeType>> ranges;
std::transform(column_slices.begin(), column_slices.end(), std::back_inserter(ranges), [&](auto&& cslice) {
auto range = mapper(std::move(cslice));
if (!reversed && range.is_wrap_around(cmp)) {
@@ -1603,10 +1603,18 @@ private:
throw make_exception<InvalidRequestException>("Reversed column slice had start %s less than finish %s", cslice.start, cslice.finish);
} else if (reversed) {
range.reverse();
if (range.is_wrap_around(cmp)) {
// If a wrap around range is still wrapping after reverse, then it's (a, a). This is equivalent
// to an open ended range.
range = wrapping_range<RangeType>::make_open_ended_both_sides();
}
}
return range;
return nonwrapping_range<RangeType>(std::move(range));
});
return range<RangeType>::deoverlap(std::move(ranges), cmp);
auto deoverlapped = nonwrapping_range<RangeType>::deoverlap(std::move(ranges), std::forward<Comparator>(cmp));
std::vector<wrapping_range<RangeType>> ret;
std::move(deoverlapped.begin(), deoverlapped.end(), std::back_inserter(ret));
return ret;
}
static range_tombstone make_range_tombstone(const schema& s, const SliceRange& range, tombstone tomb) {
using bound = query::clustering_range::bound;