This commit is contained in:
lukasIO
2024-05-13 18:44:11 +02:00
parent f6e9d6a678
commit d32e12e677
6 changed files with 89 additions and 65 deletions

View File

@@ -6,36 +6,45 @@ import {
DisconnectReason,
JoinResponse,
LeaveRequest,
LeaveRequestDesc,
LeaveRequest_Action,
MuteTrackRequest,
MuteTrackRequestDesc,
ParticipantInfo,
Ping,
PingDesc,
ReconnectReason,
ReconnectResponse,
Room,
SessionDescription,
SessionDescriptionDesc,
SignalRequest,
SignalRequestDesc,
SignalResponse,
SignalResponseDesc,
SignalTarget,
SimulateScenario,
SpeakerInfo,
StreamStateUpdate,
SubscribedQualityUpdate,
SubscriptionPermission,
SubscriptionPermissionDesc,
SubscriptionPermissionUpdate,
SubscriptionResponse,
SyncState,
TrackPermission,
TrackPublishedResponse,
TrackUnpublishedResponse,
TrickleRequest,
UpdateLocalAudioTrack,
UpdateParticipantMetadata,
TrickleRequestDesc,
UpdateLocalAudioTrackDesc,
UpdateParticipantMetadataDesc,
UpdateSubscription,
UpdateTrackSettings,
UpdateVideoLayers,
UpdateVideoLayersDesc,
VideoLayer,
create,
fromBinary,
fromJsonString,
protoInt64,
toBinary,
toJsonString,
} from '@livekit/protocol';
import log, { LoggerNames, getLogger } from '../logger';
import { ConnectionError, ConnectionErrorReason } from '../room/errors';
@@ -212,12 +221,7 @@ export class SignalClient {
return res as JoinResponse;
}
async reconnect(
url: string,
token: string,
sid?: string,
reason?: ReconnectReason,
): Promise<ReconnectResponse | undefined> {
async reconnect(url: string, token: string, sid?: string, reason?: ReconnectReason) {
if (!this.options) {
this.log.warn(
'attempted to reconnect without signal options being set, ignoring',
@@ -229,13 +233,14 @@ export class SignalClient {
// clear ping interval and restart it once reconnected
this.clearPingInterval();
const res = await this.connect(url, token, {
...this.options,
reconnect: true,
sid,
reconnectReason: reason,
});
return res;
const { iceServers, clientConfiguration } =
(await this.connect(url, token, {
...this.options,
reconnect: true,
sid,
reconnectReason: reason,
})) ?? {};
return { iceServers, clientConfiguration };
}
private connect(
@@ -319,9 +324,9 @@ export class SignalClient {
let resp: SignalResponse;
if (typeof ev.data === 'string') {
const json = JSON.parse(ev.data);
resp = SignalResponse.fromJson(json, { ignoreUnknownFields: true });
resp = fromJsonString(SignalResponseDesc, json, { ignoreUnknownFields: true });
} else if (ev.data instanceof ArrayBuffer) {
resp = SignalResponse.fromBinary(new Uint8Array(ev.data));
resp = fromBinary(SignalResponseDesc, new Uint8Array(ev.data));
} else {
this.log.error(
`could not decode websocket message: ${typeof ev.data}`,
@@ -487,7 +492,7 @@ export class SignalClient {
this.log.trace('sending ice candidate', { ...this.logContext, candidate });
return this.sendRequest({
case: 'trickle',
value: new TrickleRequest({
value: create(TrickleRequestDesc, {
candidateInit: JSON.stringify(candidate),
target,
}),
@@ -497,7 +502,7 @@ export class SignalClient {
sendMuteTrack(trackSid: string, muted: boolean) {
return this.sendRequest({
case: 'mute',
value: new MuteTrackRequest({
value: create(MuteTrackRequestDesc, {
sid: trackSid,
muted,
}),
@@ -514,7 +519,7 @@ export class SignalClient {
sendUpdateLocalMetadata(metadata: string, name: string) {
return this.sendRequest({
case: 'updateMetadata',
value: new UpdateParticipantMetadata({
value: create(UpdateParticipantMetadataDesc, {
metadata,
name,
}),
@@ -545,7 +550,7 @@ export class SignalClient {
sendUpdateVideoLayers(trackSid: string, layers: VideoLayer[]) {
return this.sendRequest({
case: 'updateLayers',
value: new UpdateVideoLayers({
value: create(UpdateVideoLayersDesc, {
trackSid,
layers,
}),
@@ -555,7 +560,7 @@ export class SignalClient {
sendUpdateSubscriptionPermissions(allParticipants: boolean, trackPermissions: TrackPermission[]) {
return this.sendRequest({
case: 'subscriptionPermission',
value: new SubscriptionPermission({
value: create(SubscriptionPermissionDesc, {
allParticipants,
trackPermissions,
}),
@@ -578,7 +583,7 @@ export class SignalClient {
}),
this.sendRequest({
case: 'pingReq',
value: new Ping({
value: create(PingDesc, {
timestamp: protoInt64.parse(Date.now()),
rtt: protoInt64.parse(this.rtt),
}),
@@ -589,14 +594,14 @@ export class SignalClient {
sendUpdateLocalAudioTrack(trackSid: string, features: AudioTrackFeature[]) {
return this.sendRequest({
case: 'updateAudioTrack',
value: new UpdateLocalAudioTrack({ trackSid, features }),
value: create(UpdateLocalAudioTrackDesc, { trackSid, features }),
});
}
sendLeave() {
return this.sendRequest({
case: 'leave',
value: new LeaveRequest({
value: create(LeaveRequestDesc, {
reason: DisconnectReason.CLIENT_INITIATED,
// server doesn't process this field, keeping it here to indicate the intent of a full disconnect
action: LeaveRequest_Action.DISCONNECT,
@@ -628,13 +633,13 @@ export class SignalClient {
);
return;
}
const req = new SignalRequest({ message });
const req = create(SignalRequestDesc, { message });
try {
if (this.useJSON) {
this.ws.send(req.toJsonString());
this.ws.send(toJsonString(SignalRequestDesc, req));
} else {
this.ws.send(req.toBinary());
this.ws.send(toBinary(SignalRequestDesc, req));
}
} catch (e) {
this.log.error('error sending signal message', { ...this.logContext, error: e });
@@ -826,7 +831,7 @@ function fromProtoSessionDescription(sd: SessionDescription): RTCSessionDescript
export function toProtoSessionDescription(
rsd: RTCSessionDescription | RTCSessionDescriptionInit,
): SessionDescription {
const sd = new SessionDescription({
const sd = create(SessionDescriptionDesc, {
sdp: rsd.sdp!,
type: rsd.type!,
});

View File

@@ -4,7 +4,9 @@ import {
ClientConfiguration,
type ConnectionQualityUpdate,
DataChannelInfo,
DataChannelInfoDesc,
DataPacket,
DataPacketDesc,
DataPacket_Kind,
DisconnectReason,
type JoinResponse,
@@ -21,12 +23,16 @@ import {
type SubscriptionPermissionUpdate,
type SubscriptionResponse,
SyncState,
SyncStateDesc,
TrackInfo,
type TrackPublishedResponse,
TrackUnpublishedResponse,
Transcription,
UpdateSubscription,
UpdateSubscriptionDesc,
UserPacket,
create,
toBinary,
} from '@livekit/protocol';
import { EventEmitter } from 'events';
import type { MediaAttributes } from 'sdp-transform';
@@ -1074,7 +1080,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
/* @internal */
async sendDataPacket(packet: DataPacket, kind: DataPacket_Kind) {
const msg = packet.toBinary();
const msg = toBinary(DataPacketDesc, packet);
// make sure we do have a data connection
await this.ensurePublisherConnected(kind);
@@ -1291,7 +1297,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
});
this.client.sendSyncState(
new SyncState({
create(SyncStateDesc, {
answer: previousAnswer
? toProtoSessionDescription({
sdp: previousAnswer.sdp,
@@ -1304,7 +1310,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
type: previousOffer.type,
})
: undefined,
subscription: new UpdateSubscription({
subscription: create(UpdateSubscriptionDesc, {
trackSids,
subscribe: !autoSubscribe,
participantTracks: [],
@@ -1327,7 +1333,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
const getInfo = (dc: RTCDataChannel | undefined, target: SignalTarget) => {
if (dc?.id !== undefined && dc.id !== null) {
infos.push(
new DataChannelInfo({
create(DataChannelInfoDesc, {
label: dc.label,
id: dc.id,
target,

View File

@@ -3,25 +3,29 @@ import {
DataPacket_Kind,
DisconnectReason,
JoinResponse,
LeaveRequest,
LeaveRequestDesc,
LeaveRequest_Action,
ParticipantInfo,
ParticipantInfoDesc,
ParticipantInfo_State,
ParticipantPermission,
RoomDesc,
Room as RoomModel,
ServerInfo,
SimulateScenario,
SimulateScenarioDesc,
SpeakerInfo,
StreamStateUpdate,
SubscriptionError,
SubscriptionPermissionUpdate,
SubscriptionResponse,
TrackInfo,
TrackInfoDesc,
TrackSource,
TrackType,
Transcription as TranscriptionModel,
TranscriptionSegment as TranscriptionSegmentModel,
UserPacket,
create,
protoInt64,
} from '@livekit/protocol';
import { EventEmitter } from 'events';
@@ -781,7 +785,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
await this.engine.client.handleOnClose('simulate disconnect');
break;
case 'speaker':
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'speakerUpdate',
value: 3,
@@ -789,7 +793,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
break;
case 'node-failure':
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'nodeFailure',
value: true,
@@ -797,7 +801,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
break;
case 'server-leave':
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'serverLeave',
value: true,
@@ -805,7 +809,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
break;
case 'migration':
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'migration',
value: true,
@@ -822,7 +826,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
// @ts-expect-error function is private
await this.engine.client.handleOnClose('simulate resume-disconnect');
};
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'disconnectSignalOnResume',
value: true,
@@ -834,7 +838,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
// @ts-expect-error function is private
await this.engine.client.handleOnClose('simulate resume-disconnect');
};
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'disconnectSignalOnResumeNoMessages',
value: true,
@@ -848,7 +852,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
break;
case 'force-tcp':
case 'force-tls':
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'switchCandidateProtocol',
value: scenario === 'force-tls' ? 2 : 1,
@@ -858,7 +862,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
const onLeave = this.engine.client.onLeave;
if (onLeave) {
onLeave(
new LeaveRequest({
create(LeaveRequestDesc, {
reason: DisconnectReason.CLIENT_INITIATED,
action: LeaveRequest_Action.RECONNECT,
}),
@@ -870,7 +874,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
if (arg === undefined || typeof arg !== 'number') {
throw new Error('subscriber-bandwidth requires a number as argument');
}
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'subscriberBandwidth',
value: BigInt(arg),
@@ -878,7 +882,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
break;
case 'leave-full-reconnect':
req = new SimulateScenario({
req = create(SimulateScenarioDesc, {
scenario: {
case: 'leaveRequestFullReconnect',
value: true,
@@ -1899,7 +1903,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
...options.participants,
};
this.handleDisconnect();
this.roomInfo = new RoomModel({
this.roomInfo = create(RoomDesc, {
sid: 'RM_SIMULATED',
name: 'simulated-room',
emptyTimeout: 0,
@@ -1914,7 +1918,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
this.localParticipant.updateInfo(
new ParticipantInfo({
create(ParticipantInfoDesc, {
identity: 'simulated-local',
name: 'local-name',
}),
@@ -1926,7 +1930,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
if (publishOptions.video) {
const camPub = new LocalTrackPublication(
Track.Kind.Video,
new TrackInfo({
create(TrackInfoDesc, {
source: TrackSource.CAMERA,
sid: Math.floor(Math.random() * 10_000).toString(),
type: TrackType.AUDIO,
@@ -1956,7 +1960,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
if (publishOptions.audio) {
const audioPub = new LocalTrackPublication(
Track.Kind.Audio,
new TrackInfo({
create(TrackInfoDesc, {
source: TrackSource.MICROPHONE,
sid: Math.floor(Math.random() * 10_000).toString(),
type: TrackType.AUDIO,
@@ -1978,7 +1982,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
for (let i = 0; i < participantOptions.count - 1; i += 1) {
let info: ParticipantInfo = new ParticipantInfo({
let info: ParticipantInfo = create(ParticipantInfoDesc, {
sid: Math.floor(Math.random() * 10_000).toString(),
identity: `simulated-${i}`,
state: ParticipantInfo_State.ACTIVE,
@@ -1993,7 +1997,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
false,
true,
);
const videoTrack = new TrackInfo({
const videoTrack = create(TrackInfoDesc, {
source: TrackSource.CAMERA,
sid: Math.floor(Math.random() * 10_000).toString(),
type: TrackType.AUDIO,
@@ -2003,7 +2007,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
if (participantOptions.audio) {
const dummyTrack = getEmptyAudioStreamTrack();
const audioTrack = new TrackInfo({
const audioTrack = create(TrackInfoDesc, {
source: TrackSource.MICROPHONE,
sid: Math.floor(Math.random() * 10_000).toString(),
type: TrackType.AUDIO,

View File

@@ -2,7 +2,10 @@ import {
VideoQuality as ProtoVideoQuality,
SubscribedCodec,
SubscribedQuality,
SubscribedQualityDesc,
VideoLayer,
VideoLayerDesc,
create,
} from '@livekit/protocol';
import type { SignalClient } from '../../api/SignalClient';
import type { StructuredLogger } from '../../logger';
@@ -217,7 +220,7 @@ export default class LocalVideoTrack extends LocalTrack<Track.Kind.Video> {
const qualities: SubscribedQuality[] = [];
for (let q = VideoQuality.LOW; q <= VideoQuality.HIGH; q += 1) {
qualities.push(
new SubscribedQuality({
create(SubscribedQualityDesc, {
quality: q,
enabled: q <= maxQuality,
}),
@@ -556,7 +559,7 @@ export function videoLayersFromEncodings(
// default to a single layer, HQ
if (!encodings) {
return [
new VideoLayer({
create(VideoLayerDesc, {
quality: VideoQuality.HIGH,
width,
height,
@@ -576,7 +579,7 @@ export function videoLayersFromEncodings(
const bitratesRatio = sm.suffix == 'h' ? 2 : 3;
for (let i = 0; i < sm.spatial; i += 1) {
layers.push(
new VideoLayer({
create(VideoLayerDesc, {
quality: VideoQuality.HIGH - i,
width: Math.ceil(width / resRatio ** i),
height: Math.ceil(height / resRatio ** i),
@@ -593,7 +596,7 @@ export function videoLayersFromEncodings(
return encodings.map((encoding) => {
const scale = encoding.scaleResolutionDownBy ?? 1;
let quality = videoQualityForRid(encoding.rid ?? '');
return new VideoLayer({
return create(VideoLayerDesc, {
quality,
width: Math.ceil(width / scale),
height: Math.ceil(height / scale),

View File

@@ -1,4 +1,4 @@
import { TrackPublishedResponse } from '@livekit/protocol';
import { TrackPublishedResponse, TrackPublishedResponseDesc, create } from '@livekit/protocol';
import { cloneDeep } from '../../utils/cloneDeep';
import { isSafari, sleep } from '../utils';
import { Track } from './Track';
@@ -202,7 +202,7 @@ export function getTrackPublicationInfo<T extends TrackPublication>(
tracks.forEach((track: TrackPublication) => {
if (track.track !== undefined) {
infos.push(
new TrackPublishedResponse({
create(TrackPublishedResponseDesc, {
cid: track.track.mediaStreamID,
track: track.trackInfo,
}),

View File

@@ -1,4 +1,10 @@
import { ClientInfo, ClientInfo_SDK, Transcription as TranscriptionModel } from '@livekit/protocol';
import {
ClientInfo,
ClientInfoDesc,
ClientInfo_SDK,
Transcription as TranscriptionModel,
create,
} from '@livekit/protocol';
import { getBrowser } from '../utils/browserParser';
import { protocolVersion, version } from '../version';
import CriticalTimers from './timers';
@@ -271,7 +277,7 @@ export interface ObservableMediaElement extends HTMLMediaElement {
}
export function getClientInfo(): ClientInfo {
const info = new ClientInfo({
const info = create(ClientInfoDesc, {
sdk: ClientInfo_SDK.JS,
protocol: protocolVersion,
version,