This commit is contained in:
@@ -1,5 +0,0 @@
|
||||
---
|
||||
"livekit-client": patch
|
||||
---
|
||||
|
||||
Typesafe error propagation in signal connection path
|
||||
@@ -2,7 +2,6 @@
|
||||
import js from '@eslint/js';
|
||||
import { configs, plugins, rules } from 'eslint-config-airbnb-extended';
|
||||
import { rules as prettierConfigRules } from 'eslint-config-prettier';
|
||||
import neverthrowMustUse from 'eslint-plugin-neverthrow-must-use';
|
||||
import prettierPlugin from 'eslint-plugin-prettier';
|
||||
|
||||
const strictness = 'off';
|
||||
@@ -32,15 +31,6 @@ const typescriptConfig = [
|
||||
rules.typescript.typescriptEslintStrict,
|
||||
];
|
||||
|
||||
const neverthrowConfig = [
|
||||
{
|
||||
name: 'neverthrow-must-use',
|
||||
plugins: {
|
||||
'neverthrow-must-use': neverthrowMustUse,
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
const prettierConfig = [
|
||||
// Prettier Plugin
|
||||
{
|
||||
@@ -66,7 +56,6 @@ export default [
|
||||
...typescriptConfig,
|
||||
// Prettier Config
|
||||
...prettierConfig,
|
||||
...neverthrowConfig,
|
||||
{
|
||||
languageOptions: {
|
||||
parserOptions: {
|
||||
@@ -169,7 +158,6 @@ export default [
|
||||
'one-var': strictness,
|
||||
'no-multi-assign': strictness,
|
||||
'new-cap': strictness,
|
||||
'require-yield': strictness,
|
||||
|
||||
radix: strictness,
|
||||
eqeqeq: strictness,
|
||||
|
||||
@@ -60,7 +60,6 @@
|
||||
"events": "^3.3.0",
|
||||
"jose": "^6.1.0",
|
||||
"loglevel": "^1.9.2",
|
||||
"neverthrow": "^8.2.0",
|
||||
"sdp-transform": "^2.15.0",
|
||||
"ts-debounce": "^4.0.0",
|
||||
"tslib": "2.8.1",
|
||||
@@ -97,7 +96,6 @@
|
||||
"eslint-config-prettier": "10.1.8",
|
||||
"eslint-plugin-compat": "^6.0.2",
|
||||
"eslint-plugin-import-x": "^4.16.1",
|
||||
"eslint-plugin-neverthrow-must-use": "^0.1.2",
|
||||
"eslint-plugin-prettier": "^5.5.4",
|
||||
"gh-pages": "6.3.0",
|
||||
"happy-dom": "^17.2.0",
|
||||
|
||||
34
pnpm-lock.yaml
generated
34
pnpm-lock.yaml
generated
@@ -26,9 +26,6 @@ importers:
|
||||
loglevel:
|
||||
specifier: ^1.9.2
|
||||
version: 1.9.2
|
||||
neverthrow:
|
||||
specifier: ^8.2.0
|
||||
version: 8.2.0
|
||||
sdp-transform:
|
||||
specifier: ^2.15.0
|
||||
version: 2.15.0
|
||||
@@ -123,9 +120,6 @@ importers:
|
||||
eslint-plugin-import-x:
|
||||
specifier: ^4.16.1
|
||||
version: 4.16.1(@typescript-eslint/utils@8.47.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint-import-resolver-node@0.3.9)(eslint@9.39.1(jiti@2.4.2))
|
||||
eslint-plugin-neverthrow-must-use:
|
||||
specifier: ^0.1.2
|
||||
version: 0.1.2(@typescript-eslint/parser@7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint@9.39.1(jiti@2.4.2))
|
||||
eslint-plugin-prettier:
|
||||
specifier: ^5.5.4
|
||||
version: 5.5.4(@types/eslint@8.44.7)(eslint-config-prettier@10.1.8(eslint@9.39.1(jiti@2.4.2)))(eslint@9.39.1(jiti@2.4.2))(prettier@3.6.2)
|
||||
@@ -2276,13 +2270,6 @@ packages:
|
||||
peerDependencies:
|
||||
eslint: '>=8.23.0'
|
||||
|
||||
eslint-plugin-neverthrow-must-use@0.1.2:
|
||||
resolution: {integrity: sha512-Wt/u1wjnH8rWtbc8zqTK5yOcB79zVlCCWXi6ChJNer5ACkqldNQ6/+RKVUErACbv0Oex9aqaKYoTd0OqLe4o3Q==}
|
||||
engines: {node: '>=16'}
|
||||
peerDependencies:
|
||||
'@typescript-eslint/parser': ^8.0.0
|
||||
eslint: ^9.0.0
|
||||
|
||||
eslint-plugin-prettier@5.5.4:
|
||||
resolution: {integrity: sha512-swNtI95SToIz05YINMA6Ox5R057IMAmWZ26GqPxusAp1TZzj+IdY9tXNWWD3vkF/wEqydCONcwjTFpxybBqZsg==}
|
||||
engines: {node: ^14.18.0 || >=16.0.0}
|
||||
@@ -2995,10 +2982,6 @@ packages:
|
||||
neo-async@2.6.2:
|
||||
resolution: {integrity: sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==}
|
||||
|
||||
neverthrow@8.2.0:
|
||||
resolution: {integrity: sha512-kOCT/1MCPAxY5iUV3wytNFUMUolzuwd/VF/1KCx7kf6CutrOsTie+84zTGTpgQycjvfLdBBdvBvFLqFD2c0wkQ==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
node-fetch@2.7.0:
|
||||
resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==}
|
||||
engines: {node: 4.x || >=6.0.0}
|
||||
@@ -3641,8 +3624,8 @@ packages:
|
||||
engines: {node: '>=14.17'}
|
||||
hasBin: true
|
||||
|
||||
typescript@6.0.0-dev.20251121:
|
||||
resolution: {integrity: sha512-TrGhGS4hOAKgwizhMuH/3pbTNNBMCpxRA7ia8Lrv4HRMOAOzI5lWhP5uoKRDmmaF3pUVe90MBYjSieM498zUqQ==}
|
||||
typescript@6.0.0-dev.20251120:
|
||||
resolution: {integrity: sha512-dkvZw2/09r7JIltGCeubJXLYE7+NapbKj68BtGtm47TiwjyKxTDTG2nWZu8Gpopzi0ub9bNVn0rEgh5CgOlE4w==}
|
||||
engines: {node: '>=14.17'}
|
||||
hasBin: true
|
||||
|
||||
@@ -6040,7 +6023,7 @@ snapshots:
|
||||
dependencies:
|
||||
semver: 7.6.0
|
||||
shelljs: 0.8.5
|
||||
typescript: 6.0.0-dev.20251121
|
||||
typescript: 6.0.0-dev.20251120
|
||||
|
||||
dunder-proto@1.0.1:
|
||||
dependencies:
|
||||
@@ -6348,11 +6331,6 @@ snapshots:
|
||||
- typescript
|
||||
optional: true
|
||||
|
||||
eslint-plugin-neverthrow-must-use@0.1.2(@typescript-eslint/parser@7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3))(eslint@9.39.1(jiti@2.4.2)):
|
||||
dependencies:
|
||||
'@typescript-eslint/parser': 7.18.0(eslint@9.39.1(jiti@2.4.2))(typescript@5.8.3)
|
||||
eslint: 9.39.1(jiti@2.4.2)
|
||||
|
||||
eslint-plugin-prettier@5.5.4(@types/eslint@8.44.7)(eslint-config-prettier@10.1.8(eslint@9.39.1(jiti@2.4.2)))(eslint@9.39.1(jiti@2.4.2))(prettier@3.6.2):
|
||||
dependencies:
|
||||
eslint: 9.39.1(jiti@2.4.2)
|
||||
@@ -7110,10 +7088,6 @@ snapshots:
|
||||
|
||||
neo-async@2.6.2: {}
|
||||
|
||||
neverthrow@8.2.0:
|
||||
optionalDependencies:
|
||||
'@rollup/rollup-linux-x64-gnu': 4.53.2
|
||||
|
||||
node-fetch@2.7.0:
|
||||
dependencies:
|
||||
whatwg-url: 5.0.0
|
||||
@@ -7815,7 +7789,7 @@ snapshots:
|
||||
|
||||
typescript@5.8.3: {}
|
||||
|
||||
typescript@6.0.0-dev.20251121: {}
|
||||
typescript@6.0.0-dev.20251120: {}
|
||||
|
||||
uc.micro@2.1.0: {}
|
||||
|
||||
|
||||
@@ -5,12 +5,11 @@ import {
|
||||
SignalRequest,
|
||||
SignalResponse,
|
||||
} from '@livekit/protocol';
|
||||
import { Result, ResultAsync } from 'neverthrow';
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { ConnectionError, ConnectionErrorReason } from '../room/errors';
|
||||
import { SignalClient, SignalConnectionState, type ValidationType } from './SignalClient';
|
||||
import { SignalClient, SignalConnectionState } from './SignalClient';
|
||||
import type { WebSocketCloseInfo, WebSocketConnection } from './WebSocketStream';
|
||||
import { WebSocketError, WebSocketStream } from './WebSocketStream';
|
||||
import { WebSocketStream } from './WebSocketStream';
|
||||
|
||||
// Mock the WebSocketStream
|
||||
vi.mock('./WebSocketStream');
|
||||
@@ -58,27 +57,16 @@ function createMockConnection(readable: ReadableStream<ArrayBuffer>): WebSocketC
|
||||
|
||||
interface MockWebSocketStreamOptions {
|
||||
connection?: WebSocketConnection;
|
||||
opened?: ResultAsync<WebSocketConnection<ArrayBuffer>, WebSocketError>;
|
||||
closed?: ResultAsync<WebSocketCloseInfo, WebSocketError>;
|
||||
opened?: Promise<WebSocketConnection>;
|
||||
closed?: Promise<WebSocketCloseInfo>;
|
||||
readyState?: number;
|
||||
}
|
||||
|
||||
function mockWebSocketStream(options: MockWebSocketStreamOptions = {}) {
|
||||
const {
|
||||
connection,
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
opened = connection
|
||||
? ResultAsync.fromPromise(Promise.resolve(connection), (error) => ({
|
||||
type: 'connection' as const,
|
||||
error: error as Event,
|
||||
}))
|
||||
: // eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
ResultAsync.fromPromise(new Promise(() => {}), (error) => ({
|
||||
type: 'connection' as const,
|
||||
error: error as Event,
|
||||
})),
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
closed = ResultAsync.fromPromise(new Promise(() => {}), (error) => error as WebSocketError),
|
||||
opened = connection ? Promise.resolve(connection) : new Promise(() => {}),
|
||||
closed = new Promise(() => {}),
|
||||
readyState = 1,
|
||||
} = options;
|
||||
|
||||
@@ -121,7 +109,7 @@ describe('SignalClient.connect', () => {
|
||||
|
||||
const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions);
|
||||
|
||||
expect(result._unsafeUnwrap()).toEqual(joinResponse);
|
||||
expect(result).toEqual(joinResponse);
|
||||
expect(signalClient.currentState).toBe(SignalConnectionState.CONNECTED);
|
||||
});
|
||||
});
|
||||
@@ -150,7 +138,7 @@ describe('SignalClient.connect', () => {
|
||||
|
||||
const result = await signalClient.reconnect('wss://test.livekit.io', 'test-token', 'sid-123');
|
||||
|
||||
expect(result._unsafeUnwrap()).toEqual(reconnectResponse);
|
||||
expect(result).toEqual(reconnectResponse);
|
||||
expect(signalClient.currentState).toBe(SignalConnectionState.CONNECTED);
|
||||
});
|
||||
|
||||
@@ -175,7 +163,7 @@ describe('SignalClient.connect', () => {
|
||||
const result = await signalClient.reconnect('wss://test.livekit.io', 'test-token', 'sid-123');
|
||||
|
||||
// This is an edge case: reconnect resolves with undefined when non-reconnect message is received
|
||||
expect(result._unsafeUnwrap()).toBeUndefined();
|
||||
expect(result).toBeUndefined();
|
||||
expect(signalClient.currentState).toBe(SignalConnectionState.CONNECTED);
|
||||
}, 1000);
|
||||
});
|
||||
@@ -189,14 +177,9 @@ describe('SignalClient.connect', () => {
|
||||
websocketTimeout: 100,
|
||||
};
|
||||
|
||||
const result = await signalClient.join(
|
||||
'wss://test.livekit.io',
|
||||
'test-token',
|
||||
shortTimeoutOptions,
|
||||
);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
expect(result._unsafeUnwrapErr()).toBeInstanceOf(ConnectionError);
|
||||
await expect(
|
||||
signalClient.join('wss://test.livekit.io', 'test-token', shortTimeoutOptions),
|
||||
).rejects.toThrow(ConnectionError);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -208,35 +191,23 @@ describe('SignalClient.connect', () => {
|
||||
// Simulate abort
|
||||
setTimeout(() => abortController.abort(new Error('User aborted connection')), 50);
|
||||
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
const opened = ResultAsync.fromPromise(new Promise(() => {}), (error) => ({
|
||||
type: 'connection' as const,
|
||||
error: error as Event,
|
||||
}));
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
const closed = ResultAsync.fromPromise(
|
||||
new Promise(() => {}),
|
||||
(error) => error as WebSocketError,
|
||||
);
|
||||
|
||||
return {
|
||||
url: 'wss://test.livekit.io',
|
||||
opened,
|
||||
closed,
|
||||
opened: new Promise(() => {}), // Never resolves
|
||||
closed: new Promise(() => {}),
|
||||
close: vi.fn(),
|
||||
readyState: 0,
|
||||
} as any;
|
||||
});
|
||||
|
||||
const result = await signalClient.join(
|
||||
'wss://test.livekit.io',
|
||||
'test-token',
|
||||
defaultOptions,
|
||||
abortController.signal,
|
||||
);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
expect(result._unsafeUnwrapErr().message).toBe('AbortSignal invoked');
|
||||
await expect(
|
||||
signalClient.join(
|
||||
'wss://test.livekit.io',
|
||||
'test-token',
|
||||
defaultOptions,
|
||||
abortController.signal,
|
||||
),
|
||||
).rejects.toThrow('User aborted connection');
|
||||
});
|
||||
|
||||
it('should send leave request before closing when AbortSignal is triggered during connection', async () => {
|
||||
@@ -278,21 +249,10 @@ describe('SignalClient.connect', () => {
|
||||
};
|
||||
|
||||
vi.mocked(WebSocketStream).mockImplementation(() => {
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
const opened = ResultAsync.fromPromise(Promise.resolve(mockConnection), (error) => ({
|
||||
type: 'connection' as const,
|
||||
error: error as Event,
|
||||
}));
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
const closed = ResultAsync.fromPromise(
|
||||
new Promise(() => {}),
|
||||
(error) => error as WebSocketError,
|
||||
);
|
||||
|
||||
return {
|
||||
url: 'wss://test.livekit.io',
|
||||
opened,
|
||||
closed,
|
||||
opened: Promise.resolve(mockConnection),
|
||||
closed: new Promise(() => {}),
|
||||
close: vi.fn(),
|
||||
readyState: 1,
|
||||
} as any;
|
||||
@@ -310,12 +270,10 @@ describe('SignalClient.connect', () => {
|
||||
await streamWriterReadyPromise;
|
||||
|
||||
// Now abort the connection (after WS opens, before join response)
|
||||
abortController.abort();
|
||||
abortController.abort(new Error('User aborted connection'));
|
||||
|
||||
// joinPromise should return Err result
|
||||
const result = await joinPromise;
|
||||
expect(result.isErr()).toBe(true);
|
||||
expect(result._unsafeUnwrapErr().message).toBe('AbortSignal invoked');
|
||||
// joinPromise should reject
|
||||
await expect(joinPromise).rejects.toThrow('User aborted connection');
|
||||
|
||||
// Verify that a leave request was sent before closing
|
||||
const leaveRequestSent = writtenMessages.some((data) => {
|
||||
@@ -338,13 +296,8 @@ describe('SignalClient.connect', () => {
|
||||
|
||||
describe('Failure Case - WebSocket Connection Errors', () => {
|
||||
it('should reject with NotAllowed error for 4xx HTTP status', async () => {
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
const opened = ResultAsync.fromPromise(
|
||||
Promise.reject(ConnectionError.websocket('Connection failed')),
|
||||
(error) => error as WebSocketError,
|
||||
);
|
||||
mockWebSocketStream({
|
||||
opened,
|
||||
opened: Promise.reject(new Error('Connection failed')),
|
||||
readyState: 3,
|
||||
});
|
||||
|
||||
@@ -354,85 +307,54 @@ describe('SignalClient.connect', () => {
|
||||
text: async () => 'Forbidden',
|
||||
});
|
||||
|
||||
const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const error = result._unsafeUnwrapErr();
|
||||
expect(error).toBeInstanceOf(ConnectionError);
|
||||
expect(error.message).toBe('Forbidden');
|
||||
expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.NotAllowed);
|
||||
expect((error as ConnectionError).status).toBe(403);
|
||||
await expect(
|
||||
signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions),
|
||||
).rejects.toMatchObject({
|
||||
message: 'Forbidden',
|
||||
reason: ConnectionErrorReason.NotAllowed,
|
||||
status: 403,
|
||||
});
|
||||
});
|
||||
|
||||
it('should reject with ServerUnreachable when fetch fails', async () => {
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
const opened = ResultAsync.fromPromise(
|
||||
Promise.reject(ConnectionError.websocket('Connection failed')),
|
||||
(error) => error as WebSocketError,
|
||||
);
|
||||
mockWebSocketStream({
|
||||
opened,
|
||||
opened: Promise.reject(new Error('Connection failed')),
|
||||
readyState: 3,
|
||||
});
|
||||
|
||||
// Mock fetch to throw (network error)
|
||||
(global.fetch as any).mockRejectedValueOnce(new Error('Network error'));
|
||||
|
||||
const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const error = result._unsafeUnwrapErr();
|
||||
expect(error).toBeInstanceOf(ConnectionError);
|
||||
expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.ServerUnreachable);
|
||||
await expect(
|
||||
signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions),
|
||||
).rejects.toMatchObject({
|
||||
reason: ConnectionErrorReason.ServerUnreachable,
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle WebsocketError from WebSocket rejection as unreachable if server is not reachable', async () => {
|
||||
const customError = ConnectionError.websocket('Custom error');
|
||||
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
const opened = ResultAsync.fromPromise(
|
||||
Promise.reject(customError),
|
||||
(error) => error as WebSocketError,
|
||||
it('should handle ConnectionError from WebSocket rejection', async () => {
|
||||
const customError = new ConnectionError(
|
||||
'Custom error',
|
||||
ConnectionErrorReason.InternalError,
|
||||
500,
|
||||
);
|
||||
|
||||
mockWebSocketStream({
|
||||
opened,
|
||||
opened: Promise.reject(customError),
|
||||
readyState: 3,
|
||||
});
|
||||
|
||||
(global.fetch as any).mockRejectedValueOnce(new Error('Network error'));
|
||||
|
||||
const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const error = result._unsafeUnwrapErr();
|
||||
expect(error).toBeInstanceOf(ConnectionError);
|
||||
expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.ServerUnreachable);
|
||||
});
|
||||
it('should handle WebsocketError from WebSocket rejection as websocket error if server is reachable', async () => {
|
||||
const customError = ConnectionError.websocket('Custom error');
|
||||
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
const opened = ResultAsync.fromPromise(
|
||||
Promise.reject(customError),
|
||||
(error) => error as WebSocketError,
|
||||
);
|
||||
mockWebSocketStream({
|
||||
opened,
|
||||
readyState: 3,
|
||||
});
|
||||
|
||||
// Mock fetch to return 200
|
||||
// Mock fetch to return 500
|
||||
(global.fetch as any).mockResolvedValueOnce({
|
||||
status: 200,
|
||||
text: async () => 'testplaceholder',
|
||||
status: 500,
|
||||
text: async () => 'Internal Server Error',
|
||||
});
|
||||
|
||||
const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const error = result._unsafeUnwrapErr();
|
||||
expect(error).toBeInstanceOf(ConnectionError);
|
||||
expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.WebSocket);
|
||||
await expect(
|
||||
signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions),
|
||||
).rejects.toMatchObject({
|
||||
reason: ConnectionErrorReason.InternalError,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -448,13 +370,12 @@ describe('SignalClient.connect', () => {
|
||||
|
||||
mockWebSocketStream({ connection: mockConnection });
|
||||
|
||||
const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const error = result._unsafeUnwrapErr();
|
||||
expect(error).toBeInstanceOf(ConnectionError);
|
||||
expect(error.message).toBe('no message received as first message');
|
||||
expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.InternalError);
|
||||
await expect(
|
||||
signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions),
|
||||
).rejects.toMatchObject({
|
||||
message: 'no message received as first message',
|
||||
reason: ConnectionErrorReason.InternalError,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -469,14 +390,16 @@ describe('SignalClient.connect', () => {
|
||||
|
||||
mockWebSocketStream({ connection: mockConnection });
|
||||
|
||||
const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const error = result._unsafeUnwrapErr();
|
||||
expect(error).toBeInstanceOf(ConnectionError);
|
||||
expect(error.message).toBe('Received leave request while trying to (re)connect');
|
||||
expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.LeaveRequest);
|
||||
expect((error as ConnectionError).context).toBe(1);
|
||||
await expect(
|
||||
signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions),
|
||||
).rejects.toMatchObject(
|
||||
new ConnectionError(
|
||||
'Received leave request while trying to (re)connect',
|
||||
ConnectionErrorReason.LeaveRequest,
|
||||
undefined,
|
||||
1,
|
||||
),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -492,41 +415,43 @@ describe('SignalClient.connect', () => {
|
||||
|
||||
mockWebSocketStream({ connection: mockConnection });
|
||||
|
||||
const result = await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const error = result._unsafeUnwrapErr();
|
||||
expect(error).toBeInstanceOf(ConnectionError);
|
||||
expect(error.message).toBe('did not receive join response, got reconnect instead');
|
||||
expect((error as ConnectionError).reason).toBe(ConnectionErrorReason.InternalError);
|
||||
await expect(
|
||||
signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions),
|
||||
).rejects.toMatchObject({
|
||||
message: 'did not receive join response, got reconnect instead',
|
||||
reason: ConnectionErrorReason.InternalError,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Failure Case - WebSocket Closed During Connection', () => {
|
||||
it('should reject when WebSocket closes during connection attempt', async () => {
|
||||
mockWebSocketStream({ readyState: 3 }); // CLOSED
|
||||
|
||||
const shortTimeoutOptions = {
|
||||
...defaultOptions,
|
||||
websocketTimeout: 100,
|
||||
};
|
||||
|
||||
// Mock fetch to return 200
|
||||
(global.fetch as any).mockResolvedValueOnce({
|
||||
status: 200,
|
||||
text: async () => 'testplaceholder',
|
||||
let closedResolve: (value: WebSocketCloseInfo) => void;
|
||||
const closedPromise = new Promise<WebSocketCloseInfo>((resolve) => {
|
||||
closedResolve = resolve;
|
||||
});
|
||||
|
||||
const result = await signalClient.join(
|
||||
'wss://test.livekit.io',
|
||||
'test-token',
|
||||
shortTimeoutOptions,
|
||||
);
|
||||
vi.mocked(WebSocketStream).mockImplementation(() => {
|
||||
// Simulate close during connection
|
||||
queueMicrotask(() => {
|
||||
closedResolve({ closeCode: 1006, reason: 'Connection lost' });
|
||||
});
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const error = result._unsafeUnwrapErr();
|
||||
expect(error).toBeInstanceOf(ConnectionError);
|
||||
expect(error.reason).toBe(ConnectionErrorReason.WebSocket);
|
||||
return {
|
||||
url: 'wss://test.livekit.io',
|
||||
opened: new Promise(() => {}), // Never resolves
|
||||
closed: closedPromise,
|
||||
close: vi.fn(),
|
||||
readyState: 2, // CLOSING
|
||||
} as any;
|
||||
});
|
||||
|
||||
await expect(
|
||||
signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions),
|
||||
).rejects.toMatchObject({
|
||||
message: 'Websocket got closed during a (re)connection attempt: Connection lost',
|
||||
reason: ConnectionErrorReason.InternalError,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -662,13 +587,11 @@ describe('SignalClient.validateFirstMessage', () => {
|
||||
const joinResponse = createJoinResponse();
|
||||
const signalResponse = createSignalResponse('join', joinResponse);
|
||||
|
||||
const validateMethod = (signalClient as any).validateFirstMessage as (
|
||||
msg: any,
|
||||
isReconnect: boolean,
|
||||
) => Result<ValidationType, ConnectionError>;
|
||||
const validateMethod = (signalClient as any).validateFirstMessage;
|
||||
if (validateMethod) {
|
||||
const result = validateMethod.call(signalClient, signalResponse, false);
|
||||
expect(result._unsafeUnwrap().response).toEqual(joinResponse);
|
||||
expect(result.isValid).toBe(true);
|
||||
expect(result.response).toEqual(joinResponse);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -688,13 +611,11 @@ describe('SignalClient.validateFirstMessage', () => {
|
||||
const reconnectResponse = new ReconnectResponse({ iceServers: [] });
|
||||
const signalResponse = createSignalResponse('reconnect', reconnectResponse);
|
||||
|
||||
const validateMethod = (signalClient as any).validateFirstMessage as (
|
||||
msg: any,
|
||||
isReconnect: boolean,
|
||||
) => Result<ValidationType, ConnectionError>;
|
||||
const validateMethod = (signalClient as any).validateFirstMessage;
|
||||
if (validateMethod) {
|
||||
const result = validateMethod.call(signalClient, signalResponse, true);
|
||||
expect(result._unsafeUnwrap().response).toEqual(reconnectResponse);
|
||||
expect(result.isValid).toBe(true);
|
||||
expect(result.response).toEqual(reconnectResponse);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -713,14 +634,12 @@ describe('SignalClient.validateFirstMessage', () => {
|
||||
|
||||
const updateSignalResponse = createSignalResponse('update', { participants: [] });
|
||||
|
||||
const validateMethod = (signalClient as any).validateFirstMessage as (
|
||||
msg: any,
|
||||
isReconnect: boolean,
|
||||
) => Result<ValidationType, ConnectionError>;
|
||||
const validateMethod = (signalClient as any).validateFirstMessage;
|
||||
if (validateMethod) {
|
||||
const result = validateMethod.call(signalClient, updateSignalResponse, true);
|
||||
expect(result._unsafeUnwrap().response).toBeUndefined();
|
||||
expect(result._unsafeUnwrap().shouldProcessFirstMessage).toBe(true);
|
||||
expect(result.isValid).toBe(true);
|
||||
expect(result.response).toBeUndefined();
|
||||
expect(result.shouldProcessFirstMessage).toBe(true);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -731,14 +650,12 @@ describe('SignalClient.validateFirstMessage', () => {
|
||||
const leaveRequest = new LeaveRequest({ reason: 1 });
|
||||
const signalResponse = createSignalResponse('leave', leaveRequest);
|
||||
|
||||
const validateMethod = (signalClient as any).validateFirstMessage as (
|
||||
msg: any,
|
||||
isReconnect: boolean,
|
||||
) => Result<ValidationType, ConnectionError>;
|
||||
const validateMethod = (signalClient as any).validateFirstMessage;
|
||||
if (validateMethod) {
|
||||
const result = validateMethod.call(signalClient, signalResponse, false);
|
||||
expect(result._unsafeUnwrapErr()).toBeInstanceOf(ConnectionError);
|
||||
expect(result._unsafeUnwrapErr().reason).toBe(ConnectionErrorReason.LeaveRequest);
|
||||
expect(result.isValid).toBe(false);
|
||||
expect(result.error).toBeInstanceOf(ConnectionError);
|
||||
expect(result.error?.reason).toBe(ConnectionErrorReason.LeaveRequest);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -746,14 +663,12 @@ describe('SignalClient.validateFirstMessage', () => {
|
||||
const reconnectResponse = new ReconnectResponse({ iceServers: [] });
|
||||
const signalResponse = createSignalResponse('reconnect', reconnectResponse);
|
||||
|
||||
const validateMethod = (signalClient as any).validateFirstMessage as (
|
||||
msg: any,
|
||||
isReconnect: boolean,
|
||||
) => Result<ValidationType, ConnectionError>;
|
||||
const validateMethod = (signalClient as any).validateFirstMessage;
|
||||
if (validateMethod) {
|
||||
const result = validateMethod.call(signalClient, signalResponse, false);
|
||||
expect(result._unsafeUnwrapErr()).toBeInstanceOf(ConnectionError);
|
||||
expect(result._unsafeUnwrapErr().reason).toBe(ConnectionErrorReason.InternalError);
|
||||
expect(result.isValid).toBe(false);
|
||||
expect(result.error).toBeInstanceOf(ConnectionError);
|
||||
expect(result.error?.reason).toBe(ConnectionErrorReason.InternalError);
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -777,17 +692,19 @@ describe('SignalClient.handleConnectionError', () => {
|
||||
const error = new Error('Connection failed');
|
||||
const result = await handleMethod.call(signalClient, error, 'wss://test.livekit.io/validate');
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const err = result._unsafeUnwrapErr();
|
||||
expect(err).toBeInstanceOf(ConnectionError);
|
||||
expect(err.reason).toBe(ConnectionErrorReason.NotAllowed);
|
||||
expect(err.status).toBe(403);
|
||||
expect(err.message).toBe('Forbidden');
|
||||
expect(result).toBeInstanceOf(ConnectionError);
|
||||
expect(result.reason).toBe(ConnectionErrorReason.NotAllowed);
|
||||
expect(result.status).toBe(403);
|
||||
expect(result.message).toBe('Forbidden');
|
||||
}
|
||||
});
|
||||
|
||||
it('should return ConnectionError as-is if it is already a ConnectionError', async () => {
|
||||
const connectionError = ConnectionError.internal('Custom error');
|
||||
const connectionError = new ConnectionError(
|
||||
'Custom error',
|
||||
ConnectionErrorReason.InternalError,
|
||||
500,
|
||||
);
|
||||
|
||||
(global.fetch as any).mockResolvedValueOnce({
|
||||
status: 500,
|
||||
@@ -802,10 +719,8 @@ describe('SignalClient.handleConnectionError', () => {
|
||||
'wss://test.livekit.io/validate',
|
||||
);
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const err = result._unsafeUnwrapErr();
|
||||
expect(err).toBe(connectionError);
|
||||
expect(err.reason).toBe(ConnectionErrorReason.InternalError);
|
||||
expect(result).toBe(connectionError);
|
||||
expect(result.reason).toBe(ConnectionErrorReason.InternalError);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -820,11 +735,9 @@ describe('SignalClient.handleConnectionError', () => {
|
||||
const error = new Error('Connection failed');
|
||||
const result = await handleMethod.call(signalClient, error, 'wss://test.livekit.io/validate');
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const err = result._unsafeUnwrapErr();
|
||||
expect(err).toBeInstanceOf(ConnectionError);
|
||||
expect(err.reason).toBe(ConnectionErrorReason.InternalError);
|
||||
expect(err.status).toBe(500);
|
||||
expect(result).toBeInstanceOf(ConnectionError);
|
||||
expect(result.reason).toBe(ConnectionErrorReason.InternalError);
|
||||
expect(result.status).toBe(500);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -836,15 +749,13 @@ describe('SignalClient.handleConnectionError', () => {
|
||||
const error = new Error('Connection failed');
|
||||
const result = await handleMethod.call(signalClient, error, 'wss://test.livekit.io/validate');
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
const err = result._unsafeUnwrapErr();
|
||||
expect(err).toBeInstanceOf(ConnectionError);
|
||||
expect(err.reason).toBe(ConnectionErrorReason.ServerUnreachable);
|
||||
expect(result).toBeInstanceOf(ConnectionError);
|
||||
expect(result.reason).toBe(ConnectionErrorReason.ServerUnreachable);
|
||||
}
|
||||
});
|
||||
|
||||
it('should handle fetch throwing ConnectionError', async () => {
|
||||
const fetchError = ConnectionError.serverUnreachable('Fetch failed');
|
||||
const fetchError = new ConnectionError('Fetch failed', ConnectionErrorReason.ServerUnreachable);
|
||||
(global.fetch as any).mockRejectedValueOnce(fetchError);
|
||||
|
||||
const handleMethod = (signalClient as any).handleConnectionError;
|
||||
@@ -852,8 +763,7 @@ describe('SignalClient.handleConnectionError', () => {
|
||||
const error = new Error('Connection failed');
|
||||
const result = await handleMethod.call(signalClient, error, 'wss://test.livekit.io/validate');
|
||||
|
||||
expect(result.isErr()).toBe(true);
|
||||
expect(result._unsafeUnwrapErr()).toBe(fetchError);
|
||||
expect(result).toBe(fetchError);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -44,7 +44,6 @@ import {
|
||||
WrappedJoinRequest,
|
||||
protoInt64,
|
||||
} from '@livekit/protocol';
|
||||
import { Result, ResultAsync, err, errAsync, ok, okAsync, safeTry } from 'neverthrow';
|
||||
import log, { LoggerNames, getLogger } from '../logger';
|
||||
import { ConnectionError, ConnectionErrorReason } from '../room/errors';
|
||||
import CriticalTimers from '../room/timers';
|
||||
@@ -55,15 +54,14 @@ import { type WebSocketConnection, WebSocketStream } from './WebSocketStream';
|
||||
import {
|
||||
createRtcUrl,
|
||||
createValidateUrl,
|
||||
getAbortReasonAsString,
|
||||
parseSignalResponse,
|
||||
raceResults,
|
||||
withAbort,
|
||||
withMutex,
|
||||
withTimeout,
|
||||
} from './utils';
|
||||
|
||||
// internal options
|
||||
interface ConnectOpts extends SignalOptions {
|
||||
/** internal */
|
||||
reconnect?: boolean;
|
||||
/** internal */
|
||||
reconnectReason?: number;
|
||||
/** internal */
|
||||
@@ -87,6 +85,7 @@ type SignalKind = NonNullable<SignalMessage>['case'];
|
||||
const passThroughQueueSignals: Array<SignalKind> = [
|
||||
'syncState',
|
||||
'trickle',
|
||||
'offer',
|
||||
'answer',
|
||||
'simulate',
|
||||
'leave',
|
||||
@@ -242,174 +241,229 @@ export class SignalClient {
|
||||
return this.loggerContextCb?.() ?? {};
|
||||
}
|
||||
|
||||
async join(url: string, token: string, opts: SignalOptions, abortSignal?: AbortSignal) {
|
||||
async join(
|
||||
url: string,
|
||||
token: string,
|
||||
opts: SignalOptions,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<JoinResponse> {
|
||||
// during a full reconnect, we'd want to start the sequence even if currently
|
||||
// connected
|
||||
this.state = SignalConnectionState.CONNECTING;
|
||||
this.options = opts;
|
||||
return this.connect(url, token, false, opts, abortSignal);
|
||||
const res = await this.connect(url, token, opts, abortSignal);
|
||||
return res as JoinResponse;
|
||||
}
|
||||
|
||||
reconnect(url: string, token: string, sid?: string, reason?: ReconnectReason) {
|
||||
async reconnect(
|
||||
url: string,
|
||||
token: string,
|
||||
sid?: string,
|
||||
reason?: ReconnectReason,
|
||||
): Promise<ReconnectResponse | undefined> {
|
||||
if (!this.options) {
|
||||
return errAsync(
|
||||
ConnectionError.internal('attempted to reconnect without signal options being set'),
|
||||
this.log.warn(
|
||||
'attempted to reconnect without signal options being set, ignoring',
|
||||
this.logContext,
|
||||
);
|
||||
return;
|
||||
}
|
||||
this.state = SignalConnectionState.RECONNECTING;
|
||||
// clear ping interval and restart it once reconnected
|
||||
this.clearPingInterval();
|
||||
|
||||
return this.connect(url, token, true, {
|
||||
const res = (await this.connect(url, token, {
|
||||
...this.options,
|
||||
reconnect: true,
|
||||
sid,
|
||||
reconnectReason: reason,
|
||||
});
|
||||
})) as ReconnectResponse | undefined;
|
||||
return res;
|
||||
}
|
||||
|
||||
private connect<
|
||||
T extends boolean,
|
||||
U extends T extends false ? JoinResponse : ReconnectResponse | undefined,
|
||||
>(url: string, token: string, isReconnect: T, opts: ConnectOpts, abortSignal?: AbortSignal) {
|
||||
const self = this;
|
||||
private async connect(
|
||||
url: string,
|
||||
token: string,
|
||||
opts: ConnectOpts,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<JoinResponse | ReconnectResponse | undefined> {
|
||||
const unlock = await this.connectionLock.lock();
|
||||
|
||||
return withMutex(
|
||||
safeTry<U, ConnectionError>(async function* () {
|
||||
self.connectOptions = opts;
|
||||
this.connectOptions = opts;
|
||||
const clientInfo = getClientInfo();
|
||||
const params = opts.singlePeerConnection
|
||||
? createJoinRequestConnectionParams(token, clientInfo, opts)
|
||||
: createConnectionParams(token, clientInfo, opts);
|
||||
const rtcUrl = createRtcUrl(url, params);
|
||||
const validateUrl = createValidateUrl(rtcUrl);
|
||||
|
||||
const clientInfo = getClientInfo();
|
||||
const params = opts.singlePeerConnection
|
||||
? createJoinRequestConnectionParams(token, clientInfo, opts, isReconnect)
|
||||
: createConnectionParams(token, clientInfo, opts, isReconnect);
|
||||
const rtcUrl = createRtcUrl(url, params);
|
||||
const validateUrl = createValidateUrl(rtcUrl);
|
||||
return new Promise<JoinResponse | ReconnectResponse | undefined>(async (resolve, reject) => {
|
||||
try {
|
||||
let alreadyAborted = false;
|
||||
const abortHandler = async (eventOrError: Event | Error) => {
|
||||
if (alreadyAborted) {
|
||||
return;
|
||||
}
|
||||
alreadyAborted = true;
|
||||
const target = eventOrError instanceof Event ? eventOrError.currentTarget : eventOrError;
|
||||
const reason = getAbortReasonAsString(target, 'Abort handler called');
|
||||
// send leave if we have an active stream writer (connection is open)
|
||||
if (this.streamWriter && !this.isDisconnected) {
|
||||
this.sendLeave()
|
||||
.then(() => this.close(reason))
|
||||
.catch((e) => {
|
||||
this.log.error(e);
|
||||
this.close();
|
||||
});
|
||||
} else {
|
||||
this.close();
|
||||
}
|
||||
cleanupAbortHandlers();
|
||||
reject(target instanceof AbortSignal ? target.reason : target);
|
||||
};
|
||||
|
||||
abortSignal?.addEventListener('abort', abortHandler);
|
||||
|
||||
const cleanupAbortHandlers = () => {
|
||||
clearTimeout(wsTimeout);
|
||||
abortSignal?.removeEventListener('abort', abortHandler);
|
||||
};
|
||||
|
||||
const wsTimeout = setTimeout(() => {
|
||||
abortHandler(
|
||||
new ConnectionError(
|
||||
'room connection has timed out (signal)',
|
||||
ConnectionErrorReason.ServerUnreachable,
|
||||
),
|
||||
);
|
||||
}, opts.websocketTimeout);
|
||||
|
||||
const handleSignalConnected = (
|
||||
connection: WebSocketConnection,
|
||||
firstMessage?: SignalResponse,
|
||||
) => {
|
||||
this.handleSignalConnected(connection, wsTimeout, firstMessage);
|
||||
};
|
||||
|
||||
const redactedUrl = new URL(rtcUrl);
|
||||
if (redactedUrl.searchParams.has('access_token')) {
|
||||
redactedUrl.searchParams.set('access_token', '<redacted>');
|
||||
}
|
||||
self.log.debug(`connecting to ${redactedUrl}`, {
|
||||
reconnect: isReconnect,
|
||||
this.log.debug(`connecting to ${redactedUrl}`, {
|
||||
reconnect: opts.reconnect,
|
||||
reconnectReason: opts.reconnectReason,
|
||||
...self.logContext,
|
||||
...this.logContext,
|
||||
});
|
||||
|
||||
if (self.ws) {
|
||||
await self.close(
|
||||
false,
|
||||
opts?.reconnectReason ? ReconnectReason[opts.reconnectReason] : undefined,
|
||||
);
|
||||
if (this.ws) {
|
||||
await this.close(false);
|
||||
}
|
||||
this.ws = new WebSocketStream<ArrayBuffer>(rtcUrl);
|
||||
|
||||
const ws = new WebSocketStream<ArrayBuffer>(rtcUrl);
|
||||
self.ws = ws;
|
||||
|
||||
const wsConnectionResult = withTimeout(ws.opened, opts.websocketTimeout).mapErr(
|
||||
async (error) => {
|
||||
// retrieve info about what error was causing the connection failure and enhance the returned error
|
||||
if (self.state !== SignalConnectionState.CONNECTED) {
|
||||
self.state = SignalConnectionState.DISCONNECTED;
|
||||
const connectionError = await withAbort(
|
||||
withTimeout(self.fetchErrorInfo(error.message, validateUrl), 3_000),
|
||||
abortSignal,
|
||||
);
|
||||
|
||||
const closeReason = `${error.reason}: ${error.message}`;
|
||||
|
||||
self.close(undefined, closeReason);
|
||||
if (connectionError.isErr()) {
|
||||
return connectionError.error;
|
||||
try {
|
||||
this.ws.closed
|
||||
.then((closeInfo) => {
|
||||
if (this.isEstablishingConnection) {
|
||||
reject(
|
||||
new ConnectionError(
|
||||
`Websocket got closed during a (re)connection attempt: ${closeInfo.reason}`,
|
||||
ConnectionErrorReason.InternalError,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
return error;
|
||||
},
|
||||
);
|
||||
|
||||
const wsConnection = yield* withAbort(
|
||||
wsConnectionResult.andTee((connection) => {
|
||||
self.streamWriter = connection.writable.getWriter();
|
||||
}),
|
||||
abortSignal,
|
||||
).orTee((error) => {
|
||||
self.close(undefined, error.message);
|
||||
});
|
||||
|
||||
const firstMessageOrClose = raceResults([
|
||||
self.processInitialSignalMessage(wsConnection),
|
||||
// Return the close promise as error if it resolves first
|
||||
ws.closed
|
||||
.orTee((error) => {
|
||||
self.handleWSError(error);
|
||||
})
|
||||
.andThen((closeInfo) => {
|
||||
if (
|
||||
// we only log the warning here if the current ws connection is still the same, we don't care about closing of older ws connections that have been replaced
|
||||
ws === self.ws
|
||||
) {
|
||||
self.log.warn(`websocket closed`, {
|
||||
...self.logContext,
|
||||
if (closeInfo.closeCode !== 1000) {
|
||||
this.log.warn(`websocket closed`, {
|
||||
...this.logContext,
|
||||
reason: closeInfo.reason,
|
||||
code: closeInfo.closeCode,
|
||||
wasClean: closeInfo.closeCode === 1000,
|
||||
state: self.state,
|
||||
state: this.state,
|
||||
});
|
||||
if (self.state == SignalConnectionState.CONNECTED) {
|
||||
self.handleOnClose(closeInfo.reason ?? 'Websocket closed unexpectedly');
|
||||
} else {
|
||||
self.log.warn(
|
||||
`ws closed unexpectedly in state ${SignalConnectionState[self.state]}`,
|
||||
);
|
||||
if (this.state === SignalConnectionState.CONNECTED) {
|
||||
this.handleOnClose(closeInfo.reason ?? 'Unexpected WS error');
|
||||
}
|
||||
}
|
||||
|
||||
return err(
|
||||
ConnectionError.internal(
|
||||
closeInfo.reason ?? 'Websocket closed during (re)connection attempt',
|
||||
),
|
||||
);
|
||||
}),
|
||||
]);
|
||||
|
||||
const firstSignalResponse = yield* await withAbort(
|
||||
withTimeout(firstMessageOrClose, 5_000),
|
||||
abortSignal,
|
||||
).orTee((error) => {
|
||||
self.log.warn('signal connection failed', error);
|
||||
if (error.reason === ConnectionErrorReason.Cancelled) {
|
||||
self
|
||||
.sendLeave()
|
||||
.then(() => self.close())
|
||||
.catch((e) => {
|
||||
self.log.error(e);
|
||||
self.close();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const validation = yield* self.validateFirstMessage(firstSignalResponse, isReconnect);
|
||||
|
||||
// Handle join response - set up ping configuration
|
||||
if (firstSignalResponse.message?.case === 'join') {
|
||||
self.pingTimeoutDuration = firstSignalResponse.message.value.pingTimeout;
|
||||
self.pingIntervalDuration = firstSignalResponse.message.value.pingInterval;
|
||||
if (self.pingTimeoutDuration && self.pingTimeoutDuration > 0) {
|
||||
self.log.debug('ping config', {
|
||||
...self.logContext,
|
||||
timeout: self.pingTimeoutDuration,
|
||||
interval: self.pingIntervalDuration,
|
||||
return;
|
||||
})
|
||||
.catch((reason) => {
|
||||
if (this.isEstablishingConnection) {
|
||||
reject(
|
||||
new ConnectionError(
|
||||
`Websocket error during a (re)connection attempt: ${reason}`,
|
||||
ConnectionErrorReason.InternalError,
|
||||
),
|
||||
);
|
||||
}
|
||||
});
|
||||
const connection = await this.ws.opened.catch(async (reason: unknown) => {
|
||||
if (this.state !== SignalConnectionState.CONNECTED) {
|
||||
this.state = SignalConnectionState.DISCONNECTED;
|
||||
clearTimeout(wsTimeout);
|
||||
const error = await this.handleConnectionError(reason, validateUrl);
|
||||
reject(error);
|
||||
return;
|
||||
}
|
||||
// other errors, handle
|
||||
this.handleWSError(reason);
|
||||
reject(reason);
|
||||
return;
|
||||
});
|
||||
clearTimeout(wsTimeout);
|
||||
if (!connection) {
|
||||
return;
|
||||
}
|
||||
const signalReader = connection.readable.getReader();
|
||||
this.streamWriter = connection.writable.getWriter();
|
||||
const firstMessage = await signalReader.read();
|
||||
signalReader.releaseLock();
|
||||
if (!firstMessage.value) {
|
||||
throw new ConnectionError(
|
||||
'no message received as first message',
|
||||
ConnectionErrorReason.InternalError,
|
||||
);
|
||||
}
|
||||
|
||||
const firstSignalResponse = parseSignalResponse(firstMessage.value);
|
||||
|
||||
// Validate the first message
|
||||
const validation = this.validateFirstMessage(
|
||||
firstSignalResponse,
|
||||
opts.reconnect ?? false,
|
||||
);
|
||||
|
||||
if (!validation.isValid) {
|
||||
reject(validation.error);
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle join response - set up ping configuration
|
||||
if (firstSignalResponse.message?.case === 'join') {
|
||||
this.pingTimeoutDuration = firstSignalResponse.message.value.pingTimeout;
|
||||
this.pingIntervalDuration = firstSignalResponse.message.value.pingInterval;
|
||||
|
||||
if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) {
|
||||
this.log.debug('ping config', {
|
||||
...this.logContext,
|
||||
timeout: this.pingTimeoutDuration,
|
||||
interval: this.pingIntervalDuration,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Handle successful connection
|
||||
const firstMessageToProcess = validation.shouldProcessFirstMessage
|
||||
? firstSignalResponse
|
||||
: undefined;
|
||||
handleSignalConnected(connection, firstMessageToProcess);
|
||||
resolve(validation.response);
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
} finally {
|
||||
cleanupAbortHandlers();
|
||||
}
|
||||
|
||||
self.handleSignalConnected(
|
||||
wsConnection,
|
||||
validation.shouldProcessFirstMessage ? firstSignalResponse : undefined,
|
||||
);
|
||||
|
||||
return ok(validation.response as U);
|
||||
}),
|
||||
this.connectionLock,
|
||||
);
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async startReadingLoop(
|
||||
@@ -458,31 +512,24 @@ export class SignalClient {
|
||||
return;
|
||||
}
|
||||
const unlock = await this.closingLock.lock();
|
||||
|
||||
try {
|
||||
this.clearPingInterval();
|
||||
if (updateState) {
|
||||
this.state = SignalConnectionState.DISCONNECTING;
|
||||
}
|
||||
if (this.ws) {
|
||||
const ws = this.ws;
|
||||
this.ws = undefined;
|
||||
this.streamWriter = undefined;
|
||||
ws.close({ closeCode: 1000, reason });
|
||||
this.ws.close({ closeCode: 1000, reason });
|
||||
|
||||
// calling `ws.close()` only starts the closing handshake (CLOSING state), prefer to wait until state is actually CLOSED
|
||||
const closePromise = ws.closed.match(
|
||||
(closeInfo) => closeInfo,
|
||||
(error) => error,
|
||||
);
|
||||
|
||||
const closePromise = this.ws.closed;
|
||||
this.ws = undefined;
|
||||
this.streamWriter = undefined;
|
||||
await Promise.race([closePromise, sleep(MAX_WS_CLOSE_TIME)]);
|
||||
this.log.info('closed websocket', { reason });
|
||||
}
|
||||
} catch (e) {
|
||||
this.log.debug('websocket error while closing', { ...this.logContext, error: e });
|
||||
} finally {
|
||||
if (updateState && this.state === SignalConnectionState.DISCONNECTING) {
|
||||
if (updateState) {
|
||||
this.state = SignalConnectionState.DISCONNECTED;
|
||||
}
|
||||
unlock();
|
||||
@@ -491,13 +538,8 @@ export class SignalClient {
|
||||
|
||||
// initial offer after joining
|
||||
sendOffer(offer: RTCSessionDescriptionInit, offerId: number) {
|
||||
this.log.debug('sending offer', {
|
||||
...this.logContext,
|
||||
offerSdp: offer.sdp,
|
||||
state: SignalConnectionState[this.state],
|
||||
wsState: this.ws?.readyState,
|
||||
});
|
||||
return this.sendRequest({
|
||||
this.log.debug('sending offer', { ...this.logContext, offerSdp: offer.sdp });
|
||||
this.sendRequest({
|
||||
case: 'offer',
|
||||
value: toProtoSessionDescription(offer, offerId),
|
||||
});
|
||||
@@ -805,13 +847,13 @@ export class SignalClient {
|
||||
if (this.state === SignalConnectionState.DISCONNECTED) return;
|
||||
const onCloseCallback = this.onClose;
|
||||
await this.close(undefined, reason);
|
||||
this.log.debug(`websocket connection closing: ${reason}`, { ...this.logContext, reason });
|
||||
this.log.debug(`websocket connection closed: ${reason}`, { ...this.logContext, reason });
|
||||
if (onCloseCallback) {
|
||||
onCloseCallback(reason);
|
||||
}
|
||||
}
|
||||
|
||||
private handleWSError(error: ReturnType<typeof ConnectionError.websocket>) {
|
||||
private handleWSError(error: unknown) {
|
||||
this.log.error('websocket error', { ...this.logContext, error });
|
||||
}
|
||||
|
||||
@@ -873,30 +915,17 @@ export class SignalClient {
|
||||
* @param firstMessage Optional first message to process
|
||||
* @internal
|
||||
*/
|
||||
private handleSignalConnected(connection: WebSocketConnection, firstMessage?: SignalResponse) {
|
||||
private handleSignalConnected(
|
||||
connection: WebSocketConnection,
|
||||
timeoutHandle: ReturnType<typeof setTimeout>,
|
||||
firstMessage?: SignalResponse,
|
||||
) {
|
||||
this.state = SignalConnectionState.CONNECTED;
|
||||
clearTimeout(timeoutHandle);
|
||||
this.startPingInterval();
|
||||
this.startReadingLoop(connection.readable.getReader(), firstMessage);
|
||||
}
|
||||
|
||||
private processInitialSignalMessage(
|
||||
connection: WebSocketConnection,
|
||||
): ResultAsync<SignalResponse, ConnectionError> {
|
||||
// TODO: If inferring from the return type this could be more granular here than ConnectionError
|
||||
return safeTry<SignalResponse, ConnectionError>(async function* () {
|
||||
const signalReader = connection.readable.getReader();
|
||||
|
||||
const firstMessage = await signalReader.read().finally(() => signalReader.releaseLock());
|
||||
if (!firstMessage.value) {
|
||||
return err(ConnectionError.internal('no message received as first message'));
|
||||
}
|
||||
|
||||
const firstSignalResponse = parseSignalResponse(firstMessage.value);
|
||||
|
||||
return okAsync(firstSignalResponse);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the first message received from the signal server
|
||||
* @param firstSignalResponse The first signal response received
|
||||
@@ -907,54 +936,63 @@ export class SignalClient {
|
||||
private validateFirstMessage(
|
||||
firstSignalResponse: SignalResponse,
|
||||
isReconnect: boolean,
|
||||
): Result<
|
||||
ValidationType,
|
||||
// TODO, this should probably not be a ConnectionError?
|
||||
ConnectionError
|
||||
> {
|
||||
if (isReconnect === false && firstSignalResponse.message?.case === 'join') {
|
||||
return ok({
|
||||
): {
|
||||
isValid: boolean;
|
||||
response?: JoinResponse | ReconnectResponse;
|
||||
error?: ConnectionError;
|
||||
shouldProcessFirstMessage?: boolean;
|
||||
} {
|
||||
if (firstSignalResponse.message?.case === 'join') {
|
||||
return {
|
||||
isValid: true,
|
||||
response: firstSignalResponse.message.value,
|
||||
shouldProcessFirstMessage: false,
|
||||
});
|
||||
};
|
||||
} else if (
|
||||
isReconnect === true &&
|
||||
this.state === SignalConnectionState.RECONNECTING &&
|
||||
firstSignalResponse.message?.case !== 'leave'
|
||||
) {
|
||||
if (firstSignalResponse.message?.case === 'reconnect') {
|
||||
return ok({
|
||||
return {
|
||||
isValid: true,
|
||||
response: firstSignalResponse.message.value,
|
||||
shouldProcessFirstMessage: false,
|
||||
});
|
||||
};
|
||||
} else {
|
||||
// in reconnecting, any message received means signal reconnected and we still need to process it
|
||||
this.log.debug(
|
||||
'declaring signal reconnected without reconnect response received',
|
||||
this.logContext,
|
||||
);
|
||||
return ok({
|
||||
return {
|
||||
isValid: true,
|
||||
response: undefined,
|
||||
shouldProcessFirstMessage: true,
|
||||
});
|
||||
};
|
||||
}
|
||||
} else if (this.isEstablishingConnection && firstSignalResponse.message?.case === 'leave') {
|
||||
return err(
|
||||
ConnectionError.leaveRequest(
|
||||
return {
|
||||
isValid: false,
|
||||
error: new ConnectionError(
|
||||
'Received leave request while trying to (re)connect',
|
||||
ConnectionErrorReason.LeaveRequest,
|
||||
undefined,
|
||||
firstSignalResponse.message.value.reason,
|
||||
),
|
||||
);
|
||||
};
|
||||
} else if (!isReconnect) {
|
||||
// non-reconnect case, should receive join response first
|
||||
|
||||
return err(
|
||||
ConnectionError.internal(
|
||||
return {
|
||||
isValid: false,
|
||||
error: new ConnectionError(
|
||||
`did not receive join response, got ${firstSignalResponse.message?.case} instead`,
|
||||
ConnectionErrorReason.InternalError,
|
||||
),
|
||||
);
|
||||
};
|
||||
}
|
||||
return err(ConnectionError.internal('Unexpected first message'));
|
||||
|
||||
return {
|
||||
isValid: false,
|
||||
error: new ConnectionError('Unexpected first message', ConnectionErrorReason.InternalError),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -964,38 +1002,31 @@ export class SignalClient {
|
||||
* @returns A ConnectionError with appropriate reason and status
|
||||
* @internal
|
||||
*/
|
||||
private async fetchErrorInfo(
|
||||
private async handleConnectionError(
|
||||
reason: unknown,
|
||||
validateUrl: string,
|
||||
): Promise<Result<never, ConnectionError>> {
|
||||
): Promise<ConnectionError> {
|
||||
try {
|
||||
const resp = await fetch(validateUrl);
|
||||
|
||||
if (resp.status.toFixed(0).startsWith('4')) {
|
||||
const msg = await resp.text();
|
||||
return err(ConnectionError.notAllowed(msg, resp.status));
|
||||
return new ConnectionError(msg, ConnectionErrorReason.NotAllowed, resp.status);
|
||||
} else if (reason instanceof ConnectionError) {
|
||||
return err(reason);
|
||||
return reason;
|
||||
} else {
|
||||
return err(
|
||||
ConnectionError.websocket(
|
||||
`Encountered unknown websocket error during connection: ${reason}`,
|
||||
resp?.status,
|
||||
resp?.statusText,
|
||||
),
|
||||
return new ConnectionError(
|
||||
`Encountered unknown websocket error during connection: ${reason}`,
|
||||
ConnectionErrorReason.InternalError,
|
||||
resp.status,
|
||||
);
|
||||
}
|
||||
} catch (e) {
|
||||
if (!(e instanceof ConnectionError)) {
|
||||
console.warn('received unexpected error', e);
|
||||
}
|
||||
return err(
|
||||
e instanceof ConnectionError
|
||||
? e
|
||||
: ConnectionError.serverUnreachable(
|
||||
e instanceof Error ? `${e.name}: ${e.message}` : 'server was not reachable',
|
||||
),
|
||||
);
|
||||
return e instanceof ConnectionError
|
||||
? e
|
||||
: new ConnectionError(
|
||||
e instanceof Error ? e.message : 'server was not reachable',
|
||||
ConnectionErrorReason.ServerUnreachable,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1034,13 +1065,12 @@ function createConnectionParams(
|
||||
token: string,
|
||||
info: ClientInfo,
|
||||
opts: ConnectOpts,
|
||||
isReconnect: boolean,
|
||||
): URLSearchParams {
|
||||
const params = new URLSearchParams();
|
||||
params.set('access_token', token);
|
||||
|
||||
// opts
|
||||
if (isReconnect) {
|
||||
if (opts.reconnect) {
|
||||
params.set('reconnect', '1');
|
||||
if (opts.sid) {
|
||||
params.set('sid', opts.sid);
|
||||
@@ -1090,7 +1120,6 @@ function createJoinRequestConnectionParams(
|
||||
token: string,
|
||||
info: ClientInfo,
|
||||
opts: ConnectOpts,
|
||||
isReconnect: boolean,
|
||||
): URLSearchParams {
|
||||
const params = new URLSearchParams();
|
||||
params.set('access_token', token);
|
||||
@@ -1101,7 +1130,7 @@ function createJoinRequestConnectionParams(
|
||||
autoSubscribe: !!opts.autoSubscribe,
|
||||
adaptiveStream: !!opts.adaptiveStream,
|
||||
}),
|
||||
reconnect: isReconnect,
|
||||
reconnect: !!opts.reconnect,
|
||||
participantSid: opts.sid ? opts.sid : undefined,
|
||||
});
|
||||
if (opts.reconnectReason) {
|
||||
@@ -1114,8 +1143,3 @@ function createJoinRequestConnectionParams(
|
||||
|
||||
return params;
|
||||
}
|
||||
|
||||
export type ValidationType =
|
||||
| { response: JoinResponse; shouldProcessFirstMessage: false }
|
||||
| { response: ReconnectResponse; shouldProcessFirstMessage: false }
|
||||
| { response: undefined; shouldProcessFirstMessage: true };
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
/* eslint-disable @typescript-eslint/no-unused-vars */
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { ConnectionErrorReason } from '../room/errors';
|
||||
import { WebSocketStream } from './WebSocketStream';
|
||||
|
||||
// Mock WebSocket
|
||||
@@ -123,16 +122,6 @@ vi.mock('../room/utils', () => ({
|
||||
sleep: vi.fn((duration: number) => new Promise((resolve) => setTimeout(resolve, duration))),
|
||||
}));
|
||||
|
||||
// Helper function to unwrap Result from opened promise
|
||||
async function getConnectionOrFail(wsStream: WebSocketStream) {
|
||||
const result = await wsStream.opened;
|
||||
expect(result.isOk()).toBe(true);
|
||||
if (!result.isOk()) {
|
||||
throw new Error('Failed to open connection');
|
||||
}
|
||||
return result.value;
|
||||
}
|
||||
|
||||
describe('WebSocketStream', () => {
|
||||
let mockWebSocket: MockWebSocket;
|
||||
let originalWebSocket: typeof WebSocket;
|
||||
@@ -185,7 +174,7 @@ describe('WebSocketStream', () => {
|
||||
new WebSocketStream('wss://test.example.com', {
|
||||
signal: abortController.signal,
|
||||
});
|
||||
}).toThrow('Aborted before WS was initialized');
|
||||
}).toThrow('This operation was aborted');
|
||||
});
|
||||
|
||||
it('should close when abort signal is triggered', () => {
|
||||
@@ -212,29 +201,21 @@ describe('WebSocketStream', () => {
|
||||
const removeEventListenerSpy = vi.spyOn(mockWebSocket, 'removeEventListener');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const result = await wsStream.opened;
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
expect(result.isOk()).toBe(true);
|
||||
if (result.isOk()) {
|
||||
const connection = result.value;
|
||||
expect(connection.readable).toBeInstanceOf(ReadableStream);
|
||||
expect(connection.writable).toBeInstanceOf(WritableStream);
|
||||
expect(connection.protocol).toBe('test-protocol');
|
||||
expect(connection.extensions).toBe('test-extension');
|
||||
}
|
||||
expect(connection.readable).toBeInstanceOf(ReadableStream);
|
||||
expect(connection.writable).toBeInstanceOf(WritableStream);
|
||||
expect(connection.protocol).toBe('test-protocol');
|
||||
expect(connection.extensions).toBe('test-extension');
|
||||
expect(removeEventListenerSpy).toHaveBeenCalledWith('error', expect.any(Function));
|
||||
});
|
||||
|
||||
it('should return error Result when WebSocket errors before opening', async () => {
|
||||
it('should reject when WebSocket errors before opening', async () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerError();
|
||||
|
||||
const result = await wsStream.opened;
|
||||
expect(result.isErr()).toBe(true);
|
||||
if (result.isErr()) {
|
||||
expect(result.error.reason).toBe(ConnectionErrorReason.WebSocket);
|
||||
}
|
||||
await expect(wsStream.opened).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -246,13 +227,10 @@ describe('WebSocketStream', () => {
|
||||
mockWebSocket.triggerOpen();
|
||||
mockWebSocket.triggerClose(1001, 'Going away');
|
||||
|
||||
const result = await wsStream.closed;
|
||||
const closeInfo = await wsStream.closed;
|
||||
|
||||
expect(result.isOk()).toBe(true);
|
||||
if (result.isOk()) {
|
||||
expect(result.value.closeCode).toBe(1001);
|
||||
expect(result.value.reason).toBe('Going away');
|
||||
}
|
||||
expect(closeInfo.closeCode).toBe(1001);
|
||||
expect(closeInfo.reason).toBe('Going away');
|
||||
expect(removeEventListenerSpy).toHaveBeenCalledWith('error', expect.any(Function));
|
||||
});
|
||||
|
||||
@@ -263,16 +241,13 @@ describe('WebSocketStream', () => {
|
||||
mockWebSocket.triggerError();
|
||||
mockWebSocket.triggerClose(1006, 'Connection failed');
|
||||
|
||||
const result = await wsStream.closed;
|
||||
const closeInfo = await wsStream.closed;
|
||||
|
||||
expect(result.isOk()).toBe(true);
|
||||
if (result.isOk()) {
|
||||
expect(result.value.closeCode).toBe(1006);
|
||||
expect(result.value.reason).toBe('Connection failed');
|
||||
}
|
||||
expect(closeInfo.closeCode).toBe(1006);
|
||||
expect(closeInfo.reason).toBe('Connection failed');
|
||||
});
|
||||
|
||||
it('should return error Result when error occurs without timely close event', async () => {
|
||||
it('should reject when error occurs without timely close event', async () => {
|
||||
const { sleep } = await import('../room/utils');
|
||||
vi.mocked(sleep).mockResolvedValue(undefined);
|
||||
|
||||
@@ -281,14 +256,9 @@ describe('WebSocketStream', () => {
|
||||
mockWebSocket.triggerOpen();
|
||||
mockWebSocket.triggerError();
|
||||
|
||||
const result = await wsStream.closed;
|
||||
expect(result.isErr()).toBe(true);
|
||||
if (result.isErr()) {
|
||||
expect(result.error.reason).toBe(ConnectionErrorReason.WebSocket);
|
||||
expect(result.error.message).toBe(
|
||||
'Encountered unspecified websocket error without a timely close event',
|
||||
);
|
||||
}
|
||||
await expect(wsStream.closed).rejects.toThrow(
|
||||
'Encountered unspecified websocket error without a timely close event',
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -297,11 +267,8 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream<ArrayBuffer | string>('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const result = await wsStream.opened;
|
||||
expect(result.isOk()).toBe(true);
|
||||
if (!result.isOk()) return;
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const connection = result.value;
|
||||
const reader = connection.readable.getReader();
|
||||
|
||||
const message1 = new ArrayBuffer(8);
|
||||
@@ -325,22 +292,23 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const reader = connection.readable.getReader();
|
||||
|
||||
mockWebSocket.triggerError();
|
||||
|
||||
const closedResult = await wsStream.closed;
|
||||
await expect(reader.read()).rejects.toBeDefined();
|
||||
expect(closedResult.isErr()).toBe(true);
|
||||
await Promise.all([
|
||||
expect(reader.read()).rejects.toBeDefined(),
|
||||
expect(wsStream.closed).rejects.toBeDefined(),
|
||||
]);
|
||||
});
|
||||
|
||||
it('should close WebSocket with custom close info when cancelled', async () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const reader = connection.readable.getReader();
|
||||
const closeSpy = vi.spyOn(mockWebSocket, 'close');
|
||||
@@ -354,7 +322,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const reader1 = connection.readable.getReader();
|
||||
|
||||
@@ -369,7 +337,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream<ArrayBuffer | string>('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const writer = connection.writable.getWriter();
|
||||
const sendSpy = vi.spyOn(mockWebSocket, 'send');
|
||||
@@ -394,7 +362,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const writer = connection.writable.getWriter();
|
||||
const closeSpy = vi.spyOn(mockWebSocket, 'close');
|
||||
@@ -408,7 +376,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const writer = connection.writable.getWriter();
|
||||
|
||||
@@ -450,7 +418,7 @@ describe('WebSocketStream', () => {
|
||||
});
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
await getConnectionOrFail(wsStream);
|
||||
await wsStream.opened;
|
||||
|
||||
const closeSpy = vi.spyOn(mockWebSocket, 'close');
|
||||
|
||||
@@ -465,7 +433,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const reader = connection.readable.getReader();
|
||||
|
||||
@@ -499,7 +467,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const reader = connection.readable.getReader();
|
||||
const writer = connection.writable.getWriter();
|
||||
@@ -525,7 +493,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const sourceData = [new ArrayBuffer(8), new ArrayBuffer(16), new ArrayBuffer(32)];
|
||||
let dataIndex = 0;
|
||||
@@ -556,7 +524,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const msg1 = new ArrayBuffer(8);
|
||||
const msg2 = new ArrayBuffer(16);
|
||||
@@ -584,7 +552,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const reader = connection.readable.getReader();
|
||||
|
||||
@@ -594,16 +562,17 @@ describe('WebSocketStream', () => {
|
||||
// Trigger error while read is pending
|
||||
mockWebSocket.triggerError();
|
||||
|
||||
const closedResult = await wsStream.closed;
|
||||
await expect(readPromise).rejects.toBeDefined();
|
||||
expect(closedResult.isErr()).toBe(true);
|
||||
await Promise.all([
|
||||
expect(readPromise).rejects.toBeDefined(),
|
||||
expect(wsStream.closed).rejects.toBeDefined(),
|
||||
]);
|
||||
});
|
||||
|
||||
it('should support zero-length and empty messages', async () => {
|
||||
const wsStream = new WebSocketStream<ArrayBuffer | string>('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const reader = connection.readable.getReader();
|
||||
const writer = connection.writable.getWriter();
|
||||
@@ -630,7 +599,7 @@ describe('WebSocketStream', () => {
|
||||
const wsStream = new WebSocketStream('wss://test.example.com');
|
||||
|
||||
mockWebSocket.triggerOpen();
|
||||
const connection = await getConnectionOrFail(wsStream);
|
||||
const connection = await wsStream.opened;
|
||||
|
||||
const reader = connection.readable.getReader();
|
||||
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
// https://github.com/CarterLi/websocketstream-polyfill
|
||||
import { ResultAsync } from 'neverthrow';
|
||||
import { ConnectionError } from '../room/errors';
|
||||
import { sleep } from '../room/utils';
|
||||
|
||||
export interface WebSocketConnection<T extends ArrayBuffer | string = ArrayBuffer | string> {
|
||||
@@ -20,8 +18,6 @@ export interface WebSocketStreamOptions {
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
export type WebSocketError = ReturnType<typeof ConnectionError.websocket>;
|
||||
|
||||
/**
|
||||
* [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) with [Streams API](https://developer.mozilla.org/en-US/docs/Web/API/Streams_API)
|
||||
*
|
||||
@@ -30,11 +26,11 @@ export type WebSocketError = ReturnType<typeof ConnectionError.websocket>;
|
||||
export class WebSocketStream<T extends ArrayBuffer | string = ArrayBuffer | string> {
|
||||
readonly url: string;
|
||||
|
||||
readonly opened: ResultAsync<WebSocketConnection<T>, WebSocketError>;
|
||||
readonly opened: Promise<WebSocketConnection<T>>;
|
||||
|
||||
readonly closed: ResultAsync<WebSocketCloseInfo, WebSocketError>;
|
||||
readonly closed: Promise<WebSocketCloseInfo>;
|
||||
|
||||
readonly close!: (closeInfo?: WebSocketCloseInfo) => void;
|
||||
readonly close: (closeInfo?: WebSocketCloseInfo) => void;
|
||||
|
||||
get readyState(): number {
|
||||
return this.ws.readyState;
|
||||
@@ -44,120 +40,77 @@ export class WebSocketStream<T extends ArrayBuffer | string = ArrayBuffer | stri
|
||||
|
||||
constructor(url: string, options: WebSocketStreamOptions = {}) {
|
||||
if (options.signal?.aborted) {
|
||||
throw ConnectionError.cancelled('Aborted before WS was initialized');
|
||||
throw new DOMException('This operation was aborted', 'AbortError');
|
||||
}
|
||||
|
||||
this.url = url;
|
||||
|
||||
const ws = new WebSocket(url, options.protocols ?? []);
|
||||
ws.binaryType = 'arraybuffer';
|
||||
this.ws = ws;
|
||||
this.url = url;
|
||||
|
||||
const closeWithInfo = ({ closeCode: code, reason }: WebSocketCloseInfo = {}) =>
|
||||
ws.close(code, reason);
|
||||
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
this.opened = ResultAsync.fromPromise<WebSocketConnection<T>, WebSocketError>(
|
||||
new Promise((resolve, r) => {
|
||||
const reject = (err: WebSocketError) => r(err);
|
||||
const errorHandler = (e: Event) => {
|
||||
console.error(e);
|
||||
reject(
|
||||
ConnectionError.websocket('Encountered websocket error while establishing connection'),
|
||||
);
|
||||
ws.removeEventListener('open', openHandler);
|
||||
};
|
||||
this.opened = new Promise((resolve, reject) => {
|
||||
ws.onopen = () => {
|
||||
resolve({
|
||||
readable: new ReadableStream<T>({
|
||||
start(controller) {
|
||||
ws.onmessage = ({ data }) => controller.enqueue(data);
|
||||
ws.onerror = (e) => controller.error(e);
|
||||
},
|
||||
cancel: closeWithInfo,
|
||||
}),
|
||||
writable: new WritableStream<T>({
|
||||
write(chunk) {
|
||||
ws.send(chunk);
|
||||
},
|
||||
abort() {
|
||||
ws.close();
|
||||
},
|
||||
close: closeWithInfo,
|
||||
}),
|
||||
protocol: ws.protocol,
|
||||
extensions: ws.extensions,
|
||||
});
|
||||
ws.removeEventListener('error', reject);
|
||||
};
|
||||
ws.addEventListener('error', reject);
|
||||
});
|
||||
|
||||
const onCloseDuringOpen = (ev: CloseEvent) => {
|
||||
reject(
|
||||
ConnectionError.websocket(
|
||||
`WS closed during connection establishment: ${ev.reason}`,
|
||||
ev.code,
|
||||
ev.reason,
|
||||
),
|
||||
);
|
||||
};
|
||||
|
||||
const openHandler = () => {
|
||||
resolve({
|
||||
readable: new ReadableStream<T>({
|
||||
start(controller) {
|
||||
ws.onmessage = ({ data }) => controller.enqueue(data);
|
||||
ws.onerror = (e) => controller.error(e);
|
||||
this.closed = new Promise<WebSocketCloseInfo>((resolve, reject) => {
|
||||
const rejectHandler = async () => {
|
||||
const closePromise = new Promise<CloseEvent>((res) => {
|
||||
if (ws.readyState === WebSocket.CLOSED) return;
|
||||
else {
|
||||
ws.addEventListener(
|
||||
'close',
|
||||
(closeEv: CloseEvent) => {
|
||||
res(closeEv);
|
||||
},
|
||||
cancel: closeWithInfo,
|
||||
}),
|
||||
writable: new WritableStream<T>({
|
||||
write(chunk) {
|
||||
ws.send(chunk);
|
||||
},
|
||||
abort() {
|
||||
ws.close();
|
||||
},
|
||||
close: closeWithInfo,
|
||||
}),
|
||||
protocol: ws.protocol,
|
||||
extensions: ws.extensions,
|
||||
});
|
||||
ws.removeEventListener('error', errorHandler);
|
||||
ws.removeEventListener('close', onCloseDuringOpen);
|
||||
};
|
||||
|
||||
console.log('websocket setup registering event listeners');
|
||||
|
||||
ws.addEventListener('open', openHandler, { once: true });
|
||||
ws.addEventListener('error', errorHandler, { once: true });
|
||||
ws.addEventListener('close', onCloseDuringOpen, { once: true });
|
||||
}),
|
||||
(error) => error as WebSocketError,
|
||||
);
|
||||
|
||||
// eslint-disable-next-line neverthrow-must-use/must-use-result
|
||||
this.closed = ResultAsync.fromPromise<WebSocketCloseInfo, WebSocketError>(
|
||||
new Promise<WebSocketCloseInfo>((resolve, r) => {
|
||||
const reject = (err: WebSocketError) => r(err);
|
||||
const errorHandler = async () => {
|
||||
const closePromise = new Promise<CloseEvent>((res) => {
|
||||
if (ws.readyState === WebSocket.CLOSED) return;
|
||||
else {
|
||||
ws.addEventListener(
|
||||
'close',
|
||||
(closeEv: CloseEvent) => {
|
||||
res(closeEv);
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
}
|
||||
});
|
||||
const reason = await Promise.race([sleep(250), closePromise]);
|
||||
if (!reason) {
|
||||
reject(
|
||||
ConnectionError.websocket(
|
||||
'Encountered unspecified websocket error without a timely close event',
|
||||
),
|
||||
{ once: true },
|
||||
);
|
||||
} else {
|
||||
// if we can infer the close reason from the close event then resolve with ok, we don't need to throw
|
||||
resolve({ closeCode: reason.code, reason: reason.reason });
|
||||
}
|
||||
};
|
||||
|
||||
if (ws.readyState === WebSocket.CLOSED) {
|
||||
reject(ConnectionError.websocket('Websocket already closed at initialization time'));
|
||||
return;
|
||||
});
|
||||
const reason = await Promise.race([sleep(250), closePromise]);
|
||||
if (!reason) {
|
||||
reject(new Error('Encountered unspecified websocket error without a timely close event'));
|
||||
} else {
|
||||
// if we can infer the close reason from the close event then resolve the promise, we don't need to throw
|
||||
resolve(reason);
|
||||
}
|
||||
};
|
||||
ws.onclose = ({ code, reason }) => {
|
||||
resolve({ closeCode: code, reason });
|
||||
ws.removeEventListener('error', rejectHandler);
|
||||
};
|
||||
|
||||
ws.onclose = ({ code, reason }) => {
|
||||
resolve({ closeCode: code, reason });
|
||||
ws.removeEventListener('error', errorHandler);
|
||||
};
|
||||
|
||||
ws.addEventListener('error', errorHandler);
|
||||
}),
|
||||
(error) => error as WebSocketError,
|
||||
);
|
||||
ws.addEventListener('error', rejectHandler);
|
||||
});
|
||||
|
||||
if (options.signal) {
|
||||
options.signal.onabort = () => ws.close(undefined, 'AbortSignal triggered');
|
||||
options.signal.onabort = () => ws.close();
|
||||
}
|
||||
|
||||
this.close = closeWithInfo;
|
||||
|
||||
128
src/api/utils.ts
128
src/api/utils.ts
@@ -1,7 +1,4 @@
|
||||
import { SignalResponse } from '@livekit/protocol';
|
||||
import { Result, ResultAsync, errAsync } from 'neverthrow';
|
||||
import type { Mutex } from '@livekit/mutex';
|
||||
import { ConnectionError } from '../room/errors';
|
||||
import { toHttpUrl, toWebsocketUrl } from '../room/utils';
|
||||
|
||||
export function createRtcUrl(url: string, searchParams: URLSearchParams) {
|
||||
@@ -52,128 +49,3 @@ export function getAbortReasonAsString(
|
||||
return 'toString' in reason ? reason.toString() : defaultMessage;
|
||||
}
|
||||
}
|
||||
|
||||
export function withTimeout<T, E extends Error>(
|
||||
ra: ResultAsyncLike<T, E>,
|
||||
ms: number,
|
||||
): ResultAsync<T, E | ReturnType<typeof ConnectionError.timeout>> {
|
||||
const timeout = ResultAsync.fromPromise(
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => {
|
||||
reject(ConnectionError.timeout('Timeout'));
|
||||
}, ms),
|
||||
),
|
||||
(e) => e as ReturnType<typeof ConnectionError.timeout>,
|
||||
);
|
||||
|
||||
return raceResults([ra, timeout]);
|
||||
}
|
||||
|
||||
export function withAbort<T, E extends Error>(
|
||||
ra: ResultAsyncLike<T, E>,
|
||||
signal: AbortSignal | undefined,
|
||||
): ResultAsync<T, E | ReturnType<typeof ConnectionError.cancelled>> {
|
||||
if (signal?.aborted) {
|
||||
return errAsync(ConnectionError.cancelled('AbortSignal invoked'));
|
||||
}
|
||||
|
||||
const abortResult = ResultAsync.fromPromise(
|
||||
new Promise<never>((_, reject) => {
|
||||
const onAbortHandler = () => {
|
||||
reject(ConnectionError.cancelled('AbortSignal invoked'));
|
||||
};
|
||||
signal?.addEventListener('abort', onAbortHandler, { once: true });
|
||||
}),
|
||||
(e) => e as ReturnType<typeof ConnectionError.cancelled>,
|
||||
);
|
||||
|
||||
return raceResults([ra, abortResult]);
|
||||
}
|
||||
|
||||
export function withMutex<T, E extends Error>(
|
||||
fn: ResultAsyncLike<T, E>,
|
||||
mutex: Mutex,
|
||||
): ResultAsync<T, E> {
|
||||
return ResultAsync.fromSafePromise(mutex.lock()).andThen((unlock) => withFinally(fn, unlock));
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a callback after a ResultAsync completes, regardless of success or failure.
|
||||
* Similar to Promise.finally() but for ResultAsync.
|
||||
*
|
||||
* @param ra - The ResultAsync to execute
|
||||
* @param onFinally - Callback to run after completion (receives no arguments)
|
||||
* @returns A new ResultAsync with the same result, but runs onFinally first
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* withFinally(
|
||||
* someOperation(),
|
||||
* () => cleanup()
|
||||
* )
|
||||
* ```
|
||||
*/
|
||||
export function withFinally<T, E extends Error>(
|
||||
ra: ResultAsyncLike<T, E>,
|
||||
onFinally: () => void | Promise<void>,
|
||||
): ResultAsync<T, E> {
|
||||
return ResultAsync.fromPromise(
|
||||
(async () => {
|
||||
try {
|
||||
const result = await ra;
|
||||
return result.match(
|
||||
(value) => value,
|
||||
(error) => {
|
||||
throw error as Error;
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
throw error as Error;
|
||||
} finally {
|
||||
await onFinally();
|
||||
}
|
||||
})(),
|
||||
(e) => e as E,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Races multiple ResultAsync operations and returns whichever completes first.
|
||||
* If all fail, returns the error from the first one to reject.
|
||||
* API-compatible with Promise.race, supporting heterogeneous types.
|
||||
*
|
||||
* @param values - Array of ResultAsync operations to race (can have different types)
|
||||
* @returns A new ResultAsync with the result of whichever completes first
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* // Race a connection attempt against a timeout
|
||||
* raceResults([
|
||||
* connectToServer(), // ResultAsync<Connection, ConnectionError>
|
||||
* delay(5000).andThen(() => errAsync(new TimeoutError())) // ResultAsync<never, TimeoutError>
|
||||
* ]) // ResultAsync<Connection, ConnectionError | TimeoutError>
|
||||
* ```
|
||||
*/
|
||||
export function raceResults<T extends readonly ResultAsyncLike<any, any>[]>(
|
||||
values: T,
|
||||
): ResultAsync<
|
||||
T[number] extends ResultAsync<infer V, any> ? V : never,
|
||||
T[number] extends ResultAsync<any, infer E> ? E : never
|
||||
> {
|
||||
type Value = T[number] extends ResultAsync<infer V, any> ? V : never;
|
||||
type Err = T[number] extends ResultAsync<any, infer E> ? E : never;
|
||||
|
||||
const settledPromises = values.map(
|
||||
(ra): PromiseLike<Value> =>
|
||||
ra.then((res) =>
|
||||
res.match(
|
||||
(v) => Promise.resolve(v),
|
||||
(err) => Promise.reject(err),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
return ResultAsync.fromPromise(Promise.race(settledPromises), (e) => e as Err);
|
||||
}
|
||||
|
||||
export type ResultAsyncLike<T, E> = ResultAsync<T, E> | Promise<Result<T, E>>;
|
||||
|
||||
@@ -8,7 +8,7 @@ export class TURNCheck extends Checker {
|
||||
|
||||
async perform(): Promise<void> {
|
||||
const signalClient = new SignalClient();
|
||||
const joinResult = await signalClient.join(this.url, this.token, {
|
||||
const joinRes = await signalClient.join(this.url, this.token, {
|
||||
autoSubscribe: true,
|
||||
maxRetries: 0,
|
||||
e2eeEnabled: false,
|
||||
@@ -16,9 +16,6 @@ export class TURNCheck extends Checker {
|
||||
singlePeerConnection: false,
|
||||
});
|
||||
|
||||
// TODO fix unsafe usage
|
||||
const joinRes = joinResult._unsafeUnwrap();
|
||||
|
||||
let hasTLS = false;
|
||||
let hasTURN = false;
|
||||
let hasSTUN = false;
|
||||
|
||||
@@ -13,15 +13,13 @@ export class WebSocketCheck extends Checker {
|
||||
}
|
||||
|
||||
let signalClient = new SignalClient();
|
||||
const joinRes = (
|
||||
await signalClient.join(this.url, this.token, {
|
||||
autoSubscribe: true,
|
||||
maxRetries: 0,
|
||||
e2eeEnabled: false,
|
||||
websocketTimeout: 15_000,
|
||||
singlePeerConnection: false,
|
||||
})
|
||||
)._unsafeUnwrap();
|
||||
const joinRes = await signalClient.join(this.url, this.token, {
|
||||
autoSubscribe: true,
|
||||
maxRetries: 0,
|
||||
e2eeEnabled: false,
|
||||
websocketTimeout: 15_000,
|
||||
singlePeerConnection: false,
|
||||
});
|
||||
this.appendMessage(`Connected to server, version ${joinRes.serverVersion}.`);
|
||||
if (joinRes.serverInfo?.edition === ServerInfo_Edition.Cloud && joinRes.serverInfo?.region) {
|
||||
this.appendMessage(`LiveKit Cloud: ${joinRes.serverInfo?.region}`);
|
||||
|
||||
@@ -227,7 +227,7 @@ export class E2EEManager
|
||||
};
|
||||
|
||||
private onWorkerError = (ev: ErrorEvent) => {
|
||||
log.error('e2ee worker encountered an error:', { error: ev });
|
||||
log.error('e2ee worker encountered an error:', { error: ev.error });
|
||||
this.emit(EncryptionEvent.EncryptionError, ev.error, undefined);
|
||||
};
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ export default class PCTransport extends EventEmitter {
|
||||
|
||||
remoteNackMids: string[] = [];
|
||||
|
||||
onOffer?: (offer: RTCSessionDescriptionInit, offerId: number) => Promise<void>;
|
||||
onOffer?: (offer: RTCSessionDescriptionInit, offerId: number) => void;
|
||||
|
||||
onIceCandidate?: (candidate: RTCIceCandidate) => void;
|
||||
|
||||
@@ -352,7 +352,7 @@ export default class PCTransport extends EventEmitter {
|
||||
return;
|
||||
}
|
||||
await this.setMungedSDP(offer, write(sdpParsed));
|
||||
await this.onOffer(offer, this.latestOfferId);
|
||||
this.onOffer(offer, this.latestOfferId);
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ import { SignalTarget } from '@livekit/protocol';
|
||||
import log, { LoggerNames, getLogger } from '../logger';
|
||||
import PCTransport, { PCEvents } from './PCTransport';
|
||||
import { roomConnectOptionDefaults } from './defaults';
|
||||
import { ConnectionError } from './errors';
|
||||
import { ConnectionError, ConnectionErrorReason } from './errors';
|
||||
import CriticalTimers from './timers';
|
||||
import type { LoggerOptions } from './types';
|
||||
import { sleep } from './utils';
|
||||
@@ -49,7 +49,7 @@ export class PCTransportManager {
|
||||
|
||||
public onTrack?: (ev: RTCTrackEvent) => void;
|
||||
|
||||
public onPublisherOffer?: (offer: RTCSessionDescriptionInit, offerId: number) => Promise<void>;
|
||||
public onPublisherOffer?: (offer: RTCSessionDescriptionInit, offerId: number) => void;
|
||||
|
||||
private isPublisherConnectionRequired: boolean;
|
||||
|
||||
@@ -99,8 +99,8 @@ export class PCTransportManager {
|
||||
this.onTrack?.(ev);
|
||||
};
|
||||
|
||||
this.publisher.onOffer = async (offer, offerId) => {
|
||||
return this.onPublisherOffer?.(offer, offerId);
|
||||
this.publisher.onOffer = (offer, offerId) => {
|
||||
this.onPublisherOffer?.(offer, offerId);
|
||||
};
|
||||
|
||||
this.state = PCTransportState.NEW;
|
||||
@@ -345,7 +345,12 @@ export class PCTransportManager {
|
||||
this.log.warn('abort transport connection', this.logContext);
|
||||
CriticalTimers.clearTimeout(connectTimeout);
|
||||
|
||||
reject(ConnectionError.cancelled('room connection has been cancelled'));
|
||||
reject(
|
||||
new ConnectionError(
|
||||
'room connection has been cancelled',
|
||||
ConnectionErrorReason.Cancelled,
|
||||
),
|
||||
);
|
||||
};
|
||||
if (abortController?.signal.aborted) {
|
||||
abortHandler();
|
||||
@@ -354,13 +359,23 @@ export class PCTransportManager {
|
||||
|
||||
const connectTimeout = CriticalTimers.setTimeout(() => {
|
||||
abortController?.signal.removeEventListener('abort', abortHandler);
|
||||
reject(ConnectionError.internal('could not establish pc connection'));
|
||||
reject(
|
||||
new ConnectionError(
|
||||
'could not establish pc connection',
|
||||
ConnectionErrorReason.InternalError,
|
||||
),
|
||||
);
|
||||
}, timeout);
|
||||
|
||||
while (this.state !== PCTransportState.CONNECTED) {
|
||||
await sleep(50); // FIXME we shouldn't rely on `sleep` in the connection paths, as it invokes `setTimeout` which can be drastically throttled by browser implementations
|
||||
if (abortController?.signal.aborted) {
|
||||
reject(ConnectionError.cancelled('room connection has been cancelled'));
|
||||
reject(
|
||||
new ConnectionError(
|
||||
'room connection has been cancelled',
|
||||
ConnectionErrorReason.Cancelled,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,6 @@ import {
|
||||
type UserPacket,
|
||||
} from '@livekit/protocol';
|
||||
import { EventEmitter } from 'events';
|
||||
import { type Result, err, errAsync, ok, safeTry } from 'neverthrow';
|
||||
import type { MediaAttributes } from 'sdp-transform';
|
||||
import type TypedEventEmitter from 'typed-emitter';
|
||||
import type { SignalOptions } from '../api/SignalClient';
|
||||
@@ -63,7 +62,6 @@ import {
|
||||
ConnectionError,
|
||||
ConnectionErrorReason,
|
||||
NegotiationError,
|
||||
SimulatedError,
|
||||
TrackInvalidError,
|
||||
UnexpectedConnectionState,
|
||||
} from './errors';
|
||||
@@ -266,20 +264,38 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
token: string,
|
||||
opts: SignalOptions,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<Result<JoinResponse, ConnectionError>> {
|
||||
): Promise<JoinResponse> {
|
||||
this.url = url;
|
||||
this.token = token;
|
||||
this.signalOpts = opts;
|
||||
this.maxJoinAttempts = opts.maxRetries;
|
||||
this.joinAttempts += 1;
|
||||
try {
|
||||
this.joinAttempts += 1;
|
||||
|
||||
this.setupSignalClientCallbacks();
|
||||
const joinResult = await this.client.join(url, token, opts, abortSignal);
|
||||
this.setupSignalClientCallbacks();
|
||||
const joinResponse = await this.client.join(url, token, opts, abortSignal);
|
||||
this._isClosed = false;
|
||||
this.latestJoinResponse = joinResponse;
|
||||
|
||||
if (joinResult.isErr()) {
|
||||
const error = joinResult.error;
|
||||
if (error instanceof ConnectionError) {
|
||||
if (error.reason === ConnectionErrorReason.ServerUnreachable) {
|
||||
this.subscriberPrimary = joinResponse.subscriberPrimary;
|
||||
if (!this.pcManager) {
|
||||
await this.configure(joinResponse);
|
||||
}
|
||||
|
||||
// create offer
|
||||
if (!this.subscriberPrimary || joinResponse.fastPublish) {
|
||||
this.negotiate().catch((err) => {
|
||||
log.error(err, this.logContext);
|
||||
});
|
||||
}
|
||||
|
||||
this.registerOnLineListener();
|
||||
this.clientConfiguration = joinResponse.clientConfiguration;
|
||||
this.emit(EngineEvent.SignalConnected, joinResponse);
|
||||
return joinResponse;
|
||||
} catch (e) {
|
||||
if (e instanceof ConnectionError) {
|
||||
if (e.reason === ConnectionErrorReason.ServerUnreachable) {
|
||||
this.log.warn(
|
||||
`Couldn't connect to server, attempt ${this.joinAttempts} of ${this.maxJoinAttempts}`,
|
||||
this.logContext,
|
||||
@@ -289,30 +305,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
}
|
||||
}
|
||||
}
|
||||
return err(error);
|
||||
throw e;
|
||||
}
|
||||
|
||||
const joinResponse = joinResult.value;
|
||||
|
||||
this._isClosed = false;
|
||||
this.latestJoinResponse = joinResponse;
|
||||
|
||||
this.subscriberPrimary = joinResponse.subscriberPrimary;
|
||||
if (!this.pcManager) {
|
||||
await this.configure(joinResponse);
|
||||
}
|
||||
|
||||
// create offer
|
||||
if (!this.subscriberPrimary || joinResponse.fastPublish) {
|
||||
this.negotiate().catch((error) => {
|
||||
log.error(error, this.logContext);
|
||||
});
|
||||
}
|
||||
|
||||
this.registerOnLineListener();
|
||||
this.clientConfiguration = joinResponse.clientConfiguration;
|
||||
this.emit(EngineEvent.SignalConnected, joinResponse);
|
||||
return ok(joinResponse);
|
||||
}
|
||||
|
||||
async close() {
|
||||
@@ -387,7 +381,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
const publicationTimeout = setTimeout(() => {
|
||||
delete this.pendingTrackResolvers[req.cid];
|
||||
reject(
|
||||
ConnectionError.timeout('publication of local track timed out, no response from server'),
|
||||
new ConnectionError(
|
||||
'publication of local track timed out, no response from server',
|
||||
ConnectionErrorReason.Timeout,
|
||||
),
|
||||
);
|
||||
}, 10_000);
|
||||
this.pendingTrackResolvers[req.cid] = {
|
||||
@@ -471,7 +468,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
};
|
||||
|
||||
this.pcManager.onPublisherOffer = (offer, offerId) => {
|
||||
return this.client.sendOffer(offer, offerId);
|
||||
this.client.sendOffer(offer, offerId);
|
||||
};
|
||||
|
||||
this.pcManager.onDataChannel = this.handleDataChannel;
|
||||
@@ -1024,26 +1021,23 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
this.fullReconnectOnNext = true;
|
||||
}
|
||||
|
||||
let result: Result<void, Error>;
|
||||
this.attemptingReconnect = true;
|
||||
if (this.fullReconnectOnNext) {
|
||||
result = await this.restartConnection();
|
||||
} else {
|
||||
result = await this.resumeConnection(reason);
|
||||
}
|
||||
this.clearPendingReconnect();
|
||||
this.fullReconnectOnNext = false;
|
||||
if (result.isErr()) {
|
||||
const error = result.error;
|
||||
try {
|
||||
this.attemptingReconnect = true;
|
||||
if (this.fullReconnectOnNext) {
|
||||
await this.restartConnection();
|
||||
} else {
|
||||
await this.resumeConnection(reason);
|
||||
}
|
||||
this.clearPendingReconnect();
|
||||
this.fullReconnectOnNext = false;
|
||||
} catch (e) {
|
||||
this.reconnectAttempts += 1;
|
||||
let recoverable = true;
|
||||
// TODO this needs proper handling to define which errors are actually unexpected and non recoverable
|
||||
// Currently all connection related errors are ConnectionErrors
|
||||
if (error instanceof UnexpectedConnectionState) {
|
||||
this.log.debug('received unrecoverable error', { ...this.logContext, error });
|
||||
if (e instanceof UnexpectedConnectionState) {
|
||||
this.log.debug('received unrecoverable error', { ...this.logContext, error: e });
|
||||
// unrecoverable
|
||||
recoverable = false;
|
||||
} else if (!(error instanceof SignalReconnectError)) {
|
||||
} else if (!(e instanceof SignalReconnectError)) {
|
||||
// cannot resume
|
||||
this.fullReconnectOnNext = true;
|
||||
}
|
||||
@@ -1060,8 +1054,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
this.emit(EngineEvent.Disconnected);
|
||||
await this.close();
|
||||
}
|
||||
} finally {
|
||||
this.attemptingReconnect = false;
|
||||
}
|
||||
this.attemptingReconnect = false;
|
||||
}
|
||||
|
||||
private getNextRetryDelay(context: ReconnectContext) {
|
||||
@@ -1075,114 +1070,108 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
return null;
|
||||
}
|
||||
|
||||
private async restartConnection(
|
||||
regionUrl?: string,
|
||||
): Promise<Result<void, UnexpectedConnectionState | SignalReconnectError>> {
|
||||
const self = this;
|
||||
const restartResultAsync = safeTry(async function* () {
|
||||
if (!self.url || !self.token) {
|
||||
private async restartConnection(regionUrl?: string) {
|
||||
try {
|
||||
if (!this.url || !this.token) {
|
||||
// permanent failure, don't attempt reconnection
|
||||
return err(new UnexpectedConnectionState('could not reconnect, url or token not saved'));
|
||||
throw new UnexpectedConnectionState('could not reconnect, url or token not saved');
|
||||
}
|
||||
|
||||
self.log.info(`reconnecting, attempt: ${self.reconnectAttempts}`, self.logContext);
|
||||
self.emit(EngineEvent.Restarting);
|
||||
this.log.info(`reconnecting, attempt: ${this.reconnectAttempts}`, this.logContext);
|
||||
this.emit(EngineEvent.Restarting);
|
||||
|
||||
if (!self.client.isDisconnected) {
|
||||
await self.client.sendLeave();
|
||||
if (!this.client.isDisconnected) {
|
||||
await this.client.sendLeave();
|
||||
}
|
||||
await self.cleanupPeerConnections();
|
||||
await self.cleanupClient();
|
||||
await this.cleanupPeerConnections();
|
||||
await this.cleanupClient();
|
||||
|
||||
if (!self.signalOpts) {
|
||||
self.log.warn(
|
||||
'attempted connection restart, without signal options present',
|
||||
self.logContext,
|
||||
);
|
||||
return err(new SignalReconnectError());
|
||||
}
|
||||
// in case a regionUrl is passed, the region URL takes precedence
|
||||
const joinResult = await self.join(regionUrl ?? self.url, self.token, self.signalOpts);
|
||||
if (joinResult.isErr()) {
|
||||
const error = joinResult.error;
|
||||
if (error instanceof ConnectionError && error.reason === ConnectionErrorReason.NotAllowed) {
|
||||
return err(new UnexpectedConnectionState('could not reconnect, token might be expired'));
|
||||
let joinResponse: JoinResponse;
|
||||
try {
|
||||
if (!this.signalOpts) {
|
||||
this.log.warn(
|
||||
'attempted connection restart, without signal options present',
|
||||
this.logContext,
|
||||
);
|
||||
throw new SignalReconnectError();
|
||||
}
|
||||
return err(new SignalReconnectError());
|
||||
// in case a regionUrl is passed, the region URL takes precedence
|
||||
joinResponse = await this.join(regionUrl ?? this.url, this.token, this.signalOpts);
|
||||
} catch (e) {
|
||||
if (e instanceof ConnectionError && e.reason === ConnectionErrorReason.NotAllowed) {
|
||||
throw new UnexpectedConnectionState('could not reconnect, token might be expired');
|
||||
}
|
||||
throw new SignalReconnectError();
|
||||
}
|
||||
|
||||
if (self.shouldFailNext) {
|
||||
self.shouldFailNext = false;
|
||||
return err(new SimulatedError());
|
||||
if (this.shouldFailNext) {
|
||||
this.shouldFailNext = false;
|
||||
throw new Error('simulated failure');
|
||||
}
|
||||
|
||||
self.client.setReconnected();
|
||||
self.emit(EngineEvent.SignalRestarted, joinResult.value);
|
||||
this.client.setReconnected();
|
||||
this.emit(EngineEvent.SignalRestarted, joinResponse);
|
||||
|
||||
await self.waitForPCReconnected();
|
||||
await this.waitForPCReconnected();
|
||||
|
||||
// re-check signal connection state before setting engine as resumed
|
||||
if (self.client.currentState !== SignalConnectionState.CONNECTED) {
|
||||
return err(new SignalReconnectError('Signal connection got severed during reconnect'));
|
||||
if (this.client.currentState !== SignalConnectionState.CONNECTED) {
|
||||
throw new SignalReconnectError('Signal connection got severed during reconnect');
|
||||
}
|
||||
|
||||
self.regionUrlProvider?.resetAttempts();
|
||||
this.regionUrlProvider?.resetAttempts();
|
||||
// reconnect success
|
||||
self.emit(EngineEvent.Restarted);
|
||||
return ok();
|
||||
});
|
||||
|
||||
const restartResult = await restartResultAsync;
|
||||
|
||||
if (restartResult.isErr()) {
|
||||
this.emit(EngineEvent.Restarted);
|
||||
} catch (error) {
|
||||
const nextRegionUrl = await this.regionUrlProvider?.getNextBestRegionUrl();
|
||||
if (nextRegionUrl) {
|
||||
this.log.info('retrying signal connection');
|
||||
return this.restartConnection(nextRegionUrl);
|
||||
await this.restartConnection(nextRegionUrl);
|
||||
return;
|
||||
} else {
|
||||
// no more regions to try (or we're not on cloud)
|
||||
this.regionUrlProvider?.resetAttempts();
|
||||
return err(restartResult.error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
return ok(restartResult.value);
|
||||
}
|
||||
|
||||
private async resumeConnection(
|
||||
reason?: ReconnectReason,
|
||||
): Promise<Result<void, SignalReconnectError>> {
|
||||
private async resumeConnection(reason?: ReconnectReason): Promise<void> {
|
||||
if (!this.url || !this.token) {
|
||||
// permanent failure, don't attempt reconnection
|
||||
return errAsync(new UnexpectedConnectionState('could not reconnect, url or token not saved'));
|
||||
throw new UnexpectedConnectionState('could not reconnect, url or token not saved');
|
||||
}
|
||||
// trigger publisher reconnect
|
||||
if (!this.pcManager) {
|
||||
return errAsync(new UnexpectedConnectionState('publisher and subscriber connections unset'));
|
||||
throw new UnexpectedConnectionState('publisher and subscriber connections unset');
|
||||
}
|
||||
|
||||
this.log.info(`resuming signal connection, attempt ${this.reconnectAttempts}`, this.logContext);
|
||||
this.emit(EngineEvent.Resuming);
|
||||
this.setupSignalClientCallbacks();
|
||||
|
||||
const reconnectResult = await this.client.reconnect(
|
||||
this.url,
|
||||
this.token,
|
||||
this.participantSid,
|
||||
reason,
|
||||
);
|
||||
|
||||
if (reconnectResult.isErr()) {
|
||||
return err(reconnectResult.error);
|
||||
let res: ReconnectResponse | undefined;
|
||||
try {
|
||||
this.setupSignalClientCallbacks();
|
||||
res = await this.client.reconnect(this.url, this.token, this.participantSid, reason);
|
||||
} catch (error) {
|
||||
let message = '';
|
||||
if (error instanceof Error) {
|
||||
message = error.message;
|
||||
this.log.error(error.message, { ...this.logContext, error });
|
||||
}
|
||||
if (error instanceof ConnectionError && error.reason === ConnectionErrorReason.NotAllowed) {
|
||||
throw new UnexpectedConnectionState('could not reconnect, token might be expired');
|
||||
}
|
||||
if (error instanceof ConnectionError && error.reason === ConnectionErrorReason.LeaveRequest) {
|
||||
throw error;
|
||||
}
|
||||
throw new SignalReconnectError(message);
|
||||
}
|
||||
|
||||
this.emit(EngineEvent.SignalResumed);
|
||||
|
||||
const reconnectResponse = reconnectResult.value;
|
||||
if (reconnectResponse) {
|
||||
const rtcConfig = this.makeRTCConfiguration(reconnectResponse);
|
||||
if (res) {
|
||||
const rtcConfig = this.makeRTCConfiguration(res);
|
||||
this.pcManager.updateConfiguration(rtcConfig);
|
||||
if (this.latestJoinResponse) {
|
||||
this.latestJoinResponse.serverInfo = reconnectResponse.serverInfo;
|
||||
this.latestJoinResponse.serverInfo = res.serverInfo;
|
||||
}
|
||||
} else {
|
||||
this.log.warn('Did not receive reconnect response', this.logContext);
|
||||
@@ -1190,7 +1179,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
|
||||
if (this.shouldFailNext) {
|
||||
this.shouldFailNext = false;
|
||||
return err(new SimulatedError());
|
||||
throw new Error('simulated failure');
|
||||
}
|
||||
|
||||
await this.pcManager.triggerIceRestart();
|
||||
@@ -1199,7 +1188,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
|
||||
// re-check signal connection state before setting engine as resumed
|
||||
if (this.client.currentState !== SignalConnectionState.CONNECTED) {
|
||||
return err(new SignalReconnectError('Signal connection got severed during reconnect'));
|
||||
throw new SignalReconnectError('Signal connection got severed during reconnect');
|
||||
}
|
||||
|
||||
this.client.setReconnected();
|
||||
@@ -1210,14 +1199,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
this.createDataChannels();
|
||||
}
|
||||
|
||||
if (reconnectResponse?.lastMessageSeq) {
|
||||
this.resendReliableMessagesForResume(reconnectResponse.lastMessageSeq);
|
||||
if (res?.lastMessageSeq) {
|
||||
this.resendReliableMessagesForResume(res.lastMessageSeq);
|
||||
}
|
||||
|
||||
// resume success
|
||||
this.emit(EngineEvent.Resumed);
|
||||
|
||||
return ok();
|
||||
}
|
||||
|
||||
async waitForPCInitialConnection(timeout?: number, abortController?: AbortController) {
|
||||
@@ -1241,7 +1228,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
} catch (e: any) {
|
||||
// TODO do we need a `failed` state here for the PC?
|
||||
this.pcState = PCState.Disconnected;
|
||||
throw ConnectionError.internal(`could not establish PC connection: ${e.message}`);
|
||||
throw new ConnectionError(
|
||||
`could not establish PC connection, ${e.message}`,
|
||||
ConnectionErrorReason.InternalError,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1422,7 +1412,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
const transport = subscriber ? this.pcManager.subscriber : this.pcManager.publisher;
|
||||
const transportName = subscriber ? 'Subscriber' : 'Publisher';
|
||||
if (!transport) {
|
||||
throw ConnectionError.internal(`${transportName} connection not set`);
|
||||
throw new ConnectionError(
|
||||
`${transportName} connection not set`,
|
||||
ConnectionErrorReason.InternalError,
|
||||
);
|
||||
}
|
||||
|
||||
let needNegotiation = false;
|
||||
@@ -1441,8 +1434,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
}
|
||||
if (needNegotiation) {
|
||||
// start negotiation
|
||||
this.negotiate().catch((error) => {
|
||||
log.error(error, this.logContext);
|
||||
this.negotiate().catch((err) => {
|
||||
log.error(err, this.logContext);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1463,8 +1456,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
|
||||
await sleep(50);
|
||||
}
|
||||
|
||||
throw ConnectionError.internal(
|
||||
throw new ConnectionError(
|
||||
`could not establish ${transportName} connection, state: ${transport.getICEConnectionState()}`,
|
||||
ConnectionErrorReason.InternalError,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -180,9 +180,7 @@ describe('RegionUrlProvider', () => {
|
||||
const provider = new RegionUrlProvider('wss://test.livekit.cloud', 'token');
|
||||
fetchMock.mockResolvedValue(createMockResponse(401));
|
||||
|
||||
await expect(provider.fetchRegionSettings()).rejects.toThrow(
|
||||
ConnectionError.notAllowed('Could not fetch region settings: Unauthorized', 401),
|
||||
);
|
||||
await expect(provider.fetchRegionSettings()).rejects.toThrow(ConnectionError);
|
||||
await expect(provider.fetchRegionSettings()).rejects.toMatchObject({
|
||||
reason: ConnectionErrorReason.NotAllowed,
|
||||
status: 401,
|
||||
@@ -193,14 +191,10 @@ describe('RegionUrlProvider', () => {
|
||||
const provider = new RegionUrlProvider('wss://test.livekit.cloud', 'token');
|
||||
fetchMock.mockResolvedValue(createMockResponse(500));
|
||||
|
||||
await expect(provider.fetchRegionSettings()).rejects.toThrow(
|
||||
ConnectionError.internal('Could not fetch region settings: Internal Server Error', {
|
||||
status: 500,
|
||||
}),
|
||||
);
|
||||
await expect(provider.fetchRegionSettings()).rejects.toThrow(ConnectionError);
|
||||
await expect(provider.fetchRegionSettings()).rejects.toMatchObject({
|
||||
reason: ConnectionErrorReason.InternalError,
|
||||
context: { status: 500 },
|
||||
status: 500,
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -45,26 +45,25 @@ export class RegionUrlProvider {
|
||||
const regionSettings = (await regionSettingsResponse.json()) as RegionSettings;
|
||||
return { regionSettings, updatedAtInMs: Date.now(), maxAgeInMs };
|
||||
} else {
|
||||
throw regionSettingsResponse.status === 401
|
||||
? ConnectionError.notAllowed(
|
||||
`Could not fetch region settings: ${regionSettingsResponse.statusText}`,
|
||||
regionSettingsResponse.status,
|
||||
)
|
||||
: ConnectionError.internal(
|
||||
`Could not fetch region settings: ${regionSettingsResponse.statusText}`,
|
||||
{ status: regionSettingsResponse.status },
|
||||
);
|
||||
throw new ConnectionError(
|
||||
`Could not fetch region settings: ${regionSettingsResponse.statusText}`,
|
||||
regionSettingsResponse.status === 401
|
||||
? ConnectionErrorReason.NotAllowed
|
||||
: ConnectionErrorReason.InternalError,
|
||||
regionSettingsResponse.status,
|
||||
);
|
||||
}
|
||||
} catch (e: unknown) {
|
||||
if (e instanceof ConnectionError) {
|
||||
// rethrow connection errors
|
||||
throw e;
|
||||
} else if (signal?.aborted) {
|
||||
throw ConnectionError.cancelled(`Region fetching was aborted`);
|
||||
throw new ConnectionError(`Region fetching was aborted`, ConnectionErrorReason.Cancelled);
|
||||
} else {
|
||||
// wrap other errors as connection errors (e.g. timeouts)
|
||||
throw ConnectionError.serverUnreachable(
|
||||
throw new ConnectionError(
|
||||
`Could not fetch region settings, ${e instanceof Error ? `${e.name}: ${e.message}` : e}`,
|
||||
ConnectionErrorReason.ServerUnreachable,
|
||||
500, // using 500 as a catch-all manually set error code here
|
||||
);
|
||||
}
|
||||
|
||||
@@ -686,7 +686,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
|
||||
try {
|
||||
await BackOffStrategy.getInstance().getBackOffPromise(url);
|
||||
if (abortController.signal.aborted) {
|
||||
ConnectionError.cancelled('Connection attempt aborted');
|
||||
throw new ConnectionError('Connection attempt aborted', ConnectionErrorReason.Cancelled);
|
||||
}
|
||||
await this.attemptConnection(regionUrl ?? url, token, opts, abortController);
|
||||
this.abortController = undefined;
|
||||
@@ -773,7 +773,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
|
||||
roomOptions: InternalRoomOptions,
|
||||
abortController: AbortController,
|
||||
): Promise<JoinResponse> => {
|
||||
const joinResult = await engine.join(
|
||||
const joinResponse = await engine.join(
|
||||
url,
|
||||
token,
|
||||
{
|
||||
@@ -788,13 +788,6 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
|
||||
abortController.signal,
|
||||
);
|
||||
|
||||
// TODO continue propagating Result, we don't need to throw here
|
||||
if (joinResult.isErr()) {
|
||||
throw joinResult.error;
|
||||
}
|
||||
|
||||
const joinResponse = joinResult.value;
|
||||
|
||||
let serverInfo: Partial<ServerInfo> | undefined = joinResponse.serverInfo;
|
||||
if (!serverInfo) {
|
||||
serverInfo = { version: joinResponse.serverVersion, region: joinResponse.serverRegion };
|
||||
@@ -901,9 +894,12 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
|
||||
} catch (err) {
|
||||
await this.engine.close();
|
||||
this.recreateEngine();
|
||||
const resultingError = abortController.signal.aborted
|
||||
? ConnectionError.cancelled(`could not establish signal connection`)
|
||||
: ConnectionError.serverUnreachable(`could not establish signal connection`);
|
||||
const resultingError = new ConnectionError(
|
||||
`could not establish signal connection`,
|
||||
abortController.signal.aborted
|
||||
? ConnectionErrorReason.Cancelled
|
||||
: ConnectionErrorReason.ServerUnreachable,
|
||||
);
|
||||
if (err instanceof Error) {
|
||||
resultingError.message = `${resultingError.message}: ${err.message}`;
|
||||
}
|
||||
@@ -921,7 +917,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
|
||||
if (abortController.signal.aborted) {
|
||||
await this.engine.close();
|
||||
this.recreateEngine();
|
||||
throw ConnectionError.cancelled(`Connection attempt aborted`);
|
||||
throw new ConnectionError(`Connection attempt aborted`, ConnectionErrorReason.Cancelled);
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -978,7 +974,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
|
||||
this.log.warn(msg, this.logContext);
|
||||
this.abortController?.abort(msg);
|
||||
// in case the abort controller didn't manage to cancel the connection attempt, reject the connect promise explicitly
|
||||
this.connectFuture?.reject?.(ConnectionError.cancelled('Client initiated disconnect'));
|
||||
this.connectFuture?.reject?.(
|
||||
new ConnectionError('Client initiated disconnect', ConnectionErrorReason.Cancelled),
|
||||
);
|
||||
this.connectFuture = undefined;
|
||||
}
|
||||
|
||||
@@ -1927,7 +1925,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
|
||||
});
|
||||
if (byteLength(response) > MAX_PAYLOAD_BYTES) {
|
||||
responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE');
|
||||
this.log.warn(`RPC Response payload too large for ${method}`, this.logContext);
|
||||
console.warn(`RPC Response payload too large for ${method}`);
|
||||
} else {
|
||||
responsePayload = response;
|
||||
}
|
||||
@@ -1935,9 +1933,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
|
||||
if (error instanceof RpcError) {
|
||||
responseError = error;
|
||||
} else {
|
||||
this.log.warn(
|
||||
console.warn(
|
||||
`Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`,
|
||||
{ ...this.logContext, error },
|
||||
error,
|
||||
);
|
||||
responseError = RpcError.builtIn('APPLICATION_ERROR');
|
||||
}
|
||||
|
||||
@@ -10,14 +10,6 @@ export class LivekitError extends Error {
|
||||
}
|
||||
}
|
||||
|
||||
export class SimulatedError extends LivekitError {
|
||||
readonly type = 'simulated';
|
||||
|
||||
constructor(message = 'Simulated failure') {
|
||||
super(-1, message);
|
||||
}
|
||||
}
|
||||
|
||||
export enum ConnectionErrorReason {
|
||||
NotAllowed,
|
||||
ServerUnreachable,
|
||||
@@ -25,132 +17,30 @@ export enum ConnectionErrorReason {
|
||||
Cancelled,
|
||||
LeaveRequest,
|
||||
Timeout,
|
||||
WebSocket,
|
||||
}
|
||||
|
||||
type NotAllowed = {
|
||||
reason: ConnectionErrorReason.NotAllowed;
|
||||
status: number;
|
||||
context?: unknown;
|
||||
};
|
||||
|
||||
type InternalError = {
|
||||
reason: ConnectionErrorReason.InternalError;
|
||||
status: never;
|
||||
context?: { status?: number; statusText?: string };
|
||||
};
|
||||
|
||||
type ConnectionTimeout = {
|
||||
reason: ConnectionErrorReason.Timeout;
|
||||
status: never;
|
||||
context: never;
|
||||
};
|
||||
|
||||
type LeaveRequest = {
|
||||
reason: ConnectionErrorReason.LeaveRequest;
|
||||
status: never;
|
||||
context: DisconnectReason;
|
||||
};
|
||||
|
||||
type Cancelled = {
|
||||
reason: ConnectionErrorReason.Cancelled;
|
||||
status: never;
|
||||
context: never;
|
||||
};
|
||||
|
||||
type ServerUnreachable = {
|
||||
reason: ConnectionErrorReason.ServerUnreachable;
|
||||
export class ConnectionError extends LivekitError {
|
||||
status?: number;
|
||||
context?: never;
|
||||
};
|
||||
|
||||
type WebSocket = {
|
||||
reason: ConnectionErrorReason.WebSocket;
|
||||
status?: number;
|
||||
context?: string;
|
||||
};
|
||||
context?: unknown | DisconnectReason;
|
||||
|
||||
type ConnectionErrorVariants =
|
||||
| NotAllowed
|
||||
| ConnectionTimeout
|
||||
| LeaveRequest
|
||||
| InternalError
|
||||
| Cancelled
|
||||
| ServerUnreachable
|
||||
| WebSocket;
|
||||
|
||||
export class ConnectionError<
|
||||
Variant extends ConnectionErrorVariants = ConnectionErrorVariants,
|
||||
> extends LivekitError {
|
||||
status?: Variant['status'];
|
||||
|
||||
context: Variant['context'];
|
||||
|
||||
reason: Variant['reason'];
|
||||
reason: ConnectionErrorReason;
|
||||
|
||||
reasonName: string;
|
||||
|
||||
readonly name = 'ConnectionError';
|
||||
|
||||
protected constructor(
|
||||
constructor(
|
||||
message: string,
|
||||
reason: Variant['reason'],
|
||||
status?: Variant['status'],
|
||||
context?: Variant['context'],
|
||||
reason: ConnectionErrorReason,
|
||||
status?: number,
|
||||
context?: unknown | DisconnectReason,
|
||||
) {
|
||||
super(1, message);
|
||||
this.name = 'ConnectionError';
|
||||
this.status = status;
|
||||
this.reason = reason;
|
||||
this.context = context;
|
||||
this.reasonName = ConnectionErrorReason[reason];
|
||||
}
|
||||
|
||||
static notAllowed(message: string, status: number, context?: unknown) {
|
||||
return new ConnectionError<NotAllowed>(
|
||||
message,
|
||||
ConnectionErrorReason.NotAllowed,
|
||||
status,
|
||||
context,
|
||||
);
|
||||
}
|
||||
|
||||
static timeout(message: string) {
|
||||
return new ConnectionError<ConnectionTimeout>(message, ConnectionErrorReason.Timeout);
|
||||
}
|
||||
|
||||
static leaveRequest(message: string, context: DisconnectReason) {
|
||||
return new ConnectionError<LeaveRequest>(
|
||||
message,
|
||||
ConnectionErrorReason.LeaveRequest,
|
||||
undefined,
|
||||
context,
|
||||
);
|
||||
}
|
||||
|
||||
static internal(message: string, context?: { status?: number; statusText?: string }) {
|
||||
return new ConnectionError<InternalError>(
|
||||
message,
|
||||
ConnectionErrorReason.InternalError,
|
||||
undefined,
|
||||
context,
|
||||
);
|
||||
}
|
||||
|
||||
static cancelled(message: string) {
|
||||
return new ConnectionError<Cancelled>(message, ConnectionErrorReason.Cancelled);
|
||||
}
|
||||
|
||||
static serverUnreachable(message: string, status?: number) {
|
||||
return new ConnectionError<ServerUnreachable>(
|
||||
message,
|
||||
ConnectionErrorReason.ServerUnreachable,
|
||||
status,
|
||||
);
|
||||
}
|
||||
|
||||
static websocket(message: string, status?: number, reason?: string) {
|
||||
return new ConnectionError<WebSocket>(message, ConnectionErrorReason.WebSocket, status, reason);
|
||||
}
|
||||
}
|
||||
|
||||
export class DeviceUnsupportedError extends LivekitError {
|
||||
|
||||
Reference in New Issue
Block a user