sstables_set::incremental_selector: use ring_position instead of token

Currently `sstable_set::incremental_selector` works in terms of tokens.
Sstables can be selected with tokens and internally the token-space is
partitioned (in `partitioned_sstable_set`, used for LCS) with tokens as
well. This is problematic for severeal reasons.
The sub-range sstables cover from the token-space is defined in terms of
decorated keys. It is even possible that multiple sstables cover
multiple non-overlapping sub-ranges of a single token. The current
system is unable to model this and will at best result in selecting
unnecessary sstables.
The usage of token for providing the next position where the
intersecting sstables change [1] causes further problems. Attempting to
walk over the token-space by repeatedly calling `select()` with the
`next_position` returned from the previous call will quite possibly lead
to an infinite loop as a token cannot express inclusiveness/exclusiveness
and thus the incremental selector will not be able to make progress when
the upper and lower bounds of two neighbouring intervals share the same
token with different inclusiveness e.g. [t1, t2](t2, t3].

To solve these problems update incremental_selector to work in terms of
ring position. This makes it possible to partition the token-space
amoing sstables at decorated key granularity. It also makes it possible
for select() to return a next_position that is guaranteed to make
progress.

partitioned_sstable_set now builds the internal interval map using the
decorated key of the sstables, not just the tokens.
incremental_selector::select() now uses `dht::ring_position_view` as
both the selector and the next_position. ring_position_view can express
positions between keys so it can also include information about
inclusiveness/exclusiveness of the next interval guaranteeing forward
progress.

[1] `sstable_set::incremental_selector::selection::next_position`
This commit is contained in:
Botond Dénes
2018-07-03 11:59:42 +03:00
parent bf2645c616
commit a8e795a16e
10 changed files with 144 additions and 98 deletions

View File

@@ -36,8 +36,8 @@ public:
compatible_ring_position_view(const schema& s, dht::ring_position_view rpv)
: _schema(&s), _rpv(rpv) {
}
const dht::token& token() const {
return _rpv->token();
const dht::ring_position_view& position() const {
return *_rpv;
}
friend int tri_compare(const compatible_ring_position_view& x, const compatible_ring_position_view& y) {
return dht::ring_position_tri_compare(*x._schema, *x._rpv, *y._rpv);

View File

@@ -237,7 +237,7 @@ partition_presence_checker
table::make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set> sstables) {
auto sel = make_lw_shared(sstables->make_incremental_selector());
return [this, sstables = std::move(sstables), sel = std::move(sel)] (const dht::decorated_key& key) {
auto& sst = sel->select(key.token()).sstables;
auto& sst = sel->select(key).sstables;
if (sst.empty()) {
return partition_presence_checker_result::definitely_doesnt_exist;
}
@@ -453,7 +453,7 @@ public:
const dht::partition_range& pr,
tracing::trace_state_ptr trace_state,
sstable_reader_factory_type fn)
: reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position::min())
: reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min())
, _pr(&pr)
, _sstables(std::move(sstables))
, _trace_state(std::move(trace_state))
@@ -475,7 +475,7 @@ public:
virtual std::vector<flat_mutation_reader> create_new_readers(const dht::token* const t) override {
dblog.trace("incremental_reader_selector {}: {}({})", this, __FUNCTION__, seastar::lazy_deref(t));
const auto& position = (t ? *t : _selector_position.token());
const auto position = (t ? dht::ring_position_view::ending_at(*t) : _selector_position);
// we only pass _selector_position's token to _selector::select() when T is nullptr
// because it means gap between sstables, and the lower bound of the first interval
// after the gap is guaranteed to be inclusive.
@@ -485,8 +485,8 @@ public:
// For the lower bound of the token range the _selector
// might not return any sstables, in this case try again
// with next_token unless it's maximum token.
if (!selection.next_position.is_max()
&& position == (_pr->start() ? _pr->start()->value().token() : dht::minimum_token())) {
auto start = _pr->start() ? dht::ring_position_view::ending_at(_pr->start()->value().token()) : dht::ring_position_view::min();
if (!selection.next_position.is_max() && dht::ring_position_tri_compare(*_s, position, start) == 0) {
dblog.trace("incremental_reader_selector {}: no sstables intersect with the lower bound, retrying", this);
_selector_position = std::move(selection.next_position);
return create_new_readers(nullptr);
@@ -510,8 +510,7 @@ public:
virtual std::vector<flat_mutation_reader> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
_pr = &pr;
dht::ring_position_comparator cmp(*_s);
if (cmp(dht::ring_position_view::for_range_start(*_pr), _selector_position) >= 0) {
if (dht::ring_position_tri_compare(*_s, dht::ring_position_view::for_range_start(*_pr), _selector_position) >= 0) {
return create_new_readers(&_pr->start()->value().token());
}
@@ -1566,7 +1565,7 @@ future<std::unordered_set<sstring>> table::get_sstables_by_partition_key(const s
[this] (std::unordered_set<sstring>& filenames, lw_shared_ptr<sstables::sstable_set::incremental_selector>& sel, partition_key& pk) {
return do_with(dht::decorated_key(dht::global_partitioner().decorate_key(*_schema, pk)),
[this, &filenames, &sel, &pk](dht::decorated_key& dk) mutable {
auto sst = sel->select(dk.token()).sstables;
auto sst = sel->select(dk).sstables;
auto hk = sstables::sstable::make_hashed_key(*_schema, dk.key());
return do_for_each(sst, [this, &filenames, &dk, hk = std::move(hk)] (std::vector<sstables::shared_sstable>::const_iterator::reference s) mutable {

View File

@@ -620,6 +620,11 @@ public:
const dht::token& token() const { return *_token; }
const partition_key* key() const { return _key; }
// Only when key() == nullptr
token_bound get_token_bound() const { return token_bound(_weight); }
// Only when key() != nullptr
after_key is_after_key() const { return after_key(_weight == 1); }
friend std::ostream& operator<<(std::ostream&, ring_position_view);
};

View File

@@ -234,7 +234,7 @@ class list_reader_selector : public reader_selector {
public:
explicit list_reader_selector(schema_ptr s, std::vector<flat_mutation_reader> readers)
: reader_selector(s, dht::ring_position::min())
: reader_selector(s, dht::ring_position_view::min())
, _readers(std::move(readers)) {
}
@@ -245,7 +245,7 @@ public:
list_reader_selector& operator=(list_reader_selector&&) = default;
virtual std::vector<flat_mutation_reader> create_new_readers(const dht::token* const) override {
_selector_position = dht::ring_position::max();
_selector_position = dht::ring_position_view::max();
return std::exchange(_readers, {});
}

View File

@@ -50,9 +50,9 @@ namespace mutation_reader {
class reader_selector {
protected:
schema_ptr _s;
dht::ring_position _selector_position;
dht::ring_position_view _selector_position;
public:
reader_selector(schema_ptr s, dht::ring_position rp) noexcept : _s(std::move(s)), _selector_position(std::move(rp)) {}
reader_selector(schema_ptr s, dht::ring_position_view rpv) noexcept : _s(std::move(s)), _selector_position(std::move(rpv)) {}
virtual ~reader_selector() = default;
// Call only if has_new_readers() returned true.

View File

@@ -75,7 +75,7 @@ static api::timestamp_type get_max_purgeable_timestamp(const column_family& cf,
const std::unordered_set<shared_sstable>& compacting_set, const dht::decorated_key& dk) {
auto timestamp = api::max_timestamp;
stdx::optional<utils::hashed_key> hk;
for (auto&& sst : boost::range::join(selector.select(dk.token()).sstables, cf.compacted_undeleted_sstables())) {
for (auto&& sst : boost::range::join(selector.select(dk).sstables, cf.compacted_undeleted_sstables())) {
if (compacting_set.count(sst)) {
continue;
}

View File

@@ -69,7 +69,7 @@ extern logging::logger clogger;
class incremental_selector_impl {
public:
virtual ~incremental_selector_impl() {}
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::ring_position> select(const dht::token& token) = 0;
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view&) = 0;
};
class sstable_set_impl {
@@ -82,13 +82,15 @@ public:
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const = 0;
};
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, lw_shared_ptr<sstable_list> all)
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s, lw_shared_ptr<sstable_list> all)
: _impl(std::move(impl))
, _schema(std::move(s))
, _all(std::move(all)) {
}
sstable_set::sstable_set(const sstable_set& x)
: _impl(x._impl->clone())
, _schema(x._schema)
, _all(make_lw_shared(sstable_list(*x._all))) {
}
@@ -130,8 +132,9 @@ sstable_set::erase(shared_sstable sst) {
sstable_set::~sstable_set() = default;
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl)
: _impl(std::move(impl)) {
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s)
: _impl(std::move(impl))
, _cmp(s) {
}
sstable_set::incremental_selector::~incremental_selector() = default;
@@ -139,16 +142,17 @@ sstable_set::incremental_selector::~incremental_selector() = default;
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
sstable_set::incremental_selector::selection
sstable_set::incremental_selector::select(const dht::token& t) const {
if (!_current_token_range || !_current_token_range->contains(t, dht::token_comparator())) {
std::tie(_current_token_range, _current_sstables, _current_next_position) = _impl->select(t);
sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const {
if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) {
std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos);
_current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); });
}
return {_current_sstables, _current_next_position};
}
sstable_set::incremental_selector
sstable_set::make_incremental_selector() const {
return incremental_selector(_impl->make_incremental_selector());
return incremental_selector(_impl->make_incremental_selector(), *_schema);
}
// default sstable_set, not specialized for anything
@@ -178,8 +182,8 @@ public:
incremental_selector(const std::vector<shared_sstable>& sstables)
: _sstables(sstables) {
}
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::ring_position> select(const dht::token& token) override {
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), _sstables, dht::ring_position::max());
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view&) override {
return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max());
}
};
@@ -209,8 +213,8 @@ private:
}
static interval_type make_interval(const schema& s, const sstable& sst) {
return interval_type::closed(
compatible_ring_position_view(s, dht::ring_position_view::starting_at(sst.get_first_decorated_key().token())),
compatible_ring_position_view(s, dht::ring_position_view::ending_at(sst.get_last_decorated_key().token())));
compatible_ring_position_view(s, sst.get_first_decorated_key()),
compatible_ring_position_view(s, sst.get_last_decorated_key()));
}
interval_type make_interval(const sstable& sst) const {
return make_interval(*_schema, sst);
@@ -234,6 +238,29 @@ private:
}
}
public:
static dht::ring_position to_ring_position(const compatible_ring_position_view& crpv) {
// Ring position views, representing bounds of sstable intervals are
// guaranteed to have key() != nullptr;
const auto& pos = crpv.position();
return dht::ring_position(pos.token(), *pos.key());
}
static dht::partition_range to_partition_range(const interval_type& i) {
return dht::partition_range::make(
{to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())},
{to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())});
}
static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i) {
auto lower_bound = [&] {
if (pos.key()) {
return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()),
pos.is_after_key() == dht::ring_position_view::after_key::no);
} else {
return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true);
}
}();
auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds()));
return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound));
}
explicit partitioned_sstable_set(schema_ptr schema)
: _schema(std::move(schema)) {
}
@@ -275,53 +302,50 @@ class partitioned_sstable_set::incremental_selector : public incremental_selecto
const std::vector<shared_sstable>& _unleveled_sstables;
map_iterator _it;
const map_iterator _end;
// Only to back the dht::ring_position_view returned from select().
dht::ring_position _next_position;
private:
static dht::token_range to_token_range(const interval_type& i) {
return dht::token_range::make({i.lower().token(), boost::icl::is_left_closed(i.bounds())},
{i.upper().token(), boost::icl::is_right_closed(i.bounds())});
dht::ring_position_view next_position(map_iterator it) {
if (it == _end) {
_next_position = dht::ring_position::max();
return dht::ring_position_view::max();
} else {
_next_position = partitioned_sstable_set::to_ring_position(it->first.lower());
return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds())));
}
}
static bool is_before_interval(const compatible_ring_position_view& crpv, const interval_type& interval) {
if (boost::icl::is_left_closed(interval.bounds())) {
return crpv < interval.lower();
} else {
return crpv <= interval.lower();
}
}
public:
incremental_selector(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables)
: _schema(std::move(schema))
, _unleveled_sstables(unleveled_sstables)
, _it(leveled_sstables.begin())
, _end(leveled_sstables.end()) {
, _end(leveled_sstables.end())
, _next_position(dht::ring_position::min()) {
}
virtual std::tuple<dht::token_range, std::vector<shared_sstable>, dht::ring_position> select(const dht::token& token) override {
auto pr = dht::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token));
auto interval = make_interval(*_schema, std::move(pr));
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_view> select(const dht::ring_position_view& pos) override {
auto crpv = compatible_ring_position_view(*_schema, pos);
auto ssts = _unleveled_sstables;
using namespace dht;
auto inclusiveness = [] (auto& interval) {
return boost::icl::is_left_closed(interval.bounds()) ? ring_position::token_bound::start : ring_position::token_bound::end;
};
const auto next_pos = [&] {
const auto next = std::next(_it);
auto& interval = next->first;
return next == _end ? ring_position::max() : ring_position(interval.lower().token(), inclusiveness(interval));
};
const auto current_pos = [&] {
auto& interval = _it->first;
return _it == _end ? ring_position::max() : ring_position(interval.lower().token(), inclusiveness(interval));
};
while (_it != _end) {
if (boost::icl::contains(_it->first, interval)) {
if (boost::icl::contains(_it->first, crpv)) {
ssts.insert(ssts.end(), _it->second.begin(), _it->second.end());
return std::make_tuple(to_token_range(_it->first), std::move(ssts), next_pos());
return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it)));
}
// we don't want to skip current interval if token lies before it.
if (boost::icl::lower_less(interval, _it->first)) {
return std::make_tuple(dht::token_range::make({token, true}, {_it->first.lower().token(), false}),
std::move(ssts),
current_pos());
// We don't want to skip current interval if pos lies before it.
if (is_before_interval(crpv, _it->first)) {
return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it));
}
_it++;
}
return std::make_tuple(dht::token_range::make_open_ended_both_sides(), std::move(ssts), ring_position::max());
return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max());
}
};
@@ -661,7 +685,8 @@ bool compaction_strategy::use_clustering_key_filter() const {
sstable_set
compaction_strategy::make_sstable_set(schema_ptr schema) const {
return sstable_set(
_compaction_strategy_impl->make_sstable_set(std::move(schema)),
_compaction_strategy_impl->make_sstable_set(schema),
schema,
make_lw_shared<sstable_list>());
}

View File

@@ -33,12 +33,13 @@ class incremental_selector_impl;
class sstable_set {
std::unique_ptr<sstable_set_impl> _impl;
schema_ptr _schema;
// used to support column_family::get_sstable(), which wants to return an sstable_list
// that has a reference somewhere
lw_shared_ptr<sstable_list> _all;
public:
~sstable_set();
sstable_set(std::unique_ptr<sstable_set_impl> impl, lw_shared_ptr<sstable_list> all);
sstable_set(std::unique_ptr<sstable_set_impl> impl, schema_ptr s, lw_shared_ptr<sstable_list> all);
sstable_set(const sstable_set&);
sstable_set(sstable_set&&) noexcept;
sstable_set& operator=(const sstable_set&);
@@ -48,31 +49,40 @@ public:
void insert(shared_sstable sst);
void erase(shared_sstable sst);
// Used to incrementally select sstables from sstable set using tokens.
// Used to incrementally select sstables from sstable set using ring-position.
// sstable set must be alive and cannot be modified while incremental
// selector is used.
class incremental_selector {
std::unique_ptr<incremental_selector_impl> _impl;
mutable stdx::optional<dht::token_range> _current_token_range;
dht::ring_position_comparator _cmp;
mutable std::optional<dht::partition_range> _current_range;
mutable std::optional<nonwrapping_range<dht::ring_position_view>> _current_range_view;
mutable std::vector<shared_sstable> _current_sstables;
mutable dht::ring_position _current_next_position = dht::ring_position::min();
mutable dht::ring_position_view _current_next_position = dht::ring_position_view::min();
public:
~incremental_selector();
incremental_selector(std::unique_ptr<incremental_selector_impl> impl);
incremental_selector(std::unique_ptr<incremental_selector_impl> impl, const schema& s);
incremental_selector(incremental_selector&&) noexcept;
struct selection {
const std::vector<shared_sstable>& sstables;
dht::ring_position next_position;
dht::ring_position_view next_position;
};
// Return the sstables that intersect with t and the best next
// token (inclusive) to call select() with so that the least
// amount of sstables will be returned (without skipping any).
// NOTE: selection.sstables is a reference to an internal cache
// and can be invalidated by another call to select().
// If you need it long-term copy it!
selection select(const dht::token& t) const;
// Return the sstables that intersect with `pos` and the next
// position where the intersecting sstables change.
// To walk through the token range incrementally call `select()`
// with `dht::ring_position_view::min()` and then pass back the
// returned `next_position` on each next call until
// `next_position` becomes `dht::ring_position::max()`.
//
// Successive calls to `select()' have to pass weakly monotonic
// positions (incrementability).
//
// NOTE: both `selection.sstables` and `selection.next_position`
// are only guaranteed to be valid until the next call to
// `select()`.
selection select(const dht::ring_position_view& pos) const;
};
incremental_selector make_incremental_selector() const;
};

View File

@@ -541,17 +541,17 @@ static mutation make_mutation_with_key(simple_schema& s, dht::decorated_key dk)
}
class dummy_incremental_selector : public reader_selector {
// To back _selector_position.
dht::ring_position _position;
std::vector<std::vector<mutation>> _readers_mutations;
streamed_mutation::forwarding _fwd;
dht::partition_range _pr;
const dht::token& position() const {
return _readers_mutations.back().front().token();
}
flat_mutation_reader pop_reader() {
auto muts = std::move(_readers_mutations.back());
_readers_mutations.pop_back();
_selector_position = _readers_mutations.empty() ? dht::ring_position::max() : dht::ring_position::starting_at(position());
_position = _readers_mutations.empty() ? dht::ring_position::max() : _readers_mutations.back().front().decorated_key();
_selector_position = _position;
return flat_mutation_reader_from_mutations(std::move(muts), _pr, _fwd);
}
public:
@@ -563,13 +563,13 @@ public:
std::vector<std::vector<mutation>> reader_mutations,
dht::partition_range pr = query::full_partition_range,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no)
: reader_selector(s, dht::ring_position::min())
: reader_selector(s, dht::ring_position_view::min())
, _position(dht::ring_position::min())
, _readers_mutations(std::move(reader_mutations))
, _fwd(fwd)
, _pr(std::move(pr)) {
// So we can pop the next reader off the back
boost::reverse(_readers_mutations);
_selector_position = dht::ring_position::starting_at(position());
}
virtual std::vector<flat_mutation_reader> create_new_readers(const dht::token* const t) override {
if (_readers_mutations.empty()) {
@@ -583,7 +583,8 @@ public:
return readers;
}
while (!_readers_mutations.empty() && *t >= _selector_position.token()) {
auto pos = dht::ring_position_view::ending_at(*t);
while (!_readers_mutations.empty() && dht::ring_position_tri_compare(*_s, _selector_position, pos) <= 0) {
readers.emplace_back(pop_reader());
}
return readers;

View File

@@ -3886,12 +3886,18 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options());
auto key_and_token_pair = token_generation_for_current_shard(8);
auto decorated_keys = boost::copy_range<std::vector<dht::decorated_key>>(
key_and_token_pair | boost::adaptors::transformed([&s] (const std::pair<sstring, dht::token>& key_and_token) {
auto value = bytes(reinterpret_cast<const signed char*>(key_and_token.first.data()), key_and_token.first.size());
auto pk = sstables::key::from_bytes(value).to_partition_key(*s);
return dht::global_partitioner().decorate_key(*s, std::move(pk));
}));
auto check = [] (sstable_set::incremental_selector& selector, const dht::token& token, std::unordered_set<int64_t> expected_gens) {
auto sstables = selector.select(token).sstables;
BOOST_REQUIRE(sstables.size() == expected_gens.size());
auto check = [] (sstable_set::incremental_selector& selector, const dht::decorated_key& key, std::unordered_set<int64_t> expected_gens) {
auto sstables = selector.select(key).sstables;
BOOST_REQUIRE_EQUAL(sstables.size(), expected_gens.size());
for (auto& sst : sstables) {
BOOST_REQUIRE(expected_gens.count(sst->generation()) == 1);
BOOST_REQUIRE_EQUAL(expected_gens.count(sst->generation()), 1);
}
};
@@ -3904,14 +3910,14 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
set.insert(sstable_for_overlapping_test(s, 5, key_and_token_pair[4].first, key_and_token_pair[5].first, 1));
sstable_set::incremental_selector sel = set.make_incremental_selector();
check(sel, key_and_token_pair[0].second, {1, 2});
check(sel, key_and_token_pair[1].second, {1, 2});
check(sel, key_and_token_pair[2].second, {});
check(sel, key_and_token_pair[3].second, {3});
check(sel, key_and_token_pair[4].second, {3, 4, 5});
check(sel, key_and_token_pair[5].second, {5});
check(sel, key_and_token_pair[6].second, {});
check(sel, key_and_token_pair[7].second, {});
check(sel, decorated_keys[0], {1, 2});
check(sel, decorated_keys[1], {1, 2});
check(sel, decorated_keys[2], {});
check(sel, decorated_keys[3], {3});
check(sel, decorated_keys[4], {3, 4, 5});
check(sel, decorated_keys[5], {5});
check(sel, decorated_keys[6], {});
check(sel, decorated_keys[7], {});
}
{
@@ -3924,14 +3930,14 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
set.insert(sstable_for_overlapping_test(s, 5, key_and_token_pair[4].first, key_and_token_pair[5].first, 1));
sstable_set::incremental_selector sel = set.make_incremental_selector();
check(sel, key_and_token_pair[0].second, {0, 1, 2});
check(sel, key_and_token_pair[1].second, {0, 1, 2});
check(sel, key_and_token_pair[2].second, {0});
check(sel, key_and_token_pair[3].second, {0, 3});
check(sel, key_and_token_pair[4].second, {0, 3, 4, 5});
check(sel, key_and_token_pair[5].second, {0, 5});
check(sel, key_and_token_pair[6].second, {0});
check(sel, key_and_token_pair[7].second, {0});
check(sel, decorated_keys[0], {0, 1, 2});
check(sel, decorated_keys[1], {0, 1, 2});
check(sel, decorated_keys[2], {0});
check(sel, decorated_keys[3], {0, 3});
check(sel, decorated_keys[4], {0, 3, 4, 5});
check(sel, decorated_keys[5], {0, 5});
check(sel, decorated_keys[6], {0});
check(sel, decorated_keys[7], {0});
}
return make_ready_future<>();