Files
server-sdk-go/room.go
He Chen fe7af253c4 TEL-522: JoinWithToken accepts a context to be cancelled or timed out early (#886)
* TEL-522: JoinWithToken accepts a context to be cancelled or timed out early

* add JoinWithContextAndToken
2026-04-21 20:16:03 -07:00

1327 lines
40 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lksdk
import (
"cmp"
"context"
"fmt"
"maps"
"reflect"
"slices"
"strings"
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v4"
"golang.org/x/mod/semver"
"google.golang.org/protobuf/proto"
protoLogger "github.com/livekit/protocol/logger"
protosignalling "github.com/livekit/protocol/signalling"
"github.com/livekit/server-sdk-go/v2/signalling"
"github.com/livekit/mediatransportutil/pkg/pacer"
"github.com/livekit/protocol/auth"
protoCodecs "github.com/livekit/protocol/codecs"
"github.com/livekit/protocol/livekit"
dtlsElliptic "github.com/pion/dtls/v3/pkg/crypto/elliptic"
)
var (
_ engineHandler = (*Room)(nil)
)
// -----------------------------------------------
type SimulateScenario int
const (
SimulateSignalReconnect SimulateScenario = iota
SimulateForceTCP
SimulateForceTLS
SimulateSpeakerUpdate
SimulateMigration
SimulateServerLeave
SimulateNodeFailure
)
type ConnectionState string
const (
ConnectionStateConnected ConnectionState = "connected"
ConnectionStateReconnecting ConnectionState = "reconnecting"
ConnectionStateDisconnected ConnectionState = "disconnected"
)
// -----------------------------------------------
const (
SimulateSpeakerUpdateInterval = 5
)
type (
TrackPubCallback func(track Track, pub TrackPublication, participant *RemoteParticipant)
PubCallback func(pub TrackPublication, participant *RemoteParticipant)
)
type ParticipantKind int
const (
ParticipantStandard = ParticipantKind(livekit.ParticipantInfo_STANDARD)
ParticipantIngress = ParticipantKind(livekit.ParticipantInfo_INGRESS)
ParticipantEgress = ParticipantKind(livekit.ParticipantInfo_EGRESS)
ParticipantSIP = ParticipantKind(livekit.ParticipantInfo_SIP)
ParticipantAgent = ParticipantKind(livekit.ParticipantInfo_AGENT)
ParticipantConnector = ParticipantKind(livekit.ParticipantInfo_CONNECTOR)
)
type ConnectInfo struct {
APIKey string
APISecret string
RoomName string
ParticipantName string
ParticipantIdentity string
ParticipantKind ParticipantKind
ParticipantMetadata string
ParticipantAttributes map[string]string
}
type ConnectOption func(*signalling.ConnectParams)
func WithConnectTimeout(timeout time.Duration) ConnectOption {
return func(p *signalling.ConnectParams) {
p.ConnectTimeout = timeout
}
}
// WithAutoSubscribe sets whether the participant should automatically subscribe to tracks.
// Default is true.
func WithAutoSubscribe(val bool) ConnectOption {
return func(p *signalling.ConnectParams) {
p.AutoSubscribe = val
}
}
// WithRetransmitBufferSize sets the retransmit buffer size to respond to NACK requests.
// Must be one of: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768.
func WithRetransmitBufferSize(val uint16) ConnectOption {
return func(p *signalling.ConnectParams) {
p.RetransmitBufferSize = val
}
}
// WithPacer enables the use of a pacer on this connection
// A pacer helps to smooth out video packet rate to avoid overwhelming downstream. Learn more at: https://chromium.googlesource.com/external/webrtc/+/master/modules/pacing/g3doc/index.md
func WithPacer(pacer pacer.Factory) ConnectOption {
return func(p *signalling.ConnectParams) {
p.Pacer = pacer
}
}
// WithInterceptors sets custom RTP interceptors for the connection.
func WithInterceptors(interceptors []interceptor.Factory) ConnectOption {
return func(p *signalling.ConnectParams) {
p.Interceptors = interceptors
}
}
// WithIncludeDefaultInterceptors sets whether to register default interceptors
// along with custom interceptors.
func WithIncludeDefaultInterceptors(include bool) ConnectOption {
return func(p *signalling.ConnectParams) {
p.IncludeDefaultInterceptors = include
}
}
// WithICETransportPolicy sets the ICE transport policy (UDP, Relay, etc.).
func WithICETransportPolicy(iceTransportPolicy webrtc.ICETransportPolicy) ConnectOption {
return func(p *signalling.ConnectParams) {
p.ICETransportPolicy = iceTransportPolicy
}
}
// WithDisableRegionDiscovery disables automatic region discovery for LiveKit Cloud.
func WithDisableRegionDiscovery() ConnectOption {
return func(p *signalling.ConnectParams) {
p.DisableRegionDiscovery = true
}
}
// WithMetadata sets custom metadata for the participant.
func WithMetadata(metadata string) ConnectOption {
return func(p *signalling.ConnectParams) {
p.Metadata = metadata
}
}
// WithExtraAttributes sets additional key-value attributes for the participant.
// Empty string values will be ignored.
func WithExtraAttributes(attrs map[string]string) ConnectOption {
return func(p *signalling.ConnectParams) {
if len(attrs) != 0 && p.Attributes == nil {
p.Attributes = make(map[string]string, len(attrs))
}
for k, v := range attrs {
if v == "" {
continue
}
p.Attributes[k] = v
}
}
}
func WithCodecs(codecs []livekit.Codec) ConnectOption {
return func(p *signalling.ConnectParams) {
pCodecs := make([]webrtc.RTPCodecParameters, 0, len(codecs))
for i := range codecs {
pCodecs = append(pCodecs, protoCodecs.ToWebrtcCodecParameters(&codecs[i]))
}
p.Codecs = pCodecs
}
}
// WithDTLSEllipticCurves configures the DTLS elliptic curves used for key exchange.
// Use this on FIPS 140-enabled systems to specify NIST-approved curves (e.g. P-256, P-384)
// instead of the default X25519.
func WithDTLSEllipticCurves(curves ...dtlsElliptic.Curve) ConnectOption {
return func(p *signalling.ConnectParams) {
p.DTLSEllipticCurves = curves
}
}
func WithLogger(l protoLogger.Logger) ConnectOption {
return func(p *signalling.ConnectParams) {
p.Logger = l
}
}
type PLIWriter func(webrtc.SSRC)
type Room struct {
log protoLogger.Logger
useSinglePeerConnection bool
engine *RTCEngine
sid string
name string
LocalParticipant *LocalParticipant
callback *RoomCallback
connectionState ConnectionState
sidReady chan struct{}
remoteParticipants map[livekit.ParticipantIdentity]*RemoteParticipant
sidToIdentity map[livekit.ParticipantID]livekit.ParticipantIdentity
sidDefers map[livekit.ParticipantID]map[livekit.TrackID]func(p *RemoteParticipant)
metadata string
activeRecording bool
activeSpeakers []Participant
serverInfo *livekit.ServerInfo
regionURLProvider *regionURLProvider
sifTrailer []byte
byteStreamHandlers *sync.Map
byteStreamReaders *sync.Map
textStreamHandlers *sync.Map
textStreamReaders *sync.Map
rpcHandlers *sync.Map
lock sync.RWMutex
}
// NewRoom can be used to update callbacks before calling Join
func NewRoom(callback *RoomCallback) *Room {
r := &Room{
log: logger,
useSinglePeerConnection: semver.Compare("v"+Version, "v3.0.0") >= 0,
remoteParticipants: make(map[livekit.ParticipantIdentity]*RemoteParticipant),
sidToIdentity: make(map[livekit.ParticipantID]livekit.ParticipantIdentity),
sidDefers: make(map[livekit.ParticipantID]map[livekit.TrackID]func(*RemoteParticipant)),
callback: NewRoomCallback(),
sidReady: make(chan struct{}),
connectionState: ConnectionStateDisconnected,
regionURLProvider: newRegionURLProvider(),
byteStreamHandlers: &sync.Map{},
byteStreamReaders: &sync.Map{},
textStreamHandlers: &sync.Map{},
textStreamReaders: &sync.Map{},
rpcHandlers: &sync.Map{},
}
r.callback.Merge(callback)
r.engine = NewRTCEngine(r.useSinglePeerConnection, r, r.getLocalParticipantSID)
r.LocalParticipant = newLocalParticipant(r.engine, r.callback, r.serverInfo, r.log)
return r
}
// ConnectToRoom creates and joins the room
func ConnectToRoom(url string, info ConnectInfo, callback *RoomCallback, opts ...ConnectOption) (*Room, error) {
room := NewRoom(callback)
err := room.Join(url, info, opts...)
if err != nil {
return nil, err
}
return room, nil
}
// ConnectToRoomWithToken creates and joins the room
func ConnectToRoomWithToken(url, token string, callback *RoomCallback, opts ...ConnectOption) (*Room, error) {
room := NewRoom(callback)
err := room.JoinWithToken(url, token, opts...)
if err != nil {
return nil, err
}
return room, nil
}
// SetLogger overrides default logger.
func (r *Room) SetLogger(l protoLogger.Logger) {
r.log = l
r.engine.SetLogger(l)
r.LocalParticipant.SetLogger(l)
r.lock.RLock()
for _, rp := range r.remoteParticipants {
rp.SetLogger(l)
}
r.lock.RUnlock()
}
func (r *Room) Name() string {
r.lock.RLock()
defer r.lock.RUnlock()
return r.name
}
// SID returns the unique session ID of the room.
// This will block until session ID is available, which could take up to 2s after joining the room.
func (r *Room) SID() string {
<-r.sidReady
r.lock.RLock()
defer r.lock.RUnlock()
return r.sid
}
// PrepareConnection - with LiveKit Cloud, determine the best edge data center for the current client to connect to
func (r *Room) PrepareConnection(url, token string) error {
cloudHostname, _ := parseCloudURL(url)
if cloudHostname == "" {
return nil
}
return r.regionURLProvider.RefreshRegionSettings(cloudHostname, token)
}
// Join - joins the room with default permissions
func (r *Room) Join(url string, info ConnectInfo, opts ...ConnectOption) error {
return r.JoinWithContext(context.Background(), url, info, opts...)
}
// JoinWithContext - like Join, but accepts a context for cancellation/deadline.
func (r *Room) JoinWithContext(ctx context.Context, url string, info ConnectInfo, opts ...ConnectOption) error {
var params signalling.ConnectParams
for _, opt := range opts {
opt(&params)
}
// generate token
at := auth.NewAccessToken(info.APIKey, info.APISecret)
grant := &auth.VideoGrant{
RoomJoin: true,
Room: info.RoomName,
}
at.SetVideoGrant(grant).
SetIdentity(info.ParticipantIdentity).
SetMetadata(info.ParticipantMetadata).
SetAttributes(info.ParticipantAttributes).
SetName(info.ParticipantName).
SetKind(livekit.ParticipantInfo_Kind(info.ParticipantKind))
token, err := at.ToJWT()
if err != nil {
return err
}
return r.JoinWithContextAndToken(ctx, url, token, opts...)
}
// JoinWithToken - customize participant options by generating your own token
func (r *Room) JoinWithToken(url, token string, opts ...ConnectOption) error {
return r.JoinWithContextAndToken(context.Background(), url, token, opts...)
}
// JoinWithContextAndToken - like JoinWithToken, but accepts a context for cancellation/deadline.
func (r *Room) JoinWithContextAndToken(ctx context.Context, url, token string, opts ...ConnectOption) error {
params := &signalling.ConnectParams{
AutoSubscribe: true,
ConnectTimeout: 3 * time.Second,
}
for _, opt := range opts {
opt(params)
}
if params.Logger != nil {
r.SetLogger(params.Logger)
}
isSuccess := false
cloudHostname, _ := parseCloudURL(url)
if !params.DisableRegionDiscovery && cloudHostname != "" {
if err := r.regionURLProvider.RefreshRegionSettings(cloudHostname, token); err != nil {
r.log.Errorw("failed to get best url", err)
} else {
for tries := uint(0); !isSuccess; tries++ {
if ctx.Err() != nil {
return ctx.Err()
}
bestURL, err := r.regionURLProvider.PopBestURL(cloudHostname, token)
if err != nil {
r.log.Errorw("failed to get best url", err)
break
}
r.log.Debugw("RTC engine joining room", "url", bestURL, "connectTimeout", params.ConnectTimeout)
callCtx, cancelCallCtx := context.WithTimeout(ctx, params.ConnectTimeout)
isSuccess, err = r.engine.JoinContext(callCtx, bestURL, token, params)
cancelCallCtx()
if err != nil {
// try the next URL with exponential backoff
d := time.Duration(1<<min(tries, 6)) * 100 * time.Millisecond // max 6.4 seconds
r.log.Errorw(
"failed to join room", err,
"retrying in", d,
"url", bestURL,
)
select {
case <-time.After(d):
case <-ctx.Done():
return ctx.Err()
}
continue
}
}
}
}
if !isSuccess {
if _, err := r.engine.JoinContext(ctx, url, token, params); err != nil {
return err
}
}
return nil
}
// Disconnect leaves the room, indicating the client initiated the disconnect.
func (r *Room) Disconnect() {
r.DisconnectWithReason(livekit.DisconnectReason_CLIENT_INITIATED)
}
// DisconnectWithReason leaves the room with a specific disconnect reason.
func (r *Room) DisconnectWithReason(reason livekit.DisconnectReason) {
_ = r.engine.SendLeaveWithReason(reason)
r.cleanup()
}
// ConnectionState returns the current connection state of the room.
func (r *Room) ConnectionState() ConnectionState {
r.lock.RLock()
defer r.lock.RUnlock()
return r.connectionState
}
func (r *Room) setConnectionState(cs ConnectionState) {
r.lock.Lock()
r.connectionState = cs
r.lock.Unlock()
}
func (r *Room) deferParticipantUpdate(sid livekit.ParticipantID, trackID livekit.TrackID, fnc func(p *RemoteParticipant)) {
r.lock.Lock()
defer r.lock.Unlock()
if r.sidDefers[sid] == nil {
r.sidDefers[sid] = make(map[livekit.TrackID]func(p *RemoteParticipant))
}
r.sidDefers[sid][trackID] = fnc
}
func (r *Room) runParticipantDefers(sid livekit.ParticipantID, p *RemoteParticipant) {
r.lock.Lock()
fncs := r.sidDefers[sid]
delete(r.sidDefers, sid)
r.lock.Unlock()
if len(fncs) != 0 {
r.log.Infow(
"running deferred updates for participant",
"participant", p.Identity(),
"participantID", sid,
"numUpdates", len(fncs),
)
for _, fnc := range fncs {
fnc(p)
}
}
}
func (r *Room) clearParticipantDefers(sid livekit.ParticipantID, pi *livekit.ParticipantInfo) {
r.lock.Lock()
defer r.lock.Unlock()
for trackID := range r.sidDefers[sid] {
found := false
for _, ti := range pi.Tracks {
if livekit.TrackID(ti.GetSid()) == trackID {
found = true
break
}
}
if !found {
r.log.Infow(
"deleting deferred update for participant",
"participant", pi.Identity,
"participantID", sid,
"trackID", trackID,
)
delete(r.sidDefers[sid], trackID)
if len(r.sidDefers[sid]) == 0 {
delete(r.sidDefers, sid)
}
}
}
}
// GetParticipantByIdentity returns a remote participant by their identity.
// Returns nil if not found.
// Note: this represents the current view from the local participant's perspective
func (r *Room) GetParticipantByIdentity(identity string) *RemoteParticipant {
r.lock.RLock()
defer r.lock.RUnlock()
return r.remoteParticipants[livekit.ParticipantIdentity(identity)]
}
// GetParticipantBySID returns a remote participant by their session ID.
// Returns nil if not found.
// Note: this represents the current view from the local participant's perspective
func (r *Room) GetParticipantBySID(sid string) *RemoteParticipant {
r.lock.RLock()
defer r.lock.RUnlock()
if identity, ok := r.sidToIdentity[livekit.ParticipantID(sid)]; ok {
return r.remoteParticipants[identity]
}
return nil
}
// GetRemoteParticipants returns all remote participants in the room as seen by the local participant
// Note: this does not represent the exact state from the server's view. To get all participants that
// exists on the server, use [RoomServiceClient.ListParticipants] instead.
func (r *Room) GetRemoteParticipants() []*RemoteParticipant {
r.lock.RLock()
defer r.lock.RUnlock()
var participants []*RemoteParticipant
for _, rp := range r.remoteParticipants {
participants = append(participants, rp)
}
return participants
}
// ActiveSpeakers returns a list of currently active speakers.
// Speakers are ordered by audio level (loudest first).
func (r *Room) ActiveSpeakers() []Participant {
r.lock.RLock()
defer r.lock.RUnlock()
return r.activeSpeakers
}
func (r *Room) Metadata() string {
r.lock.RLock()
defer r.lock.RUnlock()
return r.metadata
}
// IsRecording returns true if the room is currently being recorded.
func (r *Room) IsRecording() bool {
r.lock.RLock()
defer r.lock.RUnlock()
return r.activeRecording
}
// ServerInfo returns information about the LiveKit server.
func (r *Room) ServerInfo() *livekit.ServerInfo {
r.lock.RLock()
defer r.lock.RUnlock()
return proto.Clone(r.serverInfo).(*livekit.ServerInfo)
}
// SifTrailer returns the SIF (Server Injected Frames) trailer data used by E2EE
func (r *Room) SifTrailer() []byte {
r.lock.RLock()
defer r.lock.RUnlock()
trailer := make([]byte, len(r.sifTrailer))
copy(trailer, r.sifTrailer)
return trailer
}
func (r *Room) addRemoteParticipant(pi *livekit.ParticipantInfo, updateExisting bool) *RemoteParticipant {
r.lock.Lock()
defer r.lock.Unlock()
rp, ok := r.remoteParticipants[livekit.ParticipantIdentity(pi.Identity)]
if ok {
if updateExisting {
rp.updateInfo(pi)
r.sidToIdentity[livekit.ParticipantID(pi.Sid)] = livekit.ParticipantIdentity(pi.Identity)
}
return rp
}
rp = newRemoteParticipant(pi, r.callback, r.engine, func(ssrc webrtc.SSRC) {
pli := []rtcp.Packet{
&rtcp.PictureLossIndication{SenderSSRC: uint32(ssrc), MediaSSRC: uint32(ssrc)},
}
if subscriber, ok := r.engine.Subscriber(); ok {
_ = subscriber.pc.WriteRTCP(pli)
}
}, r.log.WithValues("participant", pi.Identity))
r.remoteParticipants[livekit.ParticipantIdentity(pi.Identity)] = rp
r.sidToIdentity[livekit.ParticipantID(pi.Sid)] = livekit.ParticipantIdentity(pi.Identity)
return rp
}
func (r *Room) sendSyncState() {
var previousOffer *webrtc.SessionDescription
var previousAnswer *webrtc.SessionDescription
if r.useSinglePeerConnection {
publisher, ok := r.engine.Publisher()
if ok {
previousOffer = publisher.pc.RemoteDescription()
previousAnswer = publisher.pc.LocalDescription()
}
} else {
subscriber, ok := r.engine.Subscriber()
if ok {
previousOffer = subscriber.pc.RemoteDescription()
previousAnswer = subscriber.pc.LocalDescription()
}
}
if previousOffer == nil || previousAnswer == nil {
return
}
var trackSids []string
var trackSidsDisabled []string
sendUnsub := r.engine.connParams.AutoSubscribe
for _, rp := range r.GetRemoteParticipants() {
for _, t := range rp.TrackPublications() {
if t.IsSubscribed() != sendUnsub {
trackSids = append(trackSids, t.SID())
}
if rpub, ok := t.(*RemoteTrackPublication); ok {
if !rpub.IsEnabled() {
trackSidsDisabled = append(trackSidsDisabled, t.SID())
}
}
}
}
var publishedTracks []*livekit.TrackPublishedResponse
for _, t := range r.LocalParticipant.TrackPublications() {
if t.Track() != nil {
publishedTracks = append(publishedTracks, &livekit.TrackPublishedResponse{
Cid: t.Track().ID(),
Track: t.TrackInfo(),
})
}
}
var dataChannels []*livekit.DataChannelInfo
getDCinfo := func(dc *webrtc.DataChannel, target livekit.SignalTarget) {
if dc != nil && dc.ID() != nil {
dataChannels = append(dataChannels, &livekit.DataChannelInfo{
Label: dc.Label(),
Id: uint32(*dc.ID()),
Target: target,
})
}
}
getDCinfo(r.engine.GetDataChannel(livekit.DataPacket_RELIABLE), livekit.SignalTarget_PUBLISHER)
getDCinfo(r.engine.GetDataChannel(livekit.DataPacket_LOSSY), livekit.SignalTarget_PUBLISHER)
getDCinfo(r.engine.GetDataChannelSub(livekit.DataPacket_RELIABLE), livekit.SignalTarget_SUBSCRIBER)
getDCinfo(r.engine.GetDataChannelSub(livekit.DataPacket_LOSSY), livekit.SignalTarget_SUBSCRIBER)
r.engine.SendSyncState(&livekit.SyncState{
Offer: protosignalling.ToProtoSessionDescription(*previousOffer, 0, nil),
Answer: protosignalling.ToProtoSessionDescription(*previousAnswer, 0, nil),
Subscription: &livekit.UpdateSubscription{
TrackSids: trackSids,
Subscribe: !sendUnsub,
},
PublishTracks: publishedTracks,
DataChannels: dataChannels,
TrackSidsDisabled: trackSidsDisabled,
// MIGRATION-TODO DatachannelReceiveStates
})
}
func (r *Room) cleanup() {
r.setConnectionState(ConnectionStateDisconnected)
r.engine.Close()
r.LocalParticipant.closeTracks()
r.setSid("", true)
r.byteStreamHandlers.Clear()
r.byteStreamReaders.Range(func(key, value any) bool {
if reader, ok := value.(*ByteStreamReader); ok {
reader.close()
}
return true
})
r.byteStreamReaders.Clear()
r.textStreamHandlers.Clear()
r.textStreamReaders.Range(func(key, value any) bool {
if reader, ok := value.(*TextStreamReader); ok {
reader.close()
}
return true
})
r.textStreamReaders.Clear()
r.rpcHandlers.Clear()
r.LocalParticipant.cleanup()
}
func (r *Room) setSid(sid string, allowEmpty bool) {
r.lock.Lock()
if sid != "" || allowEmpty {
select {
case <-r.sidReady:
// already closed
default:
r.sid = sid
close(r.sidReady)
}
}
r.lock.Unlock()
}
// Simulate triggers various test scenarios for debugging and testing purposes.
// This is primarily used for development and testing.
func (r *Room) Simulate(scenario SimulateScenario) {
r.engine.Simulate(scenario)
}
func (r *Room) getLocalParticipantSID() string {
if r.LocalParticipant != nil {
return r.LocalParticipant.SID()
}
return ""
}
// Establishes the participant as a receiver for calls of the specified RPC method.
// Will overwrite any existing callback for the same method.
//
// - @param method - The name of the indicated RPC method
// - @param handler - Will be invoked when an RPC request for this method is received
// - @returns A promise that resolves when the method is successfully registered
//
// Example:
//
// room.LocalParticipant?.registerRpcMethod(
// "greet",
// func (data: RpcInvocationData) => {
// fmt.Println("Received greeting from ", data.callerIdentity, "with payload ", data.payload)
// return "Hello, " + data.callerIdentity + "!";
// }
// );
//
// The handler should return either a string or an error.
// If unable to respond within `responseTimeout`, the request will result in an error on the caller's side.
//
// You may throw errors of type `RpcError` with a string `message` in the handler,
// and they will be received on the caller's side with the message intact.
// Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error").
func (r *Room) RegisterRpcMethod(method string, handler RpcHandlerFunc) error {
if _, loaded := r.rpcHandlers.LoadOrStore(method, handler); loaded {
return fmt.Errorf("rpc handler already registered for method: %s, unregisterRpcMethod before trying to register again", method)
}
return nil
}
// UnregisterRpcMethod unregisters a previously registered RPC method.
func (r *Room) UnregisterRpcMethod(method string) {
r.rpcHandlers.Delete(method)
}
// RegisterTextStreamHandler registers a handler for incoming text streams on a specific topic.
// The handler will be called when a text stream is received for the given topic.
// It returns an error if a handler is already registered for this topic.
func (r *Room) RegisterTextStreamHandler(topic string, handler TextStreamHandler) error {
if _, loaded := r.textStreamHandlers.LoadOrStore(topic, handler); loaded {
return fmt.Errorf("text stream handler already registered for topic: %s", topic)
}
return nil
}
// UnregisterTextStreamHandler removes a previously registered text stream handler.
func (r *Room) UnregisterTextStreamHandler(topic string) {
r.textStreamHandlers.Delete(topic)
}
// RegisterByteStreamHandler registers a handler for incoming byte streams on a specific topic.
// The handler will be called when a byte stream is received for the given topic.
// It returns an error if a handler is already registered for this topic.
func (r *Room) RegisterByteStreamHandler(topic string, handler ByteStreamHandler) error {
if _, loaded := r.byteStreamHandlers.LoadOrStore(topic, handler); loaded {
return fmt.Errorf("byte stream handler already registered for topic: %s", topic)
}
return nil
}
// UnregisterByteStreamHandler removes a previously registered byte stream handler.
func (r *Room) UnregisterByteStreamHandler(topic string) {
r.byteStreamHandlers.Delete(topic)
}
// engineHandler implementation
func (r *Room) OnMediaTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
// ensure we have the participant
participantID, streamID := unpackStreamID(track.StreamID())
trackID := track.ID()
if strings.HasPrefix(streamID, "TR_") {
// backwards compatibility
trackID = streamID
}
update := func(p *RemoteParticipant) {
p.addSubscribedMediaTrack(track, trackID, receiver)
}
rp := r.GetParticipantBySID(participantID)
if rp == nil {
r.log.Infow(
"could not find participant, deferring track update",
"participantID", participantID,
"trackID", trackID,
"streamID", streamID,
)
r.deferParticipantUpdate(livekit.ParticipantID(participantID), livekit.TrackID(trackID), update)
return
}
update(rp)
r.runParticipantDefers(livekit.ParticipantID(participantID), rp)
}
func (r *Room) OnRoomJoined(
room *livekit.Room,
participant *livekit.ParticipantInfo,
otherParticipants []*livekit.ParticipantInfo,
serverInfo *livekit.ServerInfo,
sifTrailer []byte,
) {
r.lock.Lock()
r.name = room.Name
r.metadata = room.Metadata
r.activeRecording = room.ActiveRecording
r.serverInfo = serverInfo
r.connectionState = ConnectionStateConnected
r.sifTrailer = make([]byte, len(sifTrailer))
copy(r.sifTrailer, sifTrailer)
r.lock.Unlock()
r.setSid(room.Sid, false)
r.LocalParticipant.updateInfo(participant)
r.LocalParticipant.updateSubscriptionPermission()
for _, pi := range otherParticipants {
r.addRemoteParticipant(pi, true)
r.clearParticipantDefers(livekit.ParticipantID(pi.Sid), pi)
// no need to run participant defers here, since we are connected for the first time
}
}
func (r *Room) OnDisconnected(reason DisconnectionReason) {
r.callback.OnDisconnected()
r.callback.OnDisconnectedWithReason(reason)
r.cleanup()
}
func (r *Room) OnRestarting() {
r.setConnectionState(ConnectionStateReconnecting)
r.callback.OnReconnecting()
for _, rp := range r.GetRemoteParticipants() {
r.OnParticipantDisconnect(rp, livekit.DisconnectReason_UNKNOWN_REASON)
}
}
func (r *Room) OnRestarted(
room *livekit.Room,
participant *livekit.ParticipantInfo,
otherParticipants []*livekit.ParticipantInfo,
) {
r.OnRoomUpdate(room)
r.LocalParticipant.updateInfo(participant)
r.LocalParticipant.updateSubscriptionPermission()
r.OnParticipantUpdate(otherParticipants)
r.LocalParticipant.republishTracks()
r.setConnectionState(ConnectionStateConnected)
r.callback.OnReconnected()
}
func (r *Room) OnResuming() {
r.setConnectionState(ConnectionStateReconnecting)
r.callback.OnReconnecting()
}
func (r *Room) OnResumed() {
r.setConnectionState(ConnectionStateConnected)
r.callback.OnReconnected()
r.sendSyncState()
r.LocalParticipant.updateSubscriptionPermission()
}
func (r *Room) OnDataPacket(identity string, dataPacket DataPacket) {
if identity == r.LocalParticipant.Identity() {
// if sent by itself, do not handle data
return
}
p := r.GetParticipantByIdentity(identity)
params := DataReceiveParams{
SenderIdentity: identity,
Sender: p,
}
switch msg := dataPacket.(type) {
case *UserDataPacket: // compatibility
params.Topic = msg.Topic
if p != nil {
p.Callback.OnDataReceived(msg.Payload, params)
}
r.callback.OnDataReceived(msg.Payload, params)
}
if p != nil {
p.Callback.OnDataPacket(dataPacket, params)
}
r.callback.OnDataPacket(dataPacket, params)
}
func (r *Room) OnParticipantUpdate(participants []*livekit.ParticipantInfo) {
for _, pi := range participants {
if pi.Sid == r.LocalParticipant.SID() || pi.Identity == r.LocalParticipant.Identity() {
r.LocalParticipant.updateInfo(pi)
continue
}
rp := r.GetParticipantByIdentity(pi.Identity)
isNew := rp == nil
if pi.State == livekit.ParticipantInfo_DISCONNECTED {
r.OnParticipantDisconnect(rp, pi.GetDisconnectReason())
} else if isNew {
rp = r.addRemoteParticipant(pi, true)
r.clearParticipantDefers(livekit.ParticipantID(pi.Sid), pi)
r.runParticipantDefers(livekit.ParticipantID(pi.Sid), rp)
go r.callback.OnParticipantConnected(rp)
} else {
oldSid := livekit.ParticipantID(rp.SID())
rp.updateInfo(pi)
newSid := livekit.ParticipantID(rp.SID())
if oldSid != newSid {
r.log.Infow(
"participant sid update",
"participant", rp.Identity(),
"sid-old", oldSid,
"sid-new", newSid,
)
r.lock.Lock()
delete(r.sidDefers, oldSid)
delete(r.sidToIdentity, oldSid)
r.sidToIdentity[newSid] = livekit.ParticipantIdentity(rp.Identity())
r.lock.Unlock()
}
r.clearParticipantDefers(livekit.ParticipantID(pi.Sid), pi)
r.runParticipantDefers(newSid, rp)
}
}
}
func (r *Room) OnParticipantDisconnect(rp *RemoteParticipant, reason livekit.DisconnectReason) {
if rp == nil {
return
}
r.lock.Lock()
delete(r.remoteParticipants, livekit.ParticipantIdentity(rp.Identity()))
delete(r.sidToIdentity, livekit.ParticipantID(rp.SID()))
delete(r.sidDefers, livekit.ParticipantID(rp.SID()))
r.lock.Unlock()
rp.unpublishAllTracks()
r.LocalParticipant.handleParticipantDisconnected(rp.Identity())
rp.info.DisconnectReason = reason
go r.callback.OnParticipantDisconnected(rp)
}
func (r *Room) OnSpeakersChanged(speakerUpdates []*livekit.SpeakerInfo) {
speakerMap := make(map[string]Participant)
for _, p := range r.ActiveSpeakers() {
speakerMap[p.SID()] = p
}
for _, info := range speakerUpdates {
var participant Participant
if info.Sid == r.LocalParticipant.SID() {
participant = r.LocalParticipant
} else {
participant = r.GetParticipantBySID(info.Sid)
}
if reflect.ValueOf(participant).IsNil() {
continue
}
participant.setAudioLevel(info.Level)
participant.setIsSpeaking(info.Active)
if info.Active {
speakerMap[info.Sid] = participant
} else {
delete(speakerMap, info.Sid)
}
}
activeSpeakers := slices.SortedFunc(maps.Values(speakerMap), func(p1, p2 Participant) int {
return cmp.Compare(p2.AudioLevel(), p1.AudioLevel())
})
r.lock.Lock()
r.activeSpeakers = activeSpeakers
r.lock.Unlock()
go r.callback.OnActiveSpeakersChanged(activeSpeakers)
}
func (r *Room) OnConnectionQuality(updates []*livekit.ConnectionQualityInfo) {
for _, update := range updates {
if update.ParticipantSid == r.LocalParticipant.SID() {
r.LocalParticipant.setConnectionQualityInfo(update)
} else {
p := r.GetParticipantBySID(update.ParticipantSid)
if p != nil {
p.setConnectionQualityInfo(update)
} else {
r.log.Debugw("could not find participant", "sid", update.ParticipantSid,
"localParticipant", r.LocalParticipant.SID())
}
}
}
}
func (r *Room) OnRoomUpdate(room *livekit.Room) {
metadataChanged := false
recordingChanged := false
r.lock.Lock()
if r.metadata != room.Metadata {
metadataChanged = true
r.metadata = room.Metadata
}
if r.activeRecording != room.ActiveRecording {
recordingChanged = true
r.activeRecording = room.ActiveRecording
}
r.lock.Unlock()
r.setSid(room.Sid, false)
if metadataChanged {
go r.callback.OnRoomMetadataChanged(room.Metadata)
}
if recordingChanged {
go r.callback.OnRecordingStatusChanged(room.ActiveRecording)
}
}
func (r *Room) OnRoomMoved(moved *livekit.RoomMovedResponse) {
r.log.Infow("room moved", "newRoom", moved.Room.Name)
r.OnRoomUpdate(moved.Room)
for _, rp := range r.GetRemoteParticipants() {
r.OnParticipantDisconnect(rp, livekit.DisconnectReason_MIGRATION)
}
go r.callback.OnRoomMoved(moved.Room.Name, moved.Token)
infos := make([]*livekit.ParticipantInfo, 0, len(moved.OtherParticipants)+1)
infos = append(infos, moved.Participant)
infos = append(infos, moved.OtherParticipants...)
r.OnParticipantUpdate(infos)
}
func (r *Room) OnTrackRemoteMuted(msg *livekit.MuteTrackRequest) {
for _, pub := range r.LocalParticipant.TrackPublications() {
if pub.SID() == msg.Sid {
localPub := pub.(*LocalTrackPublication)
// TODO: pause sending data because it'll be dropped by SFU
localPub.setMuted(msg.Muted, true)
}
}
}
func (r *Room) OnLocalTrackUnpublished(msg *livekit.TrackUnpublishedResponse) {
err := r.LocalParticipant.UnpublishTrack(msg.TrackSid)
if err != nil {
r.log.Errorw("could not unpublish track", err, "trackID", msg.TrackSid)
}
}
func (r *Room) OnTranscription(transcription *livekit.Transcription) {
var (
p Participant
publication TrackPublication
)
if transcription.TranscribedParticipantIdentity == r.LocalParticipant.Identity() {
p = r.LocalParticipant
publication = r.LocalParticipant.getPublication(transcription.TrackId)
} else {
rp := r.GetParticipantByIdentity(transcription.TranscribedParticipantIdentity)
if rp == nil {
r.log.Debugw("recieved transcription for unknown participant", "participant", transcription.TranscribedParticipantIdentity)
return
}
publication = rp.getPublication(transcription.TrackId)
p = rp
}
transcriptionSegments := ExtractTranscriptionSegments(transcription)
r.callback.OnTranscriptionReceived(transcriptionSegments, p, publication)
}
func (r *Room) OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) {
trackPublication := r.LocalParticipant.getLocalPublication(trackSubscribed.TrackSid)
if trackPublication == nil {
r.log.Debugw("recieved track subscribed for unknown track", "trackID", trackSubscribed.TrackSid)
return
}
r.callback.OnLocalTrackSubscribed(trackPublication, r.LocalParticipant)
}
func (r *Room) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) {
r.LocalParticipant.handleSubscribedQualityUpdate(subscribedQualityUpdate)
}
func (r *Room) OnSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate *livekit.SubscribedAudioCodecUpdate) {
r.LocalParticipant.handleSubscribedAudioCodecUpdate(subscribedAudioCodecUpdate)
}
func (r *Room) OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) {
addTransceivers := func(transport *PCTransport, kind webrtc.RTPCodecType, count uint32) {
for i := uint32(0); i < count; i++ {
if _, err := transport.PeerConnection().AddTransceiverFromKind(
kind,
webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
},
); err != nil {
r.log.Warnw(
"could not add transceiver", err,
"room", r.name,
"roomID", r.sid,
"participant", r.LocalParticipant.Identity(),
"participantID", r.LocalParticipant.SID(),
"kind", kind,
)
} else {
r.log.Debugw(
"added transceiver of kind",
"room", r.name,
"roomID", r.sid,
"participant", r.LocalParticipant.Identity(),
"participantID", r.LocalParticipant.SID(),
"kind", kind,
)
}
}
}
publisher, ok := r.engine.Publisher()
if !ok {
r.log.Warnw("no publisher peer connection", ErrNoPeerConnection)
return
}
addTransceivers(publisher, webrtc.RTPCodecTypeAudio, mediaSectionsRequirement.NumAudios)
addTransceivers(publisher, webrtc.RTPCodecTypeVideo, mediaSectionsRequirement.NumVideos)
publisher.Negotiate()
}
func (r *Room) OnStreamHeader(streamHeader *livekit.DataStream_Header, participantIdentity string) {
switch header := streamHeader.ContentHeader.(type) {
case *livekit.DataStream_Header_TextHeader:
streamHandlerCallback, ok := r.textStreamHandlers.Load(streamHeader.Topic)
if !ok {
r.log.Debugw("ignoring incoming text stream due to no handler for topic", "topic", streamHeader.Topic)
return
}
info := TextStreamInfo{
baseStreamInfo: &baseStreamInfo{
Id: streamHeader.StreamId,
MimeType: streamHeader.MimeType,
Size: streamHeader.TotalLength,
Topic: streamHeader.Topic,
Timestamp: streamHeader.Timestamp,
Attributes: streamHeader.Attributes,
},
}
textStreamReader := NewTextStreamReader(info, streamHeader.TotalLength)
r.textStreamReaders.Store(streamHeader.StreamId, textStreamReader)
go streamHandlerCallback.(TextStreamHandler)(textStreamReader, participantIdentity)
case *livekit.DataStream_Header_ByteHeader:
streamHandlerCallback, ok := r.byteStreamHandlers.Load(streamHeader.Topic)
if !ok {
r.log.Debugw("ignoring incoming byte stream due to no handler for topic", "topic", streamHeader.Topic)
return
}
info := ByteStreamInfo{
baseStreamInfo: &baseStreamInfo{
Id: streamHeader.StreamId,
MimeType: streamHeader.MimeType,
Size: streamHeader.TotalLength,
Topic: streamHeader.Topic,
Timestamp: streamHeader.Timestamp,
Attributes: streamHeader.Attributes,
},
Name: &header.ByteHeader.Name,
}
byteStreamReader := NewByteStreamReader(info, streamHeader.TotalLength)
r.byteStreamReaders.Store(streamHeader.StreamId, byteStreamReader)
go streamHandlerCallback.(ByteStreamHandler)(byteStreamReader, participantIdentity)
}
}
func (r *Room) OnStreamChunk(streamChunk *livekit.DataStream_Chunk) {
streamId := streamChunk.StreamId
byteStreamReader, ok := r.byteStreamReaders.Load(streamId)
if ok {
if len(streamChunk.Content) > 0 {
byteStreamReader.(*ByteStreamReader).enqueue(streamChunk)
}
}
textStreamReader, ok := r.textStreamReaders.Load(streamId)
if ok {
if len(streamChunk.Content) > 0 {
textStreamReader.(*TextStreamReader).enqueue(streamChunk)
}
}
}
func (r *Room) OnStreamTrailer(streamTrailer *livekit.DataStream_Trailer) {
streamId := streamTrailer.StreamId
byteStreamReader, ok := r.byteStreamReaders.Load(streamId)
if ok {
reader := byteStreamReader.(*ByteStreamReader)
for k, v := range streamTrailer.Attributes {
reader.Info.Attributes[k] = v
}
reader.close()
r.byteStreamReaders.Delete(streamId)
}
textStreamReader, ok := r.textStreamReaders.Load(streamId)
if ok {
reader := textStreamReader.(*TextStreamReader)
for k, v := range streamTrailer.Attributes {
reader.Info.Attributes[k] = v
}
reader.close()
r.textStreamReaders.Delete(streamId)
}
}
func (r *Room) OnRpcRequest(callerIdentity, requestId, method, payload string, responseTimeout time.Duration, version uint32) {
r.engine.publishRpcAck(callerIdentity, requestId)
if version != 1 {
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcUnsupportedVersion, nil))
return
}
handler, ok := r.rpcHandlers.Load(method)
if !ok {
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcUnsupportedMethod, nil))
return
}
response, err := handler.(RpcHandlerFunc)(RpcInvocationData{
RequestID: requestId,
CallerIdentity: callerIdentity,
Payload: payload,
ResponseTimeout: responseTimeout,
})
if err != nil {
if _, ok := err.(*RpcError); ok {
r.engine.publishRpcResponse(callerIdentity, requestId, nil, err.(*RpcError))
} else {
r.log.Warnw("unexpected error returned by RPC handler for method, using application error instead", err, "method", method)
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcApplicationError, nil))
}
return
}
if byteLength(response) > MaxDataBytes {
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcResponsePayloadTooLarge, nil))
return
}
r.engine.publishRpcResponse(callerIdentity, requestId, &response, nil)
}
func (r *Room) OnRpcAck(requestId string) {
r.LocalParticipant.HandleIncomingRpcAck(requestId)
}
func (r *Room) OnRpcResponse(requestId string, payload *string, error *RpcError) {
r.LocalParticipant.HandleIncomingRpcResponse(requestId, payload, error)
}
// ---------------------------------------------------------
func unpackStreamID(packed string) (participantId string, trackId string) {
parts := strings.Split(packed, "|")
if len(parts) > 1 {
return parts[0], packed[len(parts[0])+1:]
}
return packed, ""
}