diff --git a/mutation_query.cc b/mutation_query.cc index 7ae48a3d59..7fb5c7bad5 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -63,69 +63,91 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa return builder.build(); } + +querying_reader::querying_reader(schema_ptr s, + const mutation_source& source, + const query::partition_range& range, + const query::partition_slice& slice, + uint32_t row_limit, + gc_clock::time_point query_time, + std::function consumer) + : _schema(std::move(s)) + , _range(range) + , _slice(slice) + , _requested_limit(row_limit) + , _query_time(query_time) + , _limit(row_limit) + , _source(source) + , _consumer(std::move(consumer)) +{ } + +future<> querying_reader::read() { + _reader = _source(_schema, _range, service::get_local_sstable_query_read_priority()); + return consume(*_reader, [this](mutation&& m) { + // FIXME: Make data sources respect row_ranges so that we don't have to filter them out here. + auto is_distinct = _slice.options.contains(query::partition_slice::option::distinct); + auto is_reversed = _slice.options.contains(query::partition_slice::option::reversed); + auto limit = !is_distinct ? _limit : 1; + auto rows_left = m.partition().compact_for_query(*m.schema(), _query_time, + _slice.row_ranges(*m.schema(), m.key()), + is_reversed, limit); + _limit -= rows_left; + + if (rows_left || !m.partition().empty()) { + // NOTE: We must return all columns, regardless of what's in + // partition_slice, for the results to be reconcilable with tombstones. + // That's because row's presence depends on existence of any + // column in a row (See mutation_partition::query). We could + // optimize this case and only send cell timestamps, without data, + // for the cells which are not queried for (TODO). + _consumer(rows_left, std::move(m)); + } + + return _limit ? stop_iteration::no : stop_iteration::yes; + }); +} + +class reconcilable_result_builder { + querying_reader _reader; + std::vector _result; + uint32_t _total = 0; +public: + reconcilable_result_builder(schema_ptr s, + const mutation_source& source, + const query::partition_range& range, + const query::partition_slice& slice, + uint32_t row_limit, + gc_clock::time_point query_time) + : _reader(std::move(s), source, range, slice, row_limit, query_time, [this] (uint32_t live_rows, mutation&& m) { + _result.emplace_back(partition{live_rows, freeze(m)}); + _total += live_rows; + }) + { } + + reconcilable_result_builder(reconcilable_result_builder&&) = delete; // this captured + + future build() { + return _reader.read().then([this] { + return make_ready_future(reconcilable_result(_total, std::move(_result))); + }); + } +}; + future mutation_query(schema_ptr s, - const mutation_source& source, - const query::partition_range& range, - const query::partition_slice& slice, - uint32_t row_limit, - gc_clock::time_point query_time) + const mutation_source& source, + const query::partition_range& range, + const query::partition_slice& slice, + uint32_t row_limit, + gc_clock::time_point query_time) { - struct query_state { - const query::partition_range& range; - const query::partition_slice& slice; - uint32_t requested_limit; - gc_clock::time_point query_time; - uint32_t limit; - mutation_reader reader; - std::vector result; - - query_state( - const query::partition_range& range, - const query::partition_slice& slice, - uint32_t requested_limit, - gc_clock::time_point query_time - ) - : range(range) - , slice(slice) - , requested_limit(requested_limit) - , query_time(query_time) - , limit(requested_limit) - { } - }; - if (row_limit == 0) { return make_ready_future(reconcilable_result()); } - return do_with(query_state(range, slice, row_limit, query_time), - [&source, s = std::move(s)] (query_state& state) -> future { - state.reader = source(std::move(s), state.range, service::get_local_sstable_query_read_priority()); - return consume(state.reader, [&state] (mutation&& m) { - // FIXME: Make data sources respect row_ranges so that we don't have to filter them out here. - auto is_distinct = state.slice.options.contains(query::partition_slice::option::distinct); - auto is_reversed = state.slice.options.contains(query::partition_slice::option::reversed); - auto limit = !is_distinct ? state.limit : 1; - auto rows_left = m.partition().compact_for_query(*m.schema(), state.query_time, state.slice.row_ranges(*m.schema(), m.key()), - is_reversed, limit); - state.limit -= rows_left; - - if (rows_left || !m.partition().empty()) { - // NOTE: We must return all columns, regardless of what's in - // partition_slice, for the results to be reconcilable with tombstones. - // That's because row's presence depends on existence of any - // column in a row (See mutation_partition::query). We could - // optimize this case and only send cell timestamps, without data, - // for the cells which are not queried for (TODO). - state.result.emplace_back(partition{rows_left, freeze(m)}); - } - - return state.limit ? stop_iteration::no : stop_iteration::yes; - }).then([&state] { - return make_ready_future( - reconcilable_result(state.requested_limit - state.limit, std::move(state.result))); - }); - }); + auto b_ptr = std::make_unique(std::move(s), source, range, slice, row_limit, query_time); + auto& b = *b_ptr; + return b.build().finally([keep = std::move(b_ptr)] {}); } std::ostream& operator<<(std::ostream& out, const reconcilable_result::printer& pr) { diff --git a/mutation_query.hh b/mutation_query.hh index 76ce8dbc14..14401b62cc 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -114,3 +114,26 @@ future mutation_query( const query::partition_slice& slice, uint32_t row_limit, gc_clock::time_point query_time); + + +class querying_reader { + schema_ptr _schema; + const query::partition_range& _range; + const query::partition_slice& _slice; + uint32_t _requested_limit; + gc_clock::time_point _query_time; + uint32_t _limit; + const mutation_source& _source; + std::function _consumer; + std::experimental::optional _reader; +public: + querying_reader(schema_ptr s, + const mutation_source& source, + const query::partition_range& range, + const query::partition_slice& slice, + uint32_t row_limit, + gc_clock::time_point query_time, + std::function consumer); + + future<> read(); +};