diff --git a/db/view/view.cc b/db/view/view.cc index 122d956e6c..b27dde3002 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -3353,19 +3353,67 @@ future<> view_builder::register_staging_sstable(sstables::shared_sstable sst, lw return _vug.register_staging_sstable(std::move(sst), std::move(table)); } -future check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t, streaming::stream_reason reason) { +future check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t, streaming::stream_reason reason) { if (is_internal_keyspace(t.schema()->ks_name())) { - return make_ready_future(false); + co_return sstable_destination_decision::normal_directory; } - if (reason == streaming::stream_reason::repair && !t.views().empty()) { - return make_ready_future(true); - } - return do_with(std::move(tmptr), t.views(), [&vb] (locator::token_metadata_ptr& tmptr, auto& views) { - return map_reduce(views, + + if (vb.get_db().find_keyspace(t.schema()->ks_name()).uses_tablets()) { + // views are managed by view building coordinator + if (t.views().empty()) { + co_return sstable_destination_decision::normal_directory; + } + + auto build_status_map = co_await vb.get_sys_ks().get_view_build_status_map(); + auto views_names = t.views() + | std::views::transform([] (const view_ptr& v) { return std::make_pair(v->ks_name(), v->cf_name()); }) + | std::ranges::to(); + + bool any_started_building = false; + bool any_started = false; + bool all_success = true; + for (auto& [view_name, statuses]: build_status_map) { + if (!views_names.contains(view_name)) { + continue; + } + + any_started_building = true; + any_started = any_started || std::ranges::any_of(statuses | std::views::values, [] (const build_status& status) { + return status == build_status::STARTED; + }); + all_success = all_success && std::ranges::all_of(statuses | std::views::values, [] (const build_status& status) { + return status == build_status::SUCCESS; + }); + } + + if (!any_started_building) { + // If all of the views didn't start building yet (and none of them is finished building) + // the sstable can go to normal directory and no view update will be lost + co_return sstable_destination_decision::normal_directory; + } else if (any_started) { + // If any of the views started building and didn't finished yet, we need to use view building + // coordinator to schedule staging sstables processing to control if replica is not in a pending state. + co_return sstable_destination_decision::staging_managed_by_vbc; + } else if (all_success) { + // If all of the views were built, the sstable can be registered to view update generator directly (in case of `stream_reason::repair`). + co_return reason == streaming::stream_reason::repair ? sstable_destination_decision::staging_directly_to_generator : sstable_destination_decision::normal_directory; + } + // This should be unreachable, reaching this point would mean that, + // any of the views started building, none of the status is `STARTED` and not all statuses are `SUCCESS`. + // Since there are only 2 statuses: STARTED and SUCCESS, this is unreachable. + on_internal_error(vlogger, fmt::format("check_needs_view_update_path() reached unreachable branch. any_started_building: {} | any_started: {} | all_success: {}", any_started_building, any_started, all_success)); + } else { + // views are managed by view builder + if (reason == streaming::stream_reason::repair && !t.views().empty()) { + co_return sstable_destination_decision::staging_directly_to_generator; + } + + auto any_view_build_ongoing = co_await map_reduce(t.views(), [&] (const view_ptr& view) { return vb.check_view_build_ongoing(*tmptr, view->ks_name(), view->cf_name()); }, false, std::logical_or()); - }); + co_return any_view_build_ongoing ? sstable_destination_decision::staging_directly_to_generator : sstable_destination_decision::normal_directory; + } } void view_updating_consumer::do_flush_buffer() { diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index a04d233fa5..f2444e8c17 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -195,6 +195,7 @@ public: static constexpr size_t batch_memory_max = 1024*1024; replica::database& get_db() noexcept { return _db; } + db::system_keyspace& get_sys_ks() noexcept { return _sys_ks; } public: view_builder(replica::database&, db::system_keyspace&, db::system_distributed_keyspace&, service::migration_notifier&, view_update_generator& vug, diff --git a/db/view/view_update_checks.hh b/db/view/view_update_checks.hh index 8b3031e3b6..cd5503720b 100644 --- a/db/view/view_update_checks.hh +++ b/db/view/view_update_checks.hh @@ -20,7 +20,13 @@ class table; namespace db::view { class view_builder; -future check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t, +enum class sstable_destination_decision { + normal_directory, // use normal sstable directory + staging_directly_to_generator, // use staging directory and create view building task for the sstable + staging_managed_by_vbc // use staging directory and register the sstable to view update generator +}; + +future check_needs_view_update_path(view_builder& vb, locator::token_metadata_ptr tmptr, const replica::table& t, streaming::stream_reason reason); } diff --git a/main.cc b/main.cc index d66d0f52b9..27c85fd1ce 100644 --- a/main.cc +++ b/main.cc @@ -1767,7 +1767,7 @@ sharded token_metadata; checkpoint(stop_signal, "starting repair service"); auto max_memory_repair = memory::stats().total_memory() * 0.1; - repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); + repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] { repair.stop().get(); }); @@ -1781,7 +1781,7 @@ sharded token_metadata; debug::the_stream_manager = &stream_manager; checkpoint(stop_signal, "starting streaming service"); - stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); + stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(view_building_worker), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] { // FIXME -- keep the instances alive, just call .stop on them stream_manager.invoke_on_all(&streaming::stream_manager::stop).get(); @@ -2136,7 +2136,7 @@ sharded token_metadata; }); checkpoint(stop_signal, "starting sstables loader"); - sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get(); + sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get(); auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { sst_loader.stop().get(); }); diff --git a/repair/row_level.cc b/repair/row_level.cc index 62698d0be9..a10cbd9dc5 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -10,6 +10,7 @@ #include #include #include "dht/auto_refreshing_sharder.hh" +#include "db/view/view_building_worker.hh" #include "gms/endpoint_state.hh" #include "repair/repair.hh" #include "message/messaging_service.hh" @@ -419,6 +420,7 @@ class repair_writer_impl : public repair_writer::impl { mutation_fragment_queue _mq; sharded& _db; db::view::view_builder& _view_builder; + sharded& _view_building_worker; streaming::stream_reason _reason; mutation_reader _queue_reader; service::frozen_topology_guard _topo_guard; @@ -429,6 +431,7 @@ public: reader_permit permit, sharded& db, db::view::view_builder& view_builder, + sharded& view_building_worker, streaming::stream_reason reason, mutation_fragment_queue queue, mutation_reader queue_reader, @@ -439,6 +442,7 @@ public: , _mq(std::move(queue)) , _db(db) , _view_builder(view_builder) + , _view_building_worker(view_building_worker) , _reason(reason) , _queue_reader(std::move(queue_reader)) , _topo_guard(topo_guard) @@ -546,7 +550,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions()); auto sharder = get_sharder_helper(t, *(w->schema()), _topo_guard); _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder.sharder, std::move(_queue_reader), - streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), + streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, _view_building_worker, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), _topo_guard, _repaired_at, w->get_sstable_list_to_mark_as_repaired()), t.stream_in_progress()).then([w] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", @@ -567,10 +571,11 @@ lw_shared_ptr make_repair_writer( streaming::stream_reason reason, sharded& db, db::view::view_builder& view_builder, + sharded& view_building_worker, service::frozen_topology_guard topo_guard) { auto [queue_reader, queue_handle] = make_queue_reader(schema, permit); auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle)); - auto i = std::make_unique(schema, repaired_at, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader), topo_guard); + auto i = std::make_unique(schema, repaired_at, permit, db, view_builder, view_building_worker, reason, std::move(queue), std::move(queue_reader), topo_guard); return make_lw_shared(schema, permit, std::move(i)); } @@ -877,7 +882,7 @@ public: , _same_sharding_config(is_same_sharding_config(cf)) , _nr_peer_nodes(nr_peer_nodes) , _repaired_at(repaired_at) - , _repair_writer(make_repair_writer(_schema, _repaired_at, _permit, _reason, _db, rs.get_view_builder(), topo_guard)) + , _repair_writer(make_repair_writer(_schema, _repaired_at, _permit, _reason, _db, rs.get_view_builder(), rs.get_view_building_worker(), topo_guard)) , _gate(format("repair_meta[{}]", repair_meta_id)) , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, [&rs] (uint32_t repair_meta_id, std::optional dst_cpu_id_opt, locator::host_id addr) { @@ -3505,6 +3510,7 @@ repair_service::repair_service(sharded& tsm, sharded& bm, sharded& sys_ks, db::view::view_builder& vb, + sharded& vbw, tasks::task_manager& tm, service::migration_manager& mm, size_t max_repair_memory) @@ -3516,6 +3522,7 @@ repair_service::repair_service(sharded& tsm, , _bm(bm) , _sys_ks(sys_ks) , _view_builder(vb) + , _view_building_worker(vbw) , _repair_module(seastar::make_shared(tm, *this, max_repair_memory)) , _mm(mm) , _node_ops_metrics(_repair_module) diff --git a/repair/row_level.hh b/repair/row_level.hh index be7976385c..ccab8b1b2f 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -38,6 +38,10 @@ class system_keyspace; class system_distributed_keyspace; class batchlog_manager; +namespace view { +class view_building_worker; +} + } namespace gms { @@ -103,6 +107,7 @@ class repair_service : public seastar::peering_sharded_service { sharded& _bm; sharded& _sys_ks; db::view::view_builder& _view_builder; + sharded& _view_building_worker; shared_ptr _repair_module; service::migration_manager& _mm; node_ops_metrics _node_ops_metrics; @@ -143,6 +148,7 @@ public: sharded& bm, sharded& sys_ks, db::view::view_builder& vb, + sharded& vbw, tasks::task_manager& tm, service::migration_manager& mm, size_t max_repair_memory); ~repair_service(); @@ -200,6 +206,7 @@ public: sharded& get_db() noexcept { return _db; } service::migration_manager& get_migration_manager() noexcept { return _mm; } db::view::view_builder& get_view_builder() noexcept { return _view_builder; } + sharded& get_view_building_worker() noexcept { return _view_building_worker; } gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); } size_t max_repair_memory() const { return _max_repair_memory; } seastar::semaphore& memory_sem() { return _memory_sem; } diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 3dd5fe3e5d..bf77dcc183 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -6,6 +6,8 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ +#include "db/view/view_building_worker.hh" +#include "sstables/shared_sstable.hh" #include "utils/assert.hh" #include #include @@ -123,15 +125,15 @@ distributed_loader::reshape(sharded& dir, sharded distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded& db, - sharded& vb, bool needs_view_update, sstring ks, sstring cf) { + sharded& vb, sharded& vbw, db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf) { auto& table = db.local().find_column_family(ks, cf); auto new_sstables = std::vector(); co_await dir.do_for_each_sstable([&table, needs_view_update, &new_sstables] (sstables::shared_sstable sst) -> future<> { auto gen = table.calculate_generation_for_new_table(); - dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), needs_view_update ? "staging" : "base", gen); - co_await sst->pick_up_from_upload(!needs_view_update ? sstables::sstable_state::normal : sstables::sstable_state::staging, gen); + dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), needs_view_update == db::view::sstable_destination_decision::normal_directory ? "base" : "staging", gen); + co_await sst->pick_up_from_upload(needs_view_update == db::view::sstable_destination_decision::normal_directory ? sstables::sstable_state::normal : sstables::sstable_state::staging, gen); // When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels. sst->set_sstable_level(0); new_sstables.push_back(std::move(sst)); @@ -147,23 +149,27 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh abort(); }); - co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> { - if (sst->requires_view_building()) { - co_await vb.local().register_staging_sstable(sst, table.shared_from_this()); - } - }); + if (needs_view_update == db::view::sstable_destination_decision::staging_managed_by_vbc) { + co_await vbw.local().register_staging_sstable_tasks(new_sstables, table.shared_from_this()); + } else if (needs_view_update == db::view::sstable_destination_decision::staging_directly_to_generator) { + co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> { + if (sst->requires_view_building()) { + co_await vb.local().register_staging_sstable(sst, table.shared_from_this()); + } + }); + } co_return new_sstables.size(); } future<> -distributed_loader::process_upload_dir(distributed& db, sharded& vb, sstring ks, sstring cf, bool skip_cleanup, bool skip_reshape) { +distributed_loader::process_upload_dir(distributed& db, sharded& vb, sharded& vbw, sstring ks, sstring cf, bool skip_cleanup, bool skip_reshape) { const auto& rs = db.local().find_keyspace(ks).get_replication_strategy(); if (rs.is_per_table()) { on_internal_error(dblog, "process_upload_dir is not supported with tablets"); } - return seastar::async([&db, &vb, ks = std::move(ks), cf = std::move(cf), skip_cleanup, skip_reshape] { + return seastar::async([&db, &vb, &vbw, ks = std::move(ks), cf = std::move(cf), skip_cleanup, skip_reshape] { auto global_table = get_table_on_all_shards(db, ks, cf).get(); sharded directory; @@ -208,10 +214,10 @@ distributed_loader::process_upload_dir(distributed& db, shard } // Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions. - const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get(); + const auto use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get(); - size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb] (sstables::sstable_directory& dir) { - return make_sstables_available(dir, db, vb, use_view_update_path, ks, cf); + size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb, &vbw] (sstables::sstable_directory& dir) { + return make_sstables_available(dir, db, vb, vbw, use_view_update_path, ks, cf); }, size_t(0), std::plus()).get(); dblog.info("Loaded {} SSTables", loaded); diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index 274e57d0bf..76e565ea72 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -33,7 +33,9 @@ namespace db { class config; class system_keyspace; namespace view { +enum class sstable_destination_decision; class view_builder; +class view_building_worker; } } @@ -72,8 +74,8 @@ class distributed_loader { static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(global_table_ptr&, sharded& dir); static future make_sstables_available(sstables::sstable_directory& dir, - sharded& db, sharded& vb, - bool needs_view_update, sstring ks, sstring cf); + sharded& db, sharded& vb, sharded& vbw, + db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf); static future<> populate_keyspace(distributed& db, sharded& sys_ks, keyspace& ks, sstring ks_name); static future>>> get_sstables_from(distributed& db, sstring ks, sstring cf, sstables::sstable_open_config cfg, @@ -91,7 +93,7 @@ public: get_sstables_from_upload_dir(distributed& db, sstring ks, sstring cf, sstables::sstable_open_config cfg); static future>>> get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, std::vector sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function = {}); - static future<> process_upload_dir(distributed& db, sharded& vb, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape); + static future<> process_upload_dir(distributed& db, sharded& vb, sharded& vbw, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape); }; } diff --git a/sstables_loader.cc b/sstables_loader.cc index 0d5a1b4e76..2d05da36b9 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -582,7 +582,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name, co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true, scope, {}); }); } else { - co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name, skip_cleanup, skip_reshape); + co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, _view_building_worker, ks_name, cf_name, skip_cleanup, skip_reshape); } } catch (...) { llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}", @@ -751,12 +751,14 @@ future<> sstables_loader::download_task_impl::run() { sstables_loader::sstables_loader(sharded& db, netw::messaging_service& messaging, sharded& vb, + sharded& vbw, tasks::task_manager& tm, sstables::storage_manager& sstm, seastar::scheduling_group sg) : _db(db) , _messaging(messaging) , _view_builder(vb) + , _view_building_worker(vbw) , _task_manager_module(make_shared(tm)) , _storage_manager(sstm) , _sched_group(std::move(sg)) diff --git a/sstables_loader.hh b/sstables_loader.hh index 3da68bc8bf..21513e1fd9 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -26,6 +26,7 @@ namespace netw { class messaging_service; } namespace db { namespace view { class view_builder; +class view_building_worker; } } @@ -67,6 +68,7 @@ private: sharded& _db; netw::messaging_service& _messaging; sharded& _view_builder; + sharded& _view_building_worker; shared_ptr _task_manager_module; sstables::storage_manager& _storage_manager; seastar::scheduling_group _sched_group; @@ -88,6 +90,7 @@ public: sstables_loader(sharded& db, netw::messaging_service& messaging, sharded& vb, + sharded& vbw, tasks::task_manager& tm, sstables::storage_manager& sstm, seastar::scheduling_group sg); diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 6fa576f240..accca82170 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -8,8 +8,10 @@ #include #include +#include #include "consumer.hh" +#include "db/view/view_building_worker.hh" #include "replica/database.hh" #include "mutation/mutation_source_metadata.hh" #include "db/view/view_builder.hh" @@ -22,13 +24,14 @@ namespace streaming { mutation_reader_consumer make_streaming_consumer(sstring origin, sharded& db, db::view::view_builder& vb, + sharded& vbw, uint64_t estimated_partitions, stream_reason reason, sstables::offstrategy offstrategy, service::frozen_topology_guard frozen_guard, std::optional repaired_at, lw_shared_ptr sstable_list_to_mark_as_repaired) { - return [&db, &vb = vb.container(), estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard, repaired_at, sstable_list_to_mark_as_repaired] (mutation_reader reader) -> future<> { + return [&db, &vb = vb.container(), &vbw, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard, repaired_at, sstable_list_to_mark_as_repaired] (mutation_reader reader) -> future<> { std::exception_ptr ex; try { if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) { @@ -38,7 +41,7 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); auto guard = service::topology_guard(frozen_guard); - bool use_view_update_path = co_await with_scheduling_group(db.local().get_gossip_scheduling_group(), [&] { + auto use_view_update_path = co_await with_scheduling_group(db.local().get_gossip_scheduling_group(), [&] { return db::view::check_needs_view_update_path(vb.local(), db.local().get_token_metadata_ptr(), *cf, reason); }); //FIXME: for better estimations this should be transmitted from remote @@ -48,11 +51,11 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, // means partition estimation shouldn't be adjusted. const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema()); mutation_reader_consumer consumer = - [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, origin = std::move(origin), + [cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vb, &vbw, origin = std::move(origin), offstrategy, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard] (mutation_reader reader) { sstables::shared_sstable sst; try { - sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write(); + sst = use_view_update_path == db::view::sstable_destination_decision::normal_directory ? cf->make_streaming_sstable_for_write() : cf->make_streaming_staging_sstable(); } catch (...) { return current_exception_as_future().finally([reader = std::move(reader)] () mutable { return reader.close(); @@ -77,11 +80,13 @@ mutation_reader_consumer make_streaming_consumer(sstring origin, cf->enable_off_strategy_trigger(); } co_await cf->add_sstable_and_update_cache(sst, offstrategy); - }).then([cf, s, sst, use_view_update_path, &vb]() mutable -> future<> { - if (!use_view_update_path) { - return make_ready_future<>(); + }).then([cf, s, sst, use_view_update_path, &vb, &vbw]() mutable -> future<> { + if (use_view_update_path == db::view::sstable_destination_decision::staging_managed_by_vbc) { + return vbw.local().register_staging_sstable_tasks({sst}, std::move(cf)); + } else if (use_view_update_path == db::view::sstable_destination_decision::staging_directly_to_generator) { + return vb.local().register_staging_sstable(sst, std::move(cf)); } - return vb.local().register_staging_sstable(sst, std::move(cf)); + return make_ready_future<>(); }); }; if (!offstrategy) { diff --git a/streaming/consumer.hh b/streaming/consumer.hh index 214719f5f1..2680286119 100644 --- a/streaming/consumer.hh +++ b/streaming/consumer.hh @@ -20,6 +20,7 @@ class database; namespace db { namespace view { class view_builder; +class view_building_worker; } } @@ -28,6 +29,7 @@ namespace streaming { mutation_reader_consumer make_streaming_consumer(sstring origin, sharded& db, db::view::view_builder& vb, + sharded& vbw, uint64_t estimated_partitions, stream_reason reason, sstables::offstrategy offstrategy, diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index d4377586d1..968cdcc05b 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -9,6 +9,7 @@ */ #include +#include "db/view/view_building_worker.hh" #include "gms/gossiper.hh" #include "streaming/stream_manager.hh" #include "streaming/stream_result_future.hh" @@ -27,11 +28,13 @@ extern logging::logger sslog; stream_manager::stream_manager(db::config& cfg, sharded& db, db::view::view_builder& view_builder, + sharded& view_building_worker, sharded& ms, sharded& mm, gms::gossiper& gossiper, scheduling_group sg) : _db(db) , _view_builder(view_builder) + , _view_building_worker(view_building_worker) , _ms(ms) , _mm(mm) , _gossiper(gossiper) diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index 99d0afb442..e51d11ca2d 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -29,6 +29,7 @@ namespace db { class config; namespace view { class view_builder; +class view_building_worker; } } @@ -84,6 +85,7 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en private: sharded& _db; db::view::view_builder& _view_builder; + sharded& _view_building_worker; sharded& _ms; sharded& _mm; gms::gossiper& _gossiper; @@ -105,6 +107,7 @@ private: public: stream_manager(db::config& cfg, sharded& db, db::view::view_builder& view_builder, + sharded& view_building_worker, sharded& ms, sharded& mm, gms::gossiper& gossiper, scheduling_group sg); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 072d8d5cca..653e4ed26a 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -87,7 +87,7 @@ public: mutation_reader_consumer stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_reason reason, service::frozen_topology_guard topo_guard) { - return streaming::make_streaming_consumer("streaming", _db, _view_builder, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); + return streaming::make_streaming_consumer("streaming", _db, _view_builder, _view_building_worker, estimated_partitions, reason, is_offstrategy_supported(reason), topo_guard); } void stream_manager::init_messaging_service_handler(abort_source& as) { diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index df2fa0f4ec..faaee1d3cd 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -6084,7 +6084,7 @@ SEASTAR_TEST_CASE(test_sstable_load_mixed_generation_type) { // Load sstables with mixed generation types copy_directory("test/resource/sstables/mixed_generation_type", upload_dir); - replica::distributed_loader::process_upload_dir(e.db(), e.view_builder(), "ks", "test", false, false).get(); + replica::distributed_loader::process_upload_dir(e.db(), e.view_builder(), e.view_building_worker(), "ks", "test", false, false).get(); // Verify the expected data is present assert_that(e.execute_cql("SELECT * FROM ks.test").get()).is_rows() diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index cd42209de9..4500933617 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -11,6 +11,7 @@ #include #include #include "gms/generation-number.hh" +#include "db/view/view_building_worker.hh" #include "replica/database_fwd.hh" #include "test/lib/cql_test_env.hh" #include "cdc/generation_service.hh" @@ -141,6 +142,7 @@ private: sharded _qp; sharded _auth_service; sharded _view_builder; + sharded _view_building_worker; sharded _view_update_generator; sharded _mnotifier; sharded _sl_controller; @@ -363,6 +365,10 @@ public: return _view_update_generator.local(); } + virtual distributed& view_building_worker() override { + return _view_building_worker; + } + virtual service::migration_notifier& local_mnotifier() override { return _mnotifier.local(); } @@ -922,7 +928,7 @@ private: _view_builder.stop().get(); }); - _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); + _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_view_building_worker), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); }); _ss.start(std::ref(abort_sources), std::ref(_db), @@ -1070,6 +1076,11 @@ private: group0_service.setup_group0_if_exist(_sys_ks.local(), _ss.local(), _qp.local(), _mm.local()).get(); + _view_building_worker.start(std::ref(_db), std::ref(_sys_ks), std::ref(_mnotifier), std::ref(group0_client), std::ref(_view_update_generator), std::ref(_ms), std::ref(_view_building_state_machine)).get(); + auto stop_view_building_worker = defer_verbose_shutdown("view building worker", [this] { + _view_building_worker.stop().get(); + }); + const auto generation_number = gms::generation_type(_sys_ks.local().increment_and_get_generation().get()); try { diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index db9f48572b..226c9a8bd5 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -16,6 +16,7 @@ #include #include +#include "db/view/view_building_worker.hh" #include "db/view/view_update_generator.hh" #include "service/qos/service_level_controller.hh" #include "replica/database.hh" @@ -160,6 +161,8 @@ public: virtual distributed& view_builder() = 0; virtual db::view::view_builder& local_view_builder() = 0; + virtual distributed& view_building_worker() = 0; + virtual db::view::view_update_generator& local_view_update_generator() = 0; virtual service::migration_notifier& local_mnotifier() = 0;