stream_manager: Remove system_distributed_keyspace and view_update_generator

Now all the code is happy with view_builder and can be shortened

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2024-05-16 13:07:29 +03:00
parent 84ef6a8179
commit 8906126a2c
5 changed files with 3 additions and 13 deletions

View File

@@ -1586,7 +1586,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
debug::the_stream_manager = &stream_manager;
supervisor::notify("starting streaming service");
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
// FIXME -- keep the instances alive, just call .stop on them
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();

View File

@@ -24,15 +24,11 @@ extern logging::logger sslog;
stream_manager::stream_manager(db::config& cfg,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator,
sharded<db::view::view_builder>& view_builder,
sharded<netw::messaging_service>& ms,
sharded<service::migration_manager>& mm,
gms::gossiper& gossiper, scheduling_group sg)
: _db(db)
, _sys_dist_ks(sys_dist_ks)
, _view_update_generator(view_update_generator)
, _view_builder(view_builder)
, _ms(ms)
, _mm(mm)

View File

@@ -27,9 +27,7 @@
namespace db {
class config;
class system_distributed_keyspace;
namespace view {
class view_update_generator;
class view_builder;
}
}
@@ -85,8 +83,6 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en
*/
private:
sharded<replica::database>& _db;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
sharded<db::view::view_builder>& _view_builder;
sharded<netw::messaging_service>& _ms;
sharded<service::migration_manager>& _mm;
@@ -108,8 +104,6 @@ private:
public:
stream_manager(db::config& cfg, sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator,
sharded<db::view::view_builder>& view_builder,
sharded<netw::messaging_service>& ms,
sharded<service::migration_manager>& mm,

View File

@@ -121,7 +121,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);
sslog.trace("Got stream_mutation_fragments from {} reason {}, session {}", from, int(reason), session);
if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) {
if (!_view_builder.local_is_initialized()) {
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
_db.local().get_token_metadata().get_topology().my_address())));
}

View File

@@ -726,7 +726,7 @@ private:
std::ref(_ms), std::ref(_fd)).get();
auto stop_raft_gr = deferred_stop(_group0_registry);
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer([this] { _stream_manager.stop().get(); });
_feature_service.invoke_on_all([] (auto& fs) {