From 2970567b3a4d0b1c83692c115ffb703a8c49f33a Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 4 Feb 2025 11:06:23 +0300 Subject: [PATCH] streaming: Relax streaming::make_streamig_consumer() view builder arg Two callers of it -- repair and stream-manager -- both have non-sharded reference and can just use it as argument. The helper in question gets sharded<> one by itself. Signed-off-by: Pavel Emelyanov --- repair/row_level.cc | 3 +-- streaming/consumer.cc | 4 ++-- streaming/consumer.hh | 2 +- streaming/stream_session.cc | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index fdaec103c2..7ddc5649b4 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -62,7 +62,6 @@ #include "repair/reader.hh" #include "compaction/compaction_manager.hh" #include "utils/xx_hasher.hh" -#include "db/view/view_builder.hh" extern logging::logger rlogger; @@ -495,7 +494,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { auto erm = t.get_effective_replication_map(); auto& sharder = erm->get_sharder(*(w->schema())); _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader), - streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder.container(), w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), + streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), t.stream_in_progress()).then([w, erm] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index f49e4dfac0..4ec2c4736e 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -20,12 +20,12 @@ namespace streaming { reader_consumer_v2 make_streaming_consumer(sstring origin, sharded& db, - sharded& vb, + db::view::view_builder& vb, uint64_t estimated_partitions, stream_reason reason, sstables::offstrategy offstrategy, service::frozen_topology_guard frozen_guard) { - return [&db, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (mutation_reader reader) -> future<> { + return [&db, &vb = vb.container(), estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (mutation_reader reader) -> future<> { std::exception_ptr ex; try { if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) { diff --git a/streaming/consumer.hh b/streaming/consumer.hh index dabdf3aad1..40a61eae24 100644 --- a/streaming/consumer.hh +++ b/streaming/consumer.hh @@ -26,7 +26,7 @@ namespace streaming { reader_consumer_v2 make_streaming_consumer(sstring origin, sharded& db, - sharded& vb, + db::view::view_builder& vb, uint64_t estimated_partitions, stream_reason reason, sstables::offstrategy offstrategy, diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index ab96da5315..abd3c3d332 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -86,7 +86,7 @@ public: reader_consumer_v2 stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) { - return streaming::make_streaming_consumer("streaming", _db, _view_builder.container(), estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); + return streaming::make_streaming_consumer("streaming", _db, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); } void stream_manager::init_messaging_service_handler(abort_source& as) {