The stream_manager will bookkeep the streaming bandwidth option, to subscribe on its changes it needs the config reference. It would be better if it was stream_manager::config, but currently subscription on db::config::<stuff> updates is not very shard-friendly, so we need to carry the config reference itself around. Similar trouble is there for compaction_manager. The option is passed through its own config, but the config is created on each shard by database code. Stream manager config would be created once by main code on shard 0. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
348 lines
11 KiB
C++
348 lines
11 KiB
C++
/*
|
|
*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
|
*/
|
|
|
|
#include <seastar/core/distributed.hh>
|
|
#include "gms/gossiper.hh"
|
|
#include "streaming/stream_manager.hh"
|
|
#include "streaming/stream_result_future.hh"
|
|
#include "log.hh"
|
|
#include "streaming/stream_session_state.hh"
|
|
#include <seastar/core/metrics.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include "db/config.hh"
|
|
|
|
namespace streaming {
|
|
|
|
extern logging::logger sslog;
|
|
|
|
stream_manager::stream_manager(db::config& cfg,
|
|
sharded<replica::database>& db,
|
|
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
|
sharded<db::view::view_update_generator>& view_update_generator,
|
|
sharded<netw::messaging_service>& ms,
|
|
sharded<service::migration_manager>& mm,
|
|
gms::gossiper& gossiper)
|
|
: _db(db)
|
|
, _sys_dist_ks(sys_dist_ks)
|
|
, _view_update_generator(view_update_generator)
|
|
, _ms(ms)
|
|
, _mm(mm)
|
|
, _gossiper(gossiper)
|
|
{
|
|
namespace sm = seastar::metrics;
|
|
|
|
_metrics.add_group("streaming", {
|
|
sm::make_counter("total_incoming_bytes", [this] { return _total_incoming_bytes; },
|
|
sm::description("Total number of bytes received on this shard.")),
|
|
|
|
sm::make_counter("total_outgoing_bytes", [this] { return _total_outgoing_bytes; },
|
|
sm::description("Total number of bytes sent on this shard.")),
|
|
});
|
|
}
|
|
|
|
future<> stream_manager::start() {
|
|
_gossiper.register_(shared_from_this());
|
|
init_messaging_service_handler();
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
future<> stream_manager::stop() {
|
|
co_await _gossiper.unregister_(shared_from_this());
|
|
co_await uninit_messaging_service_handler();
|
|
}
|
|
|
|
void stream_manager::register_sending(shared_ptr<stream_result_future> result) {
|
|
#if 0
|
|
result.addEventListener(notifier);
|
|
// Make sure we remove the stream on completion (whether successful or not)
|
|
result.addListener(new Runnable()
|
|
{
|
|
public void run()
|
|
{
|
|
initiatedStreams.remove(result.planId);
|
|
}
|
|
}, MoreExecutors.sameThreadExecutor());
|
|
#endif
|
|
_initiated_streams[result->plan_id] = std::move(result);
|
|
}
|
|
|
|
void stream_manager::register_receiving(shared_ptr<stream_result_future> result) {
|
|
#if 0
|
|
result->add_event_listener(notifier);
|
|
// Make sure we remove the stream on completion (whether successful or not)
|
|
result.addListener(new Runnable()
|
|
{
|
|
public void run()
|
|
{
|
|
receivingStreams.remove(result.planId);
|
|
}
|
|
}, MoreExecutors.sameThreadExecutor());
|
|
#endif
|
|
_receiving_streams[result->plan_id] = std::move(result);
|
|
}
|
|
|
|
shared_ptr<stream_result_future> stream_manager::get_sending_stream(UUID plan_id) const {
|
|
auto it = _initiated_streams.find(plan_id);
|
|
if (it != _initiated_streams.end()) {
|
|
return it->second;
|
|
}
|
|
return {};
|
|
}
|
|
|
|
shared_ptr<stream_result_future> stream_manager::get_receiving_stream(UUID plan_id) const {
|
|
auto it = _receiving_streams.find(plan_id);
|
|
if (it != _receiving_streams.end()) {
|
|
return it->second;
|
|
}
|
|
return {};
|
|
}
|
|
|
|
void stream_manager::remove_stream(UUID plan_id) {
|
|
sslog.debug("stream_manager: removing plan_id={}", plan_id);
|
|
_initiated_streams.erase(plan_id);
|
|
_receiving_streams.erase(plan_id);
|
|
// FIXME: Do not ignore the future
|
|
(void)remove_progress_on_all_shards(plan_id).handle_exception([plan_id] (auto ep) {
|
|
sslog.info("stream_manager: Fail to remove progress for plan_id={}: {}", plan_id, ep);
|
|
});
|
|
}
|
|
|
|
void stream_manager::show_streams() const {
|
|
for (auto const& x : _initiated_streams) {
|
|
sslog.debug("stream_manager:initiated_stream: plan_id={}", x.first);
|
|
}
|
|
for (auto const& x : _receiving_streams) {
|
|
sslog.debug("stream_manager:receiving_stream: plan_id={}", x.first);
|
|
}
|
|
}
|
|
|
|
std::vector<shared_ptr<stream_result_future>> stream_manager::get_all_streams() const {
|
|
std::vector<shared_ptr<stream_result_future>> result;
|
|
for (auto& x : _initiated_streams) {
|
|
result.push_back(x.second);
|
|
}
|
|
for (auto& x : _receiving_streams) {
|
|
result.push_back(x.second);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
void stream_manager::update_progress(UUID cf_id, gms::inet_address peer, progress_info::direction dir, size_t fm_size) {
|
|
auto& sbytes = _stream_bytes[cf_id];
|
|
if (dir == progress_info::direction::OUT) {
|
|
sbytes[peer].bytes_sent += fm_size;
|
|
_total_outgoing_bytes += fm_size;
|
|
} else {
|
|
sbytes[peer].bytes_received += fm_size;
|
|
_total_incoming_bytes += fm_size;
|
|
}
|
|
}
|
|
|
|
future<> stream_manager::update_all_progress_info() {
|
|
return seastar::async([this] {
|
|
for (auto sr: get_all_streams()) {
|
|
for (auto session : sr->get_coordinator()->get_all_stream_sessions()) {
|
|
session->update_progress().get();
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
void stream_manager::remove_progress(UUID plan_id) {
|
|
_stream_bytes.erase(plan_id);
|
|
}
|
|
|
|
stream_bytes stream_manager::get_progress(UUID plan_id, gms::inet_address peer) const {
|
|
auto it = _stream_bytes.find(plan_id);
|
|
if (it == _stream_bytes.end()) {
|
|
return stream_bytes();
|
|
}
|
|
auto const& sbytes = it->second;
|
|
auto i = sbytes.find(peer);
|
|
if (i == sbytes.end()) {
|
|
return stream_bytes();
|
|
}
|
|
return i->second;
|
|
}
|
|
|
|
stream_bytes stream_manager::get_progress(UUID plan_id) const {
|
|
auto it = _stream_bytes.find(plan_id);
|
|
if (it == _stream_bytes.end()) {
|
|
return stream_bytes();
|
|
}
|
|
stream_bytes ret;
|
|
for (auto const& x : it->second) {
|
|
ret += x.second;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
future<> stream_manager::remove_progress_on_all_shards(UUID plan_id) {
|
|
return container().invoke_on_all([plan_id] (auto& sm) {
|
|
sm.remove_progress(plan_id);
|
|
});
|
|
}
|
|
|
|
future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id, gms::inet_address peer) const {
|
|
return container().map_reduce0(
|
|
[plan_id, peer] (auto& sm) {
|
|
return sm.get_progress(plan_id, peer);
|
|
},
|
|
stream_bytes(),
|
|
std::plus<stream_bytes>()
|
|
);
|
|
}
|
|
|
|
future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id) const {
|
|
return container().map_reduce0(
|
|
[plan_id] (auto& sm) {
|
|
return sm.get_progress(plan_id);
|
|
},
|
|
stream_bytes(),
|
|
std::plus<stream_bytes>()
|
|
);
|
|
}
|
|
|
|
future<stream_bytes> stream_manager::get_progress_on_all_shards(gms::inet_address peer) const {
|
|
return container().map_reduce0(
|
|
[peer] (auto& sm) {
|
|
stream_bytes ret;
|
|
for (auto& sbytes : sm._stream_bytes) {
|
|
if (sbytes.second.contains(peer)) {
|
|
ret += sbytes.second.at(peer);
|
|
}
|
|
}
|
|
return ret;
|
|
},
|
|
stream_bytes(),
|
|
std::plus<stream_bytes>()
|
|
);
|
|
}
|
|
|
|
future<stream_bytes> stream_manager::get_progress_on_all_shards() const {
|
|
return container().map_reduce0(
|
|
[] (auto& sm) {
|
|
stream_bytes ret;
|
|
for (auto& sbytes : sm._stream_bytes) {
|
|
for (auto& sb : sbytes.second) {
|
|
ret += sb.second;
|
|
}
|
|
}
|
|
return ret;
|
|
},
|
|
stream_bytes(),
|
|
std::plus<stream_bytes>()
|
|
);
|
|
}
|
|
|
|
stream_bytes stream_manager::get_progress_on_local_shard() const {
|
|
stream_bytes ret;
|
|
for (auto const& sbytes : _stream_bytes) {
|
|
for (auto const& sb : sbytes.second) {
|
|
ret += sb.second;
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
bool stream_manager::has_peer(inet_address endpoint) const {
|
|
for (auto sr : get_all_streams()) {
|
|
for (auto session : sr->get_coordinator()->get_all_stream_sessions()) {
|
|
if (session->peer == endpoint) {
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void stream_manager::fail_sessions(inet_address endpoint) {
|
|
for (auto sr : get_all_streams()) {
|
|
for (auto session : sr->get_coordinator()->get_all_stream_sessions()) {
|
|
if (session->peer == endpoint) {
|
|
session->close_session(stream_session_state::FAILED);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void stream_manager::fail_all_sessions() {
|
|
for (auto sr : get_all_streams()) {
|
|
for (auto session : sr->get_coordinator()->get_all_stream_sessions()) {
|
|
session->close_session(stream_session_state::FAILED);
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> stream_manager::on_remove(inet_address endpoint) {
|
|
if (has_peer(endpoint)) {
|
|
sslog.info("stream_manager: Close all stream_session with peer = {} in on_remove", endpoint);
|
|
//FIXME: discarded future.
|
|
(void)container().invoke_on_all([endpoint] (auto& sm) {
|
|
sm.fail_sessions(endpoint);
|
|
}).handle_exception([endpoint] (auto ep) {
|
|
sslog.warn("stream_manager: Fail to close sessions peer = {} in on_remove", endpoint);
|
|
});
|
|
}
|
|
return make_ready_future();
|
|
}
|
|
|
|
future<> stream_manager::on_restart(inet_address endpoint, endpoint_state ep_state) {
|
|
if (has_peer(endpoint)) {
|
|
sslog.info("stream_manager: Close all stream_session with peer = {} in on_restart", endpoint);
|
|
//FIXME: discarded future.
|
|
(void)container().invoke_on_all([endpoint] (auto& sm) {
|
|
sm.fail_sessions(endpoint);
|
|
}).handle_exception([endpoint] (auto ep) {
|
|
sslog.warn("stream_manager: Fail to close sessions peer = {} in on_restart", endpoint);
|
|
});
|
|
}
|
|
return make_ready_future();
|
|
}
|
|
|
|
future<> stream_manager::on_dead(inet_address endpoint, endpoint_state ep_state) {
|
|
if (has_peer(endpoint)) {
|
|
sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint);
|
|
//FIXME: discarded future.
|
|
(void)container().invoke_on_all([endpoint] (auto& sm) {
|
|
sm.fail_sessions(endpoint);
|
|
}).handle_exception([endpoint] (auto ep) {
|
|
sslog.warn("stream_manager: Fail to close sessions peer = {} in on_dead", endpoint);
|
|
});
|
|
}
|
|
return make_ready_future();
|
|
}
|
|
|
|
shared_ptr<stream_session> stream_manager::get_session(utils::UUID plan_id, gms::inet_address from, const char* verb, std::optional<utils::UUID> cf_id) {
|
|
if (cf_id) {
|
|
sslog.debug("[Stream #{}] GOT {} from {}: cf_id={}", plan_id, verb, from, *cf_id);
|
|
} else {
|
|
sslog.debug("[Stream #{}] GOT {} from {}", plan_id, verb, from);
|
|
}
|
|
auto sr = get_sending_stream(plan_id);
|
|
if (!sr) {
|
|
sr = get_receiving_stream(plan_id);
|
|
}
|
|
if (!sr) {
|
|
auto err = format("[Stream #{}] GOT {} from {}: Can not find stream_manager", plan_id, verb, from);
|
|
sslog.debug("{}", err.c_str());
|
|
throw std::runtime_error(err);
|
|
}
|
|
auto coordinator = sr->get_coordinator();
|
|
if (!coordinator) {
|
|
auto err = format("[Stream #{}] GOT {} from {}: Can not find coordinator", plan_id, verb, from);
|
|
sslog.debug("{}", err.c_str());
|
|
throw std::runtime_error(err);
|
|
}
|
|
return coordinator->get_or_create_session(*this, from);
|
|
}
|
|
|
|
} // namespace streaming
|