streaming: Maintain class bandwidth

Same as was done in b112a983 for compaction manager

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-07-16 12:21:59 +03:00
parent a246b6d3eb
commit 96d6be7daf
2 changed files with 28 additions and 0 deletions

View File

@@ -9,6 +9,7 @@
*/
#include <seastar/core/distributed.hh>
#include "service/priority_manager.hh"
#include "gms/gossiper.hh"
#include "streaming/stream_manager.hh"
#include "streaming/stream_result_future.hh"
@@ -35,9 +36,15 @@ stream_manager::stream_manager(db::config& cfg,
, _ms(ms)
, _mm(mm)
, _gossiper(gossiper)
, _io_throughput_mbs(cfg.stream_io_throughput_mb_per_sec)
{
namespace sm = seastar::metrics;
if (this_shard_id() == 0) {
_io_throughput_option_observer.emplace(_io_throughput_mbs.observe(_io_throughput_updater.make_observer()));
(void)_io_throughput_updater.trigger_later();
}
_metrics.add_group("streaming", {
sm::make_counter("total_incoming_bytes", [this] { return _total_incoming_bytes; },
sm::description("Total number of bytes received on this shard.")),
@@ -56,6 +63,20 @@ future<> stream_manager::start() {
future<> stream_manager::stop() {
co_await _gossiper.unregister_(shared_from_this());
co_await uninit_messaging_service_handler();
co_await _io_throughput_updater.join();
}
future<> stream_manager::update_io_throughput(uint32_t value_mbs) {
uint64_t bps = ((uint64_t)(value_mbs != 0 ? value_mbs : std::numeric_limits<uint32_t>::max())) << 20;
return service::get_local_streaming_priority().update_bandwidth(bps).then_wrapped([value_mbs] (auto f) {
if (f.failed()) {
sslog.warn("Couldn't update streaming bandwidth: {}", f.get_exception());
} else if (value_mbs != 0) {
sslog.info("Set streaming bandwidth to {}MB/s", value_mbs);
} else {
sslog.info("Set unlimited streaming bandwidth");
}
});
}
void stream_manager::register_sending(shared_ptr<stream_result_future> result) {

View File

@@ -13,6 +13,8 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/distributed.hh>
#include "utils/UUID.hh"
#include "utils/updateable_value.hh"
#include "utils/serialized_action.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "gms/inet_address.hh"
#include "gms/endpoint_state.hh"
@@ -99,6 +101,10 @@ private:
semaphore _mutation_send_limiter{256};
seastar::metrics::metric_groups _metrics;
utils::updateable_value<uint32_t> _io_throughput_mbs;
serialized_action _io_throughput_updater = serialized_action([this] { return update_io_throughput(_io_throughput_mbs()); });
std::optional<utils::observer<uint32_t>> _io_throughput_option_observer;
public:
stream_manager(db::config& cfg, sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
@@ -181,6 +187,7 @@ private:
void init_messaging_service_handler();
future<> uninit_messaging_service_handler();
future<> update_io_throughput(uint32_t value_mbs);
};
} // namespace streaming