service: use session variable for streaming

Use session that was retrieved at the beginning of the handler for
node operations with streaming to ensure that the session id won't
change in between.
This commit is contained in:
Aleksandra Martyniuk
2025-12-23 19:10:04 +01:00
parent 3fe596d556
commit 2be5ee9f9d

View File

@@ -6271,7 +6271,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, _topology_state_machine._topology.session);
co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper, session);
}
});
co_await task->done();
@@ -6297,7 +6297,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr());
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, _topology_state_machine._topology.session, locator::host_id{replaced_id.uuid()});
co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, session, locator::host_id{replaced_id.uuid()});
}
});
co_await task->done();
@@ -6347,7 +6347,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
auto ops = seastar::make_shared<node_ops_info>(node_ops_id::create_random_id(), as, std::move(ignored_ips));
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), id, ops, session);
} else {
return removenode_with_stream(id, _topology_state_machine._topology.session, as);
return removenode_with_stream(id, session, as);
}
});
co_await task->done();
@@ -6374,7 +6374,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
co_await _repair.local().rebuild_with_repair(std::move(ks_erms), tmptr, std::move(sdc_param), session);
} else {
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tmptr, _abort_source,
tmptr->get_my_id(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, _topology_state_machine._topology.session);
tmptr->get_my_id(), _snitch.local()->get_location(), "Rebuild", streaming::stream_reason::rebuild, session);
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(_gossiper.get_unreachable_members()));
if (source_dc != "") {
streamer->add_source_filter(std::make_unique<dht::range_streamer::single_datacenter_filter>(source_dc));