Merge 'Put streaming sched group onto stream manager' from Pavel Emelyanov
The manager is in charge of updating IO bandwidth on the respective prio class. Nowadays it uses global priority-manager, but unifying sched classes effort will require it to use non-global streaming sched group. After the patch the sched class field is unused, but it's a preparation towards huge (really huge) "switch to seastar API level 7" patch ref: #13963 Closes #13997 * github.com:scylladb/scylladb: stream_manager: Add streaming sched group copy cql_test_env: Move sched groups initialization up
This commit is contained in:
2
main.cc
2
main.cc
@@ -1461,7 +1461,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
debug::the_stream_manager = &stream_manager;
|
||||
supervisor::notify("starting streaming service");
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(mm), std::ref(gossiper)).get();
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
|
||||
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
|
||||
// FIXME -- keep the instances alive, just call .stop on them
|
||||
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
|
||||
|
||||
@@ -29,13 +29,14 @@ stream_manager::stream_manager(db::config& cfg,
|
||||
sharded<db::view::view_update_generator>& view_update_generator,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
sharded<service::migration_manager>& mm,
|
||||
gms::gossiper& gossiper)
|
||||
gms::gossiper& gossiper, scheduling_group sg)
|
||||
: _db(db)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _view_update_generator(view_update_generator)
|
||||
, _ms(ms)
|
||||
, _mm(mm)
|
||||
, _gossiper(gossiper)
|
||||
, _streaming_group(std::move(sg))
|
||||
, _io_throughput_mbs(cfg.stream_io_throughput_mb_per_sec)
|
||||
{
|
||||
namespace sm = seastar::metrics;
|
||||
|
||||
@@ -93,6 +93,7 @@ private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
std::unordered_map<streaming::stream_reason, float> _finished_percentage;
|
||||
|
||||
scheduling_group _streaming_group;
|
||||
utils::updateable_value<uint32_t> _io_throughput_mbs;
|
||||
serialized_action _io_throughput_updater = serialized_action([this] { return update_io_throughput(_io_throughput_mbs()); });
|
||||
std::optional<utils::observer<uint32_t>> _io_throughput_option_observer;
|
||||
@@ -103,7 +104,7 @@ public:
|
||||
sharded<db::view::view_update_generator>& view_update_generator,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
sharded<service::migration_manager>& mm,
|
||||
gms::gossiper& gossiper);
|
||||
gms::gossiper& gossiper, scheduling_group sg);
|
||||
|
||||
future<> start(abort_source& as);
|
||||
future<> stop();
|
||||
|
||||
@@ -554,6 +554,8 @@ public:
|
||||
cfg->max_memory_for_unlimited_query_hard_limit.set(uint64_t(query::result_memory_limiter::unlimited_result_size));
|
||||
}
|
||||
|
||||
auto scheduling_groups = get_scheduling_groups().get();
|
||||
|
||||
sharded<cql3::query_processor> qp;
|
||||
sharded<gms::feature_service> feature_service;
|
||||
sharded<netw::messaging_service> ms;
|
||||
@@ -691,7 +693,7 @@ public:
|
||||
std::ref(raft_address_map), std::ref(ms), std::ref(gossiper), std::ref(fd)).get();
|
||||
auto stop_raft_gr = deferred_stop(raft_gr);
|
||||
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(ms), std::ref(mm), std::ref(gossiper)).get();
|
||||
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(ms), std::ref(mm), std::ref(gossiper), scheduling_groups.streaming_scheduling_group).get();
|
||||
auto stop_streaming = defer([&stream_manager] { stream_manager.stop().get(); });
|
||||
|
||||
sharded<sstables::directory_semaphore> sst_dir_semaphore;
|
||||
@@ -707,7 +709,6 @@ public:
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
}
|
||||
|
||||
auto scheduling_groups = get_scheduling_groups().get();
|
||||
dbcfg.compaction_scheduling_group = scheduling_groups.compaction_scheduling_group;
|
||||
dbcfg.memory_compaction_scheduling_group = scheduling_groups.memory_compaction_scheduling_group;
|
||||
dbcfg.streaming_scheduling_group = scheduling_groups.streaming_scheduling_group;
|
||||
|
||||
Reference in New Issue
Block a user