diff --git a/configure.py b/configure.py index a84f9cd683..45c7b4c7f4 100755 --- a/configure.py +++ b/configure.py @@ -938,6 +938,7 @@ scylla_core = (['database.cc', 'streaming/stream_result_future.cc', 'streaming/stream_session_state.cc', 'streaming/stream_reason.cc', + 'streaming/consumer.cc', 'clocks-impl.cc', 'partition_slice_builder.cc', 'init.cc', diff --git a/repair/row_level.cc b/repair/row_level.cc index ebb3a37092..5bdc1c3384 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -23,7 +23,6 @@ #include "message/messaging_service.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" -#include "sstables/sstable_set.hh" #include "mutation_fragment.hh" #include "mutation_writer/multishard_writer.hh" #include "dht/i_partitioner.hh" @@ -33,7 +32,6 @@ #include "utils/UUID.hh" #include "utils/hash.hh" #include "service/priority_manager.hh" -#include "db/view/view_update_checks.hh" #include "database.hh" #include #include @@ -44,13 +42,13 @@ #include #include #include -#include "../db/view/view_update_generator.hh" #include "gms/i_endpoint_state_change_subscriber.hh" #include "gms/gossiper.hh" #include "repair/row_level.hh" #include "mutation_source_metadata.hh" #include "utils/stall_free.hh" #include "service/migration_manager.hh" +#include "streaming/consumer.hh" extern logging::logger rlogger; @@ -586,41 +584,7 @@ public: _mq = std::move(queue_handle); auto writer = shared_from_this(); _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader), - [&db, &sys_dist_ks, &view_update_gen, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) { - auto& t = db.local().find_column_family(reader.schema()); - return db::view::check_needs_view_update_path(sys_dist_ks.local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader), reason, &view_update_gen] (bool use_view_update_path) mutable { - //FIXME: for better estimations this should be transmitted from remote - auto metadata = mutation_source_metadata{}; - auto& cs = t->get_compaction_strategy(); - const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions); - sstables::offstrategy offstrategy = is_offstrategy_supported(reason); - bool auto_offstrategy_trigger = offstrategy && (reason == streaming::stream_reason::repair); - auto consumer = cs.make_interposer_consumer(metadata, - [t = std::move(t), &view_update_gen, use_view_update_path, adjusted_estimated_partitions, offstrategy, auto_offstrategy_trigger] (flat_mutation_reader reader) { - sstables::shared_sstable sst = use_view_update_path ? t->make_streaming_staging_sstable() : t->make_streaming_sstable_for_write(); - schema_ptr s = reader.schema(); - auto& pc = service::get_local_streaming_priority(); - return sst->write_components(std::move(reader), adjusted_estimated_partitions, s, - t->get_sstables_manager().configure_writer("repair"), - encoding_stats{}, pc).then([sst] { - return sst->open_data(); - }).then([t, sst, offstrategy, auto_offstrategy_trigger] { - if (auto_offstrategy_trigger) { - rlogger.debug("Enabled automatic off-strategy trigger for table {}.{}", - t->schema()->ks_name(), t->schema()->cf_name()); - t->enable_off_strategy_trigger(); - } - return t->add_sstable_and_update_cache(sst, offstrategy); - }).then([t, s, sst, use_view_update_path, &view_update_gen]() mutable -> future<> { - if (!use_view_update_path) { - return make_ready_future<>(); - } - return view_update_gen.local().register_staging_sstable(sst, std::move(t)); - }); - }); - return consumer(std::move(reader)); - }); - }, + streaming::make_streaming_consumer("repair", db, sys_dist_ks, view_update_gen, _estimated_partitions, _reason, is_offstrategy_supported(_reason)), t.stream_in_progress()).then([writer] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", writer->_schema->ks_name(), writer->_schema->cf_name(), partitions); diff --git a/streaming/consumer.cc b/streaming/consumer.cc new file mode 100644 index 0000000000..21dc216e84 --- /dev/null +++ b/streaming/consumer.cc @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "consumer.hh" +#include "mutation_source_metadata.hh" +#include "service/priority_manager.hh" +#include "db/view/view_update_generator.hh" +#include "db/view/view_update_checks.hh" +#include "sstables/sstables.hh" +#include "sstables/sstables_manager.hh" + +namespace streaming { + +std::function (flat_mutation_reader)> make_streaming_consumer(sstring origin, + sharded& db, + sharded& sys_dist_ks, + sharded& vug, + uint64_t estimated_partitions, + stream_reason reason, + sstables::offstrategy offstrategy) { + return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin)] (flat_mutation_reader reader) { + auto& cf = db.local().find_column_family(reader.schema()); + return db::view::check_needs_view_update_path(sys_dist_ks.local(), cf, reason).then([cf = cf.shared_from_this(), &vug, estimated_partitions, offstrategy, reason, reader = std::move(reader), origin = std::move(origin)] (bool use_view_update_path) mutable { + //FIXME: for better estimations this should be transmitted from remote + auto metadata = mutation_source_metadata{}; + auto& cs = cf->get_compaction_strategy(); + const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions); + auto consumer = cs.make_interposer_consumer(metadata, + [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy, reason] (flat_mutation_reader reader) { + sstables::shared_sstable sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write(); + schema_ptr s = reader.schema(); + auto& pc = service::get_local_streaming_priority(); + + return sst->write_components(std::move(reader), adjusted_estimated_partitions, s, + cf->get_sstables_manager().configure_writer(origin), + encoding_stats{}, pc).then([sst] { + return sst->open_data(); + }).then([cf, sst, offstrategy, reason] { + if (offstrategy && (reason == stream_reason::repair)) { + sstables::sstlog.debug("Enabled automatic off-strategy trigger for table {}.{}", + cf->schema()->ks_name(), cf->schema()->cf_name()); + cf->enable_off_strategy_trigger(); + } + return cf->add_sstable_and_update_cache(sst, offstrategy); + }).then([cf, s, sst, use_view_update_path, &vug]() mutable -> future<> { + if (!use_view_update_path) { + return make_ready_future<>(); + } + return vug.local().register_staging_sstable(sst, std::move(cf)); + }); + }); + return consumer(std::move(reader)); + }); + }; +} + +} diff --git a/streaming/consumer.hh b/streaming/consumer.hh new file mode 100644 index 0000000000..8cc2723d3f --- /dev/null +++ b/streaming/consumer.hh @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "sstables/sstable_set.hh" +#include "streaming/stream_reason.hh" + +class database; +namespace db { +class system_distributed_keyspace; +namespace view { +class view_update_generator; +} +} + +namespace streaming { + +std::function(flat_mutation_reader)> make_streaming_consumer(sstring origin, + sharded& db, + sharded& sys_dist_ks, + sharded& vug, + uint64_t estimated_partitions, + stream_reason reason, + sstables::offstrategy offstrategy); + +} diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 5562894bb9..5bbd06a0bd 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -68,6 +68,7 @@ #include "../db/view/view_update_generator.hh" #include "mutation_source_metadata.hh" #include "streaming/stream_mutation_fragments_cmd.hh" +#include "consumer.hh" namespace streaming { @@ -181,35 +182,7 @@ void stream_session::init_messaging_service_handler(netw::messaging_service& ms, //FIXME: discarded future. (void)mutation_writer::distribute_reader_and_consume_on_shards(s, make_generating_reader(s, permit, std::move(get_next_mutation_fragment)), - [plan_id, estimated_partitions, reason] (flat_mutation_reader reader) { - auto& cf = get_local_db().find_column_family(reader.schema()); - return db::view::check_needs_view_update_path(_sys_dist_ks->local(), cf, reason).then([cf = cf.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable { - //FIXME: for better estimations this should be transmitted from remote - auto metadata = mutation_source_metadata{}; - auto& cs = cf->get_compaction_strategy(); - const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions); - auto consumer = cf->get_compaction_strategy().make_interposer_consumer(metadata, - [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path] (flat_mutation_reader reader) { - sstables::shared_sstable sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write(); - schema_ptr s = reader.schema(); - auto& pc = service::get_local_streaming_priority(); - - return sst->write_components(std::move(reader), adjusted_estimated_partitions, s, - cf->get_sstables_manager().configure_writer("streaming"), - encoding_stats{}, pc).then([sst] { - return sst->open_data(); - }).then([cf, sst] { - return cf->add_sstable_and_update_cache(sst); - }).then([cf, s, sst, use_view_update_path]() mutable -> future<> { - if (!use_view_update_path) { - return make_ready_future<>(); - } - return _view_update_generator->local().register_staging_sstable(sst, std::move(cf)); - }); - }); - return consumer(std::move(reader)); - }); - }, + make_streaming_consumer("streaming", *_db, *_sys_dist_ks, *_view_update_generator, estimated_partitions, reason, sstables::offstrategy::no), cf.stream_in_progress() ).then_wrapped([s, plan_id, from, sink, estimated_partitions] (future f) mutable { int32_t status = 0;