streaming: Relax streaming::make_streamig_consumer() view builder arg
Two callers of it -- repair and stream-manager -- both have non-sharded reference and can just use it as argument. The helper in question gets sharded<> one by itself. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -62,7 +62,6 @@
|
||||
#include "repair/reader.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
#include "utils/xx_hasher.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
|
||||
extern logging::logger rlogger;
|
||||
|
||||
@@ -495,7 +494,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
|
||||
auto erm = t.get_effective_replication_map();
|
||||
auto& sharder = erm->get_sharder(*(w->schema()));
|
||||
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder.container(), w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
|
||||
t.stream_in_progress()).then([w, erm] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
|
||||
|
||||
@@ -20,12 +20,12 @@ namespace streaming {
|
||||
|
||||
reader_consumer_v2 make_streaming_consumer(sstring origin,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
db::view::view_builder& vb,
|
||||
uint64_t estimated_partitions,
|
||||
stream_reason reason,
|
||||
sstables::offstrategy offstrategy,
|
||||
service::frozen_topology_guard frozen_guard) {
|
||||
return [&db, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (mutation_reader reader) -> future<> {
|
||||
return [&db, &vb = vb.container(), estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (mutation_reader reader) -> future<> {
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) {
|
||||
|
||||
@@ -26,7 +26,7 @@ namespace streaming {
|
||||
|
||||
reader_consumer_v2 make_streaming_consumer(sstring origin,
|
||||
sharded<replica::database>& db,
|
||||
sharded<db::view::view_builder>& vb,
|
||||
db::view::view_builder& vb,
|
||||
uint64_t estimated_partitions,
|
||||
stream_reason reason,
|
||||
sstables::offstrategy offstrategy,
|
||||
|
||||
@@ -86,7 +86,7 @@ public:
|
||||
|
||||
reader_consumer_v2
|
||||
stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) {
|
||||
return streaming::make_streaming_consumer("streaming", _db, _view_builder.container(), estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard);
|
||||
return streaming::make_streaming_consumer("streaming", _db, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard);
|
||||
}
|
||||
|
||||
void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
|
||||
Reference in New Issue
Block a user