From eb357a385d75fe1e582d46b4dd6c04b2ddcdd37d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 12 Sep 2018 12:29:58 +0300 Subject: [PATCH] flat_mutation_reader: make timeout opt-out rather than opt-in MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently timeout is opt-in, that is, all methods that even have it default it to `db::no_timeout`. This means that ensuring timeout is used where it should be is completely up to the author and the reviewrs of the code. As humans are notoriously prone to mistakes this has resulted in a very inconsistent usage of timeout, many clients of `flat_mutation_reader` passing the timeout only to some members and only on certain call sites. This is small wonder considering that some core operations like `operator()()` only recently received a timeout parameter and others like `peek()` didn't even have one until this patch. Both of these methods call `fill_buffer()` which potentially talks to the lower layers and is supposed to propagate the timeout. All this makes the `flat_mutation_reader`'s timeout effectively useless. To make order in this chaos make the timeout parameter a mandatory one on all `flat_mutation_reader` methods that need it. This ensures that humans now get a reminder from the compiler when they forget to pass the timeout. Clients can still opt-out from passing a timeout by passing `db::no_timeout` (the previous default value) but this will be now explicit and developers should think before typing it. There were suprisingly few core call sites to fix up. Where a timeout was available nearby I propagated it to be able to pass it to the reader, where I couldn't I passed `db::no_timeout`. Authors of the latter kind of code (view, streaming and repair are some of the notable examples) should maybe consider propagating down a timeout if needed. In the test code (the wast majority of the changes) I just used `db::no_timeout` everywhere. Tests: unit(release, debug) Signed-off-by: Botond Dénes Message-Id: <1edc10802d5eb23de8af28c9f48b8d3be0f1a468.1536744563.git.bdenes@scylladb.com> --- cache_flat_mutation_reader.hh | 2 +- database.cc | 4 +- db/view/view.cc | 10 +- flat_mutation_reader.cc | 10 +- flat_mutation_reader.hh | 44 +++++---- frozen_mutation.cc | 2 +- memtable.cc | 2 +- multishard_writer.cc | 4 +- mutation.cc | 8 +- mutation.hh | 2 +- mutation_partition.cc | 2 +- querier.hh | 4 +- read_context.hh | 8 +- repair/repair.cc | 4 +- row_cache.cc | 14 +-- sstables/compaction.cc | 2 +- sstables/sstables.cc | 2 +- streaming/stream_transfer_task.cc | 2 +- tests/flat_mutation_reader_assertions.hh | 20 ++-- tests/flat_mutation_reader_test.cc | 30 +++--- tests/frozen_mutation_test.cc | 4 +- tests/memtable_snapshot_source.hh | 2 +- tests/memtable_test.cc | 36 +++---- tests/multishard_writer_test.cc | 2 +- tests/mutation_fragment_test.cc | 4 +- tests/mutation_reader_test.cc | 28 +++--- tests/mutation_source_test.cc | 20 ++-- tests/mutation_test.cc | 2 +- tests/perf/perf_fast_forward.cc | 18 ++-- tests/perf/perf_mutation_readers.cc | 6 +- tests/perf/perf_sstable.hh | 2 +- tests/perf_row_cache_update.cc | 4 +- tests/row_cache_alloc_stress.cc | 6 +- tests/row_cache_stress_test.cc | 4 +- tests/row_cache_test.cc | 114 +++++++++++------------ tests/sstable_3_x_test.cc | 4 +- tests/sstable_datafile_test.cc | 86 ++++++++--------- tests/sstable_mutation_test.cc | 30 +++--- tests/sstable_test.cc | 22 ++--- 39 files changed, 287 insertions(+), 283 deletions(-) diff --git a/cache_flat_mutation_reader.hh b/cache_flat_mutation_reader.hh index 0e68da5ca3..7dc09719b4 100644 --- a/cache_flat_mutation_reader.hh +++ b/cache_flat_mutation_reader.hh @@ -361,7 +361,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim } }); return make_ready_future<>(); - }); + }, timeout); } inline diff --git a/database.cc b/database.cc index 51d2d03f75..dca99ebfbf 100644 --- a/database.cc +++ b/database.cc @@ -602,7 +602,7 @@ future table::find_partition(schema_ptr s, const dht::decorated_key& key) const { return do_with(dht::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) { return do_with(this->make_reader(s, range), [s] (flat_mutation_reader& reader) { - return read_mutation_from_flat_mutation_reader(reader).then([] (mutation_opt&& mo) -> std::unique_ptr { + return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([] (mutation_opt&& mo) -> std::unique_ptr { if (!mo) { return {}; } @@ -739,7 +739,7 @@ table::for_all_partitions(schema_ptr s, Func&& func) const { return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) { return do_until([&is] { return is.done(); }, [&is] { - return read_mutation_from_flat_mutation_reader(is.reader).then([&is](mutation_opt&& mo) { + return read_mutation_from_flat_mutation_reader(is.reader, db::no_timeout).then([&is](mutation_opt&& mo) { if (!mo) { is.empty = true; } else { diff --git a/db/view/view.cc b/db/view/view.cc index 1561deec01..0482872bc4 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -634,8 +634,8 @@ private: future on_results(); future advance_all() { - auto existings_f = _existings ? (*_existings)() : make_ready_future>(); - return when_all(_updates(), std::move(existings_f)).then([this] (auto&& fragments) mutable { + auto existings_f = _existings ? (*_existings)(db::no_timeout) : make_ready_future>(); + return when_all(_updates(db::no_timeout), std::move(existings_f)).then([this] (auto&& fragments) mutable { _update = std::move(std::get(std::get<0>(fragments).get())); _existing = std::move(std::get(std::get<1>(fragments).get())); return stop_iteration::no; @@ -643,7 +643,7 @@ private: } future advance_updates() { - return _updates().then([this] (auto&& update) mutable { + return _updates(db::no_timeout).then([this] (auto&& update) mutable { _update = std::move(update); return stop_iteration::no; }); @@ -653,7 +653,7 @@ private: if (!_existings) { return make_ready_future(stop_iteration::no); } - return (*_existings)().then([this] (auto&& existing) mutable { + return (*_existings)(db::no_timeout).then([this] (auto&& existing) mutable { _existing = std::move(existing); return stop_iteration::no; }); @@ -1534,7 +1534,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) { query::max_partitions, view_builder::consumer{*this, step}); consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition - auto built = step.reader.consume_in_thread(std::move(consumer)); + auto built = step.reader.consume_in_thread(std::move(consumer), db::no_timeout); _as.check(); diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index d4ae48a5ea..49e49f3cb4 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -196,11 +196,11 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) { position_range _current; mutation_fragment_opt _next; // When resolves, _next is engaged or _end_of_stream is set. - future<> ensure_next() { + future<> ensure_next(db::timeout_clock::time_point timeout) { if (_next) { return make_ready_future<>(); } - return _underlying().then([this] (auto&& mfo) { + return _underlying(timeout).then([this] (auto&& mfo) { _next = std::move(mfo); if (!_next) { _end_of_stream = true; @@ -213,11 +213,11 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) { position_in_partition(position_in_partition::after_static_row_tag_t()) }) { } virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { - return repeat([this] { + return repeat([this, timeout] { if (is_buffer_full()) { return make_ready_future(stop_iteration::yes); } - return ensure_next().then([this] { + return ensure_next(timeout).then([this] { if (is_end_of_stream()) { return stop_iteration::yes; } @@ -290,7 +290,7 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_par } _underlying.next_partition(); _static_row_done = false; - return _underlying.fill_buffer().then([this] { + return _underlying.fill_buffer(timeout).then([this] { _end_of_stream = is_end_end_of_underlying_stream(); }); } diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 661520c9b6..66201e7db8 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -320,7 +320,7 @@ public: flat_mutation_reader(std::unique_ptr impl) noexcept : _impl(std::move(impl)) {} - future operator()(db::timeout_clock::time_point timeout = db::no_timeout) { + future operator()(db::timeout_clock::time_point timeout) { return _impl->operator()(timeout); } @@ -328,7 +328,7 @@ public: GCC6_CONCEPT( requires FlatMutationReaderConsumer() ) - auto consume_pausable(Consumer consumer, db::timeout_clock::time_point timeout = db::no_timeout) { + auto consume_pausable(Consumer consumer, db::timeout_clock::time_point timeout) { return _impl->consume_pausable(std::move(consumer), timeout); } @@ -337,8 +337,8 @@ public: requires FlattenedConsumer() ) auto consume(Consumer consumer, - consume_reversed_partitions reversed = consume_reversed_partitions::no, - db::timeout_clock::time_point timeout = db::no_timeout) { + db::timeout_clock::time_point timeout, + consume_reversed_partitions reversed = consume_reversed_partitions::no) { if (reversed) { return do_with(impl::reverse_partitions(*_impl), [&] (auto& reversed_partition_stream) { return reversed_partition_stream._impl->consume(std::move(consumer), timeout); @@ -351,7 +351,7 @@ public: GCC6_CONCEPT( requires FlattenedConsumer() && PartitionFilter ) - auto consume_in_thread(Consumer consumer, Filter filter, db::timeout_clock::time_point timeout = db::no_timeout) { + auto consume_in_thread(Consumer consumer, Filter filter, db::timeout_clock::time_point timeout) { return _impl->consume_in_thread(std::move(consumer), std::move(filter), timeout); } @@ -359,13 +359,13 @@ public: GCC6_CONCEPT( requires FlattenedConsumer() ) - auto consume_in_thread(Consumer consumer, db::timeout_clock::time_point timeout = db::no_timeout) { + auto consume_in_thread(Consumer consumer, db::timeout_clock::time_point timeout) { return consume_in_thread(std::move(consumer), [] (const dht::decorated_key&) { return true; }, timeout); } void next_partition() { _impl->next_partition(); } - future<> fill_buffer(db::timeout_clock::time_point timeout = db::no_timeout) { return _impl->fill_buffer(timeout); } + future<> fill_buffer(db::timeout_clock::time_point timeout) { return _impl->fill_buffer(timeout); } // Changes the range of partitions to pr. The range can only be moved // forwards. pr.begin() needs to be larger than pr.end() of the previousl @@ -373,7 +373,7 @@ public: // previous fast forward target). // pr needs to be valid until the reader is destroyed or fast_forward_to() // is called again. - future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout = db::no_timeout) { + future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { return _impl->fast_forward_to(pr, timeout); } // Skips to a later range of rows. @@ -397,7 +397,7 @@ public: // // When forwarding mode is not enabled, fast_forward_to() // cannot be used. - future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout = db::no_timeout) { + future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) { return _impl->fast_forward_to(std::move(cr), timeout); } bool is_end_of_stream() const { return _impl->is_end_of_stream(); } @@ -412,15 +412,15 @@ public: // Resolves with a pointer to the next fragment in the stream without consuming it from the stream, // or nullptr if there are no more fragments. // The returned pointer is invalidated by any other non-const call to this object. - future peek() { + future peek(db::timeout_clock::time_point timeout) { if (!is_buffer_empty()) { return make_ready_future(&_impl->_buffer.front()); } if (is_end_of_stream()) { return make_ready_future(nullptr); } - return fill_buffer().then([this] { - return peek(); + return fill_buffer(timeout).then([this, timeout] { + return peek(timeout); }); } // A peek at the next fragment in the buffer. @@ -459,9 +459,13 @@ GCC6_CONCEPT(requires requires(StopCondition stop, ConsumeMutationFragment consu { consume_mf(std::move(mf)) } -> void; { consume_eos() } -> future<>; }) -future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition&& stop, - ConsumeMutationFragment&& consume_mf, ConsumeEndOfStream&& consume_eos) { - return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos] { +future<> consume_mutation_fragments_until( + flat_mutation_reader& r, + StopCondition&& stop, + ConsumeMutationFragment&& consume_mf, + ConsumeEndOfStream&& consume_eos, + db::timeout_clock::time_point timeout) { + return do_until([stop] { return stop(); }, [&r, stop, consume_mf, consume_eos, timeout] { while (!r.is_buffer_empty()) { consume_mf(r.pop_mutation_fragment()); if (stop()) { @@ -471,7 +475,7 @@ future<> consume_mutation_fragments_until(flat_mutation_reader& r, StopCondition if (r.is_end_of_stream()) { return consume_eos(); } - return r.fill_buffer(); + return r.fill_buffer(timeout); }); } @@ -596,13 +600,13 @@ make_flat_mutation_reader_from_fragments(schema_ptr, std::deque resolves when consumption ends. template inline -future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer) { +future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer, db::timeout_clock::time_point timeout) { static_assert(std::is_same, futurize_t>>::value, "bad Consumer signature"); using futurator = futurize>; - return do_with(std::move(consumer), [&reader] (Consumer& c) -> future<> { - return repeat([&reader, &c] () { - return read_mutation_from_flat_mutation_reader(reader).then([&c] (mutation_opt&& mo) -> future { + return do_with(std::move(consumer), [&reader, timeout] (Consumer& c) -> future<> { + return repeat([&reader, &c, timeout] () { + return read_mutation_from_flat_mutation_reader(reader, timeout).then([&c] (mutation_opt&& mo) -> future { if (!mo) { return make_ready_future(stop_iteration::yes); } diff --git a/frozen_mutation.cc b/frozen_mutation.cc index cc2e29350d..a0f95b18e8 100644 --- a/frozen_mutation.cc +++ b/frozen_mutation.cc @@ -251,7 +251,7 @@ future<> fragment_and_freeze(flat_mutation_reader mr, frozen_mutation_consumer_f fragmenting_mutation_freezer freezer(*mr.schema(), c, fragment_size); return do_with(std::move(mr), std::move(freezer), [] (auto& mr, auto& freezer) { return repeat([&] { - return mr().then([&] (auto mfopt) { + return mr(db::no_timeout).then([&] (auto mfopt) { if (!mfopt) { return make_ready_future(stop_iteration::yes); } diff --git a/memtable.cc b/memtable.cc index ac2a79f1ea..666d584ad2 100644 --- a/memtable.cc +++ b/memtable.cc @@ -623,7 +623,7 @@ memtable::apply(memtable& mt) { return consume_partitions(rd, [self = this->shared_from_this(), &rd] (mutation&& m) { self->apply(m); return stop_iteration::no; - }); + }, db::no_timeout); }); } diff --git a/multishard_writer.cc b/multishard_writer.cc index 829604ccbf..ef28f6edfb 100644 --- a/multishard_writer.cc +++ b/multishard_writer.cc @@ -113,7 +113,7 @@ shard_writer::shard_writer(schema_ptr s, } future<> shard_writer::consume() { - return _reader.peek().then([this] (mutation_fragment* mf_ptr) { + return _reader.peek(db::no_timeout).then([this] (mutation_fragment* mf_ptr) { if (mf_ptr) { return _consumer(std::move(_reader)); } @@ -198,7 +198,7 @@ future<> multishard_writer::wait_pending_consumers() { future<> multishard_writer::distribute_mutation_fragments() { return repeat([this] () mutable { - return _producer().then([this] (mutation_fragment_opt mf_opt) mutable { + return _producer(db::no_timeout).then([this] (mutation_fragment_opt mf_opt) mutable { if (mf_opt) { return handle_mutation_fragment(std::move(*mf_opt)); } else { diff --git a/mutation.cc b/mutation.cc index 73ce091a61..8b89bb536b 100644 --- a/mutation.cc +++ b/mutation.cc @@ -216,13 +216,13 @@ mutation mutation::sliced(const query::clustering_row_ranges& ranges) const { return mutation(schema(), decorated_key(), partition().sliced(*schema(), ranges)); } -future read_mutation_from_flat_mutation_reader(flat_mutation_reader& r) { +future read_mutation_from_flat_mutation_reader(flat_mutation_reader& r, db::timeout_clock::time_point timeout) { if (r.is_buffer_empty()) { if (r.is_end_of_stream()) { return make_ready_future(); } - return r.fill_buffer().then([&r] { - return read_mutation_from_flat_mutation_reader(r); + return r.fill_buffer(timeout).then([&r, timeout] { + return read_mutation_from_flat_mutation_reader(r, timeout); }); } // r.is_buffer_empty() is always false at this point @@ -268,7 +268,7 @@ future read_mutation_from_flat_mutation_reader(flat_mutation_reade return _builder->consume_end_of_stream(); } }; - return r.consume(adapter(r.schema())); + return r.consume(adapter(r.schema()), timeout); } std::ostream& operator<<(std::ostream& os, const mutation& m) { diff --git a/mutation.hh b/mutation.hh index 794afec059..22e45a57d6 100644 --- a/mutation.hh +++ b/mutation.hh @@ -188,4 +188,4 @@ boost::iterator_range::const_iterator> slice( class flat_mutation_reader; // Reads a single partition from a reader. Returns empty optional if there are no more partitions to be read. -future read_mutation_from_flat_mutation_reader(flat_mutation_reader&); +future read_mutation_from_flat_mutation_reader(flat_mutation_reader& reader, db::timeout_clock::time_point timeout); diff --git a/mutation_partition.cc b/mutation_partition.cc index 3e01a955a4..1296ebc323 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2360,7 +2360,7 @@ future counter_write_query(schema_ptr s, const mutation_source& so auto cwqrb = counter_write_query_result_builder(*s); auto cfq = make_stable_flattened_mutations_consumer>( *s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb)); - auto f = r_a_r->reader.consume(std::move(cfq), flat_mutation_reader::consume_reversed_partitions::no); + auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout, flat_mutation_reader::consume_reversed_partitions::no); return f.finally([r_a_r = std::move(r_a_r)] { }); } diff --git a/querier.hh b/querier.hh index cfcdbe4da1..9923ec23f2 100644 --- a/querier.hh +++ b/querier.hh @@ -90,7 +90,7 @@ auto consume_page(flat_mutation_reader& reader, // on it because it stores references to some of it's own members. // Move it to the heap before any consumption begins to avoid // accidents. - return reader.peek().then([=, &reader, consumer = std::make_unique(std::move(consumer)), &slice] ( + return reader.peek(timeout).then([=, &reader, consumer = std::make_unique(std::move(consumer)), &slice] ( mutation_fragment* next_fragment) mutable { const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end; compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, *consumer); @@ -103,7 +103,7 @@ auto consume_page(flat_mutation_reader& reader, compaction_state, clustering_position_tracker(std::move(consumer), last_ckey)); - return reader.consume(std::move(reader_consumer), is_reversed, timeout).then([last_ckey] (auto&&... results) mutable { + return reader.consume(std::move(reader_consumer), timeout, is_reversed).then([last_ckey] (auto&&... results) mutable { return make_ready_future, std::decay_t...>(std::move(*last_ckey), std::move(results)...); }); }); diff --git a/read_context.hh b/read_context.hh index d79222b707..e13c0a5dd8 100644 --- a/read_context.hh +++ b/read_context.hh @@ -47,7 +47,7 @@ public: : _cache(cache) , _read_context(context) { } - future move_to_next_partition() { + future move_to_next_partition(db::timeout_clock::time_point timeout) { _last_key = std::move(_new_last_key); auto start = population_range_start(); auto phase = _cache.phase_of(start); @@ -75,7 +75,7 @@ public: if (_reader->is_end_of_stream() && _reader->is_buffer_empty()) { return make_ready_future(); } - return (*_reader)().then([this] (auto&& mfopt) { + return (*_reader)(timeout).then([this] (auto&& mfopt) { if (mfopt) { assert(mfopt->is_partition_start()); _new_last_key = mfopt->as_partition_start().key(); @@ -219,8 +219,8 @@ public: } // Gets the next fragment from the underlying reader future get_next_fragment(db::timeout_clock::time_point timeout) { - return ensure_underlying(timeout).then([this] { - return _underlying.underlying()(); + return ensure_underlying(timeout).then([this, timeout] { + return _underlying.underlying()(timeout); }); } }; diff --git a/repair/repair.cc b/repair/repair.cc index 5494df35c0..8dac81cb54 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -595,7 +595,7 @@ future partition_checksum::compute_legacy(flat_mutation_read return do_with(std::move(mr), partition_checksum(), [] (auto& reader, auto& checksum) { return repeat([&reader, &checksum] () { - return read_mutation_from_flat_mutation_reader(reader).then([&checksum] (auto mopt) { + return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([&checksum] (auto mopt) { if (!mopt) { return stop_iteration::yes; } @@ -615,7 +615,7 @@ future partition_checksum::compute_legacy(flat_mutation_read future partition_checksum::compute_streamed(flat_mutation_reader m) { return do_with(std::move(m), [] (auto& m) { - return m.consume(partition_hasher(*m.schema())); + return m.consume(partition_hasher(*m.schema()), db::no_timeout); }); } diff --git a/row_cache.cc b/row_cache.cc index 72667c2a11..ac2ba54875 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -338,10 +338,10 @@ future<> read_context::create_underlying(bool skip_first_fragment, db::timeout_c } else { _sm_range = dht::partition_range::make_singular({dht::ring_position(*_key)}); } - return _underlying.fast_forward_to(std::move(_sm_range), *_underlying_snapshot, _phase, timeout).then([this, skip_first_fragment] { + return _underlying.fast_forward_to(std::move(_sm_range), *_underlying_snapshot, _phase, timeout).then([this, skip_first_fragment, timeout] { _underlying_snapshot = {}; if (skip_first_fragment) { - return _underlying.underlying()().then([](auto &&mf) {}); + return _underlying.underlying()(timeout).then([](auto &&mf) {}); } else { return make_ready_future<>(); } @@ -369,8 +369,8 @@ private: auto src_and_phase = _cache.snapshot_of(_read_context->range().start()->value()); auto phase = src_and_phase.phase; _read_context->enter_partition(_read_context->range().start()->value().as_decorated_key(), src_and_phase.snapshot, phase); - return _read_context->create_underlying(false, timeout).then([this, phase] { - return _read_context->underlying().underlying()().then([this, phase] (auto&& mfopt) { + return _read_context->create_underlying(false, timeout).then([this, phase, timeout] { + return _read_context->underlying().underlying()(timeout).then([this, phase] (auto&& mfopt) { if (!mfopt) { if (phase == _cache.phase_of(_read_context->range().start()->value())) { _cache._read_section(_cache._tracker.region(), [this] { @@ -532,8 +532,8 @@ public: , _read_context(ctx) {} - future operator()() { - return _reader.move_to_next_partition().then([this] (auto&& mfopt) mutable { + future operator()(db::timeout_clock::time_point timeout) { + return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable { { if (!mfopt) { this->handle_end_of_stream(); @@ -658,7 +658,7 @@ private: } future read_from_secondary(db::timeout_clock::time_point timeout) { - return _secondary_reader().then([this, timeout] (flat_mutation_reader_opt fropt, mutation_fragment_opt ps) { + return _secondary_reader(timeout).then([this, timeout] (flat_mutation_reader_opt fropt, mutation_fragment_opt ps) { if (fropt) { if (ps) { push_mutation_fragment(std::move(*ps)); diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 9e18b7466f..c0b53aeda9 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -754,7 +754,7 @@ future compaction::run(std::unique_ptr c) { // leave this block either successfully or exceptionally with the reader object // destroyed. auto r = std::move(reader); - r.consume_in_thread(std::move(cfc), c->filter_func()); + r.consume_in_thread(std::move(cfc), c->filter_func(), db::no_timeout); } catch (...) { delete_sstables_for_interrupted_compaction(c->_info->new_sstables, c->_info->ks, c->_info->cf); c = nullptr; // make sure writers are stopped while running in thread context diff --git a/sstables/sstables.cc b/sstables/sstables.cc index e367378a0f..c25404795b 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -3548,7 +3548,7 @@ future<> sstable::write_components( } return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats, &pc] () mutable { auto wr = get_writer(*schema, estimated_partitions, cfg, stats, pc); - mr.consume_in_thread(std::move(wr)); + mr.consume_in_thread(std::move(wr), db::no_timeout); }); } diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 71a3020243..52b80fcb18 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -178,7 +178,7 @@ future<> send_mutation_fragments(lw_shared_ptr si) { auto sink_op = [sink, si, got_error_from_peer] () mutable -> future<> { return repeat([sink, si, got_error_from_peer] () mutable { - return si->reader().then([sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable { + return si->reader(db::no_timeout).then([sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable { if (mf && !(*got_error_from_peer)) { frozen_mutation_fragment fmf = freeze(*s, *mf); auto size = fmf.representation().size(); diff --git a/tests/flat_mutation_reader_assertions.hh b/tests/flat_mutation_reader_assertions.hh index d0370a00bb..5daddd62dd 100644 --- a/tests/flat_mutation_reader_assertions.hh +++ b/tests/flat_mutation_reader_assertions.hh @@ -33,7 +33,7 @@ class flat_reader_assertions { dht::partition_range _pr; private: mutation_fragment_opt read_next() { - return _reader().get0(); + return _reader(db::no_timeout).get0(); } public: flat_reader_assertions(flat_mutation_reader reader) @@ -208,11 +208,11 @@ public: const schema& s = *_reader.schema(); range_tombstone_list actual_list(s); position_in_partition::equal_compare eq(s); - while (mutation_fragment* next = _reader.peek().get0()) { + while (mutation_fragment* next = _reader.peek(db::no_timeout).get0()) { if (!next->is_range_tombstone() || !eq(next->position(), mfo->position())) { break; } - actual_list.apply(s, _reader().get0()->as_range_tombstone()); + actual_list.apply(s, _reader(db::no_timeout).get0()->as_range_tombstone()); } actual_list.apply(s, mfo->as_range_tombstone()); { @@ -285,7 +285,7 @@ public: } flat_reader_assertions& produces(const mutation& m, const stdx::optional& ck_ranges = {}) { - auto mo = read_mutation_from_flat_mutation_reader(_reader).get0(); + auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0(); if (!mo) { BOOST_FAIL(sprint("Expected %s, but got end of stream, at: %s", m, seastar::current_backtrace())); } @@ -310,7 +310,7 @@ public: flat_reader_assertions& produces_eos_or_empty_mutation() { BOOST_TEST_MESSAGE("Expecting eos or empty mutation"); - auto mo = read_mutation_from_flat_mutation_reader(_reader).get0(); + auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0(); if (mo) { if (!mo->partition().empty()) { BOOST_FAIL(sprint("Mutation is not empty: %s", *mo)); @@ -356,7 +356,7 @@ public: flat_reader_assertions& fast_forward_to(const dht::partition_range& pr) { _pr = pr; - _reader.fast_forward_to(_pr).get(); + _reader.fast_forward_to(_pr, db::no_timeout).get(); return *this; } @@ -366,7 +366,7 @@ public: } flat_reader_assertions& fast_forward_to(position_range pr) { - _reader.fast_forward_to(std::move(pr)).get(); + _reader.fast_forward_to(std::move(pr), db::no_timeout).get(); return *this; } @@ -378,7 +378,7 @@ public: } flat_reader_assertions& produces_compacted(const mutation& m, const stdx::optional& ck_ranges = {}) { - auto mo = read_mutation_from_flat_mutation_reader(_reader).get0(); + auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0(); BOOST_REQUIRE(bool(mo)); memory::disable_failure_guard dfg; mutation got = *mo; @@ -388,13 +388,13 @@ public: } mutation_assertion next_mutation() { - auto mo = read_mutation_from_flat_mutation_reader(_reader).get0(); + auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0(); BOOST_REQUIRE(bool(mo)); return mutation_assertion(std::move(*mo)); } future<> fill_buffer() { - return _reader.fill_buffer(); + return _reader.fill_buffer(db::no_timeout); } bool is_buffer_full() const { diff --git a/tests/flat_mutation_reader_test.cc b/tests/flat_mutation_reader_test.cc index 5cf46b4247..65da7c3f4a 100644 --- a/tests/flat_mutation_reader_test.cc +++ b/tests/flat_mutation_reader_test.cc @@ -89,10 +89,10 @@ struct mock_consumer { static size_t count_fragments(mutation m) { auto r = flat_mutation_reader_from_mutations({m}); size_t res = 0; - auto mfopt = r().get0(); + auto mfopt = r(db::no_timeout).get0(); while (bool(mfopt)) { ++res; - mfopt = r().get0(); + mfopt = r(db::no_timeout).get0(); } return res; } @@ -103,7 +103,7 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) { size_t fragments_in_m = count_fragments(m); for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) { auto r = flat_mutation_reader_from_mutations({m}); - auto result = r.consume(mock_consumer(depth)).get0(); + auto result = r.consume(mock_consumer(depth), db::no_timeout).get0(); BOOST_REQUIRE(result._consume_end_of_stream_called); BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count); BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count); @@ -125,24 +125,24 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) { size_t fragments_in_m2 = count_fragments(m2); for (size_t depth = 1; depth < fragments_in_m1; ++depth) { auto r = flat_mutation_reader_from_mutations({m1, m2}); - auto result = r.consume(mock_consumer(depth)).get0(); + auto result = r.consume(mock_consumer(depth), db::no_timeout).get0(); BOOST_REQUIRE(result._consume_end_of_stream_called); BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count); BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count); BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count); auto r2 = flat_mutation_reader_from_mutations({m1, m2}); - auto start = r2().get0(); + auto start = r2(db::no_timeout).get0(); BOOST_REQUIRE(start); BOOST_REQUIRE(start->is_partition_start()); for (auto& mf : result._fragments) { - auto mfopt = r2().get0(); + auto mfopt = r2(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt)); } } for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) { auto r = flat_mutation_reader_from_mutations({m1, m2}); - auto result = r.consume(mock_consumer(depth)).get0(); + auto result = r.consume(mock_consumer(depth), db::no_timeout).get0(); BOOST_REQUIRE(result._consume_end_of_stream_called); BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count); BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count); @@ -155,14 +155,14 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) { } BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count); auto r2 = flat_mutation_reader_from_mutations({m1, m2}); - auto start = r2().get0(); + auto start = r2(db::no_timeout).get0(); BOOST_REQUIRE(start); BOOST_REQUIRE(start->is_partition_start()); for (auto& mf : result._fragments) { - auto mfopt = r2().get0(); + auto mfopt = r2(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) { - mfopt = r2().get0(); + mfopt = r2(db::no_timeout).get0(); } BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt)); @@ -476,10 +476,10 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti auto consume_fn = [&] (flat_mutation_reader& fmr, flat_stream_consumer fsc) { if (thread) { assert(bool(!reversed)); - return fmr.consume_in_thread(std::move(fsc)); + return fmr.consume_in_thread(std::move(fsc), db::no_timeout); } else { auto reversed_flag = flat_mutation_reader::consume_reversed_partitions(bool(reversed)); - return fmr.consume(std::move(fsc), reversed_flag).get0(); + return fmr.consume(std::move(fsc), db::no_timeout, reversed_flag).get0(); } }; @@ -519,7 +519,7 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti }; BOOST_TEST_MESSAGE("Consume all, filtered"); fmr = flat_mutation_reader_from_mutations(muts); - muts2 = fmr.consume_in_thread(flat_stream_consumer(s, reversed), std::move(filter)); + muts2 = fmr.consume_in_thread(flat_stream_consumer(s, reversed), std::move(filter), db::no_timeout); BOOST_REQUIRE_EQUAL(muts.size() / 2, muts2.size()); for (auto j = size_t(1); j < muts.size(); j += 2) { BOOST_REQUIRE_EQUAL(muts[j], muts2[j / 2]); @@ -608,8 +608,8 @@ SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) { return seastar::async([] { for_each_mutation([&] (const mutation& m) { auto rd = flat_mutation_reader_from_mutations({mutation(m)}); - rd().get(); - rd().get(); + rd(db::no_timeout).get(); + rd(db::no_timeout).get(); // We rely on AddressSanitizer telling us if nothing was leaked. }); }); diff --git a/tests/frozen_mutation_test.cc b/tests/frozen_mutation_test.cc index 2a29acc067..4a9b158b47 100644 --- a/tests/frozen_mutation_test.cc +++ b/tests/frozen_mutation_test.cc @@ -109,7 +109,7 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_fragment) { rd.consume_pausable([&] (mutation_fragment mf) { mfs.emplace_back(std::move(mf)); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); for (auto&& mf : mfs) { auto refrozen_mf = freeze(s, mf).unfreeze(s); @@ -118,4 +118,4 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_fragment) { } } }); -} \ No newline at end of file +} diff --git a/tests/memtable_snapshot_source.hh b/tests/memtable_snapshot_source.hh index 48fda84b1f..3cc806791e 100644 --- a/tests/memtable_snapshot_source.hh +++ b/tests/memtable_snapshot_source.hh @@ -78,7 +78,7 @@ private: consume_partitions(rd, [&] (mutation&& m) { new_mt->apply(std::move(m)); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); _memtables.erase(_memtables.begin(), _memtables.begin() + count); _memtables.push_back(new_mt); } diff --git a/tests/memtable_test.cc b/tests/memtable_test.cc index 64b4923ab4..4978d418f1 100644 --- a/tests/memtable_test.cc +++ b/tests/memtable_test.cc @@ -93,7 +93,7 @@ SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source) // Create reader so that each mutation is in a separate version flat_mutation_reader rd = mt->make_flat_reader(s, dht::partition_range::make_singular(m.decorated_key())); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); readers.push_back(std::move(rd)); } @@ -238,7 +238,7 @@ SEASTAR_TEST_CASE(test_virtual_dirty_accounting_on_flush) { // Create a reader which will cause many partition versions to be created flat_mutation_reader_opt rd1 = mt->make_flat_reader(s); rd1->set_max_buffer_size(1); - rd1->fill_buffer().get(); + rd1->fill_buffer(db::no_timeout).get(); // Override large cell value with a short one { @@ -258,7 +258,7 @@ SEASTAR_TEST_CASE(test_virtual_dirty_accounting_on_flush) { flush_reader_check.produces_partition(current_ring[1]); virtual_dirty_values.push_back(mgr.virtual_dirty_memory()); - while ((*rd1)().get0()) ; + while ((*rd1)(db::no_timeout).get0()) ; rd1 = {}; logalloc::shard_tracker().full_compaction(); @@ -372,17 +372,17 @@ SEASTAR_TEST_CASE(test_segment_migration_during_flush) { auto rd = mt->make_flush_reader(s, service::get_local_priority_manager().memtable_flush_priority()); for (int i = 0; i < partitions; ++i) { - auto mfopt = rd().get0(); + auto mfopt = rd(db::no_timeout).get0(); BOOST_REQUIRE(bool(mfopt)); BOOST_REQUIRE(mfopt->is_partition_start()); while (!mfopt->is_end_of_partition()) { logalloc::shard_tracker().full_compaction(); - mfopt = rd().get0(); + mfopt = rd(db::no_timeout).get0(); } virtual_dirty_values.push_back(mgr.virtual_dirty_memory()); } - BOOST_REQUIRE(!rd().get0()); + BOOST_REQUIRE(!rd(db::no_timeout).get0()); std::reverse(virtual_dirty_values.begin(), virtual_dirty_values.end()); BOOST_REQUIRE(std::is_sorted(virtual_dirty_values.begin(), virtual_dirty_values.end())); @@ -511,8 +511,8 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { { auto rd = mt->make_flat_reader(s); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(!row.cells().cell_hash_for(0)); } @@ -520,15 +520,15 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { auto slice = s->full_slice(); slice.options.set(); auto rd = mt->make_flat_reader(s, query::full_partition_range, slice); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } { auto rd = mt->make_flat_reader(s); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } @@ -537,8 +537,8 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { { auto rd = mt->make_flat_reader(s); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(!row.cells().cell_hash_for(0)); } @@ -546,15 +546,15 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { auto slice = s->full_slice(); slice.options.set(); auto rd = mt->make_flat_reader(s, query::full_partition_range, slice); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } { auto rd = mt->make_flat_reader(s); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } }); diff --git a/tests/multishard_writer_test.cc b/tests/multishard_writer_test.cc index d05786cb87..fd6bc38789 100644 --- a/tests/multishard_writer_test.cc +++ b/tests/multishard_writer_test.cc @@ -66,7 +66,7 @@ SEASTAR_TEST_CASE(test_multishard_writer) { return make_exception_future<>(std::runtime_error("Failed to write")); } return repeat([&shards_after, reader = std::move(reader), error] () mutable { - return reader().then([&shards_after, error] (mutation_fragment_opt mf_opt) mutable { + return reader(db::no_timeout).then([&shards_after, error] (mutation_fragment_opt mf_opt) mutable { if (mf_opt) { if (mf_opt->is_partition_start()) { auto shard = dht::global_partitioner().shard_of(mf_opt->as_partition_start().key().token()); diff --git a/tests/mutation_fragment_test.cc b/tests/mutation_fragment_test.cc index a779ada108..9335c17309 100644 --- a/tests/mutation_fragment_test.cc +++ b/tests/mutation_fragment_test.cc @@ -111,7 +111,7 @@ SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) { muts.push_back(mutation(m.schema(), m.decorated_key())); } auto rd = flat_mutation_reader_from_mutations({m}); - rd.consume(fragment_scatterer{muts}).get(); + rd.consume(fragment_scatterer{muts}, db::no_timeout).get(); for (int i = 0; i < n; ++i) { memtables[i]->apply(std::move(muts[i])); } @@ -396,7 +396,7 @@ SEASTAR_TEST_CASE(test_schema_upgrader_is_equivalent_with_mutation_upgrade) { // upgrade m1 to m2's schema auto reader = transform(flat_mutation_reader_from_mutations({m1}), schema_upgrader(m2.schema())); - auto from_upgrader = read_mutation_from_flat_mutation_reader(reader).get0(); + auto from_upgrader = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); auto regular = m1; regular.upgrade(m2.schema()); diff --git a/tests/mutation_reader_test.cc b/tests/mutation_reader_test.cc index 287670b3db..bc24a5ff7b 100644 --- a/tests/mutation_reader_test.cc +++ b/tests/mutation_reader_test.cc @@ -1214,11 +1214,11 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin } result.partition().apply(*s, std::move(mf)); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); for (auto&& range : ranges) { auto prange = position_range(range); - rd.fast_forward_to(prange).get(); + rd.fast_forward_to(prange, db::no_timeout).get(); rd.consume_pausable([&](mutation_fragment&& mf) { if (!mf.relevant_for_range(*s, prange.start())) { BOOST_FAIL(sprint("Received fragment which is not relevant for range: %s, range: %s", mf, prange)); @@ -1229,14 +1229,14 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin } result.partition().apply(*s, std::move(mf)); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); } assert_that(result).is_equal_to(expected, ranges); }; check_next_partition(combined[0]); - rd.fast_forward_to(dht::partition_range::make_singular(keys[2])).get(); + rd.fast_forward_to(dht::partition_range::make_singular(keys[2]), db::no_timeout).get(); check_next_partition(combined[2]); }); } @@ -1281,7 +1281,7 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones } result.partition().apply(*s, std::move(mf)); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); assert_that(result).is_equal_to(m1 + m2, query::clustering_row_ranges({range})); } @@ -1304,9 +1304,9 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones BOOST_REQUIRE(!mf.position().has_clustering_key()); result.partition().apply(*s, std::move(mf)); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); - rd.fast_forward_to(prange).get(); + rd.fast_forward_to(prange, db::no_timeout).get(); position_in_partition last_pos = position_in_partition::before_all_clustered_rows(); auto consume_clustered = [&] (mutation_fragment&& mf) { @@ -1319,9 +1319,9 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones return stop_iteration::no; }; - rd.consume_pausable(consume_clustered).get(); - rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows())).get(); - rd.consume_pausable(consume_clustered).get(); + rd.consume_pausable(consume_clustered, db::no_timeout).get(); + rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows()), db::no_timeout).get(); + rd.consume_pausable(consume_clustered, db::no_timeout).get(); assert_that(result).is_equal_to(m1 + m2); } @@ -1346,7 +1346,7 @@ SEASTAR_TEST_CASE(test_combined_mutation_source_is_a_mutation_source) { mf_m.partition().apply(*s, mf); memtables[source_index++ % memtables.size()]->apply(mf_m); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); } std::vector sources; @@ -1717,7 +1717,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending auto reader = make_multishard_combining_reader(s.schema(), query::full_partition_range, s.schema()->full_slice(), service::get_local_sstable_query_read_priority(), partitioner, std::move(factory)); - reader.fill_buffer().get(); + reader.fill_buffer(db::no_timeout).get(); BOOST_REQUIRE(reader.is_buffer_full()); BOOST_REQUIRE(smp::submit_to(shard_of_interest, [remote_control = remote_control.get()] { @@ -1868,7 +1868,7 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_destroyed_with_pending_read_ahead) { auto reader = make_foreign_reader(s.schema(), std::move(remote_reader)); - reader.fill_buffer().get(); + reader.fill_buffer(db::no_timeout).get(); BOOST_REQUIRE(!reader.is_buffer_empty()); } @@ -1970,7 +1970,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending { auto reader = make_multishard_combining_reader(s.schema(), query::full_partition_range, s.schema()->full_slice(), service::get_local_sstable_query_read_priority(), partitioner, std::move(factory)); - reader.fill_buffer().get(); + reader.fill_buffer(db::no_timeout).get(); BOOST_REQUIRE(reader.is_buffer_full()); } diff --git a/tests/mutation_source_test.cc b/tests/mutation_source_test.cc index 717fe852d3..d5f58e59ce 100644 --- a/tests/mutation_source_test.cc +++ b/tests/mutation_source_test.cc @@ -117,17 +117,17 @@ static void test_streamed_mutation_forwarding_is_consistent_with_slicing(populat void consume_end_of_stream() { } }; - fwd_reader.consume(consumer(m.schema(), builder)).get0(); + fwd_reader.consume(consumer(m.schema(), builder), db::no_timeout).get0(); BOOST_REQUIRE(bool(builder)); for (auto&& range : ranges) { BOOST_TEST_MESSAGE(sprint("fwd %s", range)); - fwd_reader.fast_forward_to(position_range(range)).get(); - fwd_reader.consume(consumer(m.schema(), builder)).get0(); + fwd_reader.fast_forward_to(position_range(range), db::no_timeout).get(); + fwd_reader.consume(consumer(m.schema(), builder), db::no_timeout).get0(); } mutation_opt fwd_m = builder->consume_end_of_stream(); BOOST_REQUIRE(bool(fwd_m)); - mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(sliced_reader).get0(); + mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(sliced_reader, db::no_timeout).get0(); BOOST_REQUIRE(bool(sliced_m)); assert_that(*sliced_m).is_equal_to(*fwd_m, slice_with_ranges.row_ranges(*m.schema(), m.key())); } @@ -1012,7 +1012,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) { } result.partition().apply(*s, std::move(mf)); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); assert_that(result).is_equal_to(m1 + m2, query::clustering_row_ranges({range})); } @@ -1029,9 +1029,9 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) { BOOST_REQUIRE(!mf.position().has_clustering_key()); result.partition().apply(*s, std::move(mf)); return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); - rd.fast_forward_to(prange).get(); + rd.fast_forward_to(prange, db::no_timeout).get(); position_in_partition last_pos = position_in_partition::before_all_clustered_rows(); auto consume_clustered = [&] (mutation_fragment&& mf) { @@ -1044,9 +1044,9 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn populate) { return stop_iteration::no; }; - rd.consume_pausable(consume_clustered).get(); - rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows())).get(); - rd.consume_pausable(consume_clustered).get(); + rd.consume_pausable(consume_clustered, db::no_timeout).get(); + rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows()), db::no_timeout).get(); + rd.consume_pausable(consume_clustered, db::no_timeout).get(); assert_that(result).is_equal_to(m1 + m2); } diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc index b09030bfed..8b30f1da6c 100644 --- a/tests/mutation_test.cc +++ b/tests/mutation_test.cc @@ -84,7 +84,7 @@ static atomic_cell make_collection_member(data_type dt, T value) { static mutation_partition get_partition(memtable& mt, const partition_key& key) { auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key); auto reader = mt.make_flat_reader(mt.schema(), dht::partition_range::make_singular(dk)); - auto mo = read_mutation_from_flat_mutation_reader(reader).get0(); + auto mo = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); BOOST_REQUIRE(bool(mo)); return std::move(mo->partition()); } diff --git a/tests/perf/perf_fast_forward.cc b/tests/perf/perf_fast_forward.cc index 47b4187ae6..da3a49ab26 100644 --- a/tests/perf/perf_fast_forward.cc +++ b/tests/perf/perf_fast_forward.cc @@ -507,7 +507,7 @@ public: static uint64_t consume_all(flat_mutation_reader& rd) { - return rd.consume(counting_consumer()).get0(); + return rd.consume(counting_consumer(), db::no_timeout).get0(); } static @@ -516,13 +516,13 @@ uint64_t consume_all_with_next_partition(flat_mutation_reader& rd) { do { fragments += consume_all(rd); rd.next_partition(); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); } while(!rd.is_end_of_stream() || !rd.is_buffer_empty()); return fragments; } static void assert_partition_start(flat_mutation_reader& rd) { - auto mfopt = rd().get0(); + auto mfopt = rd(db::no_timeout).get0(); assert(mfopt); assert(mfopt->is_partition_start()); } @@ -546,7 +546,7 @@ static test_result scan_rows_with_stride(column_family& cf, int n_rows, int n_re rd.fast_forward_to(position_range( position_in_partition(position_in_partition::clustering_row_tag_t(), clustering_key::from_singular(*cf.schema(), ck)), position_in_partition(position_in_partition::clustering_row_tag_t(), clustering_key::from_singular(*cf.schema(), ck + n_read)) - )).get(); + ), db::no_timeout).get(); } fragments += consume_all(rd); ck += n_read + n_skip; @@ -585,7 +585,7 @@ static test_result scan_with_stride_partitions(column_family& cf, int n, int n_r dht::partition_range::bound(keys[pk], true), dht::partition_range::bound(keys[std::min(n, pk + n_read) - 1], true) ); - rd.fast_forward_to(pr).get(); + rd.fast_forward_to(pr, db::no_timeout).get(); } fragments += consume_all(rd); pk += n_read + n_skip; @@ -607,7 +607,7 @@ static test_result slice_rows(column_family& cf, int offset = 0, int n_read = 1) rd.fast_forward_to(position_range( position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset)), - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read)))).get(); + position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read))), db::no_timeout).get(); uint64_t fragments = consume_all_with_next_partition(rd); return {before, fragments}; @@ -662,7 +662,7 @@ static test_result slice_rows_single_key(column_family& cf, int offset = 0, int assert_partition_start(rd); rd.fast_forward_to(position_range( position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset)), - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read)))).get(); + position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read))), db::no_timeout).get(); uint64_t fragments = consume_all_with_next_partition(rd); return {before, fragments}; @@ -723,13 +723,13 @@ static test_result test_forwarding_with_restriction(column_family& cf, table_con rd.fast_forward_to(position_range( position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 1)), - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 2)))).get(); + position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 2))), db::no_timeout).get(); fragments += consume_all(rd); rd.fast_forward_to(position_range( position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key - 2)), - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key + 2)))).get(); + position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key + 2))), db::no_timeout).get(); fragments += consume_all_with_next_partition(rd); return {before, fragments}; diff --git a/tests/perf/perf_mutation_readers.cc b/tests/perf/perf_mutation_readers.cc index 1feb3258da..c6f8a7831b 100644 --- a/tests/perf/perf_mutation_readers.cc +++ b/tests/perf/perf_mutation_readers.cc @@ -125,7 +125,7 @@ future<> combined::consume_all(flat_mutation_reader mr) const return mr.consume_pausable([] (mutation_fragment mf) { perf_tests::do_not_optimize(mf); return stop_iteration::no; - }).then([] { + }, db::no_timeout).then([] { perf_tests::stop_measuring_time(); }); }); @@ -248,7 +248,7 @@ protected: return mr.consume_pausable([] (mutation_fragment mf) { perf_tests::do_not_optimize(mf); return stop_iteration::no; - }); + }, db::no_timeout); }); } }; @@ -273,4 +273,4 @@ PERF_TEST_F(memtable, many_partitions_many_rows) return consume_all(multi_row_mt().make_flat_reader(schema(), multi_partition_range(25))); } -} \ No newline at end of file +} diff --git a/tests/perf/perf_sstable.hh b/tests/perf/perf_sstable.hh index ab37c9d35e..a491558f66 100644 --- a/tests/perf/perf_sstable.hh +++ b/tests/perf/perf_sstable.hh @@ -204,7 +204,7 @@ public: auto total = make_lw_shared(0); auto done = make_lw_shared(false); return do_until([done] { return *done; }, [this, done, total, &r] { - return read_mutation_from_flat_mutation_reader(r).then([this, done, total] (mutation_opt m) { + return read_mutation_from_flat_mutation_reader(r, db::no_timeout).then([this, done, total] (mutation_opt m) { if (!m) { *done = true; } else { diff --git a/tests/perf_row_cache_update.cc b/tests/perf_row_cache_update.cc index 5d8b9001a3..3bd80b1144 100644 --- a/tests/perf_row_cache_update.cc +++ b/tests/perf_row_cache_update.cc @@ -153,7 +153,7 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) { auto rd = std::make_unique( make_combined_reader(s, cache.make_reader(s), mt->make_flat_reader(s))); rd->set_max_buffer_size(1); - rd->fill_buffer().get(); + rd->fill_buffer(db::no_timeout).get(); scheduling_latency_measurer slm; slm.start(); @@ -164,7 +164,7 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) { rd->set_max_buffer_size(1024*1024); rd->consume_pausable([] (mutation_fragment) { return stop_iteration::no; - }).get(); + }, db::no_timeout).get(); mt = {}; rd = {}; diff --git a/tests/row_cache_alloc_stress.cc b/tests/row_cache_alloc_stress.cc index 2dad22e4e0..afe31970f0 100644 --- a/tests/row_cache_alloc_stress.cc +++ b/tests/row_cache_alloc_stress.cc @@ -185,7 +185,7 @@ int main(int argc, char** argv) { for (auto&& key : keys) { auto range = dht::partition_range::make_singular(key); auto reader = cache.make_reader(s, range); - auto mo = read_mutation_from_flat_mutation_reader(reader).get0(); + auto mo = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); assert(mo); assert(mo->partition().live_row_count(*s) == row_count + 1 /* one row was already in cache before update()*/); @@ -202,7 +202,7 @@ int main(int argc, char** argv) { for (auto&& key : keys) { auto range = dht::partition_range::make_singular(key); auto reader = cache.make_reader(s, range); - auto mfopt = reader().get0(); + auto mfopt = reader(db::no_timeout).get0(); assert(mfopt); assert(mfopt->is_partition_start()); } @@ -240,7 +240,7 @@ int main(int argc, char** argv) { try { auto reader = cache.make_reader(s, range); - assert(!reader().get0()); + assert(!reader(db::no_timeout).get0()); auto evicted_from_cache = logalloc::segment_size + large_cell_size; new char[evicted_from_cache + logalloc::segment_size]; assert(false); // The test is not invoking the case which it's supposed to test diff --git a/tests/row_cache_stress_test.cc b/tests/row_cache_stress_test.cc index 817a1e798e..7ceb1061c7 100644 --- a/tests/row_cache_stress_test.cc +++ b/tests/row_cache_stress_test.cc @@ -305,7 +305,7 @@ int main(int argc, char** argv) { while (!cancelled) { test_log.trace("{}: starting read", id); auto rd = t.make_single_key_reader(pk, ck_range); - auto row_count = rd->rd.consume(validating_consumer(t, id)).get0(); + auto row_count = rd->rd.consume(validating_consumer(t, id), db::no_timeout).get0(); if (row_count != len) { throw std::runtime_error(sprint("Expected %d fragments, got %d", len, row_count)); } @@ -317,7 +317,7 @@ int main(int argc, char** argv) { while (!cancelled) { test_log.trace("{}: starting read", id); auto rd = t.make_scanning_reader(); - auto row_count = rd->rd.consume(validating_consumer(t, id)).get0(); + auto row_count = rd->rd.consume(validating_consumer(t, id), db::no_timeout).get0(); if (row_count != expected_row_count) { throw std::runtime_error(sprint("Expected %d fragments, got %d", expected_row_count, row_count)); } diff --git a/tests/row_cache_test.cc b/tests/row_cache_test.cc index a59a718980..a765463ee5 100644 --- a/tests/row_cache_test.cc +++ b/tests/row_cache_test.cc @@ -126,7 +126,7 @@ snapshot_source snapshot_source_from_snapshot(mutation_source src) { bool has_key(row_cache& cache, const dht::decorated_key& key) { auto range = dht::partition_range::make_singular(key); auto reader = cache.make_reader(cache.schema(), range); - auto mo = read_mutation_from_flat_mutation_reader(reader).get0(); + auto mo = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); if (!bool(mo)) { return false; } @@ -673,7 +673,7 @@ SEASTAR_TEST_CASE(test_reading_from_random_partial_partition) { cache.populate(m1); // m1 is supposed to have random continuity and populate() should preserve it auto rd1 = cache.make_reader(gen.schema()); - rd1.fill_buffer().get(); + rd1.fill_buffer(db::no_timeout).get(); // Merge m2 into cache auto mt = make_lw_shared(gen.schema()); @@ -681,7 +681,7 @@ SEASTAR_TEST_CASE(test_reading_from_random_partial_partition) { cache.update([&] { underlying.apply(m2); }, *mt).get(); auto rd2 = cache.make_reader(gen.schema()); - rd2.fill_buffer().get(); + rd2.fill_buffer(db::no_timeout).get(); assert_that(std::move(rd1)).next_mutation().is_equal_to(m1); assert_that(std::move(rd2)).next_mutation().is_equal_to(m1 + m2); @@ -738,7 +738,7 @@ SEASTAR_TEST_CASE(test_eviction) { auto pr = dht::partition_range::make_singular(key); auto rd = cache.make_reader(s, pr); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); } while (tracker.partitions() > 0) { @@ -807,7 +807,7 @@ SEASTAR_TEST_CASE(test_eviction_after_schema_change) { auto pr = dht::partition_range::make_singular(m.decorated_key()); auto rd = cache.make_reader(s2, pr); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); } while (tracker.region().evict_some() == memory::reclaiming_result::reclaimed_something) ; @@ -824,9 +824,9 @@ void test_sliced_read_row_presence(flat_mutation_reader reader, schema_ptr s, st { clustering_key::equality ck_eq(*s); - auto mfopt = reader().get0(); + auto mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt->is_partition_start()); - while ((mfopt = reader().get0()) && !mfopt->is_end_of_partition()) { + while ((mfopt = reader(db::no_timeout).get0()) && !mfopt->is_end_of_partition()) { if (mfopt->is_clustering_row()) { BOOST_REQUIRE(!expected.empty()); auto expected_ck = expected.front(); @@ -840,7 +840,7 @@ void test_sliced_read_row_presence(flat_mutation_reader reader, schema_ptr s, st } BOOST_REQUIRE(expected.empty()); BOOST_REQUIRE(mfopt && mfopt->is_end_of_partition()); - BOOST_REQUIRE(!reader().get0()); + BOOST_REQUIRE(!reader(db::no_timeout).get0()); } SEASTAR_TEST_CASE(test_single_partition_update) { @@ -1043,7 +1043,7 @@ SEASTAR_TEST_CASE(test_update_failure) { auto has_only = [&] (const partitions_type& partitions) { auto reader = cache.make_reader(s, query::full_partition_range); for (int i = 0; i < partition_count; i++) { - auto mopt = read_mutation_from_flat_mutation_reader(reader).get0(); + auto mopt = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); if (!mopt) { break; } @@ -1051,7 +1051,7 @@ SEASTAR_TEST_CASE(test_update_failure) { BOOST_REQUIRE(it != partitions.end()); BOOST_REQUIRE(it->second.equal(*s, mopt->partition())); } - BOOST_REQUIRE(!reader().get0()); + BOOST_REQUIRE(!reader(db::no_timeout).get0()); }; if (failed) { @@ -1236,11 +1236,11 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { auto m0_range = dht::partition_range::make_singular(ring[0].ring_position()); auto rd1 = cache.make_reader(s, m0_range); rd1.set_max_buffer_size(1); - auto rd1_fill_buffer = rd1.fill_buffer(); + auto rd1_fill_buffer = rd1.fill_buffer(db::no_timeout); auto rd2 = cache.make_reader(s); rd2.set_max_buffer_size(1); - auto rd2_fill_buffer = rd2.fill_buffer(); + auto rd2_fill_buffer = rd2.fill_buffer(db::no_timeout); sleep(10ms).get(); @@ -1371,7 +1371,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) { auto rd1 = cache.make_reader(s); rd1.set_max_buffer_size(1); - auto rd1_fill_buffer = rd1.fill_buffer(); + auto rd1_fill_buffer = rd1.fill_buffer(db::no_timeout); sleep(10ms).get(); @@ -1431,10 +1431,10 @@ SEASTAR_TEST_CASE(test_mvcc) { cache.populate(m1); auto rd1 = cache.make_reader(s); - rd1.fill_buffer().get(); + rd1.fill_buffer(db::no_timeout).get(); auto rd2 = cache.make_reader(s); - rd2.fill_buffer().get(); + rd2.fill_buffer(db::no_timeout).get(); auto mt1 = make_lw_shared(s); mt1->apply(m2); @@ -1453,19 +1453,19 @@ SEASTAR_TEST_CASE(test_mvcc) { cache.update([&] { underlying.apply(mt1_copy); }, *mt1).get(); auto rd3 = cache.make_reader(s); - rd3.fill_buffer().get(); + rd3.fill_buffer(db::no_timeout).get(); auto rd4 = cache.make_reader(s); - rd4.fill_buffer().get(); + rd4.fill_buffer(db::no_timeout).get(); auto rd5 = cache.make_reader(s); - rd5.fill_buffer().get(); + rd5.fill_buffer(db::no_timeout).get(); assert_that(std::move(rd3)).has_monotonic_positions(); if (with_active_memtable_reader) { assert(mt1_reader_opt); - auto mt1_reader_mutation = read_mutation_from_flat_mutation_reader(*mt1_reader_opt).get0(); + auto mt1_reader_mutation = read_mutation_from_flat_mutation_reader(*mt1_reader_opt, db::no_timeout).get0(); BOOST_REQUIRE(mt1_reader_mutation); assert_that(*mt1_reader_mutation).is_equal_to(m2); } @@ -1884,7 +1884,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_in_partial_partition) { } static void consume_all(flat_mutation_reader& rd) { - while (auto mfopt = rd().get0()) {} + while (auto mfopt = rd(db::no_timeout).get0()) {} } static void populate_range(row_cache& cache, @@ -1934,7 +1934,7 @@ SEASTAR_TEST_CASE(test_readers_get_all_data_after_eviction) { auto make_reader = [&] (const query::partition_slice& slice) { auto rd = cache.make_reader(s, query::full_partition_range, slice); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return assert_that(std::move(rd)); }; @@ -2060,7 +2060,7 @@ SEASTAR_TEST_CASE(test_tombstones_are_not_missed_when_range_is_invalidated) { auto make_reader = [&] (const query::partition_slice& slice) { auto rd = cache.make_reader(s.schema(), pr, slice); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return assert_that(std::move(rd)); }; @@ -2165,7 +2165,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) { auto make_reader = [&] (const dht::partition_range& pr) { auto rd = cache.make_reader(s.schema(), pr); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return rd; }; @@ -2185,7 +2185,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) { auto pr = dht::partition_range::make_singular(pkeys[2]); snap = mt->make_flat_reader(s.schema(), pr); snap->set_max_buffer_size(1); - snap->fill_buffer().get(); + snap->fill_buffer(db::no_timeout).get(); cache.update([&] { auto mt2 = make_lw_shared(cache.schema()); @@ -2244,9 +2244,9 @@ SEASTAR_TEST_CASE(test_exception_safety_of_reads) { try { injector.fail_after(i++); auto rd = cache.make_reader(s, query::full_partition_range, slice); - auto got_opt = read_mutation_from_flat_mutation_reader(rd).get0(); + auto got_opt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0(); BOOST_REQUIRE(got_opt); - BOOST_REQUIRE(!read_mutation_from_flat_mutation_reader(rd).get0()); + BOOST_REQUIRE(!read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0()); injector.cancel(); assert_that(*got_opt).is_equal_to(mut, ranges); @@ -2320,9 +2320,9 @@ SEASTAR_TEST_CASE(test_exception_safety_of_transitioning_from_underlying_read_to injector.fail_after(i++); auto rd = cache.make_reader(s.schema(), pr, slice); - auto got_opt = read_mutation_from_flat_mutation_reader(rd).get0(); + auto got_opt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0(); BOOST_REQUIRE(got_opt); - auto mfopt = rd().get0(); + auto mfopt = rd(db::no_timeout).get0(); BOOST_REQUIRE(!mfopt); injector.cancel(); @@ -2396,7 +2396,7 @@ SEASTAR_TEST_CASE(test_concurrent_population_before_latest_version_iterator) { auto make_reader = [&] (const query::partition_slice& slice) { auto rd = cache.make_reader(s.schema(), pr, slice); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return assert_that(std::move(rd)); }; @@ -2558,7 +2558,7 @@ SEASTAR_TEST_CASE(test_random_row_population) { auto make_reader = [&] (const query::partition_slice* slice = nullptr) { auto rd = cache.make_reader(s.schema(), pr, slice ? *slice : s.schema()->full_slice()); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return std::move(rd); }; @@ -2592,7 +2592,7 @@ SEASTAR_TEST_CASE(test_random_row_population) { while (!readers.empty()) { auto i = readers.begin(); while (i != readers.end()) { - auto mfo = i->reader().get0(); + auto mfo = i->reader(db::no_timeout).get0(); if (!mfo) { auto&& ranges = i->slice->row_ranges(*s.schema(), pk.key()); assert_that(i->result).is_equal_to(m1, ranges); @@ -2673,7 +2673,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver auto make_reader = [&] { auto rd = cache.make_reader(s.schema(), pr); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return std::move(rd); }; @@ -2801,7 +2801,7 @@ SEASTAR_TEST_CASE(test_continuity_population_with_multicolumn_clustering_key) { auto make_reader = [&] (const query::partition_slice* slice = nullptr) { auto rd = cache.make_reader(s, pr, slice ? *slice : s->full_slice()); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return std::move(rd); }; @@ -2908,7 +2908,7 @@ SEASTAR_TEST_CASE(test_concurrent_setting_of_continuity_on_read_upper_bound) { auto make_rd = [&] (const query::partition_slice* slice = nullptr) { auto rd = cache.make_reader(s.schema(), pr, slice ? *slice : s.schema()->full_slice()); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return std::move(rd); }; @@ -2972,7 +2972,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_of_overlapping_tombstones_in_many_versi auto make_reader = [&] { auto rd = cache.make_reader(s.schema()); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return std::move(rd); }; @@ -3010,7 +3010,7 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) { auto make_reader = [&] (const query::partition_slice& slice) { auto rd = cache.make_reader(s, pr, slice); rd.set_max_buffer_size(3); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return std::move(rd); }; @@ -3037,7 +3037,7 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) { .build(); auto rd = make_reader(slice); - auto actual_opt = read_mutation_from_flat_mutation_reader(rd).get0(); + auto actual_opt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0(); BOOST_REQUIRE(actual_opt); auto actual = *actual_opt; @@ -3119,12 +3119,12 @@ SEASTAR_TEST_CASE(test_cache_update_and_eviction_preserves_monotonicity_of_memta auto mt_rd1 = mt->make_flat_reader(s); mt_rd1.set_max_buffer_size(1); - mt_rd1.fill_buffer().get(); + mt_rd1.fill_buffer(db::no_timeout).get(); BOOST_REQUIRE(mt_rd1.is_buffer_full()); // If fails, increase n_rows auto mt_rd2 = mt->make_flat_reader(s); mt_rd2.set_max_buffer_size(1); - mt_rd2.fill_buffer().get(); + mt_rd2.fill_buffer(db::no_timeout).get(); apply(cache, underlying, *mt); @@ -3133,13 +3133,13 @@ SEASTAR_TEST_CASE(test_cache_update_and_eviction_preserves_monotonicity_of_memta auto c_rd1 = cache.make_reader(s); c_rd1.set_max_buffer_size(1); - c_rd1.fill_buffer().get(); + c_rd1.fill_buffer(db::no_timeout).get(); apply(cache, underlying, m2); auto c_rd2 = cache.make_reader(s); c_rd2.set_max_buffer_size(1); - c_rd2.fill_buffer().get(); + c_rd2.fill_buffer(db::no_timeout).get(); cache.evict(); @@ -3164,8 +3164,8 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { { auto rd = cache.make_reader(s); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(!row.cells().cell_hash_for(0)); } @@ -3173,15 +3173,15 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { auto slice = s->full_slice(); slice.options.set(); auto rd = cache.make_reader(s, query::full_partition_range, slice); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } { auto rd = cache.make_reader(s); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } @@ -3191,8 +3191,8 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { { auto rd = cache.make_reader(s); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(!row.cells().cell_hash_for(0)); } @@ -3200,15 +3200,15 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { auto slice = s->full_slice(); slice.options.set(); auto rd = cache.make_reader(s, query::full_partition_range, slice); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } { auto rd = cache.make_reader(s); - rd().get0()->as_partition_start(); - clustering_row row = std::move(rd().get0()->as_mutable_clustering_row()); + rd(db::no_timeout).get0()->as_partition_start(); + clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } }); @@ -3234,7 +3234,7 @@ SEASTAR_TEST_CASE(test_random_population_with_many_versions) { auto make_reader = [&] () { auto rd = cache.make_reader(s, query::full_partition_range, s->full_slice()); rd.set_max_buffer_size(1); - rd.fill_buffer().get(); + rd.fill_buffer(db::no_timeout).get(); return assert_that(std::move(rd)); }; @@ -3341,7 +3341,7 @@ SEASTAR_TEST_CASE(test_eviction_after_old_snapshot_touches_overriden_rows_keeps_ auto pr1 = dht::partition_range::make_singular(pk); auto rd1 = cache.make_reader(s, pr1); rd1.set_max_buffer_size(1); - rd1.fill_buffer().get(); + rd1.fill_buffer(db::no_timeout).get(); apply(cache, underlying, m2); @@ -3382,7 +3382,7 @@ SEASTAR_TEST_CASE(test_eviction_after_old_snapshot_touches_overriden_rows_keeps_ auto rd1 = cache.make_reader(s, pr); rd1.set_max_buffer_size(1); - rd1.fill_buffer().get(); + rd1.fill_buffer(db::no_timeout).get(); apply(cache, underlying, m2); @@ -3429,7 +3429,7 @@ SEASTAR_TEST_CASE(test_reading_progress_with_small_buffer_and_invalidation) { while (!rd3.is_end_of_stream()) { tracker.allocator().invalidate_references(); - rd3.fill_buffer().get(); + rd3.fill_buffer(db::no_timeout).get(); while (!rd3.is_buffer_empty()) { result.partition().apply(*s.schema(), rd3.pop_mutation_fragment()); } diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index fe163fd894..71ae4824c4 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -2899,7 +2899,7 @@ SEASTAR_THREAD_TEST_CASE(compact_deleted_row) { * ] */ auto reader = compacted_sstable_reader(s, table_name, {1, 2}); - mutation_opt m = read_mutation_from_flat_mutation_reader(reader).get0(); + mutation_opt m = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); BOOST_REQUIRE(m); BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("key"))))); BOOST_REQUIRE(!m->partition().partition_tombstone()); @@ -2968,7 +2968,7 @@ SEASTAR_THREAD_TEST_CASE(compact_deleted_cell) { * */ auto reader = compacted_sstable_reader(s, table_name, {1, 2}); - mutation_opt m = read_mutation_from_flat_mutation_reader(reader).get0(); + mutation_opt m = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); BOOST_REQUIRE(m); BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("key"))))); BOOST_REQUIRE(!m->partition().partition_tombstone()); diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index 961895de4b..3e00fc5b39 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -832,7 +832,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) { return reusable_sst(s, tmpdir_path, 11).then([s, verifier, tomb, &static_set_col] (auto sstp) mutable { return do_with(make_dkey(s, "key1"), [sstp, s, verifier, tomb, &static_set_col] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) { auto verify_set = [&tomb] (const collection_type_impl::mutation& m) { BOOST_REQUIRE(bool(m.tomb) == true); BOOST_REQUIRE(m.tomb == tomb); @@ -861,7 +861,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) { }).then([sstp, s, verifier] { return do_with(make_dkey(s, "key2"), [sstp, s, verifier] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, verifier, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, verifier, rd] (auto mutation) { auto m = verifier(mutation); BOOST_REQUIRE(!m.tomb); BOOST_REQUIRE(m.cells.size() == 1); @@ -894,7 +894,7 @@ SEASTAR_TEST_CASE(datafile_generation_12) { return reusable_sst(s, tmpdir_path, 12).then([s, tomb] (auto sstp) mutable { return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, tomb, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) { auto& mp = mutation->partition(); BOOST_REQUIRE(mp.row_tombstones().size() == 1); for (auto& rt: mp.row_tombstones()) { @@ -930,7 +930,7 @@ static future<> sstable_compression_test(compressor_ptr c, unsigned generation) return reusable_sst(s, tmpdir_path, generation).then([s, tomb] (auto sstp) mutable { return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, tomb, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) { auto& mp = mutation->partition(); BOOST_REQUIRE(mp.row_tombstones().size() == 1); for (auto& rt: mp.row_tombstones()) { @@ -1144,7 +1144,7 @@ SEASTAR_TEST_CASE(compact) { // nadav - deleted partition return open_sstable(s, tmpdir_path, generation).then([s] (shared_sstable sst) { auto reader = make_lw_shared(sstable_reader(sst, s)); // reader holds sst and s alive. - return read_mutation_from_flat_mutation_reader(*reader).then([reader, s] (mutation_opt m) { + return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([reader, s] (mutation_opt m) { BOOST_REQUIRE(m); BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("jerry"))))); BOOST_REQUIRE(!m->partition().partition_tombstone()); @@ -1157,7 +1157,7 @@ SEASTAR_TEST_CASE(compact) { auto& cdef2 = *s->get_column_definition("height"); BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == bytes({0,0,0,40})); BOOST_REQUIRE(cells.cell_at(cdef2.id).as_atomic_cell(cdef2).value() == bytes({0,0,0,(int8_t)170})); - return read_mutation_from_flat_mutation_reader(*reader); + return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout); }).then([reader, s] (mutation_opt m) { BOOST_REQUIRE(m); BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("tom"))))); @@ -1171,7 +1171,7 @@ SEASTAR_TEST_CASE(compact) { auto& cdef2 = *s->get_column_definition("height"); BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == bytes({0,0,0,20})); BOOST_REQUIRE(cells.cell_at(cdef2.id).as_atomic_cell(cdef2).value() == bytes({0,0,0,(int8_t)180})); - return read_mutation_from_flat_mutation_reader(*reader); + return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout); }).then([reader, s] (mutation_opt m) { BOOST_REQUIRE(m); BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("john"))))); @@ -1185,14 +1185,14 @@ SEASTAR_TEST_CASE(compact) { auto& cdef2 = *s->get_column_definition("height"); BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == bytes({0,0,0,20})); BOOST_REQUIRE(cells.find_cell(cdef2.id) == nullptr); - return read_mutation_from_flat_mutation_reader(*reader); + return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout); }).then([reader, s] (mutation_opt m) { BOOST_REQUIRE(m); BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("nadav"))))); BOOST_REQUIRE(m->partition().partition_tombstone()); auto &rows = m->partition().clustered_rows(); BOOST_REQUIRE(rows.calculate_size() == 0); - return read_mutation_from_flat_mutation_reader(*reader); + return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout); }).then([reader] (mutation_opt m) { BOOST_REQUIRE(!m); }); @@ -1369,7 +1369,7 @@ static future<> check_compacted_sstables(sstring tmpdir_path, unsigned long gene return do_with(std::move(reader), [generations, s, keys] (flat_mutation_reader& reader) { return do_for_each(*generations, [&reader, keys] (unsigned long generation) mutable { - return read_mutation_from_flat_mutation_reader(reader).then([generation, keys] (mutation_opt m) { + return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([generation, keys] (mutation_opt m) { BOOST_REQUIRE(m); keys->push_back(m->key()); }); @@ -1450,7 +1450,7 @@ SEASTAR_TEST_CASE(datafile_generation_37) { return reusable_sst(s, tmpdir_path, 37).then([s, tmpdir_path] (auto sstp) { return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) { auto& mp = mutation->partition(); auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1")}); @@ -1485,7 +1485,7 @@ SEASTAR_TEST_CASE(datafile_generation_38) { return reusable_sst(s, tmpdir_path, 38).then([s] (auto sstp) { return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) { auto& mp = mutation->partition(); auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1"), to_bytes("cl2")}); @@ -1521,7 +1521,7 @@ SEASTAR_TEST_CASE(datafile_generation_39) { return reusable_sst(s, tmpdir_path, 39).then([s] (auto sstp) { return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) { auto& mp = mutation->partition(); auto& row = mp.clustered_row(*s, clustering_key::make_empty()); match_live_cell(row.cells(), *s, "cl1", data_value(data_value(to_bytes("cl1")))); @@ -1617,7 +1617,7 @@ SEASTAR_TEST_CASE(datafile_generation_41) { return reusable_sst(s, tmpdir_path, 41).then([s, tomb] (auto sstp) mutable { return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, tomb, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) { auto& mp = mutation->partition(); BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1); auto& c_row = *(mp.clustered_rows().begin()); @@ -1676,7 +1676,7 @@ SEASTAR_TEST_CASE(datafile_generation_47) { return reusable_sst(s, tmpdir_path, 47).then([s] (auto sstp) mutable { auto reader = make_lw_shared(sstable_reader(sstp, s)); return repeat([reader] { - return (*reader)().then([] (mutation_fragment_opt m) { + return (*reader)(db::no_timeout).then([] (mutation_fragment_opt m) { if (!m) { return make_ready_future(stop_iteration::yes); } @@ -2330,7 +2330,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) { auto assert_that_produces_dead_cell = [&] (auto& sst, partition_key& key) { auto reader = make_lw_shared(sstable_reader(sst, s)); - read_mutation_from_flat_mutation_reader(*reader).then([reader, s, &key] (mutation_opt m) { + read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([reader, s, &key] (mutation_opt m) { BOOST_REQUIRE(m); BOOST_REQUIRE(m->key().equal(*s, key)); auto& rows = m->partition().clustered_rows(); @@ -2340,7 +2340,7 @@ SEASTAR_TEST_CASE(tombstone_purge_test) { BOOST_REQUIRE_EQUAL(cells.size(), 1); auto& cdef = *s->get_column_definition("value"); BOOST_REQUIRE(!cells.cell_at(cdef.id).as_atomic_cell(cdef).is_live()); - return (*reader)(); + return (*reader)(db::no_timeout); }).then([reader, s] (mutation_fragment_opt m) { BOOST_REQUIRE(!m); }).get(); @@ -2505,7 +2505,7 @@ SEASTAR_TEST_CASE(check_multi_schema) { auto f = sst->load(); return f.then([sst, s] { auto reader = make_lw_shared(sstable_reader(sst, s)); - return read_mutation_from_flat_mutation_reader(*reader).then([reader, s] (mutation_opt m) { + return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([reader, s] (mutation_opt m) { BOOST_REQUIRE(m); BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, 0))); auto& rows = m->partition().clustered_rows(); @@ -2516,7 +2516,7 @@ SEASTAR_TEST_CASE(check_multi_schema) { BOOST_REQUIRE_EQUAL(cells.size(), 1); auto& cdef = *s->get_column_definition("e"); BOOST_REQUIRE_EQUAL(cells.cell_at(cdef.id).as_atomic_cell(cdef).value(), int32_type->decompose(5)); - return (*reader)(); + return (*reader)(db::no_timeout); }).then([reader, s] (mutation_fragment_opt m) { BOOST_REQUIRE(!m); }); @@ -2564,13 +2564,13 @@ SEASTAR_TEST_CASE(sstable_rewrite) { auto newsst = (*new_tables)[0]; BOOST_REQUIRE(newsst->generation() == 52); auto reader = make_lw_shared(sstable_reader(newsst, s)); - return (*reader)().then([s, reader, key] (mutation_fragment_opt m) { + return (*reader)(db::no_timeout).then([s, reader, key] (mutation_fragment_opt m) { BOOST_REQUIRE(m); BOOST_REQUIRE(m->is_partition_start()); auto pkey = partition_key::from_exploded(*s, {to_bytes(key)}); BOOST_REQUIRE(m->as_partition_start().key().key().equal(*s, pkey)); reader->next_partition(); - return (*reader)(); + return (*reader)(db::no_timeout); }).then([reader] (mutation_fragment_opt m) { BOOST_REQUIRE(!m); }); @@ -2587,7 +2587,7 @@ void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query partition_key::equality pk_eq(*s); clustering_key::equality ck_eq(*s); - auto mfopt = reader().get0(); + auto mfopt = reader(db::no_timeout).get0(); while (mfopt) { BOOST_REQUIRE(mfopt->is_partition_start()); auto it = std::find_if(expected.begin(), expected.end(), [&] (auto&& x) { @@ -2597,7 +2597,7 @@ void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query auto expected_cr = std::move(it->second); expected.erase(it); - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); while (!mfopt->is_end_of_partition()) { if (mfopt->is_clustering_row()) { @@ -2611,12 +2611,12 @@ void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query BOOST_REQUIRE(it != expected_cr.end()); expected_cr.erase(it); } - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); } BOOST_REQUIRE(expected_cr.empty()); - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); } BOOST_REQUIRE(expected.empty()); } @@ -2812,11 +2812,11 @@ SEASTAR_TEST_CASE(test_counter_read) { sst->load().get(); auto reader = sstable_reader(sst, s); - auto mfopt = reader().get0(); + auto mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mfopt->is_partition_start()); - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mfopt->is_clustering_row()); const clustering_row* cr = &mfopt->as_clustering_row(); @@ -2845,7 +2845,7 @@ SEASTAR_TEST_CASE(test_counter_read) { }); }); - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mfopt->is_clustering_row()); cr = &mfopt->as_clustering_row(); @@ -2858,11 +2858,11 @@ SEASTAR_TEST_CASE(test_counter_read) { } }); - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mfopt->is_end_of_partition()); - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(!mfopt); } }); @@ -4311,32 +4311,32 @@ SEASTAR_TEST_CASE(test_wrong_counter_shard_order) { }; { - auto mfopt = reader().get0(); + auto mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mfopt->is_partition_start()); - verify_row(reader().get0(), 28545); - verify_row(reader().get0(), 27967); - verify_row(reader().get0(), 28342); - verify_row(reader().get0(), 28325); - mfopt = reader().get0(); + verify_row(reader(db::no_timeout).get0(), 28545); + verify_row(reader(db::no_timeout).get0(), 27967); + verify_row(reader(db::no_timeout).get0(), 28342); + verify_row(reader(db::no_timeout).get0(), 28325); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mfopt->is_end_of_partition()); } { - auto mfopt = reader().get0(); + auto mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mfopt->is_partition_start()); - verify_row(reader().get0(), 28386); - verify_row(reader().get0(), 28378); - verify_row(reader().get0(), 28129); - verify_row(reader().get0(), 28260); - mfopt = reader().get0(); + verify_row(reader(db::no_timeout).get0(), 28386); + verify_row(reader(db::no_timeout).get0(), 28378); + verify_row(reader(db::no_timeout).get0(), 28129); + verify_row(reader(db::no_timeout).get0(), 28260); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); BOOST_REQUIRE(mfopt->is_end_of_partition()); } - BOOST_REQUIRE(!reader().get0()); + BOOST_REQUIRE(!reader(db::no_timeout).get0()); } }); } diff --git a/tests/sstable_mutation_test.cc b/tests/sstable_mutation_test.cc index 236890e4a1..36b6b57209 100644 --- a/tests/sstable_mutation_test.cc +++ b/tests/sstable_mutation_test.cc @@ -53,7 +53,7 @@ SEASTAR_TEST_CASE(nonexistent_key) { return do_with(make_dkey(uncompressed_schema(), "invalid_key"), [sstp] (auto& key) { auto s = uncompressed_schema(); auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return (*rd)().then([sstp, s, &key, rd] (auto mutation) { + return (*rd)(db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { BOOST_REQUIRE(!mutation); return make_ready_future<>(); }); @@ -66,7 +66,7 @@ future<> test_no_clustered(bytes&& key, std::unordered_map && return do_with(make_dkey(uncompressed_schema(), std::move(k)), [sstp, map = std::move(map)] (auto& key) { auto s = uncompressed_schema(); auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd, map = std::move(map)] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd, map = std::move(map)] (auto mutation) { BOOST_REQUIRE(mutation); auto& mp = mutation->partition(); for (auto&& e : mp.range(*s, nonwrapping_range())) { @@ -133,7 +133,7 @@ future generate_clustered(bytes&& key) { return do_with(make_dkey(complex_schema(), std::move(k)), [sstp] (auto& key) { auto s = complex_schema(); auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { BOOST_REQUIRE(mutation); return std::move(*mutation); }); @@ -324,7 +324,7 @@ future<> test_range_reads(const dht::token& min, const dht::token& max, std::vec // "mutations", continues to live until after the last // iteration's future completes, so its lifetime is safe. [sstp, mutations = std::move(mutations), &expected, expected_size, count, stop] () mutable { - return (*mutations)().then([&expected, expected_size, count, stop, mutations] (mutation_fragment_opt mfopt) mutable { + return (*mutations)(db::no_timeout).then([&expected, expected_size, count, stop, mutations] (mutation_fragment_opt mfopt) mutable { if (mfopt) { BOOST_REQUIRE(mfopt->is_partition_start()); BOOST_REQUIRE(*count < expected_size); @@ -431,7 +431,7 @@ SEASTAR_TEST_CASE(test_sstable_can_write_and_read_range_tombstone) { write_memtable_to_sstable_for_test(*mt, sst).get(); sst->load().get(); auto mr = sst->read_rows_flat(s); - auto mut = read_mutation_from_flat_mutation_reader(mr).get0(); + auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0(); BOOST_REQUIRE(bool(mut)); auto& rts = mut->partition().row_tombstones(); BOOST_REQUIRE(rts.size() == 1); @@ -450,7 +450,7 @@ SEASTAR_TEST_CASE(compact_storage_sparse_read) { return do_with(make_dkey(compact_sparse_schema(), "first_row"), [sstp] (auto& key) { auto s = compact_sparse_schema(); auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { BOOST_REQUIRE(mutation); auto& mp = mutation->partition(); auto& row = mp.clustered_row(*s, clustering_key::make_empty()); @@ -467,7 +467,7 @@ SEASTAR_TEST_CASE(compact_storage_simple_dense_read) { return do_with(make_dkey(compact_simple_dense_schema(), "first_row"), [sstp] (auto& key) { auto s = compact_simple_dense_schema(); auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { auto& mp = mutation->partition(); auto exploded = exploded_clustering_prefix({"cl1"}); @@ -486,7 +486,7 @@ SEASTAR_TEST_CASE(compact_storage_dense_read) { return do_with(make_dkey(compact_dense_schema(), "first_row"), [sstp] (auto& key) { auto s = compact_dense_schema(); auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { auto& mp = mutation->partition(); auto exploded = exploded_clustering_prefix({"cl1", "cl2"}); @@ -509,7 +509,7 @@ SEASTAR_TEST_CASE(broken_ranges_collection) { auto s = peers_schema(); auto reader = make_lw_shared(sstp->as_mutation_source().make_reader(s, query::full_partition_range)); return repeat([s, reader] { - return read_mutation_from_flat_mutation_reader(*reader).then([s, reader] (mutation_opt mut) { + return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([s, reader] (mutation_opt mut) { auto key_equal = [s, &mut] (sstring ip) { return mut->key().equal(*s, partition_key::from_deeply_exploded(*s, { net::inet_address(ip) })); }; @@ -575,7 +575,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone) { auto s = tombstone_overlap_schema(); return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) { return repeat([sstp, s, &reader] { - return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) { + return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) { if (!mut) { return stop_iteration::yes; } @@ -638,7 +638,7 @@ SEASTAR_TEST_CASE(range_tombstone_reading) { auto s = tombstone_overlap_schema(); return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) { return repeat([sstp, s, &reader] { - return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) { + return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) { if (!mut) { return stop_iteration::yes; } @@ -715,7 +715,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone2) { auto s = tombstone_overlap_schema2(); return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) { return repeat([sstp, s, &reader] { - return read_mutation_from_flat_mutation_reader(reader).then([s] (mutation_opt mut) { + return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) { if (!mut) { return stop_iteration::yes; } @@ -798,7 +798,7 @@ SEASTAR_TEST_CASE(test_non_compound_table_row_is_not_marked_as_static) { write_memtable_to_sstable_for_test(*mt, sst).get(); sst->load().get(); auto mr = sst->read_rows_flat(s); - auto mut = read_mutation_from_flat_mutation_reader(mr).get0(); + auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0(); BOOST_REQUIRE(bool(mut)); } }); @@ -1320,12 +1320,12 @@ SEASTAR_THREAD_TEST_CASE(test_large_index_pages_do_not_cause_large_allocations) auto pr = dht::partition_range::make_singular(small_keys[0]); auto mt_reader = mt->make_flat_reader(s, pr); - mutation expected = *read_mutation_from_flat_mutation_reader(mt_reader).get0(); + mutation expected = *read_mutation_from_flat_mutation_reader(mt_reader, db::no_timeout).get0(); auto t0 = std::chrono::steady_clock::now(); auto large_allocs_before = memory::stats().large_allocations(); auto sst_reader = sst->as_mutation_source().make_reader(s, pr); - mutation actual = *read_mutation_from_flat_mutation_reader(sst_reader).get0(); + mutation actual = *read_mutation_from_flat_mutation_reader(sst_reader, db::no_timeout).get0(); auto large_allocs_after = memory::stats().large_allocations(); auto duration = std::chrono::steady_clock::now() - t0; diff --git a/tests/sstable_test.cc b/tests/sstable_test.cc index a088d2e8fb..a626963ce7 100644 --- a/tests/sstable_test.cc +++ b/tests/sstable_test.cc @@ -884,7 +884,7 @@ SEASTAR_TEST_CASE(wrong_range) { return do_with(make_dkey(uncompressed_schema(), "todata"), [sstp] (auto& key) { auto s = columns_schema(); auto rd = make_lw_shared(sstp->read_row_flat(s, key)); - return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) { + return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { return make_ready_future<>(); }); }); @@ -1065,17 +1065,17 @@ static future count_rows(sstable_ptr sstp, schema_ptr s, sstring key, sstri auto ps = make_partition_slice(*s, ck1, ck2); auto dkey = make_dkey(s, key.c_str()); auto rd = sstp->read_row_flat(s, dkey, ps); - auto mfopt = rd().get0(); + auto mfopt = rd(db::no_timeout).get0(); if (!mfopt) { return 0; } int nrows = 0; - mfopt = rd().get0(); + mfopt = rd(db::no_timeout).get0(); while (mfopt) { if (mfopt->is_clustering_row()) { nrows++; } - mfopt = rd().get0(); + mfopt = rd(db::no_timeout).get0(); } return nrows; }); @@ -1086,17 +1086,17 @@ static future count_rows(sstable_ptr sstp, schema_ptr s, sstring key) { return seastar::async([sstp, s, key] () mutable { auto dkey = make_dkey(s, key.c_str()); auto rd = sstp->read_row_flat(s, dkey); - auto mfopt = rd().get0(); + auto mfopt = rd(db::no_timeout).get0(); if (!mfopt) { return 0; } int nrows = 0; - mfopt = rd().get0(); + mfopt = rd(db::no_timeout).get0(); while (mfopt) { if (mfopt->is_clustering_row()) { nrows++; } - mfopt = rd().get0(); + mfopt = rd(db::no_timeout).get0(); } return nrows; }); @@ -1109,17 +1109,17 @@ static future count_rows(sstable_ptr sstp, schema_ptr s, sstring ck1, sstri auto ps = make_partition_slice(*s, ck1, ck2); auto reader = sstp->read_range_rows_flat(s, query::full_partition_range, ps); int nrows = 0; - auto mfopt = reader().get0(); + auto mfopt = reader(db::no_timeout).get0(); while (mfopt) { - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); BOOST_REQUIRE(mfopt); while (!mfopt->is_end_of_partition()) { if (mfopt->is_clustering_row()) { nrows++; } - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); } - mfopt = reader().get0(); + mfopt = reader(db::no_timeout).get0(); } return nrows; });