feat(examples): openai realtime voice (#676)

* feat(examples): openai realtime voice

* fix: static check

* fix: remove deprecated write silence option
This commit is contained in:
Anunay Maheshwari
2025-05-29 11:45:57 +05:30
committed by GitHub
parent ab4282b03f
commit c464d000c8
8 changed files with 326 additions and 1 deletions

5
.gitignore vendored
View File

@@ -28,4 +28,7 @@ bin/
*.DS_Store
# data stream test files
*.mp4
*.mp4
#env
.env

View File

@@ -0,0 +1,8 @@
API_KEY=your-openai-api-key
WSS_URL=wss://api.openai.com/v1/realtime
MODEL=gpt-4o-realtime-preview-2024-12-17
LK_HOST=ws://localhost:7880
LK_API_KEY=devkey
LK_API_SECRET=secret
LK_ROOM_NAME=test
LK_PARTICIPANT_IDENTITY=go-agent

View File

@@ -0,0 +1,64 @@
package main
import (
"net/http"
"net/url"
"os"
"github.com/gorilla/websocket"
"github.com/joho/godotenv"
lksdk "github.com/livekit/server-sdk-go/v2"
)
func loadEnv() {
err := godotenv.Load()
if err != nil {
panic(err)
}
}
func connectToRealtimeAPI() (*websocket.Conn, string, error) {
wssUrl := os.Getenv("WSS_URL")
apiKey := os.Getenv("API_KEY")
model := os.Getenv("MODEL")
u, err := url.Parse(wssUrl)
if err != nil {
panic(err)
}
q := u.Query()
q.Add("model", model)
u.RawQuery = q.Encode()
header := http.Header{}
header.Add("Authorization", "Bearer "+apiKey)
header.Add("OpenAI-Beta", "realtime=v1")
conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
return nil, "", err
}
return conn, wssUrl, nil
}
func connectToLKRoom(cb *lksdk.RoomCallback) (*lksdk.Room, error) {
host := os.Getenv("LK_HOST")
apiKey := os.Getenv("LK_API_KEY")
apiSecret := os.Getenv("LK_API_SECRET")
roomName := os.Getenv("LK_ROOM_NAME")
participantIdentity := os.Getenv("LK_PARTICIPANT_IDENTITY")
room, err := lksdk.ConnectToRoom(host, lksdk.ConnectInfo{
APIKey: apiKey,
APISecret: apiSecret,
RoomName: roomName,
ParticipantIdentity: participantIdentity,
}, cb)
if err != nil {
return nil, err
}
return room, nil
}

View File

@@ -0,0 +1,108 @@
package main
import (
"os"
"os/signal"
"syscall"
"github.com/livekit/media-sdk"
"github.com/livekit/protocol/logger"
lksdk "github.com/livekit/server-sdk-go/v2"
lkmedia "github.com/livekit/server-sdk-go/v2/pkg/media"
"github.com/pion/webrtc/v4"
)
func callbacksForLkRoom(handler *RealtimeAPIHandler) *lksdk.RoomCallback {
var pcmRemoteTrack *lkmedia.PCMRemoteTrack
return &lksdk.RoomCallback{
ParticipantCallback: lksdk.ParticipantCallback{
OnTrackSubscribed: func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
if pcmRemoteTrack != nil {
// only handle one track
return
}
pcmRemoteTrack, _ = handleSubscribe(track, handler)
},
},
OnDisconnected: func() {
if pcmRemoteTrack != nil {
pcmRemoteTrack.Close()
pcmRemoteTrack = nil
}
},
OnDisconnectedWithReason: func(reason lksdk.DisconnectionReason) {
if pcmRemoteTrack != nil {
pcmRemoteTrack.Close()
pcmRemoteTrack = nil
}
},
}
}
func main() {
loadEnv()
audioWriterChan := make(chan media.PCM16Sample)
defer close(audioWriterChan)
handler, err := NewRealtimeAPIHandler(&RealtimeAPIHandlerCallbacks{
OnAudioReceived: func(audio media.PCM16Sample) {
audioWriterChan <- audio
},
})
if err != nil {
panic(err)
}
defer handler.Close()
room, err := connectToLKRoom(callbacksForLkRoom(handler))
if err != nil {
panic(err)
}
defer room.Disconnect()
go handlePublish(room, audioWriterChan)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)
<-sigChan
}
func handlePublish(room *lksdk.Room, audioWriterChan chan media.PCM16Sample) {
publishTrack, err := lkmedia.NewPCMLocalTrack(24000, 1, logger.GetLogger())
if err != nil {
panic(err)
}
defer func() {
publishTrack.ClearQueue()
publishTrack.Close()
}()
if _, err = room.LocalParticipant.PublishTrack(publishTrack, &lksdk.TrackPublicationOptions{
Name: "test",
}); err != nil {
panic(err)
}
for sample := range audioWriterChan {
if err := publishTrack.WriteSample(sample); err != nil {
logger.Errorw("Failed to write sample", err)
}
}
}
func handleSubscribe(track *webrtc.TrackRemote, handler *RealtimeAPIHandler) (*lkmedia.PCMRemoteTrack, error) {
if track.Codec().MimeType != webrtc.MimeTypeOpus {
logger.Warnw("Received non-opus track", nil, "track", track.Codec().MimeType)
}
writer := NewRemoteTrackWriter(handler)
trackWriter, err := lkmedia.NewPCMRemoteTrack(track, writer, lkmedia.WithTargetSampleRate(24000))
if err != nil {
logger.Errorw("Failed to create remote track", err)
return nil, err
}
return trackWriter, nil
}

View File

@@ -0,0 +1,104 @@
package main
import (
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/livekit/media-sdk"
"github.com/livekit/protocol/logger"
)
type RealtimeAPIHandler struct {
conn *websocket.Conn
url string
cb *RealtimeAPIHandlerCallbacks
}
type RealtimeAPIHandlerCallbacks struct {
OnAudioReceived func(audio media.PCM16Sample)
}
func NewRealtimeAPIHandler(cb *RealtimeAPIHandlerCallbacks) (*RealtimeAPIHandler, error) {
conn, url, err := connectToRealtimeAPI()
if err != nil {
return nil, err
}
ws := &RealtimeAPIHandler{
conn: conn,
url: url,
cb: cb,
}
go ws.readMessages()
return ws, nil
}
func (h *RealtimeAPIHandler) Url() string {
return h.url
}
func (h *RealtimeAPIHandler) sendMessage(messageType string, key string, value string) error {
message := fmt.Sprintf(`{"type": "%s", "%s": "%s"}`, messageType, key, value)
err := h.conn.WriteMessage(websocket.TextMessage, []byte(message))
if err != nil {
return err
}
return nil
}
func (h *RealtimeAPIHandler) SendAudioChunk(sample media.PCM16Sample) error {
bytes := make([]byte, len(sample)*2)
for i, s := range sample {
binary.LittleEndian.PutUint16(bytes[i*2:], uint16(s))
}
encodedSample := base64.StdEncoding.EncodeToString(bytes)
return h.sendMessage("input_audio_buffer.append", "audio", encodedSample)
}
func (h *RealtimeAPIHandler) readMessages() {
for {
_, message, err := h.conn.ReadMessage()
if err != nil {
logger.Errorw("Error reading message", err)
h.Close()
break
}
h.handleMessage(string(message))
}
}
func (h *RealtimeAPIHandler) handleMessage(message string) {
var data map[string]interface{}
err := json.Unmarshal([]byte(message), &data)
if err != nil {
logger.Errorw("Failed to unmarshal message", err)
return
}
switch data["type"] {
case "response.audio.delta":
audioBase64 := data["delta"].(string)
audioBytes, err := base64.StdEncoding.DecodeString(audioBase64)
audioPCM16 := make(media.PCM16Sample, len(audioBytes)/2)
for i := 0; i < len(audioBytes); i += 2 {
audioPCM16[i/2] = int16(binary.LittleEndian.Uint16(audioBytes[i : i+2]))
}
if err != nil {
logger.Errorw("Failed to decode audio", err)
return
}
h.cb.OnAudioReceived(audioPCM16)
default:
logger.Errorw("Unknown message type", nil, "type", data["type"])
}
}
func (m *RealtimeAPIHandler) Close() error {
return m.conn.Close()
}

View File

@@ -0,0 +1,35 @@
package main
import (
"errors"
"go.uber.org/atomic"
"github.com/livekit/media-sdk"
)
var ErrClosed = errors.New("writer is closed")
type RemoteTrackWriter struct {
handler *RealtimeAPIHandler
closed atomic.Bool
}
func NewRemoteTrackWriter(handler *RealtimeAPIHandler) *RemoteTrackWriter {
return &RemoteTrackWriter{
handler: handler,
}
}
func (w *RemoteTrackWriter) WriteSample(sample media.PCM16Sample) error {
if w.closed.Load() {
return ErrClosed
}
return w.handler.SendAudioChunk(sample)
}
func (w *RemoteTrackWriter) Close() error {
w.closed.Swap(true)
return nil
}

1
go.mod
View File

@@ -46,6 +46,7 @@ require (
github.com/google/cel-go v0.25.0 // indirect
github.com/jfreymuth/oggvorbis v1.0.5 // indirect
github.com/jfreymuth/vorbis v1.0.2 // indirect
github.com/joho/godotenv v1.5.1
github.com/jxskiss/base62 v1.1.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect

2
go.sum
View File

@@ -79,6 +79,8 @@ github.com/jfreymuth/oggvorbis v1.0.5 h1:u+Ck+R0eLSRhgq8WTmffYnrVtSztJcYrl588DM4
github.com/jfreymuth/oggvorbis v1.0.5/go.mod h1:1U4pqWmghcoVsCJJ4fRBKv9peUJMBHixthRlBeD6uII=
github.com/jfreymuth/vorbis v1.0.2 h1:m1xH6+ZI4thH927pgKD8JOH4eaGRm18rEE9/0WKjvNE=
github.com/jfreymuth/vorbis v1.0.2/go.mod h1:DoftRo4AznKnShRl1GxiTFCseHr4zR9BN3TWXyuzrqQ=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw=
github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=