Merge 'Fix view-builder vs (repair and streaming) initialization order' from Pavel Emelyanov

Both, repair and streaming depend on view builder, but since the builder is started too late, both keep sharded<> reference on it and apply `if (view_builder.local_is_initialized())` safety checks.

However, view builder can do its sharded start much earlier, there's currently nothing that prevents it from doing so. This PR moves view builder start up together with some other of its dependencies, and relaxes the way repair and streaming use their view-builder references, in particular -- removes those ugly initialization checks.

refs: scylladb/scylladb#2737

Closes scylladb/scylladb#22676

* github.com:scylladb/scylladb:
  streaming: Relax streaming::make_streamig_consumer() view builder arg
  streaming: Keep non-sharded view_builder dependency reference
  streaming: Remove view_builder.local_is_initialized() checks
  repair: Keep non-sharded view_builder dependency reference
  repair: Remove view_builder.local_is_initialized() checks
  main: Start sharded<view_builder> earlier
  test/cql_env: Move stream manager start lower
This commit is contained in:
Piotr Dulikowski
2025-02-17 10:03:28 +01:00
9 changed files with 45 additions and 55 deletions

34
main.cc
View File

@@ -1699,6 +1699,23 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
co_await utils::announce_dict_to_shards(compressor_tracker, std::move(dict));
};
sys_dist_ks.start(std::ref(qp), std::ref(mm), std::ref(proxy)).get();
auto stop_sdks = defer_verbose_shutdown("system distributed keyspace", [] {
sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
});
supervisor::notify("starting view update generator");
view_update_generator.start(std::ref(db), std::ref(proxy), std::ref(stop_signal.as_sharded_abort_source())).get();
auto stop_view_update_generator = defer_verbose_shutdown("view update generator", [] {
view_update_generator.stop().get();
});
supervisor::notify("starting the view builder");
view_builder.start(std::ref(db), std::ref(sys_ks), std::ref(sys_dist_ks), std::ref(mm_notifier), std::ref(view_update_generator), std::ref(group0_client), std::ref(qp)).get();
auto stop_view_builder = defer_verbose_shutdown("view builder", [cfg] {
view_builder.stop().get();
});
supervisor::notify("starting repair service");
auto max_memory_repair = memory::stats().total_memory() * 0.1;
repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
@@ -1817,23 +1834,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
supervisor::notify("loading non-system sstables");
replica::distributed_loader::init_non_system_keyspaces(db, proxy, sys_ks).get();
sys_dist_ks.start(std::ref(qp), std::ref(mm), std::ref(proxy)).get();
auto stop_sdks = defer_verbose_shutdown("system distributed keyspace", [] {
sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::stop).get();
});
supervisor::notify("starting view update generator");
view_update_generator.start(std::ref(db), std::ref(proxy), std::ref(stop_signal.as_sharded_abort_source())).get();
auto stop_view_update_generator = defer_verbose_shutdown("view update generator", [] {
view_update_generator.stop().get();
});
supervisor::notify("starting the view builder");
view_builder.start(std::ref(db), std::ref(sys_ks), std::ref(sys_dist_ks), std::ref(mm_notifier), std::ref(view_update_generator), std::ref(group0_client), std::ref(qp)).get();
auto stop_view_builder = defer_verbose_shutdown("view builder", [cfg] {
view_builder.stop().get();
});
supervisor::notify("starting commit log");
auto cl = db.local().commitlog();

View File

@@ -401,7 +401,7 @@ class repair_writer_impl : public repair_writer::impl {
std::optional<future<>> _writer_done;
mutation_fragment_queue _mq;
sharded<replica::database>& _db;
sharded<db::view::view_builder>& _view_builder;
db::view::view_builder& _view_builder;
streaming::stream_reason _reason;
mutation_reader _queue_reader;
public:
@@ -409,7 +409,7 @@ public:
schema_ptr schema,
reader_permit permit,
sharded<replica::database>& db,
sharded<db::view::view_builder>& view_builder,
db::view::view_builder& view_builder,
streaming::stream_reason reason,
mutation_fragment_queue queue,
mutation_reader queue_reader)
@@ -511,7 +511,7 @@ lw_shared_ptr<repair_writer> make_repair_writer(
reader_permit permit,
streaming::stream_reason reason,
sharded<replica::database>& db,
sharded<db::view::view_builder>& view_builder) {
db::view::view_builder& view_builder) {
auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit);
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader));
@@ -2519,10 +2519,6 @@ future<> repair_service::init_ms_handlers() {
auto from_id = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from_id, src_cpu_id, repair_meta_id, ks_name, cf_name,
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable {
if (!local_repair._view_builder.local_is_initialized()) {
return make_exception_future<repair_row_level_start_response>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
local_repair.my_host_id())));
}
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
const gc_clock::time_point ct = compaction_time ? *compaction_time : gc_clock::now();
return repair_meta::repair_row_level_start_handler(local_repair, from_id, src_cpu_id, repair_meta_id, std::move(ks_name),
@@ -2676,9 +2672,6 @@ public:
, _start_time(start_time)
, _is_tablet(_shard_task.db.local().find_column_family(_table_id).uses_tablets())
{
if (!_shard_task.rs.get_view_builder().local_is_initialized()) {
throw std::runtime_error(format("Node {} is not fully initialized for repair, try again later", shard_task.rs.my_host_id()));
}
repair_neighbors r_neighbors = _shard_task.get_repair_neighbors(_range);
auto& map = r_neighbors.shard_map;
for (auto& n : _all_live_peer_nodes) {
@@ -3252,7 +3245,7 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
sharded<service::storage_proxy>& sp,
sharded<db::batchlog_manager>& bm,
sharded<db::system_keyspace>& sys_ks,
sharded<db::view::view_builder>& vb,
db::view::view_builder& vb,
tasks::task_manager& tm,
service::migration_manager& mm,
size_t max_repair_memory)

View File

@@ -95,7 +95,7 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
sharded<service::storage_proxy>& _sp;
sharded<db::batchlog_manager>& _bm;
sharded<db::system_keyspace>& _sys_ks;
sharded<db::view::view_builder>& _view_builder;
db::view::view_builder& _view_builder;
shared_ptr<repair::task_manager_module> _repair_module;
service::migration_manager& _mm;
node_ops_metrics _node_ops_metrics;
@@ -132,7 +132,7 @@ public:
sharded<service::storage_proxy>& sp,
sharded<db::batchlog_manager>& bm,
sharded<db::system_keyspace>& sys_ks,
sharded<db::view::view_builder>& vb,
db::view::view_builder& vb,
tasks::task_manager& tm,
service::migration_manager& mm, size_t max_repair_memory);
~repair_service();
@@ -190,7 +190,7 @@ public:
netw::messaging_service& get_messaging() noexcept { return _messaging; }
sharded<replica::database>& get_db() noexcept { return _db; }
service::migration_manager& get_migration_manager() noexcept { return _mm; }
sharded<db::view::view_builder>& get_view_builder() noexcept { return _view_builder; }
db::view::view_builder& get_view_builder() noexcept { return _view_builder; }
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
size_t max_repair_memory() const { return _max_repair_memory; }
seastar::semaphore& memory_sem() { return _memory_sem; }

View File

@@ -20,12 +20,12 @@ namespace streaming {
reader_consumer_v2 make_streaming_consumer(sstring origin,
sharded<replica::database>& db,
sharded<db::view::view_builder>& vb,
db::view::view_builder& vb,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy,
service::frozen_topology_guard frozen_guard) {
return [&db, &vb, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (mutation_reader reader) -> future<> {
return [&db, &vb = vb.container(), estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (mutation_reader reader) -> future<> {
std::exception_ptr ex;
try {
if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) {

View File

@@ -26,7 +26,7 @@ namespace streaming {
reader_consumer_v2 make_streaming_consumer(sstring origin,
sharded<replica::database>& db,
sharded<db::view::view_builder>& vb,
db::view::view_builder& vb,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy,

View File

@@ -25,7 +25,7 @@ extern logging::logger sslog;
stream_manager::stream_manager(db::config& cfg,
sharded<replica::database>& db,
sharded<db::view::view_builder>& view_builder,
db::view::view_builder& view_builder,
sharded<netw::messaging_service>& ms,
sharded<service::migration_manager>& mm,
gms::gossiper& gossiper, scheduling_group sg)

View File

@@ -83,7 +83,7 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en
*/
private:
sharded<replica::database>& _db;
sharded<db::view::view_builder>& _view_builder;
db::view::view_builder& _view_builder;
sharded<netw::messaging_service>& _ms;
sharded<service::migration_manager>& _mm;
gms::gossiper& _gossiper;
@@ -104,7 +104,7 @@ private:
public:
stream_manager(db::config& cfg, sharded<replica::database>& db,
sharded<db::view::view_builder>& view_builder,
db::view::view_builder& view_builder,
sharded<netw::messaging_service>& ms,
sharded<service::migration_manager>& mm,
gms::gossiper& gossiper, scheduling_group sg);

View File

@@ -127,10 +127,6 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);
sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session);
if (!_view_builder.local_is_initialized()) {
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
_db.local().get_token_metadata().get_topology().my_address())));
}
return _mm.local().get_schema_for_write(schema_id, src, cpu_id, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, topo_guard, &as] (schema_ptr s) mutable {
auto permit = _db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(s, "stream-session", db::no_timeout, {});
struct stream_mutation_fragments_cmd_status {

View File

@@ -844,9 +844,6 @@ private:
std::ref(_ms), std::ref(_fd)).get();
auto stop_raft_gr = deferred_stop(_group0_registry);
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
_feature_service.invoke_on_all([] (auto& fs) {
return fs.enable(fs.supported_feature_set());
}).get();
@@ -876,6 +873,18 @@ private:
auto compression_dict_updated_callback = [] { return make_ready_future<>(); };
_sys_dist_ks.start(std::ref(_qp), std::ref(_mm), std::ref(_proxy)).get();
_view_update_generator.start(std::ref(_db), std::ref(_proxy), std::ref(abort_sources)).get();
auto stop_view_update_generator = defer_verbose_shutdown("view update generator", [this] {
_view_update_generator.stop().get();
});
_view_builder.start(std::ref(_db), std::ref(_sys_ks), std::ref(_sys_dist_ks), std::ref(_mnotifier), std::ref(_view_update_generator), std::ref(group0_client), std::ref(_qp)).get();
auto stop_view_builder = defer_verbose_shutdown("view builder", [this] {
_view_builder.stop().get();
});
_ss.start(std::ref(abort_sources), std::ref(_db),
std::ref(_gossiper),
std::ref(_sys_ks),
@@ -963,18 +972,7 @@ private:
_group0_registry.invoke_on_all(&service::raft_group_registry::drain_on_shutdown).get();
});
_view_update_generator.start(std::ref(_db), std::ref(_proxy), std::ref(abort_sources)).get();
_view_update_generator.invoke_on_all(&db::view::view_update_generator::start).get();
auto stop_view_update_generator = defer_verbose_shutdown("view update generator", [this] {
_view_update_generator.stop().get();
});
_sys_dist_ks.start(std::ref(_qp), std::ref(_mm), std::ref(_proxy)).get();
_view_builder.start(std::ref(_db), std::ref(_sys_ks), std::ref(_sys_dist_ks), std::ref(_mnotifier), std::ref(_view_update_generator), std::ref(group0_client), std::ref(_qp)).get();
auto stop_view_builder = defer_verbose_shutdown("view builder", [this] {
_view_builder.stop().get();
});
if (cfg_in.need_remote_proxy) {
_proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(_ms), std::ref(_gossiper), std::ref(_mm), std::ref(_sys_ks), std::ref(group0_client), std::ref(_topology_state_machine)).get();
@@ -985,6 +983,9 @@ private:
}
});
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
_sl_controller.invoke_on_all([this, &group0_client] (qos::service_level_controller& service) {
qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor =
::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(