fix(datastream): reader close, writer goroutine leak (#818)

* fix(datastream): reader close, writer goroutine leak

* go mod tidy

* update comment
This commit is contained in:
Anunay Maheshwari
2025-12-23 05:48:51 +05:30
committed by GitHub
parent 411a594c97
commit 2bb3eea850
3 changed files with 46 additions and 12 deletions

2
go.mod
View File

@@ -100,7 +100,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/iters v1.2.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/frostbyte73/core v0.1.1 // indirect
github.com/frostbyte73/core v0.1.1
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/gammazero/deque v1.2.0
github.com/go-jose/go-jose/v3 v3.0.4 // indirect

15
room.go
View File

@@ -638,10 +638,25 @@ func (r *Room) cleanup() {
r.engine.Close()
r.LocalParticipant.closeTracks()
r.setSid("", true)
r.byteStreamHandlers.Clear()
r.byteStreamReaders.Range(func(key, value any) bool {
if reader, ok := value.(*ByteStreamReader); ok {
reader.close()
}
return true
})
r.byteStreamReaders.Clear()
r.textStreamHandlers.Clear()
r.textStreamReaders.Range(func(key, value any) bool {
if reader, ok := value.(*TextStreamReader); ok {
reader.close()
}
return true
})
r.textStreamReaders.Clear()
r.rpcHandlers.Clear()
r.LocalParticipant.cleanup()
}

View File

@@ -6,6 +6,7 @@ import (
"sync"
"sync/atomic"
"github.com/frostbyte73/core"
protocol "github.com/livekit/protocol/livekit"
)
@@ -104,7 +105,7 @@ type baseStreamWriter[T any] struct {
onProgress func(progress float64)
chunkIndex uint64
closed atomic.Bool
closed core.Fuse
lock sync.Mutex
writeQueue chan writeTask
@@ -128,8 +129,20 @@ func newBaseStreamWriter[T any](engine *RTCEngine, header *protocol.DataStream_H
// processes write queue asynchronously
func (w *baseStreamWriter[T]) processWriteQueue() {
for task := range w.writeQueue {
w.writeStreamBytes(task.chunks, task.onDone)
for {
select {
case task := <-w.writeQueue:
w.writeStreamBytes(task.chunks, task.onDone)
case <-w.closed.Watch():
// Drain any pending tasks - close sends a trailer and no data should be sent after that
for {
select {
case <-w.writeQueue:
default:
return
}
}
}
}
}
@@ -137,33 +150,39 @@ func (w *baseStreamWriter[T]) processWriteQueue() {
// depending on the type of the stream writer
// onDone is a callback function that will be called when the data provided is written to the stream
func (w *baseStreamWriter[T]) Write(data T, onDone *func()) {
if w.closed.Load() {
if w.closed.IsBroken() {
return
}
var task writeTask
switch v := any(data).(type) {
case []byte:
w.writeQueue <- writeTask{
task = writeTask{
chunks: chunkBytes(v),
onDone: onDone,
}
case string:
w.writeQueue <- writeTask{
task = writeTask{
chunks: chunkUtf8String(v),
onDone: onDone,
}
default:
return
}
select {
case w.writeQueue <- task:
case <-w.closed.Watch():
}
}
// Close the stream, this will send a stream trailer to notify the receiver that the stream is closed
func (w *baseStreamWriter[T]) Close() {
if !w.closed.Load() {
w.closed.Store(true)
w.closed.Once(func() {
w.lock.Lock()
w.engine.publishStreamTrailer(w.streamId, w.destinationIdentities)
w.lock.Unlock()
}
})
}
// writes a list of chunks to the stream
@@ -171,7 +190,7 @@ func (w *baseStreamWriter[T]) writeStreamBytes(chunks [][]byte, onDone *func())
w.lock.Lock()
chunkIndex := w.chunkIndex
for i := 0; i < len(chunks) && !w.closed.Load(); i++ {
for i := 0; i < len(chunks) && !w.closed.IsBroken(); i++ {
chunk := chunks[i]
w.engine.waitForBufferStatusLow(protocol.DataPacket_RELIABLE)