streaming: make_streaming_consumer: coroutinize returned function

To simplify error handling in the next patch.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-06-08 08:47:02 +03:00
parent fa29b79c20
commit 42028c324c

View File

@@ -19,6 +19,8 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/coroutine.hh>
#include "consumer.hh"
#include "mutation_source_metadata.hh"
#include "service/priority_manager.hh"
@@ -36,9 +38,9 @@ std::function<future<> (flat_mutation_reader)> make_streaming_consumer(sstring o
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy) {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin)] (flat_mutation_reader reader) {
auto& cf = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(sys_dist_ks.local(), cf, reason).then([cf = cf.shared_from_this(), &vug, estimated_partitions, offstrategy, reason, reader = std::move(reader), origin = std::move(origin)] (bool use_view_update_path) mutable {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin)] (flat_mutation_reader reader) -> future<> {
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), *cf, reason);
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = cf->get_compaction_strategy();
@@ -67,8 +69,7 @@ std::function<future<> (flat_mutation_reader)> make_streaming_consumer(sstring o
return vug.local().register_staging_sstable(sst, std::move(cf));
});
});
return consumer(std::move(reader));
});
co_return co_await consumer(std::move(reader));
};
}