flat_mutation_reader: make timeout opt-out rather than opt-in

Currently timeout is opt-in, that is, all methods that even have it
default it to `db::no_timeout`. This means that ensuring timeout is used
where it should be is completely up to the author and the reviewrs of
the code. As humans are notoriously prone to mistakes this has resulted
in a very inconsistent usage of timeout, many clients of
`flat_mutation_reader` passing the timeout only to some members and only
on certain call sites. This is small wonder considering that some core
operations like `operator()()` only recently received a timeout
parameter and others like `peek()` didn't even have one until this
patch. Both of these methods call `fill_buffer()` which potentially
talks to the lower layers and is supposed to propagate the timeout.
All this makes the `flat_mutation_reader`'s timeout effectively useless.

To make order in this chaos make the timeout parameter a mandatory one
on all `flat_mutation_reader` methods that need it. This ensures that
humans now get a reminder from the compiler when they forget to pass the
timeout. Clients can still opt-out from passing a timeout by passing
`db::no_timeout` (the previous default value) but this will be now
explicit and developers should think before typing it.

There were suprisingly few core call sites to fix up. Where a timeout
was available nearby I propagated it to be able to pass it to the
reader, where I couldn't I passed `db::no_timeout`. Authors of the
latter kind of code (view, streaming and repair are some of the notable
examples) should maybe consider propagating down a timeout if needed.
In the test code (the wast majority of the changes) I just used
`db::no_timeout` everywhere.

Tests: unit(release, debug)

Signed-off-by: Botond Dénes <bdenes@scylladb.com>

Message-Id: <1edc10802d5eb23de8af28c9f48b8d3be0f1a468.1536744563.git.bdenes@scylladb.com>
This commit is contained in:
Botond Dénes
2018-09-12 12:29:58 +03:00
committed by Tomasz Grabiec
parent de05df216f
commit eb357a385d
39 changed files with 287 additions and 283 deletions

View File

@@ -361,7 +361,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
}
});
return make_ready_future<>();
});
}, timeout);
}
inline

View File

@@ -602,7 +602,7 @@ future<table::const_mutation_partition_ptr>
table::find_partition(schema_ptr s, const dht::decorated_key& key) const {
return do_with(dht::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) {
return do_with(this->make_reader(s, range), [s] (flat_mutation_reader& reader) {
return read_mutation_from_flat_mutation_reader(reader).then([] (mutation_opt&& mo) -> std::unique_ptr<const mutation_partition> {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([] (mutation_opt&& mo) -> std::unique_ptr<const mutation_partition> {
if (!mo) {
return {};
}
@@ -739,7 +739,7 @@ table::for_all_partitions(schema_ptr s, Func&& func) const {
return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) {
return do_until([&is] { return is.done(); }, [&is] {
return read_mutation_from_flat_mutation_reader(is.reader).then([&is](mutation_opt&& mo) {
return read_mutation_from_flat_mutation_reader(is.reader, db::no_timeout).then([&is](mutation_opt&& mo) {
if (!mo) {
is.empty = true;
} else {

View File

@@ -634,8 +634,8 @@ private:
future<stop_iteration> on_results();
future<stop_iteration> advance_all() {
auto existings_f = _existings ? (*_existings)() : make_ready_future<optimized_optional<mutation_fragment>>();
return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable {
auto existings_f = _existings ? (*_existings)(db::no_timeout) : make_ready_future<optimized_optional<mutation_fragment>>();
return when_all(_updates(db::no_timeout), std::move(existings_f)).then([this] (auto&& fragments) mutable {
_update = std::move(std::get<mutation_fragment_opt>(std::get<0>(fragments).get()));
_existing = std::move(std::get<mutation_fragment_opt>(std::get<1>(fragments).get()));
return stop_iteration::no;
@@ -643,7 +643,7 @@ private:
}
future<stop_iteration> advance_updates() {
return _updates().then([this] (auto&& update) mutable {
return _updates(db::no_timeout).then([this] (auto&& update) mutable {
_update = std::move(update);
return stop_iteration::no;
});
@@ -653,7 +653,7 @@ private:
if (!_existings) {
return make_ready_future<stop_iteration>(stop_iteration::no);
}
return (*_existings)().then([this] (auto&& existing) mutable {
return (*_existings)(db::no_timeout).then([this] (auto&& existing) mutable {
_existing = std::move(existing);
return stop_iteration::no;
});
@@ -1534,7 +1534,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
query::max_partitions,
view_builder::consumer{*this, step});
consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition
auto built = step.reader.consume_in_thread(std::move(consumer));
auto built = step.reader.consume_in_thread(std::move(consumer), db::no_timeout);
_as.check();

View File

@@ -196,11 +196,11 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) {
position_range _current;
mutation_fragment_opt _next;
// When resolves, _next is engaged or _end_of_stream is set.
future<> ensure_next() {
future<> ensure_next(db::timeout_clock::time_point timeout) {
if (_next) {
return make_ready_future<>();
}
return _underlying().then([this] (auto&& mfo) {
return _underlying(timeout).then([this] (auto&& mfo) {
_next = std::move(mfo);
if (!_next) {
_end_of_stream = true;
@@ -213,11 +213,11 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) {
position_in_partition(position_in_partition::after_static_row_tag_t())
}) { }
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
return repeat([this] {
return repeat([this, timeout] {
if (is_buffer_full()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return ensure_next().then([this] {
return ensure_next(timeout).then([this] {
if (is_end_of_stream()) {
return stop_iteration::yes;
}
@@ -290,7 +290,7 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_par
}
_underlying.next_partition();
_static_row_done = false;
return _underlying.fill_buffer().then([this] {
return _underlying.fill_buffer(timeout).then([this] {
_end_of_stream = is_end_end_of_underlying_stream();
});
}

View File

@@ -320,7 +320,7 @@ public:
flat_mutation_reader(std::unique_ptr<impl> impl) noexcept : _impl(std::move(impl)) {}
future<mutation_fragment_opt> operator()(db::timeout_clock::time_point timeout = db::no_timeout) {
future<mutation_fragment_opt> operator()(db::timeout_clock::time_point timeout) {
return _impl->operator()(timeout);
}
@@ -328,7 +328,7 @@ public:
GCC6_CONCEPT(
requires FlatMutationReaderConsumer<Consumer>()
)
auto consume_pausable(Consumer consumer, db::timeout_clock::time_point timeout = db::no_timeout) {
auto consume_pausable(Consumer consumer, db::timeout_clock::time_point timeout) {
return _impl->consume_pausable(std::move(consumer), timeout);
}
@@ -337,8 +337,8 @@ public:
requires FlattenedConsumer<Consumer>()
)
auto consume(Consumer consumer,
consume_reversed_partitions reversed = consume_reversed_partitions::no,
db::timeout_clock::time_point timeout = db::no_timeout) {
db::timeout_clock::time_point timeout,
consume_reversed_partitions reversed = consume_reversed_partitions::no) {
if (reversed) {
return do_with(impl::reverse_partitions(*_impl), [&] (auto& reversed_partition_stream) {
return reversed_partition_stream._impl->consume(std::move(consumer), timeout);
@@ -351,7 +351,7 @@ public:
GCC6_CONCEPT(
requires FlattenedConsumer<Consumer>() && PartitionFilter<Filter>
)
auto consume_in_thread(Consumer consumer, Filter filter, db::timeout_clock::time_point timeout = db::no_timeout) {
auto consume_in_thread(Consumer consumer, Filter filter, db::timeout_clock::time_point timeout) {
return _impl->consume_in_thread(std::move(consumer), std::move(filter), timeout);
}
@@ -359,13 +359,13 @@ public:
GCC6_CONCEPT(
requires FlattenedConsumer<Consumer>()
)
auto consume_in_thread(Consumer consumer, db::timeout_clock::time_point timeout = db::no_timeout) {
auto consume_in_thread(Consumer consumer, db::timeout_clock::time_point timeout) {
return consume_in_thread(std::move(consumer), [] (const dht::decorated_key&) { return true; }, timeout);
}
void next_partition() { _impl->next_partition(); }
future<> fill_buffer(db::timeout_clock::time_point timeout = db::no_timeout) { return _impl->fill_buffer(timeout); }
future<> fill_buffer(db::timeout_clock::time_point timeout) { return _impl->fill_buffer(timeout); }
// Changes the range of partitions to pr. The range can only be moved
// forwards. pr.begin() needs to be larger than pr.end() of the previousl
@@ -373,7 +373,7 @@ public:
// previous fast forward target).
// pr needs to be valid until the reader is destroyed or fast_forward_to()
// is called again.
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout = db::no_timeout) {
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
return _impl->fast_forward_to(pr, timeout);
}
// Skips to a later range of rows.
@@ -397,7 +397,7 @@ public:
//
// When forwarding mode is not enabled, fast_forward_to()
// cannot be used.
future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout = db::no_timeout) {
future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) {
return _impl->fast_forward_to(std::move(cr), timeout);
}
bool is_end_of_stream() const { return _impl->is_end_of_stream(); }
@@ -412,15 +412,15 @@ public:
// Resolves with a pointer to the next fragment in the stream without consuming it from the stream,
// or nullptr if there are no more fragments.
// The returned pointer is invalidated by any other non-const call to this object.
future<mutation_fragment*> peek() {
future<mutation_fragment*> peek(db::timeout_clock::time_point timeout) {
if (!is_buffer_empty()) {
return make_ready_future<mutation_fragment*>(&_impl->_buffer.front());
}
if (is_end_of_stream()) {
return make_ready_future<mutation_fragment*>(nullptr);
}
return fill_buffer().then([this] {
return peek();
return fill_buffer(timeout).then([this, timeout] {
return peek(timeout);
});
}
// A peek at the next fragment in the buffer.
@@ -459,9 +459,13 @@ GCC6_CONCEPT(requires requires(StopCondition stop, ConsumeMutationFragment consu
{ consume_mf(std::move(mf)) } -> void;
{ consume_eos() } -> future<>;
})
future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition&& stop,
ConsumeMutationFragment&& consume_mf, ConsumeEndOfStream&& consume_eos) {
return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos] {
future<> consume_mutation_fragments_until(
flat_mutation_reader& r,
StopCondition&& stop,
ConsumeMutationFragment&& consume_mf,
ConsumeEndOfStream&& consume_eos,
db::timeout_clock::time_point timeout) {
return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos, timeout] {
while (!r.is_buffer_empty()) {
consume_mf(r.pop_mutation_fragment());
if (stop()) {
@@ -471,7 +475,7 @@ future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition
if (r.is_end_of_stream()) {
return consume_eos();
}
return r.fill_buffer();
return r.fill_buffer(timeout);
});
}
@@ -596,13 +600,13 @@ make_flat_mutation_reader_from_fragments(schema_ptr, std::deque<mutation_fragmen
// The returned future<> resolves when consumption ends.
template <typename Consumer>
inline
future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer) {
future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer, db::timeout_clock::time_point timeout) {
static_assert(std::is_same<future<stop_iteration>, futurize_t<std::result_of_t<Consumer(mutation&&)>>>::value, "bad Consumer signature");
using futurator = futurize<std::result_of_t<Consumer(mutation&&)>>;
return do_with(std::move(consumer), [&reader] (Consumer& c) -> future<> {
return repeat([&reader, &c] () {
return read_mutation_from_flat_mutation_reader(reader).then([&c] (mutation_opt&& mo) -> future<stop_iteration> {
return do_with(std::move(consumer), [&reader, timeout] (Consumer& c) -> future<> {
return repeat([&reader, &c, timeout] () {
return read_mutation_from_flat_mutation_reader(reader, timeout).then([&c] (mutation_opt&& mo) -> future<stop_iteration> {
if (!mo) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}

View File

@@ -251,7 +251,7 @@ future<> fragment_and_freeze(flat_mutation_reader mr, frozen_mutation_consumer_f
fragmenting_mutation_freezer freezer(*mr.schema(), c, fragment_size);
return do_with(std::move(mr), std::move(freezer), [] (auto& mr, auto& freezer) {
return repeat([&] {
return mr().then([&] (auto mfopt) {
return mr(db::no_timeout).then([&] (auto mfopt) {
if (!mfopt) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}

View File

@@ -623,7 +623,7 @@ memtable::apply(memtable& mt) {
return consume_partitions(rd, [self = this->shared_from_this(), &rd] (mutation&& m) {
self->apply(m);
return stop_iteration::no;
});
}, db::no_timeout);
});
}

View File

@@ -113,7 +113,7 @@ shard_writer::shard_writer(schema_ptr s,
}
future<> shard_writer::consume() {
return _reader.peek().then([this] (mutation_fragment* mf_ptr) {
return _reader.peek(db::no_timeout).then([this] (mutation_fragment* mf_ptr) {
if (mf_ptr) {
return _consumer(std::move(_reader));
}
@@ -198,7 +198,7 @@ future<> multishard_writer::wait_pending_consumers() {
future<> multishard_writer::distribute_mutation_fragments() {
return repeat([this] () mutable {
return _producer().then([this] (mutation_fragment_opt mf_opt) mutable {
return _producer(db::no_timeout).then([this] (mutation_fragment_opt mf_opt) mutable {
if (mf_opt) {
return handle_mutation_fragment(std::move(*mf_opt));
} else {

View File

@@ -216,13 +216,13 @@ mutation mutation::sliced(const query::clustering_row_ranges& ranges) const {
return mutation(schema(), decorated_key(), partition().sliced(*schema(), ranges));
}
future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reader& r) {
future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reader& r, db::timeout_clock::time_point timeout) {
if (r.is_buffer_empty()) {
if (r.is_end_of_stream()) {
return make_ready_future<mutation_opt>();
}
return r.fill_buffer().then([&r] {
return read_mutation_from_flat_mutation_reader(r);
return r.fill_buffer(timeout).then([&r, timeout] {
return read_mutation_from_flat_mutation_reader(r, timeout);
});
}
// r.is_buffer_empty() is always false at this point
@@ -268,7 +268,7 @@ future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reade
return _builder->consume_end_of_stream();
}
};
return r.consume(adapter(r.schema()));
return r.consume(adapter(r.schema()), timeout);
}
std::ostream& operator<<(std::ostream& os, const mutation& m) {

View File

@@ -188,4 +188,4 @@ boost::iterator_range<std::vector<mutation>::const_iterator> slice(
class flat_mutation_reader;
// Reads a single partition from a reader. Returns empty optional if there are no more partitions to be read.
future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reader&);
future<mutation_opt> read_mutation_from_flat_mutation_reader(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);

View File

@@ -2360,7 +2360,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
auto cwqrb = counter_write_query_result_builder(*s);
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, counter_write_query_result_builder>>(
*s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb));
auto f = r_a_r->reader.consume(std::move(cfq), flat_mutation_reader::consume_reversed_partitions::no);
auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout, flat_mutation_reader::consume_reversed_partitions::no);
return f.finally([r_a_r = std::move(r_a_r)] { });
}

View File

@@ -90,7 +90,7 @@ auto consume_page(flat_mutation_reader& reader,
// on it because it stores references to some of it's own members.
// Move it to the heap before any consumption begins to avoid
// accidents.
return reader.peek().then([=, &reader, consumer = std::make_unique<Consumer>(std::move(consumer)), &slice] (
return reader.peek(timeout).then([=, &reader, consumer = std::make_unique<Consumer>(std::move(consumer)), &slice] (
mutation_fragment* next_fragment) mutable {
const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end;
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, *consumer);
@@ -103,7 +103,7 @@ auto consume_page(flat_mutation_reader& reader,
compaction_state,
clustering_position_tracker(std::move(consumer), last_ckey));
return reader.consume(std::move(reader_consumer), is_reversed, timeout).then([last_ckey] (auto&&... results) mutable {
return reader.consume(std::move(reader_consumer), timeout, is_reversed).then([last_ckey] (auto&&... results) mutable {
return make_ready_future<std::optional<clustering_key_prefix>, std::decay_t<decltype(results)>...>(std::move(*last_ckey), std::move(results)...);
});
});

View File

@@ -47,7 +47,7 @@ public:
: _cache(cache)
, _read_context(context)
{ }
future<mutation_fragment_opt> move_to_next_partition() {
future<mutation_fragment_opt> move_to_next_partition(db::timeout_clock::time_point timeout) {
_last_key = std::move(_new_last_key);
auto start = population_range_start();
auto phase = _cache.phase_of(start);
@@ -75,7 +75,7 @@ public:
if (_reader->is_end_of_stream() && _reader->is_buffer_empty()) {
return make_ready_future<mutation_fragment_opt>();
}
return (*_reader)().then([this] (auto&& mfopt) {
return (*_reader)(timeout).then([this] (auto&& mfopt) {
if (mfopt) {
assert(mfopt->is_partition_start());
_new_last_key = mfopt->as_partition_start().key();
@@ -219,8 +219,8 @@ public:
}
// Gets the next fragment from the underlying reader
future<mutation_fragment_opt> get_next_fragment(db::timeout_clock::time_point timeout) {
return ensure_underlying(timeout).then([this] {
return _underlying.underlying()();
return ensure_underlying(timeout).then([this, timeout] {
return _underlying.underlying()(timeout);
});
}
};

View File

@@ -595,7 +595,7 @@ future<partition_checksum> partition_checksum::compute_legacy(flat_mutation_read
return do_with(std::move(mr),
partition_checksum(), [] (auto& reader, auto& checksum) {
return repeat([&reader, &checksum] () {
return read_mutation_from_flat_mutation_reader(reader).then([&checksum] (auto mopt) {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([&checksum] (auto mopt) {
if (!mopt) {
return stop_iteration::yes;
}
@@ -615,7 +615,7 @@ future<partition_checksum> partition_checksum::compute_legacy(flat_mutation_read
future<partition_checksum> partition_checksum::compute_streamed(flat_mutation_reader m)
{
return do_with(std::move(m), [] (auto& m) {
return m.consume(partition_hasher(*m.schema()));
return m.consume(partition_hasher(*m.schema()), db::no_timeout);
});
}

View File

@@ -338,10 +338,10 @@ future<> read_context::create_underlying(bool skip_first_fragment, db::timeout_c
} else {
_sm_range = dht::partition_range::make_singular({dht::ring_position(*_key)});
}
return _underlying.fast_forward_to(std::move(_sm_range), *_underlying_snapshot, _phase, timeout).then([this, skip_first_fragment] {
return _underlying.fast_forward_to(std::move(_sm_range), *_underlying_snapshot, _phase, timeout).then([this, skip_first_fragment, timeout] {
_underlying_snapshot = {};
if (skip_first_fragment) {
return _underlying.underlying()().then([](auto &&mf) {});
return _underlying.underlying()(timeout).then([](auto &&mf) {});
} else {
return make_ready_future<>();
}
@@ -369,8 +369,8 @@ private:
auto src_and_phase = _cache.snapshot_of(_read_context->range().start()->value());
auto phase = src_and_phase.phase;
_read_context->enter_partition(_read_context->range().start()->value().as_decorated_key(), src_and_phase.snapshot, phase);
return _read_context->create_underlying(false, timeout).then([this, phase] {
return _read_context->underlying().underlying()().then([this, phase] (auto&& mfopt) {
return _read_context->create_underlying(false, timeout).then([this, phase, timeout] {
return _read_context->underlying().underlying()(timeout).then([this, phase] (auto&& mfopt) {
if (!mfopt) {
if (phase == _cache.phase_of(_read_context->range().start()->value())) {
_cache._read_section(_cache._tracker.region(), [this] {
@@ -532,8 +532,8 @@ public:
, _read_context(ctx)
{}
future<flat_mutation_reader_opt, mutation_fragment_opt > operator()() {
return _reader.move_to_next_partition().then([this] (auto&& mfopt) mutable {
future<flat_mutation_reader_opt, mutation_fragment_opt > operator()(db::timeout_clock::time_point timeout) {
return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable {
{
if (!mfopt) {
this->handle_end_of_stream();
@@ -658,7 +658,7 @@ private:
}
future<flat_mutation_reader_opt> read_from_secondary(db::timeout_clock::time_point timeout) {
return _secondary_reader().then([this, timeout] (flat_mutation_reader_opt fropt, mutation_fragment_opt ps) {
return _secondary_reader(timeout).then([this, timeout] (flat_mutation_reader_opt fropt, mutation_fragment_opt ps) {
if (fropt) {
if (ps) {
push_mutation_fragment(std::move(*ps));

View File

@@ -754,7 +754,7 @@ future<compaction_info> compaction::run(std::unique_ptr<compaction> c) {
// leave this block either successfully or exceptionally with the reader object
// destroyed.
auto r = std::move(reader);
r.consume_in_thread(std::move(cfc), c->filter_func());
r.consume_in_thread(std::move(cfc), c->filter_func(), db::no_timeout);
} catch (...) {
delete_sstables_for_interrupted_compaction(c->_info->new_sstables, c->_info->ks, c->_info->cf);
c = nullptr; // make sure writers are stopped while running in thread context

View File

@@ -3548,7 +3548,7 @@ future<> sstable::write_components(
}
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats, &pc] () mutable {
auto wr = get_writer(*schema, estimated_partitions, cfg, stats, pc);
mr.consume_in_thread(std::move(wr));
mr.consume_in_thread(std::move(wr), db::no_timeout);
});
}

View File

@@ -178,7 +178,7 @@ future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
auto sink_op = [sink, si, got_error_from_peer] () mutable -> future<> {
return repeat([sink, si, got_error_from_peer] () mutable {
return si->reader().then([sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable {
return si->reader(db::no_timeout).then([sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable {
if (mf && !(*got_error_from_peer)) {
frozen_mutation_fragment fmf = freeze(*s, *mf);
auto size = fmf.representation().size();

View File

@@ -33,7 +33,7 @@ class flat_reader_assertions {
dht::partition_range _pr;
private:
mutation_fragment_opt read_next() {
return _reader().get0();
return _reader(db::no_timeout).get0();
}
public:
flat_reader_assertions(flat_mutation_reader reader)
@@ -208,11 +208,11 @@ public:
const schema& s = *_reader.schema();
range_tombstone_list actual_list(s);
position_in_partition::equal_compare eq(s);
while (mutation_fragment* next = _reader.peek().get0()) {
while (mutation_fragment* next = _reader.peek(db::no_timeout).get0()) {
if (!next->is_range_tombstone() || !eq(next->position(), mfo->position())) {
break;
}
actual_list.apply(s, _reader().get0()->as_range_tombstone());
actual_list.apply(s, _reader(db::no_timeout).get0()->as_range_tombstone());
}
actual_list.apply(s, mfo->as_range_tombstone());
{
@@ -285,7 +285,7 @@ public:
}
flat_reader_assertions& produces(const mutation& m, const stdx::optional<query::clustering_row_ranges>& ck_ranges = {}) {
auto mo = read_mutation_from_flat_mutation_reader(_reader).get0();
auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0();
if (!mo) {
BOOST_FAIL(sprint("Expected %s, but got end of stream, at: %s", m, seastar::current_backtrace()));
}
@@ -310,7 +310,7 @@ public:
flat_reader_assertions& produces_eos_or_empty_mutation() {
BOOST_TEST_MESSAGE("Expecting eos or empty mutation");
auto mo = read_mutation_from_flat_mutation_reader(_reader).get0();
auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0();
if (mo) {
if (!mo->partition().empty()) {
BOOST_FAIL(sprint("Mutation is not empty: %s", *mo));
@@ -356,7 +356,7 @@ public:
flat_reader_assertions& fast_forward_to(const dht::partition_range& pr) {
_pr = pr;
_reader.fast_forward_to(_pr).get();
_reader.fast_forward_to(_pr, db::no_timeout).get();
return *this;
}
@@ -366,7 +366,7 @@ public:
}
flat_reader_assertions& fast_forward_to(position_range pr) {
_reader.fast_forward_to(std::move(pr)).get();
_reader.fast_forward_to(std::move(pr), db::no_timeout).get();
return *this;
}
@@ -378,7 +378,7 @@ public:
}
flat_reader_assertions& produces_compacted(const mutation& m, const stdx::optional<query::clustering_row_ranges>& ck_ranges = {}) {
auto mo = read_mutation_from_flat_mutation_reader(_reader).get0();
auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0();
BOOST_REQUIRE(bool(mo));
memory::disable_failure_guard dfg;
mutation got = *mo;
@@ -388,13 +388,13 @@ public:
}
mutation_assertion next_mutation() {
auto mo = read_mutation_from_flat_mutation_reader(_reader).get0();
auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0();
BOOST_REQUIRE(bool(mo));
return mutation_assertion(std::move(*mo));
}
future<> fill_buffer() {
return _reader.fill_buffer();
return _reader.fill_buffer(db::no_timeout);
}
bool is_buffer_full() const {

View File

@@ -89,10 +89,10 @@ struct mock_consumer {
static size_t count_fragments(mutation m) {
auto r = flat_mutation_reader_from_mutations({m});
size_t res = 0;
auto mfopt = r().get0();
auto mfopt = r(db::no_timeout).get0();
while (bool(mfopt)) {
++res;
mfopt = r().get0();
mfopt = r(db::no_timeout).get0();
}
return res;
}
@@ -103,7 +103,7 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
size_t fragments_in_m = count_fragments(m);
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
auto r = flat_mutation_reader_from_mutations({m});
auto result = r.consume(mock_consumer(depth)).get0();
auto result = r.consume(mock_consumer(depth), db::no_timeout).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
@@ -125,24 +125,24 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
size_t fragments_in_m2 = count_fragments(m2);
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
auto r = flat_mutation_reader_from_mutations({m1, m2});
auto result = r.consume(mock_consumer(depth)).get0();
auto result = r.consume(mock_consumer(depth), db::no_timeout).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
auto r2 = flat_mutation_reader_from_mutations({m1, m2});
auto start = r2().get0();
auto start = r2(db::no_timeout).get0();
BOOST_REQUIRE(start);
BOOST_REQUIRE(start->is_partition_start());
for (auto& mf : result._fragments) {
auto mfopt = r2().get0();
auto mfopt = r2(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
}
}
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
auto r = flat_mutation_reader_from_mutations({m1, m2});
auto result = r.consume(mock_consumer(depth)).get0();
auto result = r.consume(mock_consumer(depth), db::no_timeout).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
@@ -155,14 +155,14 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
}
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
auto r2 = flat_mutation_reader_from_mutations({m1, m2});
auto start = r2().get0();
auto start = r2(db::no_timeout).get0();
BOOST_REQUIRE(start);
BOOST_REQUIRE(start->is_partition_start());
for (auto& mf : result._fragments) {
auto mfopt = r2().get0();
auto mfopt = r2(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
mfopt = r2().get0();
mfopt = r2(db::no_timeout).get0();
}
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
@@ -476,10 +476,10 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
auto consume_fn = [&] (flat_mutation_reader& fmr, flat_stream_consumer fsc) {
if (thread) {
assert(bool(!reversed));
return fmr.consume_in_thread(std::move(fsc));
return fmr.consume_in_thread(std::move(fsc), db::no_timeout);
} else {
auto reversed_flag = flat_mutation_reader::consume_reversed_partitions(bool(reversed));
return fmr.consume(std::move(fsc), reversed_flag).get0();
return fmr.consume(std::move(fsc), db::no_timeout, reversed_flag).get0();
}
};
@@ -519,7 +519,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
};
BOOST_TEST_MESSAGE("Consume all, filtered");
fmr = flat_mutation_reader_from_mutations(muts);
muts2 = fmr.consume_in_thread(flat_stream_consumer(s, reversed), std::move(filter));
muts2 = fmr.consume_in_thread(flat_stream_consumer(s, reversed), std::move(filter), db::no_timeout);
BOOST_REQUIRE_EQUAL(muts.size() / 2, muts2.size());
for (auto j = size_t(1); j < muts.size(); j += 2) {
BOOST_REQUIRE_EQUAL(muts[j], muts2[j / 2]);
@@ -608,8 +608,8 @@ SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
return seastar::async([] {
for_each_mutation([&] (const mutation& m) {
auto rd = flat_mutation_reader_from_mutations({mutation(m)});
rd().get();
rd().get();
rd(db::no_timeout).get();
rd(db::no_timeout).get();
// We rely on AddressSanitizer telling us if nothing was leaked.
});
});

View File

@@ -109,7 +109,7 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_fragment) {
rd.consume_pausable([&] (mutation_fragment mf) {
mfs.emplace_back(std::move(mf));
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
for (auto&& mf : mfs) {
auto refrozen_mf = freeze(s, mf).unfreeze(s);
@@ -118,4 +118,4 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_fragment) {
}
}
});
}
}

View File

@@ -78,7 +78,7 @@ private:
consume_partitions(rd, [&] (mutation&& m) {
new_mt->apply(std::move(m));
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
_memtables.erase(_memtables.begin(), _memtables.begin() + count);
_memtables.push_back(new_mt);
}

View File

@@ -93,7 +93,7 @@ SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source)
// Create reader so that each mutation is in a separate version
flat_mutation_reader rd = mt->make_flat_reader(s, dht::partition_range::make_singular(m.decorated_key()));
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
readers.push_back(std::move(rd));
}
@@ -238,7 +238,7 @@ SEASTAR_TEST_CASE(test_virtual_dirty_accounting_on_flush) {
// Create a reader which will cause many partition versions to be created
flat_mutation_reader_opt rd1 = mt->make_flat_reader(s);
rd1->set_max_buffer_size(1);
rd1->fill_buffer().get();
rd1->fill_buffer(db::no_timeout).get();
// Override large cell value with a short one
{
@@ -258,7 +258,7 @@ SEASTAR_TEST_CASE(test_virtual_dirty_accounting_on_flush) {
flush_reader_check.produces_partition(current_ring[1]);
virtual_dirty_values.push_back(mgr.virtual_dirty_memory());
while ((*rd1)().get0()) ;
while ((*rd1)(db::no_timeout).get0()) ;
rd1 = {};
logalloc::shard_tracker().full_compaction();
@@ -372,17 +372,17 @@ SEASTAR_TEST_CASE(test_segment_migration_during_flush) {
auto rd = mt->make_flush_reader(s, service::get_local_priority_manager().memtable_flush_priority());
for (int i = 0; i < partitions; ++i) {
auto mfopt = rd().get0();
auto mfopt = rd(db::no_timeout).get0();
BOOST_REQUIRE(bool(mfopt));
BOOST_REQUIRE(mfopt->is_partition_start());
while (!mfopt->is_end_of_partition()) {
logalloc::shard_tracker().full_compaction();
mfopt = rd().get0();
mfopt = rd(db::no_timeout).get0();
}
virtual_dirty_values.push_back(mgr.virtual_dirty_memory());
}
BOOST_REQUIRE(!rd().get0());
BOOST_REQUIRE(!rd(db::no_timeout).get0());
std::reverse(virtual_dirty_values.begin(), virtual_dirty_values.end());
BOOST_REQUIRE(std::is_sorted(virtual_dirty_values.begin(), virtual_dirty_values.end()));
@@ -511,8 +511,8 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = mt->make_flat_reader(s);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -520,15 +520,15 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = mt->make_flat_reader(s, query::full_partition_range, slice);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = mt->make_flat_reader(s);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
@@ -537,8 +537,8 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = mt->make_flat_reader(s);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -546,15 +546,15 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = mt->make_flat_reader(s, query::full_partition_range, slice);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = mt->make_flat_reader(s);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
});

View File

@@ -66,7 +66,7 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
return make_exception_future<>(std::runtime_error("Failed to write"));
}
return repeat([&shards_after, reader = std::move(reader), error] () mutable {
return reader().then([&shards_after, error] (mutation_fragment_opt mf_opt) mutable {
return reader(db::no_timeout).then([&shards_after, error] (mutation_fragment_opt mf_opt) mutable {
if (mf_opt) {
if (mf_opt->is_partition_start()) {
auto shard = dht::global_partitioner().shard_of(mf_opt->as_partition_start().key().token());

View File

@@ -111,7 +111,7 @@ SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) {
muts.push_back(mutation(m.schema(), m.decorated_key()));
}
auto rd = flat_mutation_reader_from_mutations({m});
rd.consume(fragment_scatterer{muts}).get();
rd.consume(fragment_scatterer{muts}, db::no_timeout).get();
for (int i = 0; i < n; ++i) {
memtables[i]->apply(std::move(muts[i]));
}
@@ -396,7 +396,7 @@ SEASTAR_TEST_CASE(test_schema_upgrader_is_equivalent_with_mutation_upgrade) {
// upgrade m1 to m2's schema
auto reader = transform(flat_mutation_reader_from_mutations({m1}), schema_upgrader(m2.schema()));
auto from_upgrader = read_mutation_from_flat_mutation_reader(reader).get0();
auto from_upgrader = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
auto regular = m1;
regular.upgrade(m2.schema());

View File

@@ -1214,11 +1214,11 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
}
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
for (auto&& range : ranges) {
auto prange = position_range(range);
rd.fast_forward_to(prange).get();
rd.fast_forward_to(prange, db::no_timeout).get();
rd.consume_pausable([&](mutation_fragment&& mf) {
if (!mf.relevant_for_range(*s, prange.start())) {
BOOST_FAIL(sprint("Received fragment which is not relevant for range: %s, range: %s", mf, prange));
@@ -1229,14 +1229,14 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
}
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
}
assert_that(result).is_equal_to(expected, ranges);
};
check_next_partition(combined[0]);
rd.fast_forward_to(dht::partition_range::make_singular(keys[2])).get();
rd.fast_forward_to(dht::partition_range::make_singular(keys[2]), db::no_timeout).get();
check_next_partition(combined[2]);
});
}
@@ -1281,7 +1281,7 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
}
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
assert_that(result).is_equal_to(m1 + m2, query::clustering_row_ranges({range}));
}
@@ -1304,9 +1304,9 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
BOOST_REQUIRE(!mf.position().has_clustering_key());
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
rd.fast_forward_to(prange).get();
rd.fast_forward_to(prange, db::no_timeout).get();
position_in_partition last_pos = position_in_partition::before_all_clustered_rows();
auto consume_clustered = [&] (mutation_fragment&& mf) {
@@ -1319,9 +1319,9 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
return stop_iteration::no;
};
rd.consume_pausable(consume_clustered).get();
rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows())).get();
rd.consume_pausable(consume_clustered).get();
rd.consume_pausable(consume_clustered, db::no_timeout).get();
rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows()), db::no_timeout).get();
rd.consume_pausable(consume_clustered, db::no_timeout).get();
assert_that(result).is_equal_to(m1 + m2);
}
@@ -1346,7 +1346,7 @@ SEASTAR_TEST_CASE(test_combined_mutation_source_is_a_mutation_source) {
mf_m.partition().apply(*s, mf);
memtables[source_index++ % memtables.size()]->apply(mf_m);
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
}
std::vector<mutation_source> sources;
@@ -1717,7 +1717,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
auto reader = make_multishard_combining_reader(s.schema(), query::full_partition_range, s.schema()->full_slice(),
service::get_local_sstable_query_read_priority(), partitioner, std::move(factory));
reader.fill_buffer().get();
reader.fill_buffer(db::no_timeout).get();
BOOST_REQUIRE(reader.is_buffer_full());
BOOST_REQUIRE(smp::submit_to(shard_of_interest, [remote_control = remote_control.get()] {
@@ -1868,7 +1868,7 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_destroyed_with_pending_read_ahead)
{
auto reader = make_foreign_reader(s.schema(), std::move(remote_reader));
reader.fill_buffer().get();
reader.fill_buffer(db::no_timeout).get();
BOOST_REQUIRE(!reader.is_buffer_empty());
}
@@ -1970,7 +1970,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
{
auto reader = make_multishard_combining_reader(s.schema(), query::full_partition_range, s.schema()->full_slice(),
service::get_local_sstable_query_read_priority(), partitioner, std::move(factory));
reader.fill_buffer().get();
reader.fill_buffer(db::no_timeout).get();
BOOST_REQUIRE(reader.is_buffer_full());
}

View File

@@ -117,17 +117,17 @@ static void test_streamed_mutation_forwarding_is_consistent_with_slicing(populat
void consume_end_of_stream() { }
};
fwd_reader.consume(consumer(m.schema(), builder)).get0();
fwd_reader.consume(consumer(m.schema(), builder), db::no_timeout).get0();
BOOST_REQUIRE(bool(builder));
for (auto&& range : ranges) {
BOOST_TEST_MESSAGE(sprint("fwd %s", range));
fwd_reader.fast_forward_to(position_range(range)).get();
fwd_reader.consume(consumer(m.schema(), builder)).get0();
fwd_reader.fast_forward_to(position_range(range), db::no_timeout).get();
fwd_reader.consume(consumer(m.schema(), builder), db::no_timeout).get0();
}
mutation_opt fwd_m = builder->consume_end_of_stream();
BOOST_REQUIRE(bool(fwd_m));
mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(sliced_reader).get0();
mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(sliced_reader, db::no_timeout).get0();
BOOST_REQUIRE(bool(sliced_m));
assert_that(*sliced_m).is_equal_to(*fwd_m, slice_with_ranges.row_ranges(*m.schema(), m.key()));
}
@@ -1012,7 +1012,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) {
}
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
assert_that(result).is_equal_to(m1 + m2, query::clustering_row_ranges({range}));
}
@@ -1029,9 +1029,9 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) {
BOOST_REQUIRE(!mf.position().has_clustering_key());
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
rd.fast_forward_to(prange).get();
rd.fast_forward_to(prange, db::no_timeout).get();
position_in_partition last_pos = position_in_partition::before_all_clustered_rows();
auto consume_clustered = [&] (mutation_fragment&& mf) {
@@ -1044,9 +1044,9 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) {
return stop_iteration::no;
};
rd.consume_pausable(consume_clustered).get();
rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows())).get();
rd.consume_pausable(consume_clustered).get();
rd.consume_pausable(consume_clustered, db::no_timeout).get();
rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows()), db::no_timeout).get();
rd.consume_pausable(consume_clustered, db::no_timeout).get();
assert_that(result).is_equal_to(m1 + m2);
}

View File

@@ -84,7 +84,7 @@ static atomic_cell make_collection_member(data_type dt, T value) {
static mutation_partition get_partition(memtable& mt, const partition_key& key) {
auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key);
auto reader = mt.make_flat_reader(mt.schema(), dht::partition_range::make_singular(dk));
auto mo = read_mutation_from_flat_mutation_reader(reader).get0();
auto mo = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
BOOST_REQUIRE(bool(mo));
return std::move(mo->partition());
}

View File

@@ -507,7 +507,7 @@ public:
static
uint64_t consume_all(flat_mutation_reader& rd) {
return rd.consume(counting_consumer()).get0();
return rd.consume(counting_consumer(), db::no_timeout).get0();
}
static
@@ -516,13 +516,13 @@ uint64_t consume_all_with_next_partition(flat_mutation_reader& rd) {
do {
fragments += consume_all(rd);
rd.next_partition();
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
} while(!rd.is_end_of_stream() || !rd.is_buffer_empty());
return fragments;
}
static void assert_partition_start(flat_mutation_reader& rd) {
auto mfopt = rd().get0();
auto mfopt = rd(db::no_timeout).get0();
assert(mfopt);
assert(mfopt->is_partition_start());
}
@@ -546,7 +546,7 @@ static test_result scan_rows_with_stride(column_family& cf, int n_rows, int n_re
rd.fast_forward_to(position_range(
position_in_partition(position_in_partition::clustering_row_tag_t(), clustering_key::from_singular(*cf.schema(), ck)),
position_in_partition(position_in_partition::clustering_row_tag_t(), clustering_key::from_singular(*cf.schema(), ck + n_read))
)).get();
), db::no_timeout).get();
}
fragments += consume_all(rd);
ck += n_read + n_skip;
@@ -585,7 +585,7 @@ static test_result scan_with_stride_partitions(column_family& cf, int n, int n_r
dht::partition_range::bound(keys[pk], true),
dht::partition_range::bound(keys[std::min(n, pk + n_read) - 1], true)
);
rd.fast_forward_to(pr).get();
rd.fast_forward_to(pr, db::no_timeout).get();
}
fragments += consume_all(rd);
pk += n_read + n_skip;
@@ -607,7 +607,7 @@ static test_result slice_rows(column_family& cf, int offset = 0, int n_read = 1)
rd.fast_forward_to(position_range(
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset)),
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read)))).get();
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read))), db::no_timeout).get();
uint64_t fragments = consume_all_with_next_partition(rd);
return {before, fragments};
@@ -662,7 +662,7 @@ static test_result slice_rows_single_key(column_family& cf, int offset = 0, int
assert_partition_start(rd);
rd.fast_forward_to(position_range(
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset)),
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read)))).get();
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read))), db::no_timeout).get();
uint64_t fragments = consume_all_with_next_partition(rd);
return {before, fragments};
@@ -723,13 +723,13 @@ static test_result test_forwarding_with_restriction(column_family& cf, table_con
rd.fast_forward_to(position_range(
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 1)),
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 2)))).get();
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 2))), db::no_timeout).get();
fragments += consume_all(rd);
rd.fast_forward_to(position_range(
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key - 2)),
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key + 2)))).get();
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key + 2))), db::no_timeout).get();
fragments += consume_all_with_next_partition(rd);
return {before, fragments};

View File

@@ -125,7 +125,7 @@ future<> combined::consume_all(flat_mutation_reader mr) const
return mr.consume_pausable([] (mutation_fragment mf) {
perf_tests::do_not_optimize(mf);
return stop_iteration::no;
}).then([] {
}, db::no_timeout).then([] {
perf_tests::stop_measuring_time();
});
});
@@ -248,7 +248,7 @@ protected:
return mr.consume_pausable([] (mutation_fragment mf) {
perf_tests::do_not_optimize(mf);
return stop_iteration::no;
});
}, db::no_timeout);
});
}
};
@@ -273,4 +273,4 @@ PERF_TEST_F(memtable, many_partitions_many_rows)
return consume_all(multi_row_mt().make_flat_reader(schema(), multi_partition_range(25)));
}
}
}

View File

@@ -204,7 +204,7 @@ public:
auto total = make_lw_shared<size_t>(0);
auto done = make_lw_shared<bool>(false);
return do_until([done] { return *done; }, [this, done, total, &r] {
return read_mutation_from_flat_mutation_reader(r).then([this, done, total] (mutation_opt m) {
return read_mutation_from_flat_mutation_reader(r, db::no_timeout).then([this, done, total] (mutation_opt m) {
if (!m) {
*done = true;
} else {

View File

@@ -153,7 +153,7 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
auto rd = std::make_unique<flat_mutation_reader>(
make_combined_reader(s, cache.make_reader(s), mt->make_flat_reader(s)));
rd->set_max_buffer_size(1);
rd->fill_buffer().get();
rd->fill_buffer(db::no_timeout).get();
scheduling_latency_measurer slm;
slm.start();
@@ -164,7 +164,7 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
rd->set_max_buffer_size(1024*1024);
rd->consume_pausable([] (mutation_fragment) {
return stop_iteration::no;
}).get();
}, db::no_timeout).get();
mt = {};
rd = {};

View File

@@ -185,7 +185,7 @@ int main(int argc, char** argv) {
for (auto&& key : keys) {
auto range = dht::partition_range::make_singular(key);
auto reader = cache.make_reader(s, range);
auto mo = read_mutation_from_flat_mutation_reader(reader).get0();
auto mo = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
assert(mo);
assert(mo->partition().live_row_count(*s) ==
row_count + 1 /* one row was already in cache before update()*/);
@@ -202,7 +202,7 @@ int main(int argc, char** argv) {
for (auto&& key : keys) {
auto range = dht::partition_range::make_singular(key);
auto reader = cache.make_reader(s, range);
auto mfopt = reader().get0();
auto mfopt = reader(db::no_timeout).get0();
assert(mfopt);
assert(mfopt->is_partition_start());
}
@@ -240,7 +240,7 @@ int main(int argc, char** argv) {
try {
auto reader = cache.make_reader(s, range);
assert(!reader().get0());
assert(!reader(db::no_timeout).get0());
auto evicted_from_cache = logalloc::segment_size + large_cell_size;
new char[evicted_from_cache + logalloc::segment_size];
assert(false); // The test is not invoking the case which it's supposed to test

View File

@@ -305,7 +305,7 @@ int main(int argc, char** argv) {
while (!cancelled) {
test_log.trace("{}: starting read", id);
auto rd = t.make_single_key_reader(pk, ck_range);
auto row_count = rd->rd.consume(validating_consumer(t, id)).get0();
auto row_count = rd->rd.consume(validating_consumer(t, id), db::no_timeout).get0();
if (row_count != len) {
throw std::runtime_error(sprint("Expected %d fragments, got %d", len, row_count));
}
@@ -317,7 +317,7 @@ int main(int argc, char** argv) {
while (!cancelled) {
test_log.trace("{}: starting read", id);
auto rd = t.make_scanning_reader();
auto row_count = rd->rd.consume(validating_consumer(t, id)).get0();
auto row_count = rd->rd.consume(validating_consumer(t, id), db::no_timeout).get0();
if (row_count != expected_row_count) {
throw std::runtime_error(sprint("Expected %d fragments, got %d", expected_row_count, row_count));
}

View File

@@ -126,7 +126,7 @@ snapshot_source snapshot_source_from_snapshot(mutation_source src) {
bool has_key(row_cache& cache, const dht::decorated_key& key) {
auto range = dht::partition_range::make_singular(key);
auto reader = cache.make_reader(cache.schema(), range);
auto mo = read_mutation_from_flat_mutation_reader(reader).get0();
auto mo = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
if (!bool(mo)) {
return false;
}
@@ -673,7 +673,7 @@ SEASTAR_TEST_CASE(test_reading_from_random_partial_partition) {
cache.populate(m1); // m1 is supposed to have random continuity and populate() should preserve it
auto rd1 = cache.make_reader(gen.schema());
rd1.fill_buffer().get();
rd1.fill_buffer(db::no_timeout).get();
// Merge m2 into cache
auto mt = make_lw_shared<memtable>(gen.schema());
@@ -681,7 +681,7 @@ SEASTAR_TEST_CASE(test_reading_from_random_partial_partition) {
cache.update([&] { underlying.apply(m2); }, *mt).get();
auto rd2 = cache.make_reader(gen.schema());
rd2.fill_buffer().get();
rd2.fill_buffer(db::no_timeout).get();
assert_that(std::move(rd1)).next_mutation().is_equal_to(m1);
assert_that(std::move(rd2)).next_mutation().is_equal_to(m1 + m2);
@@ -738,7 +738,7 @@ SEASTAR_TEST_CASE(test_eviction) {
auto pr = dht::partition_range::make_singular(key);
auto rd = cache.make_reader(s, pr);
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
}
while (tracker.partitions() > 0) {
@@ -807,7 +807,7 @@ SEASTAR_TEST_CASE(test_eviction_after_schema_change) {
auto pr = dht::partition_range::make_singular(m.decorated_key());
auto rd = cache.make_reader(s2, pr);
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
}
while (tracker.region().evict_some() == memory::reclaiming_result::reclaimed_something) ;
@@ -824,9 +824,9 @@ void test_sliced_read_row_presence(flat_mutation_reader reader, schema_ptr s, st
{
clustering_key::equality ck_eq(*s);
auto mfopt = reader().get0();
auto mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt->is_partition_start());
while ((mfopt = reader().get0()) && !mfopt->is_end_of_partition()) {
while ((mfopt = reader(db::no_timeout).get0()) && !mfopt->is_end_of_partition()) {
if (mfopt->is_clustering_row()) {
BOOST_REQUIRE(!expected.empty());
auto expected_ck = expected.front();
@@ -840,7 +840,7 @@ void test_sliced_read_row_presence(flat_mutation_reader reader, schema_ptr s, st
}
BOOST_REQUIRE(expected.empty());
BOOST_REQUIRE(mfopt && mfopt->is_end_of_partition());
BOOST_REQUIRE(!reader().get0());
BOOST_REQUIRE(!reader(db::no_timeout).get0());
}
SEASTAR_TEST_CASE(test_single_partition_update) {
@@ -1043,7 +1043,7 @@ SEASTAR_TEST_CASE(test_update_failure) {
auto has_only = [&] (const partitions_type& partitions) {
auto reader = cache.make_reader(s, query::full_partition_range);
for (int i = 0; i < partition_count; i++) {
auto mopt = read_mutation_from_flat_mutation_reader(reader).get0();
auto mopt = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
if (!mopt) {
break;
}
@@ -1051,7 +1051,7 @@ SEASTAR_TEST_CASE(test_update_failure) {
BOOST_REQUIRE(it != partitions.end());
BOOST_REQUIRE(it->second.equal(*s, mopt->partition()));
}
BOOST_REQUIRE(!reader().get0());
BOOST_REQUIRE(!reader(db::no_timeout).get0());
};
if (failed) {
@@ -1236,11 +1236,11 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
auto m0_range = dht::partition_range::make_singular(ring[0].ring_position());
auto rd1 = cache.make_reader(s, m0_range);
rd1.set_max_buffer_size(1);
auto rd1_fill_buffer = rd1.fill_buffer();
auto rd1_fill_buffer = rd1.fill_buffer(db::no_timeout);
auto rd2 = cache.make_reader(s);
rd2.set_max_buffer_size(1);
auto rd2_fill_buffer = rd2.fill_buffer();
auto rd2_fill_buffer = rd2.fill_buffer(db::no_timeout);
sleep(10ms).get();
@@ -1371,7 +1371,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
auto rd1 = cache.make_reader(s);
rd1.set_max_buffer_size(1);
auto rd1_fill_buffer = rd1.fill_buffer();
auto rd1_fill_buffer = rd1.fill_buffer(db::no_timeout);
sleep(10ms).get();
@@ -1431,10 +1431,10 @@ SEASTAR_TEST_CASE(test_mvcc) {
cache.populate(m1);
auto rd1 = cache.make_reader(s);
rd1.fill_buffer().get();
rd1.fill_buffer(db::no_timeout).get();
auto rd2 = cache.make_reader(s);
rd2.fill_buffer().get();
rd2.fill_buffer(db::no_timeout).get();
auto mt1 = make_lw_shared<memtable>(s);
mt1->apply(m2);
@@ -1453,19 +1453,19 @@ SEASTAR_TEST_CASE(test_mvcc) {
cache.update([&] { underlying.apply(mt1_copy); }, *mt1).get();
auto rd3 = cache.make_reader(s);
rd3.fill_buffer().get();
rd3.fill_buffer(db::no_timeout).get();
auto rd4 = cache.make_reader(s);
rd4.fill_buffer().get();
rd4.fill_buffer(db::no_timeout).get();
auto rd5 = cache.make_reader(s);
rd5.fill_buffer().get();
rd5.fill_buffer(db::no_timeout).get();
assert_that(std::move(rd3)).has_monotonic_positions();
if (with_active_memtable_reader) {
assert(mt1_reader_opt);
auto mt1_reader_mutation = read_mutation_from_flat_mutation_reader(*mt1_reader_opt).get0();
auto mt1_reader_mutation = read_mutation_from_flat_mutation_reader(*mt1_reader_opt, db::no_timeout).get0();
BOOST_REQUIRE(mt1_reader_mutation);
assert_that(*mt1_reader_mutation).is_equal_to(m2);
}
@@ -1884,7 +1884,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_in_partial_partition) {
}
static void consume_all(flat_mutation_reader& rd) {
while (auto mfopt = rd().get0()) {}
while (auto mfopt = rd(db::no_timeout).get0()) {}
}
static void populate_range(row_cache& cache,
@@ -1934,7 +1934,7 @@ SEASTAR_TEST_CASE(test_readers_get_all_data_after_eviction) {
auto make_reader = [&] (const query::partition_slice& slice) {
auto rd = cache.make_reader(s, query::full_partition_range, slice);
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return assert_that(std::move(rd));
};
@@ -2060,7 +2060,7 @@ SEASTAR_TEST_CASE(test_tombstones_are_not_missed_when_range_is_invalidated) {
auto make_reader = [&] (const query::partition_slice& slice) {
auto rd = cache.make_reader(s.schema(), pr, slice);
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return assert_that(std::move(rd));
};
@@ -2165,7 +2165,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
auto make_reader = [&] (const dht::partition_range& pr) {
auto rd = cache.make_reader(s.schema(), pr);
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return rd;
};
@@ -2185,7 +2185,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
auto pr = dht::partition_range::make_singular(pkeys[2]);
snap = mt->make_flat_reader(s.schema(), pr);
snap->set_max_buffer_size(1);
snap->fill_buffer().get();
snap->fill_buffer(db::no_timeout).get();
cache.update([&] {
auto mt2 = make_lw_shared<memtable>(cache.schema());
@@ -2244,9 +2244,9 @@ SEASTAR_TEST_CASE(test_exception_safety_of_reads) {
try {
injector.fail_after(i++);
auto rd = cache.make_reader(s, query::full_partition_range, slice);
auto got_opt = read_mutation_from_flat_mutation_reader(rd).get0();
auto got_opt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
BOOST_REQUIRE(got_opt);
BOOST_REQUIRE(!read_mutation_from_flat_mutation_reader(rd).get0());
BOOST_REQUIRE(!read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0());
injector.cancel();
assert_that(*got_opt).is_equal_to(mut, ranges);
@@ -2320,9 +2320,9 @@ SEASTAR_TEST_CASE(test_exception_safety_of_transitioning_from_underlying_read_to
injector.fail_after(i++);
auto rd = cache.make_reader(s.schema(), pr, slice);
auto got_opt = read_mutation_from_flat_mutation_reader(rd).get0();
auto got_opt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
BOOST_REQUIRE(got_opt);
auto mfopt = rd().get0();
auto mfopt = rd(db::no_timeout).get0();
BOOST_REQUIRE(!mfopt);
injector.cancel();
@@ -2396,7 +2396,7 @@ SEASTAR_TEST_CASE(test_concurrent_population_before_latest_version_iterator) {
auto make_reader = [&] (const query::partition_slice& slice) {
auto rd = cache.make_reader(s.schema(), pr, slice);
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return assert_that(std::move(rd));
};
@@ -2558,7 +2558,7 @@ SEASTAR_TEST_CASE(test_random_row_population) {
auto make_reader = [&] (const query::partition_slice* slice = nullptr) {
auto rd = cache.make_reader(s.schema(), pr, slice ? *slice : s.schema()->full_slice());
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return std::move(rd);
};
@@ -2592,7 +2592,7 @@ SEASTAR_TEST_CASE(test_random_row_population) {
while (!readers.empty()) {
auto i = readers.begin();
while (i != readers.end()) {
auto mfo = i->reader().get0();
auto mfo = i->reader(db::no_timeout).get0();
if (!mfo) {
auto&& ranges = i->slice->row_ranges(*s.schema(), pk.key());
assert_that(i->result).is_equal_to(m1, ranges);
@@ -2673,7 +2673,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver
auto make_reader = [&] {
auto rd = cache.make_reader(s.schema(), pr);
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return std::move(rd);
};
@@ -2801,7 +2801,7 @@ SEASTAR_TEST_CASE(test_continuity_population_with_multicolumn_clustering_key) {
auto make_reader = [&] (const query::partition_slice* slice = nullptr) {
auto rd = cache.make_reader(s, pr, slice ? *slice : s->full_slice());
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return std::move(rd);
};
@@ -2908,7 +2908,7 @@ SEASTAR_TEST_CASE(test_concurrent_setting_of_continuity_on_read_upper_bound) {
auto make_rd = [&] (const query::partition_slice* slice = nullptr) {
auto rd = cache.make_reader(s.schema(), pr, slice ? *slice : s.schema()->full_slice());
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return std::move(rd);
};
@@ -2972,7 +2972,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_of_overlapping_tombstones_in_many_versi
auto make_reader = [&] {
auto rd = cache.make_reader(s.schema());
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return std::move(rd);
};
@@ -3010,7 +3010,7 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
auto make_reader = [&] (const query::partition_slice& slice) {
auto rd = cache.make_reader(s, pr, slice);
rd.set_max_buffer_size(3);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return std::move(rd);
};
@@ -3037,7 +3037,7 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
.build();
auto rd = make_reader(slice);
auto actual_opt = read_mutation_from_flat_mutation_reader(rd).get0();
auto actual_opt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
BOOST_REQUIRE(actual_opt);
auto actual = *actual_opt;
@@ -3119,12 +3119,12 @@ SEASTAR_TEST_CASE(test_cache_update_and_eviction_preserves_monotonicity_of_memta
auto mt_rd1 = mt->make_flat_reader(s);
mt_rd1.set_max_buffer_size(1);
mt_rd1.fill_buffer().get();
mt_rd1.fill_buffer(db::no_timeout).get();
BOOST_REQUIRE(mt_rd1.is_buffer_full()); // If fails, increase n_rows
auto mt_rd2 = mt->make_flat_reader(s);
mt_rd2.set_max_buffer_size(1);
mt_rd2.fill_buffer().get();
mt_rd2.fill_buffer(db::no_timeout).get();
apply(cache, underlying, *mt);
@@ -3133,13 +3133,13 @@ SEASTAR_TEST_CASE(test_cache_update_and_eviction_preserves_monotonicity_of_memta
auto c_rd1 = cache.make_reader(s);
c_rd1.set_max_buffer_size(1);
c_rd1.fill_buffer().get();
c_rd1.fill_buffer(db::no_timeout).get();
apply(cache, underlying, m2);
auto c_rd2 = cache.make_reader(s);
c_rd2.set_max_buffer_size(1);
c_rd2.fill_buffer().get();
c_rd2.fill_buffer(db::no_timeout).get();
cache.evict();
@@ -3164,8 +3164,8 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -3173,15 +3173,15 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = cache.make_reader(s, query::full_partition_range, slice);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = cache.make_reader(s);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
@@ -3191,8 +3191,8 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -3200,15 +3200,15 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = cache.make_reader(s, query::full_partition_range, slice);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = cache.make_reader(s);
rd().get0()->as_partition_start();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
});
@@ -3234,7 +3234,7 @@ SEASTAR_TEST_CASE(test_random_population_with_many_versions) {
auto make_reader = [&] () {
auto rd = cache.make_reader(s, query::full_partition_range, s->full_slice());
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
rd.fill_buffer(db::no_timeout).get();
return assert_that(std::move(rd));
};
@@ -3341,7 +3341,7 @@ SEASTAR_TEST_CASE(test_eviction_after_old_snapshot_touches_overriden_rows_keeps_
auto pr1 = dht::partition_range::make_singular(pk);
auto rd1 = cache.make_reader(s, pr1);
rd1.set_max_buffer_size(1);
rd1.fill_buffer().get();
rd1.fill_buffer(db::no_timeout).get();
apply(cache, underlying, m2);
@@ -3382,7 +3382,7 @@ SEASTAR_TEST_CASE(test_eviction_after_old_snapshot_touches_overriden_rows_keeps_
auto rd1 = cache.make_reader(s, pr);
rd1.set_max_buffer_size(1);
rd1.fill_buffer().get();
rd1.fill_buffer(db::no_timeout).get();
apply(cache, underlying, m2);
@@ -3429,7 +3429,7 @@ SEASTAR_TEST_CASE(test_reading_progress_with_small_buffer_and_invalidation) {
while (!rd3.is_end_of_stream()) {
tracker.allocator().invalidate_references();
rd3.fill_buffer().get();
rd3.fill_buffer(db::no_timeout).get();
while (!rd3.is_buffer_empty()) {
result.partition().apply(*s.schema(), rd3.pop_mutation_fragment());
}

View File

@@ -2899,7 +2899,7 @@ SEASTAR_THREAD_TEST_CASE(compact_deleted_row) {
* ]
*/
auto reader = compacted_sstable_reader(s, table_name, {1, 2});
mutation_opt m = read_mutation_from_flat_mutation_reader(reader).get0();
mutation_opt m = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("key")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
@@ -2968,7 +2968,7 @@ SEASTAR_THREAD_TEST_CASE(compact_deleted_cell) {
*
*/
auto reader = compacted_sstable_reader(s, table_name, {1, 2});
mutation_opt m = read_mutation_from_flat_mutation_reader(reader).get0();
mutation_opt m = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("key")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());

View File

@@ -832,7 +832,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
return reusable_sst(s, tmpdir_path, 11).then([s, verifier, tomb, &static_set_col] (auto sstp) mutable {
return do_with(make_dkey(s, "key1"), [sstp, s, verifier, tomb, &static_set_col] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) {
auto verify_set = [&tomb] (const collection_type_impl::mutation& m) {
BOOST_REQUIRE(bool(m.tomb) == true);
BOOST_REQUIRE(m.tomb == tomb);
@@ -861,7 +861,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
}).then([sstp, s, verifier] {
return do_with(make_dkey(s, "key2"), [sstp, s, verifier] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, verifier, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, verifier, rd] (auto mutation) {
auto m = verifier(mutation);
BOOST_REQUIRE(!m.tomb);
BOOST_REQUIRE(m.cells.size() == 1);
@@ -894,7 +894,7 @@ SEASTAR_TEST_CASE(datafile_generation_12) {
return reusable_sst(s, tmpdir_path, 12).then([s, tomb] (auto sstp) mutable {
return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, tomb, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.row_tombstones().size() == 1);
for (auto& rt: mp.row_tombstones()) {
@@ -930,7 +930,7 @@ static future<> sstable_compression_test(compressor_ptr c, unsigned generation)
return reusable_sst(s, tmpdir_path, generation).then([s, tomb] (auto sstp) mutable {
return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, tomb, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.row_tombstones().size() == 1);
for (auto& rt: mp.row_tombstones()) {
@@ -1144,7 +1144,7 @@ SEASTAR_TEST_CASE(compact) {
// nadav - deleted partition
return open_sstable(s, tmpdir_path, generation).then([s] (shared_sstable sst) {
auto reader = make_lw_shared(sstable_reader(sst, s)); // reader holds sst and s alive.
return read_mutation_from_flat_mutation_reader(*reader).then([reader, s] (mutation_opt m) {
return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("jerry")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
@@ -1157,7 +1157,7 @@ SEASTAR_TEST_CASE(compact) {
auto& cdef2 = *s->get_column_definition("height");
BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == bytes({0,0,0,40}));
BOOST_REQUIRE(cells.cell_at(cdef2.id).as_atomic_cell(cdef2).value() == bytes({0,0,0,(int8_t)170}));
return read_mutation_from_flat_mutation_reader(*reader);
return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout);
}).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("tom")))));
@@ -1171,7 +1171,7 @@ SEASTAR_TEST_CASE(compact) {
auto& cdef2 = *s->get_column_definition("height");
BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == bytes({0,0,0,20}));
BOOST_REQUIRE(cells.cell_at(cdef2.id).as_atomic_cell(cdef2).value() == bytes({0,0,0,(int8_t)180}));
return read_mutation_from_flat_mutation_reader(*reader);
return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout);
}).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("john")))));
@@ -1185,14 +1185,14 @@ SEASTAR_TEST_CASE(compact) {
auto& cdef2 = *s->get_column_definition("height");
BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == bytes({0,0,0,20}));
BOOST_REQUIRE(cells.find_cell(cdef2.id) == nullptr);
return read_mutation_from_flat_mutation_reader(*reader);
return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout);
}).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("nadav")))));
BOOST_REQUIRE(m->partition().partition_tombstone());
auto &rows = m->partition().clustered_rows();
BOOST_REQUIRE(rows.calculate_size() == 0);
return read_mutation_from_flat_mutation_reader(*reader);
return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout);
}).then([reader] (mutation_opt m) {
BOOST_REQUIRE(!m);
});
@@ -1369,7 +1369,7 @@ static future<> check_compacted_sstables(sstring tmpdir_path, unsigned long gene
return do_with(std::move(reader), [generations, s, keys] (flat_mutation_reader& reader) {
return do_for_each(*generations, [&reader, keys] (unsigned long generation) mutable {
return read_mutation_from_flat_mutation_reader(reader).then([generation, keys] (mutation_opt m) {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([generation, keys] (mutation_opt m) {
BOOST_REQUIRE(m);
keys->push_back(m->key());
});
@@ -1450,7 +1450,7 @@ SEASTAR_TEST_CASE(datafile_generation_37) {
return reusable_sst(s, tmpdir_path, 37).then([s, tmpdir_path] (auto sstp) {
return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) {
auto& mp = mutation->partition();
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1")});
@@ -1485,7 +1485,7 @@ SEASTAR_TEST_CASE(datafile_generation_38) {
return reusable_sst(s, tmpdir_path, 38).then([s] (auto sstp) {
return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) {
auto& mp = mutation->partition();
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1"), to_bytes("cl2")});
@@ -1521,7 +1521,7 @@ SEASTAR_TEST_CASE(datafile_generation_39) {
return reusable_sst(s, tmpdir_path, 39).then([s] (auto sstp) {
return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) {
auto& mp = mutation->partition();
auto& row = mp.clustered_row(*s, clustering_key::make_empty());
match_live_cell(row.cells(), *s, "cl1", data_value(data_value(to_bytes("cl1"))));
@@ -1617,7 +1617,7 @@ SEASTAR_TEST_CASE(datafile_generation_41) {
return reusable_sst(s, tmpdir_path, 41).then([s, tomb] (auto sstp) mutable {
return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, tomb, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
auto& c_row = *(mp.clustered_rows().begin());
@@ -1676,7 +1676,7 @@ SEASTAR_TEST_CASE(datafile_generation_47) {
return reusable_sst(s, tmpdir_path, 47).then([s] (auto sstp) mutable {
auto reader = make_lw_shared(sstable_reader(sstp, s));
return repeat([reader] {
return (*reader)().then([] (mutation_fragment_opt m) {
return (*reader)(db::no_timeout).then([] (mutation_fragment_opt m) {
if (!m) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
@@ -2330,7 +2330,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
auto assert_that_produces_dead_cell = [&] (auto& sst, partition_key& key) {
auto reader = make_lw_shared(sstable_reader(sst, s));
read_mutation_from_flat_mutation_reader(*reader).then([reader, s, &key] (mutation_opt m) {
read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([reader, s, &key] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, key));
auto& rows = m->partition().clustered_rows();
@@ -2340,7 +2340,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
BOOST_REQUIRE_EQUAL(cells.size(), 1);
auto& cdef = *s->get_column_definition("value");
BOOST_REQUIRE(!cells.cell_at(cdef.id).as_atomic_cell(cdef).is_live());
return (*reader)();
return (*reader)(db::no_timeout);
}).then([reader, s] (mutation_fragment_opt m) {
BOOST_REQUIRE(!m);
}).get();
@@ -2505,7 +2505,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
auto f = sst->load();
return f.then([sst, s] {
auto reader = make_lw_shared(sstable_reader(sst, s));
return read_mutation_from_flat_mutation_reader(*reader).then([reader, s] (mutation_opt m) {
return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([reader, s] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, 0)));
auto& rows = m->partition().clustered_rows();
@@ -2516,7 +2516,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
BOOST_REQUIRE_EQUAL(cells.size(), 1);
auto& cdef = *s->get_column_definition("e");
BOOST_REQUIRE_EQUAL(cells.cell_at(cdef.id).as_atomic_cell(cdef).value(), int32_type->decompose(5));
return (*reader)();
return (*reader)(db::no_timeout);
}).then([reader, s] (mutation_fragment_opt m) {
BOOST_REQUIRE(!m);
});
@@ -2564,13 +2564,13 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
auto newsst = (*new_tables)[0];
BOOST_REQUIRE(newsst->generation() == 52);
auto reader = make_lw_shared(sstable_reader(newsst, s));
return (*reader)().then([s, reader, key] (mutation_fragment_opt m) {
return (*reader)(db::no_timeout).then([s, reader, key] (mutation_fragment_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->is_partition_start());
auto pkey = partition_key::from_exploded(*s, {to_bytes(key)});
BOOST_REQUIRE(m->as_partition_start().key().key().equal(*s, pkey));
reader->next_partition();
return (*reader)();
return (*reader)(db::no_timeout);
}).then([reader] (mutation_fragment_opt m) {
BOOST_REQUIRE(!m);
});
@@ -2587,7 +2587,7 @@ void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query
partition_key::equality pk_eq(*s);
clustering_key::equality ck_eq(*s);
auto mfopt = reader().get0();
auto mfopt = reader(db::no_timeout).get0();
while (mfopt) {
BOOST_REQUIRE(mfopt->is_partition_start());
auto it = std::find_if(expected.begin(), expected.end(), [&] (auto&& x) {
@@ -2597,7 +2597,7 @@ void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query
auto expected_cr = std::move(it->second);
expected.erase(it);
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
while (!mfopt->is_end_of_partition()) {
if (mfopt->is_clustering_row()) {
@@ -2611,12 +2611,12 @@ void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query
BOOST_REQUIRE(it != expected_cr.end());
expected_cr.erase(it);
}
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
}
BOOST_REQUIRE(expected_cr.empty());
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
}
BOOST_REQUIRE(expected.empty());
}
@@ -2812,11 +2812,11 @@ SEASTAR_TEST_CASE(test_counter_read) {
sst->load().get();
auto reader = sstable_reader(sst, s);
auto mfopt = reader().get0();
auto mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_partition_start());
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_clustering_row());
const clustering_row* cr = &mfopt->as_clustering_row();
@@ -2845,7 +2845,7 @@ SEASTAR_TEST_CASE(test_counter_read) {
});
});
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_clustering_row());
cr = &mfopt->as_clustering_row();
@@ -2858,11 +2858,11 @@ SEASTAR_TEST_CASE(test_counter_read) {
}
});
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_end_of_partition());
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(!mfopt);
}
});
@@ -4311,32 +4311,32 @@ SEASTAR_TEST_CASE(test_wrong_counter_shard_order) {
};
{
auto mfopt = reader().get0();
auto mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_partition_start());
verify_row(reader().get0(), 28545);
verify_row(reader().get0(), 27967);
verify_row(reader().get0(), 28342);
verify_row(reader().get0(), 28325);
mfopt = reader().get0();
verify_row(reader(db::no_timeout).get0(), 28545);
verify_row(reader(db::no_timeout).get0(), 27967);
verify_row(reader(db::no_timeout).get0(), 28342);
verify_row(reader(db::no_timeout).get0(), 28325);
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_end_of_partition());
}
{
auto mfopt = reader().get0();
auto mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_partition_start());
verify_row(reader().get0(), 28386);
verify_row(reader().get0(), 28378);
verify_row(reader().get0(), 28129);
verify_row(reader().get0(), 28260);
mfopt = reader().get0();
verify_row(reader(db::no_timeout).get0(), 28386);
verify_row(reader(db::no_timeout).get0(), 28378);
verify_row(reader(db::no_timeout).get0(), 28129);
verify_row(reader(db::no_timeout).get0(), 28260);
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_end_of_partition());
}
BOOST_REQUIRE(!reader().get0());
BOOST_REQUIRE(!reader(db::no_timeout).get0());
}
});
}

View File

@@ -53,7 +53,7 @@ SEASTAR_TEST_CASE(nonexistent_key) {
return do_with(make_dkey(uncompressed_schema(), "invalid_key"), [sstp] (auto& key) {
auto s = uncompressed_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return (*rd)().then([sstp, s, &key, rd] (auto mutation) {
return (*rd)(db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(!mutation);
return make_ready_future<>();
});
@@ -66,7 +66,7 @@ future<> test_no_clustered(bytes&& key, std::unordered_map<bytes, data_value> &&
return do_with(make_dkey(uncompressed_schema(), std::move(k)), [sstp, map = std::move(map)] (auto& key) {
auto s = uncompressed_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd, map = std::move(map)] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd, map = std::move(map)] (auto mutation) {
BOOST_REQUIRE(mutation);
auto& mp = mutation->partition();
for (auto&& e : mp.range(*s, nonwrapping_range<clustering_key_prefix>())) {
@@ -133,7 +133,7 @@ future<mutation> generate_clustered(bytes&& key) {
return do_with(make_dkey(complex_schema(), std::move(k)), [sstp] (auto& key) {
auto s = complex_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(mutation);
return std::move(*mutation);
});
@@ -324,7 +324,7 @@ future<> test_range_reads(const dht::token& min, const dht::token& max, std::vec
// "mutations", continues to live until after the last
// iteration's future completes, so its lifetime is safe.
[sstp, mutations = std::move(mutations), &expected, expected_size, count, stop] () mutable {
return (*mutations)().then([&expected, expected_size, count, stop, mutations] (mutation_fragment_opt mfopt) mutable {
return (*mutations)(db::no_timeout).then([&expected, expected_size, count, stop, mutations] (mutation_fragment_opt mfopt) mutable {
if (mfopt) {
BOOST_REQUIRE(mfopt->is_partition_start());
BOOST_REQUIRE(*count < expected_size);
@@ -431,7 +431,7 @@ SEASTAR_TEST_CASE(test_sstable_can_write_and_read_range_tombstone) {
write_memtable_to_sstable_for_test(*mt, sst).get();
sst->load().get();
auto mr = sst->read_rows_flat(s);
auto mut = read_mutation_from_flat_mutation_reader(mr).get0();
auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0();
BOOST_REQUIRE(bool(mut));
auto& rts = mut->partition().row_tombstones();
BOOST_REQUIRE(rts.size() == 1);
@@ -450,7 +450,7 @@ SEASTAR_TEST_CASE(compact_storage_sparse_read) {
return do_with(make_dkey(compact_sparse_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_sparse_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(mutation);
auto& mp = mutation->partition();
auto& row = mp.clustered_row(*s, clustering_key::make_empty());
@@ -467,7 +467,7 @@ SEASTAR_TEST_CASE(compact_storage_simple_dense_read) {
return do_with(make_dkey(compact_simple_dense_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_simple_dense_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
auto& mp = mutation->partition();
auto exploded = exploded_clustering_prefix({"cl1"});
@@ -486,7 +486,7 @@ SEASTAR_TEST_CASE(compact_storage_dense_read) {
return do_with(make_dkey(compact_dense_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_dense_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
auto& mp = mutation->partition();
auto exploded = exploded_clustering_prefix({"cl1", "cl2"});
@@ -509,7 +509,7 @@ SEASTAR_TEST_CASE(broken_ranges_collection) {
auto s = peers_schema();
auto reader = make_lw_shared<flat_mutation_reader>(sstp->as_mutation_source().make_reader(s, query::full_partition_range));
return repeat([s, reader] {
return read_mutation_from_flat_mutation_reader(*reader).then([s, reader] (mutation_opt mut) {
return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([s, reader] (mutation_opt mut) {
auto key_equal = [s, &mut] (sstring ip) {
return mut->key().equal(*s, partition_key::from_deeply_exploded(*s, { net::inet_address(ip) }));
};
@@ -575,7 +575,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone) {
auto s = tombstone_overlap_schema();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) {
if (!mut) {
return stop_iteration::yes;
}
@@ -638,7 +638,7 @@ SEASTAR_TEST_CASE(range_tombstone_reading) {
auto s = tombstone_overlap_schema();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) {
if (!mut) {
return stop_iteration::yes;
}
@@ -715,7 +715,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone2) {
auto s = tombstone_overlap_schema2();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) {
if (!mut) {
return stop_iteration::yes;
}
@@ -798,7 +798,7 @@ SEASTAR_TEST_CASE(test_non_compound_table_row_is_not_marked_as_static) {
write_memtable_to_sstable_for_test(*mt, sst).get();
sst->load().get();
auto mr = sst->read_rows_flat(s);
auto mut = read_mutation_from_flat_mutation_reader(mr).get0();
auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0();
BOOST_REQUIRE(bool(mut));
}
});
@@ -1320,12 +1320,12 @@ SEASTAR_THREAD_TEST_CASE(test_large_index_pages_do_not_cause_large_allocations)
auto pr = dht::partition_range::make_singular(small_keys[0]);
auto mt_reader = mt->make_flat_reader(s, pr);
mutation expected = *read_mutation_from_flat_mutation_reader(mt_reader).get0();
mutation expected = *read_mutation_from_flat_mutation_reader(mt_reader, db::no_timeout).get0();
auto t0 = std::chrono::steady_clock::now();
auto large_allocs_before = memory::stats().large_allocations();
auto sst_reader = sst->as_mutation_source().make_reader(s, pr);
mutation actual = *read_mutation_from_flat_mutation_reader(sst_reader).get0();
mutation actual = *read_mutation_from_flat_mutation_reader(sst_reader, db::no_timeout).get0();
auto large_allocs_after = memory::stats().large_allocations();
auto duration = std::chrono::steady_clock::now() - t0;

View File

@@ -884,7 +884,7 @@ SEASTAR_TEST_CASE(wrong_range) {
return do_with(make_dkey(uncompressed_schema(), "todata"), [sstp] (auto& key) {
auto s = columns_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
return make_ready_future<>();
});
});
@@ -1065,17 +1065,17 @@ static future<int> count_rows(sstable_ptr sstp, schema_ptr s, sstring key, sstri
auto ps = make_partition_slice(*s, ck1, ck2);
auto dkey = make_dkey(s, key.c_str());
auto rd = sstp->read_row_flat(s, dkey, ps);
auto mfopt = rd().get0();
auto mfopt = rd(db::no_timeout).get0();
if (!mfopt) {
return 0;
}
int nrows = 0;
mfopt = rd().get0();
mfopt = rd(db::no_timeout).get0();
while (mfopt) {
if (mfopt->is_clustering_row()) {
nrows++;
}
mfopt = rd().get0();
mfopt = rd(db::no_timeout).get0();
}
return nrows;
});
@@ -1086,17 +1086,17 @@ static future<int> count_rows(sstable_ptr sstp, schema_ptr s, sstring key) {
return seastar::async([sstp, s, key] () mutable {
auto dkey = make_dkey(s, key.c_str());
auto rd = sstp->read_row_flat(s, dkey);
auto mfopt = rd().get0();
auto mfopt = rd(db::no_timeout).get0();
if (!mfopt) {
return 0;
}
int nrows = 0;
mfopt = rd().get0();
mfopt = rd(db::no_timeout).get0();
while (mfopt) {
if (mfopt->is_clustering_row()) {
nrows++;
}
mfopt = rd().get0();
mfopt = rd(db::no_timeout).get0();
}
return nrows;
});
@@ -1109,17 +1109,17 @@ static future<int> count_rows(sstable_ptr sstp, schema_ptr s, sstring ck1, sstri
auto ps = make_partition_slice(*s, ck1, ck2);
auto reader = sstp->read_range_rows_flat(s, query::full_partition_range, ps);
int nrows = 0;
auto mfopt = reader().get0();
auto mfopt = reader(db::no_timeout).get0();
while (mfopt) {
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
BOOST_REQUIRE(mfopt);
while (!mfopt->is_end_of_partition()) {
if (mfopt->is_clustering_row()) {
nrows++;
}
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
}
mfopt = reader().get0();
mfopt = reader(db::no_timeout).get0();
}
return nrows;
});