stream_session: Keep stream_manager reference

The manager is needed to get messaging service and database from.
Actually, the database can be pushed though arguments in all the
places, so effectively session only needs the messaging. However,
the stream-task's need the manager badly and there's no other
place to get it from other than the session.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-11-19 12:23:12 +03:00
parent f2ae080c63
commit db33607eb2
5 changed files with 11 additions and 8 deletions

View File

@@ -80,10 +80,10 @@ public:
std::set<inet_address> get_peers() const;
public:
shared_ptr<stream_session> get_or_create_session(inet_address peer) {
shared_ptr<stream_session> get_or_create_session(stream_manager& mgr, inet_address peer) {
auto& session = _peer_sessions[peer];
if (!session) {
session = make_shared<stream_session>(peer);
session = make_shared<stream_session>(mgr, peer);
}
return session;
}

View File

@@ -367,7 +367,7 @@ shared_ptr<stream_session> stream_manager::get_session(utils::UUID plan_id, gms:
sslog.debug("{}", err.c_str());
throw std::runtime_error(err);
}
return coordinator->get_or_create_session(from);
return coordinator->get_or_create_session(*this, from);
}
} // namespace streaming

View File

@@ -50,7 +50,7 @@ stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, dh
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, dht::token_range_vector ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_session(from);
auto session = _coordinator->get_or_create_session(_mgr, from);
session->add_stream_request(keyspace, std::move(ranges), std::move(column_families));
session->set_reason(_reason);
return *this;
@@ -62,7 +62,7 @@ stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, dht
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, dht::token_range_vector ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_session(to);
auto session = _coordinator->get_or_create_session(_mgr, to);
session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families));
session->set_reason(_reason);
return *this;

View File

@@ -219,8 +219,10 @@ future<> stream_manager::uninit_messaging_service_handler() {
ms.unregister_complete_message()).discard_result();
}
stream_session::stream_session(inet_address peer_)
: peer(peer_) {
stream_session::stream_session(stream_manager& mgr, inet_address peer_)
: peer(peer_)
, _mgr(mgr)
{
//this.metrics = StreamingMetrics.get(connecting);
}

View File

@@ -160,6 +160,7 @@ public:
inet_address peer;
unsigned dst_cpu_id = 0;
private:
stream_manager& _mgr;
// should not be null when session is started
shared_ptr<stream_result_future> _stream_result;
@@ -217,7 +218,7 @@ public:
* @param connecting Actual connecting address
* @param factory is used for establishing connection
*/
stream_session(inet_address peer_);
stream_session(stream_manager& mgr, inet_address peer_);
~stream_session();
UUID plan_id() const;