Recorder v2 (#13)
Some checks failed
Release to Docker / docker (push) Has been cancelled

* dump

* first success

* remove output

* mp4 filesink success

* rtmp

* refactor

* config updates

* cleaning, stop

* beeg changes

* s3

* s3 credentials

* tests

* nonlinux builds

* build test

* skip mage

* pipeline build tags

* delete old service dockerfile

* readme update

* request tests, file output

* rearrange readme

* get docker working

* cleaning

* file pipieline

* 🤦

* audio caps filter

* remove scaling

* framerate

* use SetProperty

* refactor

* more refactoring

* dump

* fix init

* fixed gst_debug

* remove separate gst_log_level

* docker updates

* fix contexts

* update protocol

* multiple rtmp

* bins

* remove mock, enforce init, better logging

* multiple rtmp working

* add and remove output

* fix tests
This commit is contained in:
David Colburn
2021-10-08 14:12:36 -07:00
committed by GitHub
parent db02598d5a
commit 30a250fe0c
53 changed files with 2279 additions and 6532 deletions

1
.checksumgo Normal file
View File

@@ -0,0 +1 @@
3285f5eb18fbfb346423d625201efec1364f305d

View File

@@ -6,7 +6,6 @@ on:
- '*'
paths-ignore:
- web
- recorder
pull_request:
types:
- opened
@@ -17,9 +16,6 @@ on:
jobs:
test:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./service
steps:
- uses: actions/checkout@v2
@@ -30,13 +26,12 @@ jobs:
~/go/bin
~/bin/protoc
~/.cache
key: livekit-recorder-service
key: livekit-recorder
- uses: shogo82148/actions-setup-redis@v1.10.3
with:
redis-version: '6.x'
auto-start: true
- run: redis-cli ping
- name: Set up Go
uses: actions/setup-go@v2
@@ -46,25 +41,5 @@ jobs:
- name: Download Go modules
run: go mod download
- name: Download protoc
run: |
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v3.15.8/protoc-3.15.8-linux-x86_64.zip
sudo unzip protoc-3.15.8-linux-x86_64.zip -d /usr
sudo chmod 755 /usr/bin/protoc
- name: Install protobuf generators
run: go install google.golang.org/protobuf/cmd/protoc-gen-go
- name: Mage Build
uses: magefile/mage-action@v1
with:
version: latest
args: build
workdir: ./service
- name: Mage Test
uses: magefile/mage-action@v1
with:
version: latest
args: test
workdir: ./service
- name: Go test
run: go test -tags=test ./...

View File

@@ -3,8 +3,9 @@ on:
push:
branches: [ main ]
paths-ignore:
- service/**
- recorder/**
- cmd/**
- pkg/**
- version/**
jobs:
deploy:

View File

@@ -20,8 +20,8 @@ jobs:
~/go/bin
~/bin/protoc
~/.cache
key: ${{ runner.os }}-recorder-service-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-recorder-service
key: ${{ runner.os }}-recorder-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-recorder
- uses: actions/cache@v2
with:
@@ -29,8 +29,8 @@ jobs:
key: ${{ runner.os }}-recorder-modules-${{ hashFiles('**/package-lock.json') }}
restore-keys: ${{ runner.os }}-recorder-modules
- name: Recorder docker meta
id: recorder-meta
- name: Docker metadata
id: docker-md
uses: docker/metadata-action@v3
with:
images: livekit/livekit-recorder
@@ -39,25 +39,6 @@ jobs:
type=semver,pattern=v{{version}}
type=semver,pattern=v{{major}}.{{minor}}
- name: Service docker meta
id: service-meta
uses: docker/metadata-action@v3
with:
images: livekit/livekit-recorder-service
# generate Docker tags based on the following events/attributes
tags: |
type=semver,pattern=v{{version}}
type=semver,pattern=v{{major}}.{{minor}}
- name: Set up Node
uses: actions/setup-node@v2
with:
node-version: '16.x'
- name: Download node modules
run: npm install
working-directory: ./recorder
- name: Set up Go
uses: actions/setup-go@v2
with:
@@ -65,7 +46,6 @@ jobs:
- name: Download Go modules
run: go mod download
working-directory: ./service
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
@@ -76,20 +56,11 @@ jobs:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push recorder
uses: docker/build-push-action@v2
with:
context: ./recorder
push: true
platforms: linux/amd64
tags: ${{ steps.recorder-meta.outputs.tags }}
labels: ${{ steps.recorder-meta.outputs.labels }}
- name: Build and push service
- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
push: true
platforms: linux/amd64
tags: ${{ steps.service-meta.outputs.tags }}
labels: ${{ steps.service-meta.outputs.labels }}
tags: ${{ steps.docker-md.outputs.tags }}
labels: ${{ steps.docker-md.outputs.labels }}

1
.gitignore vendored
View File

@@ -1 +1,2 @@
.idea/
bin/

View File

@@ -1,50 +1,48 @@
FROM golang:1.16-alpine as builder
FROM restreamio/gstreamer:1.18.5.0-dev as builder
WORKDIR /workspace
RUN apt-get update && apt-get install -y golang
# Copy the Go Modules manifests
COPY service/go.mod go.mod
COPY service/go.sum go.sum
COPY go.mod .
COPY go.sum .
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download
# Copy the go source
COPY service/cmd/ cmd/
COPY service/pkg/ pkg/
COPY service/version/ version/
COPY cmd/ cmd/
COPY pkg/ pkg/
COPY version/ version/
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o livekit-recorder-service ./cmd/server
WORKDIR /workspace
RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o livekit-recorder ./cmd/server
FROM buildkite/puppeteer:latest
FROM restreamio/gstreamer:1.18.5.0-dev
COPY --from=builder /workspace/livekit-recorder-service /livekit-recorder-service
COPY --from=builder /workspace/livekit-recorder /livekit-recorder
# Install pulse audio
RUN apt-get -qq update && apt-get install -y pulseaudio
# install deps
RUN apt-get update && \
apt-get install -y curl unzip wget gnupg xvfb pulseaudio gstreamer1.0-pulseaudio
# install chrome
RUN wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb && \
apt-get install -y ./google-chrome-stable_current_amd64.deb
# install chromedriver
RUN wget -N http://chromedriver.storage.googleapis.com/2.46/chromedriver_linux64.zip && \
unzip chromedriver_linux64.zip && \
chmod +x chromedriver && \
mv -f chromedriver /usr/local/bin/chromedriver
# Add root user to group for pulseaudio access
RUN adduser root pulse-access
# xvfb
RUN apt-get install -y xvfb
# create xdg_runtime_dir
RUN mkdir -pv ~/.cache/xdgr
# ffmpeg
RUN apt-get install -y ffmpeg
# node
RUN apt-get install -y nodejs
# Copy recorder
WORKDIR /app
COPY recorder/package.json recorder/package-lock.json recorder/tsconfig.json ./
COPY recorder/src ./src
RUN npm install
# Silence error about livekit-server-sdk protos
RUN npx tsc src/*.ts; exit 0
# Run the service
WORKDIR /
# run
COPY entrypoint.sh .
ENTRYPOINT ["./entrypoint.sh"]

232
README.md
View File

@@ -3,15 +3,233 @@
All your live recording needs in one place.
Record any website using our recorder, or deploy our service to manage it for you.
## Service
## How it works
Simply deploy the [service](https://github.com/livekit/livekit-recorder/tree/main/service), and your livekit server will handle the rest.
The recorder launches Chrome and navigates to the supplied url, grabs audio from pulse and video from a virtual frame buffer, and feeds them into GStreamer.
You can write the output as mp4 to a file or upload it to s3, or forward the output to one or multiple rtmp streams.
## Recorder
## Config
Want to record, but haven't made the switch to LiveKit yet? No problem! We built a standalone
[recorder](https://github.com/livekit/livekit-recorder/tree/main/recorder) that works with any provider (or any website)!
Both the standalone recorder and recorder service take a yaml config file. If you will be using templates with your recording requests, `ws_url` is required, and to record by room name instead of token, `api_key` and
`api_secret` are also required. When running in service mode, `redis` config is required (with the same db as your LiveKit server), as this is how it receives requests.
## Templates
All config options:
Use our pre-built recording [templates](https://github.com/livekit/livekit-recorder/tree/main/web), or create your own.
```yaml
api_key: livekit server api key (required if using templates without supplying tokens)
api_secret: livekit server api secret (required if using templates without supplying tokens)
ws_url: livekit server ws url (required if using templates)
health_port: http port to serve status (optional)
log_level: valid levels are debug, info, warn, error, fatal, or panic. Defaults to debug
redis: (service mode only)
address: redis address, including port
username: redis username (optional)
password: redis password (optional)
db: redis db (optional)
s3: (required if using s3 output)
access_key: s3 access key
secret: s3 access secret
region: s3 region
defaults:
preset: defaults to "NONE", see options below
width: defaults to 1920
height: defaults to 1080
depth: defaults to 24
framerate: defaults to 30
audio_bitrate: defaults to 128 (kbps)
audio_frequency: defaults to 44100 (Hz)
video_bitrate: defaults to 4500 (kbps)
```
### Presets
| Preset | width | height | framerate | video_bitrate |
|--- |--- |--- |--- |--- |
| "HD_30" | 1280 | 720 | 30 | 3000 |
| "HD_60" | 1280 | 720 | 60 | 4500 |
| "FULL_HD_30" | 1920 | 1080 | 30 | 4500 |
| "FULL_HD_60" | 1920 | 1080 | 60 | 6000 |
If you don't supply any options with your config defaults or the request, it defaults to FULL_HD_30.
## Request
See StartRecordingRequest [here](https://github.com/livekit/protocol/blob/main/livekit_recording.proto#L16).
When using standalone mode, the request can be input as a json file. In service mode, these requests will be made through
the LiveKit server's recording api.
### Input
You can input either a `url` to record from, or choose a `template` and a `layout`, and supply either a `room_name` or `token`.
We currently have 4 templates available - grid or speaker, each available in light or dark.
Your config will need your server api key and secret, along with the websocket url.
Check out our [web README](https://github.com/livekit/livekit-recorder/tree/main/web) to learn more or create your own.
### Output
You can either output to a `file`, upload to an `s3_url`, or write to one or more `rtmp` `urls`.
### Options
You can also override any defaults set in your `config.yaml`.
All request options:
```json
{
"url": "<your-recording-domain.com>",
"template": {
"layout": "<grid|speaker>-<light|dark>",
"room_name": "<room-to-record>",
"token": "<token>"
},
"file": "/out/recording.mp4",
"s3_url": "bucket/path/filename.mp4",
"rtmp": {
"urls": ["<rtmp://stream-url.com>"]
},
"options": {
"preset": "FULL_HD_60",
"width": 1920,
"height": 1080,
"depth": 24,
"framerate": 60,
"audio_bitrate": 128,
"audio_frequency": 44100,
"video_bitrate": 6000
}
}
```
# Service Mode
Simply deploy the service, and submit requests through your LiveKit server.
### How it works
The service listens to a redis subscription and waits for the LiveKit server to make a reservation. Once the reservation
is made to ensure availability, the service waits for a StartRecording request from the server before launching the recorder.
The recorder will be stopped by either a `END_RECORDING` signal from the server, or automatically when the last participant leaves if using our templates.
A single service instance can record one room at a time.
### Deployment
See guides and deployment docs at https://docs.livekit.io/guides/recording
### Running locally
If you want to try running against a local livekit server, you'll need to make a couple changes:
* open `/usr/local/etc/redis.conf` and comment out the line that says `bind 127.0.0.1`
* change `protected-mode yes` to `protected-mode no` in the same file
* add `--network host` to your `docker run` command
* update your redis address from `localhost` to your host ip as docker sees it:
* on linux, this should be `172.17.0.1`
* on mac or windows, run `docker run -it --rm alpine nslookup host.docker.internal` and you should see something like
`Name: host.docker.internal
Address: 192.168.65.2`
These changes allow the service to connect to your local redis instance from inside the docker container.
Finally, to build and run:
```bash
docker build -t livekit-recorder .
docker run --network host \
-e SERVICE_MODE=1 \
-e LIVEKIT_RECORDER_CONFIG="$(cat config.yaml)" \
livekit-recorder
```
You can then use our [cli](https://github.com/livekit/livekit-cli) to submit recording requests to your server.
# Examples
Start by filling in a config.yaml:
```
api_key: <livekit-server-api-key>
api_secret: <livekit-server-api-secret>
ws_url: <livekit-server-ws-url>
s3:
access_key: <s3-access-key>
secret: <s3-secret>
region: <s3-region>
```
## Basic recording
basic.json:
```json
{
"template": {
"layout": "speaker-dark",
"room_name": "my-room"
},
"file": "/out/demo.mp4"
}
```
```bash
mkdir -p ~/livekit/output
docker run --rm \
-e LIVEKIT_RECORDER_CONFIG="$(cat config.yaml)" \
-e RECORDING_REQUEST="$(cat basic.json)" \
-v ~/livekit/recordings:/out \
livekit/livekit-recorder
```
## Record at 720p, with 2048kbps video bitrate, and upload result to s3
s3.json:
```json
{
"url": "https://www.youtube.com/watch?v=BHACKCNDMW8",
"s3_url": "bucket/path/filename.mp4",
"options": {
"width": "1280",
"height": "720",
"video_bitrate": 2048
}
}
```
```bash
docker run --name my-recorder --rm \
-e LIVEKIT_RECORDER_CONFIG="$(cat config.yaml)" \
-e RECORDING_REQUEST="$(cat s3.json)" \
livekit/livekit-recorder
```
```bash
docker stop my-recorder
```
## Stream to Twitch at 1080p, 60fps
twitch.json:
```json
{
"template": {
"layout": "speaker-dark",
"token": "<recording-token>"
},
"rtmp": {
"urls": ["rtmp://live.twitch.tv/app/<stream-key>"]
},
"options": {
"preset": "FULL_HD_60"
}
}
```
```bash
docker run --rm \
-e LIVEKIT_RECORDER_CONFIG="$(cat config.yaml)" \
-e RECORDING_REQUEST="$(cat twitch.json)" \
livekit/livekit-recorder
```
## Ending a recording
Once started, there are a number of ways to end the recording:
* `docker stop <container>`
* if using our templates, the recorder will stop automatically when the last participant leaves
* if using your own webpage, logging `END_RECORDING` to the console
With any of these methods, the recorder will stop ffmpeg and finish uploading before shutting down.

118
cmd/server/main.go Normal file
View File

@@ -0,0 +1,118 @@
package main
import (
"errors"
"fmt"
"io/ioutil"
"os"
"github.com/go-logr/zapr"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/encoding/protojson"
"github.com/livekit/livekit-recorder/pkg/config"
"github.com/livekit/livekit-recorder/version"
)
func main() {
app := &cli.App{
Name: "livekit-recorder",
Usage: "LiveKit Recorder",
Description: "runs the recorder in standalone mode or as a service",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "service-mode",
Usage: "run recorder service",
EnvVars: []string{"SERVICE_MODE"},
},
&cli.StringFlag{
Name: "config",
Usage: "path to LiveKit recording config defaults",
},
&cli.StringFlag{
Name: "config-body",
Usage: "Default LiveKit recording config in JSON, typically passed in as an env var in a container",
EnvVars: []string{"LIVEKIT_RECORDER_CONFIG"},
},
&cli.StringFlag{
Name: "request",
Usage: "path to json StartRecordingRequest file",
},
&cli.StringFlag{
Name: "request-body",
Usage: "StartRecordingRequest json",
EnvVars: []string{"RECORDING_REQUEST"},
},
},
Action: run,
Version: version.Version,
}
if err := app.Run(os.Args); err != nil {
fmt.Println(err)
}
}
func run(c *cli.Context) error {
if c.Bool("service-mode") {
return runService(c)
}
return runRecorder(c)
}
func getConfig(c *cli.Context) (*config.Config, error) {
configFile := c.String("config")
configBody := c.String("config-body")
if configBody == "" {
if configFile != "" {
content, err := ioutil.ReadFile(configFile)
if err != nil {
return nil, err
}
configBody = string(content)
} else {
return nil, errors.New("missing config")
}
}
return config.NewConfig(configBody)
}
func getRequest(c *cli.Context) (*livekit.StartRecordingRequest, error) {
reqFile := c.String("request")
reqBody := c.String("request-body")
var content []byte
var err error
if reqBody != "" {
content = []byte(reqBody)
} else if reqFile != "" {
content, err = ioutil.ReadFile(reqFile)
if err != nil {
return nil, err
}
} else {
return nil, errors.New("missing request")
}
req := &livekit.StartRecordingRequest{}
err = protojson.Unmarshal(content, req)
return req, err
}
func initLogger(level string) {
conf := zap.NewProductionConfig()
if level != "" {
lvl := zapcore.Level(0)
if err := lvl.UnmarshalText([]byte(level)); err == nil {
conf.Level = zap.NewAtomicLevelAt(lvl)
}
}
l, _ := conf.Build()
logger.SetLogger(zapr.NewLogger(l), "livekit-recorder")
}

47
cmd/server/recorder.go Normal file
View File

@@ -0,0 +1,47 @@
package main
import (
"errors"
"os"
"os/signal"
"syscall"
"github.com/livekit/protocol/logger"
"github.com/urfave/cli/v2"
"github.com/livekit/livekit-recorder/pkg/recorder"
"github.com/livekit/livekit-recorder/pkg/service"
)
func runRecorder(c *cli.Context) error {
conf, err := getConfig(c)
if err != nil {
return err
}
req, err := getRequest(c)
if err != nil {
return err
}
initLogger(conf.LogLevel)
rec := recorder.NewRecorder(conf)
if err = rec.Validate(req); err != nil {
return err
}
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
sig := <-stopChan
logger.Infow("Exit requested, stopping recording and shutting down", "signal", sig)
rec.Stop()
}()
res := rec.Run("standalone")
service.LogResult(res)
if res.Error == "" {
return nil
}
return errors.New(res.Error)
}

61
cmd/server/service.go Normal file
View File

@@ -0,0 +1,61 @@
package main
import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/livekit/protocol/logger"
"github.com/urfave/cli/v2"
"github.com/livekit/livekit-recorder/pkg/messaging"
"github.com/livekit/livekit-recorder/pkg/service"
)
func runService(c *cli.Context) error {
conf, err := getConfig(c)
if err != nil {
return err
}
initLogger(conf.LogLevel)
rc, err := messaging.NewMessageBus(conf)
if err != nil {
return err
}
svc := service.NewService(conf, rc)
if conf.HealthPort != 0 {
go http.ListenAndServe(fmt.Sprintf(":%d", conf.HealthPort), &handler{svc: svc})
}
finishChan := make(chan os.Signal, 1)
signal.Notify(finishChan, syscall.SIGTERM, syscall.SIGQUIT)
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT)
go func() {
select {
case sig := <-finishChan:
logger.Infow("Exit requested, finishing recording then shutting down", "signal", sig)
svc.Stop(false)
case sig := <-stopChan:
logger.Infow("Exit requested, stopping recording and shutting down", "signal", sig)
svc.Stop(true)
}
}()
return svc.Run()
}
type handler struct {
svc *service.Service
}
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(h.svc.Status()))
}

View File

@@ -26,7 +26,7 @@ term_handler() {
trap 'kill ${!}; term_handler' SIGTERM
# Run service
./livekit-recorder-service &
XDG_RUNTIME_DIR=$PATH:~/.cache/xdgr ./livekit-recorder &
pid="$!"
# Wait forever

24
go.mod Normal file
View File

@@ -0,0 +1,24 @@
module github.com/livekit/livekit-recorder
go 1.16
require (
github.com/aws/aws-sdk-go v1.40.55
github.com/chromedp/cdproto v0.0.0-20210921215903-b0b4414ddbe0 // indirect
github.com/chromedp/chromedp v0.7.4
github.com/go-logr/zapr v1.0.0
github.com/go-redis/redis/v8 v8.11.3
github.com/livekit/protocol v0.9.7-0.20211008020017-77ddad9c418c
github.com/magefile/mage v1.11.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/tinyzimmer/go-glib v0.0.24
github.com/tinyzimmer/go-gst v0.2.30
github.com/urfave/cli/v2 v2.3.0
go.uber.org/zap v1.18.1
golang.org/x/net v0.0.0-20211004195052-b30845b58a23 // indirect
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

View File

@@ -1,8 +1,17 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aws/aws-sdk-go v1.40.55 h1:l5xNBmafEAfdJIIe32QbTLG3j/JscH8fzZqYyMlntuI=
github.com/aws/aws-sdk-go v1.40.55/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chromedp/cdproto v0.0.0-20210713064928-7d28b402946a/go.mod h1:At5TxYYdxkbQL0TSefRjhLE3Q0lgvqKKMSFUglJ7i1U=
github.com/chromedp/cdproto v0.0.0-20210921215903-b0b4414ddbe0 h1:vrVr4waSUQaerQiDqL2S9tzZ6V7SdRyXUGn+S7dORJc=
github.com/chromedp/cdproto v0.0.0-20210921215903-b0b4414ddbe0/go.mod h1:At5TxYYdxkbQL0TSefRjhLE3Q0lgvqKKMSFUglJ7i1U=
github.com/chromedp/chromedp v0.7.4 h1:U+0d3WbB/Oj4mDuBOI0P7S3PJEued5UZIl5AJ3QulwU=
github.com/chromedp/chromedp v0.7.4/go.mod h1:dBj+SXuQHznp6ZPwZeDDEBZKwclUwDLbZ0hjMialMYs=
github.com/chromedp/sysutil v1.0.0 h1:+ZxhTpfpZlmchB58ih/LBHX52ky7w2VhQVKQMucy3Ic=
github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -25,6 +34,12 @@ github.com/go-logr/zapr v1.0.0/go.mod h1:t7rgfcj/l02iFgbQxqhQeoyWA9jX2+2enc4PUHF
github.com/go-redis/redis/v8 v8.11.3 h1:GCjoYp8c+yQTJfc0n69iwSiHjvuAdruxl7elnZCxgt8=
github.com/go-redis/redis/v8 v8.11.3/go.mod h1:xNJ9xDG09FsIPwh3bWdk+0oDWHbtF9rPN0F/oD9XeKc=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.1.0 h1:7RFti/xnNkMJnrK7D1yQ/iCIB5OrrY/54/H930kIbHA=
github.com/gobwas/ws v1.1.0/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
@@ -43,6 +58,12 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b h1:XUr8tvMEILhphQPp3TFcIudb5KTOzFeD0pJyDn5+5QI=
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b/go.mod h1:a5Mn24iYVJRUQSkFupGByqykzD+k+wFI8J91zGHuPf8=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -51,10 +72,14 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.9.2 h1:ZX5m7DjmRMHtYH5oM85QRMuJlr4feTfD5UL22h9HmtM=
github.com/livekit/protocol v0.9.2/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo=
github.com/livekit/protocol v0.9.7-0.20211008020017-77ddad9c418c h1:pygJt4/XKiB8u6d1vxHUsrBDIZKnraTWz+I97oN5pTU=
github.com/livekit/protocol v0.9.7-0.20211008020017-77ddad9c418c/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0/go.mod h1:fcEyUyXZXoV4Abw8DX0t7wyL8mCDxXyU4iAFZfT3IHw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
@@ -70,6 +95,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/orisano/pixelmatch v0.0.0-20210112091706-4fa4c7ba91d5 h1:1SoBaSPudixRecmlHXb/GxmaD3fLMtHIDN13QujwQuc=
github.com/orisano/pixelmatch v0.0.0-20210112091706-4fa4c7ba91d5/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -86,6 +113,10 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tinyzimmer/go-glib v0.0.24 h1:ktZZC22/9t88kGRgNEFV/SESgIWhGHE+q7Z7Qj++luw=
github.com/tinyzimmer/go-glib v0.0.24/go.mod h1:ltV0gO6xNFzZhsIRbFXv8RTq9NGoNT2dmAER4YmZfaM=
github.com/tinyzimmer/go-gst v0.2.30 h1:S84dVy4bJleoaSexkdbJOa18T9jRKtUUDQKaBMyBfN0=
github.com/tinyzimmer/go-gst v0.2.30/go.mod h1:V4h+HPS3mVGSwUJ7IBi3WAkJWITZasebERyqk3TEXUM=
github.com/twitchtv/twirp v8.1.0+incompatible h1:KGXanpa9LXdVE/V5P/tA27rkKFmXRGCtSNT7zdeeVOY=
github.com/twitchtv/twirp v8.1.0+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A=
github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
@@ -105,6 +136,7 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -118,8 +150,10 @@ golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201026091529-146b70c837a4/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211004195052-b30845b58a23 h1:j34uvNZ757YpJXjsTk19wPCR/3tAhHPT4EMFysLc9Xg=
golang.org/x/net v0.0.0-20211004195052-b30845b58a23/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -132,14 +166,18 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 h1:foEbQz/B0Oz6YIqu/69kfXPYeFQAuuMYFkjaqXzl5Wo=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
@@ -167,6 +205,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/square/go-jose.v2 v2.5.1 h1:7odma5RETjNHWJnR32wx8t+Io4djHE1PqxCFx3iiZ2w=
gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=

View File

@@ -17,7 +17,7 @@ import (
"github.com/magefile/mage/mg"
"github.com/livekit/livekit-recorder/service/version"
"github.com/livekit/livekit-recorder/version"
)
const goChecksumFile = ".checksumgo"
@@ -40,7 +40,7 @@ type modInfo struct {
// run unit tests
func Test() error {
cmd := exec.Command("go", "test", "./...")
cmd := exec.Command("go", "test", "--tags=test", "./...")
connectStd(cmd)
return cmd.Run()
}

181
pkg/config/config.go Normal file
View File

@@ -0,0 +1,181 @@
package config
import (
"fmt"
"os"
"gopkg.in/yaml.v3"
livekit "github.com/livekit/protocol/proto"
)
const Display = ":99"
type Config struct {
ApiKey string `yaml:"api_key"`
ApiSecret string `yaml:"api_secret"`
WsUrl string `yaml:"ws_url"`
HealthPort int `yaml:"health_port"`
LogLevel string `yaml:"log_level"`
Redis RedisConfig `yaml:"redis"`
S3 S3Config `yaml:"s3"`
Defaults *livekit.RecordingOptions `yaml:"defaults"`
}
type RedisConfig struct {
Address string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
DB int `yaml:"db"`
}
type S3Config struct {
AccessKey string `yaml:"access_key"`
Secret string `yaml:"secret"`
Region string `yaml:"region"`
}
func NewConfig(confString string) (*Config, error) {
// start with defaults
conf := &Config{
LogLevel: "debug",
Defaults: &livekit.RecordingOptions{
Width: 1920,
Height: 1080,
Depth: 24,
Framerate: 30,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 4500,
},
}
if confString != "" {
if err := yaml.Unmarshal([]byte(confString), conf); err != nil {
return nil, fmt.Errorf("could not parse config: %v", err)
}
}
// apply preset options
if conf.Defaults.Preset != livekit.RecordingPreset_NONE {
conf.Defaults = fromPreset(conf.Defaults.Preset)
}
if err := os.Setenv("DISPLAY", Display); err != nil {
return nil, err
}
var gstDebug int
switch conf.LogLevel {
case "debug":
gstDebug = 3
case "info", "warn":
gstDebug = 2
case "error":
gstDebug = 1
case "panic":
gstDebug = 0
}
if err := os.Setenv("GST_DEBUG", fmt.Sprint(gstDebug)); err != nil {
return nil, err
}
return conf, nil
}
func TestConfig() *Config {
return &Config{
Redis: RedisConfig{
Address: "localhost:6379",
},
Defaults: &livekit.RecordingOptions{
Width: 1920,
Height: 1080,
Depth: 24,
Framerate: 30,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 4500,
},
}
}
func (c *Config) ApplyDefaults(req *livekit.StartRecordingRequest) {
if req.Options == nil {
req.Options = &livekit.RecordingOptions{}
}
if req.Options.Preset != livekit.RecordingPreset_NONE {
req.Options = fromPreset(req.Options.Preset)
return
}
if req.Options.Width == 0 || req.Options.Height == 0 {
req.Options.Width = c.Defaults.Width
req.Options.Height = c.Defaults.Height
}
if req.Options.Depth == 0 {
req.Options.Depth = c.Defaults.Depth
}
if req.Options.Framerate == 0 {
req.Options.Framerate = c.Defaults.Framerate
}
if req.Options.AudioBitrate == 0 {
req.Options.AudioBitrate = c.Defaults.AudioBitrate
}
if req.Options.AudioFrequency == 0 {
req.Options.AudioFrequency = c.Defaults.AudioFrequency
}
if req.Options.VideoBitrate == 0 {
req.Options.VideoBitrate = c.Defaults.VideoBitrate
}
return
}
func fromPreset(preset livekit.RecordingPreset) *livekit.RecordingOptions {
switch preset {
case livekit.RecordingPreset_HD_30:
return &livekit.RecordingOptions{
Width: 1280,
Height: 720,
Depth: 24,
Framerate: 30,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 3000,
}
case livekit.RecordingPreset_HD_60:
return &livekit.RecordingOptions{
Width: 1280,
Height: 720,
Depth: 24,
Framerate: 60,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 4500,
}
case livekit.RecordingPreset_FULL_HD_30:
return &livekit.RecordingOptions{
Width: 1920,
Height: 1080,
Depth: 24,
Framerate: 30,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 4500,
}
case livekit.RecordingPreset_FULL_HD_60:
return &livekit.RecordingOptions{
Width: 1920,
Height: 1080,
Depth: 24,
Framerate: 60,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 6000,
}
default:
return nil
}
}

93
pkg/config/config_test.go Normal file
View File

@@ -0,0 +1,93 @@
package config_test
import (
"testing"
livekit "github.com/livekit/protocol/proto"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
)
var testRequests = []string{`
{
"template": {
"layout": "grid-light",
"token": "recording-token"
},
"s3_url": "bucket/path/filename.mp4"
}
`, `
{
"template": {
"layout": "speaker-dark",
"room_name": "test-room"
},
"file": "/out/filename.mp4",
"options": {
"preset": "FULL_HD_30"
}
}
`, `
{
"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
"rtmp": {
"urls": ["rtmp://stream-url.com", "rtmp://live.twitch.tv/app/stream-key"]
},
"options": {
"width": 1920,
"height": 1080
}
}
`}
func TestRequests(t *testing.T) {
t.Run("template and s3", func(t *testing.T) {
req := &livekit.StartRecordingRequest{}
require.NoError(t, protojson.Unmarshal([]byte(testRequests[0]), req))
require.NotNil(t, req.Input)
require.NotNil(t, req.Output)
template, ok := req.Input.(*livekit.StartRecordingRequest_Template)
require.True(t, ok)
require.Equal(t, "grid-light", template.Template.Layout)
token, ok := template.Template.Room.(*livekit.RecordingTemplate_Token)
require.True(t, ok)
require.Equal(t, "recording-token", token.Token)
s3, ok := req.Output.(*livekit.StartRecordingRequest_S3Url)
require.True(t, ok)
require.Equal(t, "bucket/path/filename.mp4", s3.S3Url)
})
t.Run("file and preset", func(t *testing.T) {
req := &livekit.StartRecordingRequest{}
require.NoError(t, protojson.Unmarshal([]byte(testRequests[1]), req))
require.NotNil(t, req.Input)
require.NotNil(t, req.Output)
require.NotNil(t, req.Options)
template, ok := req.Input.(*livekit.StartRecordingRequest_Template)
require.True(t, ok)
require.Equal(t, "speaker-dark", template.Template.Layout)
roomName, ok := template.Template.Room.(*livekit.RecordingTemplate_RoomName)
require.True(t, ok)
require.Equal(t, "test-room", roomName.RoomName)
file, ok := req.Output.(*livekit.StartRecordingRequest_File)
require.True(t, ok)
require.Equal(t, "/out/filename.mp4", file.File)
require.Equal(t, livekit.RecordingPreset_FULL_HD_30, req.Options.Preset)
})
t.Run("rtmp and options", func(t *testing.T) {
req := &livekit.StartRecordingRequest{}
require.NoError(t, protojson.Unmarshal([]byte(testRequests[2]), req))
require.NotNil(t, req.Input)
require.NotNil(t, req.Output)
require.NotNil(t, req.Options)
url, ok := req.Input.(*livekit.StartRecordingRequest_Url)
require.True(t, ok)
require.Equal(t, "https://www.youtube.com/watch?v=dQw4w9WgXcQ", url.Url)
rtmp, ok := req.Output.(*livekit.StartRecordingRequest_Rtmp)
require.True(t, ok)
expected := []string{"rtmp://stream-url.com", "rtmp://live.twitch.tv/app/stream-key"}
require.Equal(t, expected, rtmp.Rtmp.Urls)
require.Equal(t, int32(1920), req.Options.Width)
})
}

View File

@@ -0,0 +1,13 @@
// +build test
package display
type Display struct{}
func New() *Display { return &Display{} }
func (d *Display) Launch(url string, width, height, depth int) error {
return nil
}
func (d *Display) Close() {}

106
pkg/display/display_prod.go Normal file
View File

@@ -0,0 +1,106 @@
// +build !test
package display
import (
"context"
"fmt"
"os"
"os/exec"
"github.com/chromedp/chromedp"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-recorder/pkg/config"
)
type Display struct {
xvfb *exec.Cmd
chromeCancel func()
}
func New() *Display { return &Display{} }
func (d *Display) Launch(url string, width, height, depth int) error {
if err := d.launchXvfb(width, height, depth); err != nil {
return err
}
if err := d.launchChrome(url, width, height); err != nil {
return err
}
return nil
}
func (d *Display) launchXvfb(width, height, depth int) error {
dims := fmt.Sprintf("%dx%dx%d", width, height, depth)
logger.Debugw("launching xvfb", "dims", dims)
xvfb := exec.Command("Xvfb", config.Display, "-screen", "0", dims, "-ac", "-nolisten", "tcp")
if err := xvfb.Start(); err != nil {
return err
}
d.xvfb = xvfb
return nil
}
func (d *Display) launchChrome(url string, width, height int) error {
logger.Debugw("launching chrome")
opts := []chromedp.ExecAllocatorOption{
chromedp.NoFirstRun,
chromedp.NoDefaultBrowserCheck,
chromedp.DisableGPU,
chromedp.NoSandbox,
// puppeteer default behavior
chromedp.Flag("disable-infobars", true),
chromedp.Flag("excludeSwitches", "enable-automation"),
chromedp.Flag("disable-background-networking", true),
chromedp.Flag("enable-features", "NetworkService,NetworkServiceInProcess"),
chromedp.Flag("disable-background-timer-throttling", true),
chromedp.Flag("disable-backgrounding-occluded-windows", true),
chromedp.Flag("disable-breakpad", true),
chromedp.Flag("disable-client-side-phishing-detection", true),
chromedp.Flag("disable-default-apps", true),
chromedp.Flag("disable-dev-shm-usage", true),
chromedp.Flag("disable-extensions", true),
chromedp.Flag("disable-features", "site-per-process,TranslateUI,BlinkGenPropertyTrees"),
chromedp.Flag("disable-hang-monitor", true),
chromedp.Flag("disable-ipc-flooding-protection", true),
chromedp.Flag("disable-popup-blocking", true),
chromedp.Flag("disable-prompt-on-repost", true),
chromedp.Flag("disable-renderer-backgrounding", true),
chromedp.Flag("disable-sync", true),
chromedp.Flag("force-color-profile", "srgb"),
chromedp.Flag("metrics-recording-only", true),
chromedp.Flag("safebrowsing-disable-auto-update", true),
chromedp.Flag("password-store", "basic"),
chromedp.Flag("use-mock-keychain", true),
// custom args
chromedp.Flag("kiosk", true),
chromedp.Flag("enable-automation", false),
chromedp.Flag("autoplay-policy", "no-user-gesture-required"),
chromedp.Flag("window-position", "0,0"),
chromedp.Flag("window-size", fmt.Sprintf("%d,%d", width, height)),
chromedp.Flag("display", config.Display),
}
allocCtx, _ := chromedp.NewExecAllocator(context.Background(), opts...)
ctx, cancel := chromedp.NewContext(allocCtx)
d.chromeCancel = cancel
return chromedp.Run(ctx, chromedp.Navigate(url))
}
func (d *Display) Close() {
if d.chromeCancel != nil {
d.chromeCancel()
d.chromeCancel = nil
}
if d.xvfb != nil {
err := d.xvfb.Process.Signal(os.Interrupt)
if err != nil {
logger.Errorw("failed to kill xvfb", err)
}
d.xvfb = nil
}
}

View File

@@ -1,4 +1,4 @@
package service
package messaging
import (
"context"
@@ -8,7 +8,7 @@ import (
"github.com/livekit/protocol/utils"
"github.com/pkg/errors"
"github.com/livekit/livekit-recorder/service/pkg/config"
"github.com/livekit/livekit-recorder/pkg/config"
)
func NewMessageBus(conf *config.Config) (utils.MessageBus, error) {

11
pkg/pipeline/errors.go Normal file
View File

@@ -0,0 +1,11 @@
package pipeline
import "errors"
var (
ErrCannotAddToFile = errors.New("cannot add rtmp output to file recording")
ErrCannotRemoveFromFile = errors.New("cannot remove rtmp output from file recording")
ErrGhostPadFailed = errors.New("failed to add ghost pad to bin")
ErrOutputAlreadyExists = errors.New("output already exists")
ErrOutputNotFound = errors.New("output not found")
)

189
pkg/pipeline/input.go Normal file
View File

@@ -0,0 +1,189 @@
// +build !test
package pipeline
import (
"fmt"
livekit "github.com/livekit/protocol/proto"
"github.com/tinyzimmer/go-gst/gst"
)
type InputBin struct {
isStream bool
bin *gst.Bin
audioElements []*gst.Element
videoElements []*gst.Element
audioQueue *gst.Element
videoQueue *gst.Element
mux *gst.Element
}
func newInputBin(isStream bool, options *livekit.RecordingOptions) (*InputBin, error) {
// create audio elements
pulseSrc, err := gst.NewElement("pulsesrc")
if err != nil {
return nil, err
}
audioConvert, err := gst.NewElement("audioconvert")
if err != nil {
return nil, err
}
audioCapsFilter, err := gst.NewElement("capsfilter")
if err != nil {
return nil, err
}
err = audioCapsFilter.SetProperty("caps", gst.NewCapsFromString(
fmt.Sprintf("audio/x-raw,format=S16LE,layout=interleaved,rate=%d,channels=2", options.AudioFrequency),
))
if err != nil {
return nil, err
}
faac, err := gst.NewElement("faac")
if err != nil {
return nil, err
}
err = faac.SetProperty("bitrate", int(options.AudioBitrate*1000))
if err != nil {
return nil, err
}
audioQueue, err := gst.NewElement("queue")
if err != nil {
return nil, err
}
err = audioQueue.SetProperty("flush-on-eos", true)
if err != nil {
return nil, err
}
// create video elements
xImageSrc, err := gst.NewElement("ximagesrc")
if err != nil {
return nil, err
}
err = xImageSrc.SetProperty("show-pointer", false)
if err != nil {
return nil, err
}
videoConvert, err := gst.NewElement("videoconvert")
if err != nil {
return nil, err
}
videoCapsFilter, err := gst.NewElement("capsfilter")
if err != nil {
return nil, err
}
err = videoCapsFilter.SetProperty("caps", gst.NewCapsFromString(
fmt.Sprintf("video/x-raw,framerate=%d/1", options.Framerate),
))
if err != nil {
return nil, err
}
x264Enc, err := gst.NewElement("x264enc")
if err != nil {
return nil, err
}
err = x264Enc.SetProperty("bitrate", uint(options.VideoBitrate))
if err != nil {
return nil, err
}
x264Enc.SetArg("speed-preset", "veryfast")
x264Enc.SetArg("tune", "zerolatency")
videoQueue, err := gst.NewElement("queue")
if err != nil {
return nil, err
}
err = videoQueue.SetProperty("flush-on-eos", true)
if err != nil {
return nil, err
}
// create mux
var mux *gst.Element
if isStream {
mux, err = gst.NewElement("flvmux")
if err != nil {
return nil, err
}
err = mux.Set("streamable", true)
if err != nil {
return nil, err
}
} else {
mux, err = gst.NewElement("mp4mux")
if err != nil {
return nil, err
}
err = mux.SetProperty("faststart", true)
if err != nil {
return nil, err
}
}
// create bin
bin := gst.NewBin("input")
err = bin.AddMany(
// audio
pulseSrc, audioConvert, audioCapsFilter, faac, audioQueue,
// video
xImageSrc, videoConvert, videoCapsFilter, x264Enc, videoQueue,
// mux
mux,
)
if err != nil {
return nil, err
}
// create ghost pad
ghostPad := gst.NewGhostPad("src", mux.GetStaticPad("src"))
if !bin.AddPad(ghostPad.Pad) {
return nil, ErrGhostPadFailed
}
return &InputBin{
isStream: isStream,
bin: bin,
audioElements: []*gst.Element{pulseSrc, audioConvert, audioCapsFilter, faac, audioQueue},
videoElements: []*gst.Element{xImageSrc, videoConvert, videoCapsFilter, x264Enc, videoQueue},
audioQueue: audioQueue,
videoQueue: videoQueue,
mux: mux,
}, nil
}
func (b *InputBin) Link() error {
// link audio elements
if err := gst.ElementLinkMany(b.audioElements...); err != nil {
return err
}
// link video elements
if err := gst.ElementLinkMany(b.videoElements...); err != nil {
return err
}
// link audio and video queues to mux
var muxAudioPad, muxVideoPad *gst.Pad
if b.isStream {
muxAudioPad = b.mux.GetRequestPad("audio")
muxVideoPad = b.mux.GetRequestPad("video")
} else {
muxAudioPad = b.mux.GetRequestPad("audio_%u")
muxVideoPad = b.mux.GetRequestPad("video_%u")
}
if err := requireLink(b.audioQueue.GetStaticPad("src"), muxAudioPad); err != nil {
return err
}
if err := requireLink(b.videoQueue.GetStaticPad("src"), muxVideoPad); err != nil {
return err
}
return nil
}

253
pkg/pipeline/output.go Normal file
View File

@@ -0,0 +1,253 @@
// +build !test
package pipeline
import (
"fmt"
"github.com/livekit/protocol/logger"
"github.com/tinyzimmer/go-gst/gst"
)
type OutputBin struct {
isStream bool
bin *gst.Bin
// file only
fileSink *gst.Element
// rtmp only
tee *gst.Element
urls map[string]int
queues []*gst.Element
rtmpSinks []*gst.Element
}
func newFileOutputBin(filename string) (*OutputBin, error) {
// create elements
sink, err := gst.NewElement("filesink")
if err != nil {
return nil, err
}
if err = sink.SetProperty("location", filename); err != nil {
return nil, err
}
if err = sink.SetProperty("sync", false); err != nil {
return nil, err
}
// create bin
bin := gst.NewBin("output")
if err = bin.Add(sink); err != nil {
return nil, err
}
// add ghost pad
ghostPad := gst.NewGhostPad("sink", sink.GetStaticPad("sink"))
if !bin.AddPad(ghostPad.Pad) {
return nil, ErrGhostPadFailed
}
return &OutputBin{
isStream: false,
bin: bin,
fileSink: sink,
}, nil
}
func newRtmpOutputBin(urls []string) (*OutputBin, error) {
// create elements
tee, err := gst.NewElement("tee")
if err != nil {
return nil, err
}
bin := gst.NewBin("output")
if err = bin.Add(tee); err != nil {
return nil, err
}
indexes := make(map[string]int)
queues := make([]*gst.Element, len(urls))
sinks := make([]*gst.Element, len(urls))
for i, url := range urls {
indexes[url] = i
queues[i], err = gst.NewElement("queue")
if err != nil {
return nil, err
}
sink, err := gst.NewElement("rtmpsink")
if err != nil {
return nil, err
}
if err = sink.SetProperty("sync", false); err != nil {
return nil, err
}
if err = sink.Set("location", url); err != nil {
return nil, err
}
sinks[i] = sink
}
// create bin
if err = bin.AddMany(queues...); err != nil {
return nil, err
}
if err = bin.AddMany(sinks...); err != nil {
return nil, err
}
// add ghost pad
ghostPad := gst.NewGhostPad("sink", tee.GetStaticPad("sink"))
if !bin.AddPad(ghostPad.Pad) {
return nil, ErrGhostPadFailed
}
return &OutputBin{
isStream: true,
bin: bin,
tee: tee,
urls: indexes,
queues: queues,
rtmpSinks: sinks,
}, nil
}
func (b *OutputBin) Link() error {
if !b.isStream {
return nil
}
for i, q := range b.queues {
// link queue to rtmp sink
if err := q.Link(b.rtmpSinks[i]); err != nil {
return err
}
// link tee to queue
if err := requireLink(
b.tee.GetRequestPad(fmt.Sprintf("src_%d", i)),
q.GetStaticPad("sink")); err != nil {
return err
}
}
return nil
}
func (b *OutputBin) AddRtmpSink(url string) error {
if !b.isStream {
return ErrCannotAddToFile
}
if _, ok := b.urls[url]; ok {
return ErrOutputAlreadyExists
}
idx := -1
for i, q := range b.queues {
if q == nil {
idx = i
break
}
}
queue, err := gst.NewElement("queue")
if err != nil {
return err
}
sink, err := gst.NewElement("rtmpsink")
if err != nil {
return err
}
if err = sink.SetProperty("sync", false); err != nil {
return err
}
if err = sink.Set("location", url); err != nil {
return err
}
// add to bin
if err = b.bin.AddMany(queue, sink); err != nil {
return err
}
if idx == -1 {
idx = len(b.urls)
b.queues = append(b.queues, queue)
b.rtmpSinks = append(b.rtmpSinks, sink)
} else {
b.queues[idx] = queue
b.rtmpSinks[idx] = sink
}
b.urls[url] = idx
// link queue to sink
if err = queue.Link(sink); err != nil {
return err
}
teeSrcPad := b.tee.GetRequestPad(fmt.Sprintf("src_%d", idx))
teeSrcPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
// link tee to queue
if err = requireLink(pad, queue.GetStaticPad("sink")); err != nil {
logger.Errorw("failed to link tee to queue", err)
}
// sync state
queue.SyncStateWithParent()
sink.SyncStateWithParent()
return gst.PadProbeRemove
})
return nil
}
func (b *OutputBin) RemoveRtmpSink(url string) error {
if !b.isStream {
return ErrCannotRemoveFromFile
}
idx, ok := b.urls[url]
if !ok {
return ErrOutputNotFound
}
queue := b.queues[idx]
sink := b.rtmpSinks[idx]
srcPad := b.tee.GetStaticPad(fmt.Sprintf("src_%d", idx))
srcPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
// remove probe
pad.RemoveProbe(uint64(info.ID()))
// unlink queue
pad.Unlink(queue.GetStaticPad("sink"))
// send EOS to queue
queue.GetStaticPad("sink").SendEvent(gst.NewEOSEvent())
// remove from bin
if err := b.bin.RemoveMany(queue, sink); err != nil {
logger.Errorw("failed to remove rtmp queue", err)
}
if err := queue.SetState(gst.StateNull); err != nil {
logger.Errorw("failed stop rtmp queue", err)
}
if err := sink.SetState(gst.StateNull); err != nil {
logger.Errorw("failed to stop rtmp sink", err)
}
// release tee src pad
b.tee.ReleaseRequestPad(pad)
return gst.PadProbeOK
})
delete(b.urls, url)
b.queues[idx] = nil
b.rtmpSinks[idx] = nil
return nil
}

View File

@@ -0,0 +1,54 @@
// +build test
package pipeline
import (
"time"
livekit "github.com/livekit/protocol/proto"
)
type Pipeline struct {
isStream bool
kill chan struct{}
}
func NewRtmpPipeline(rtmp []string, options *livekit.RecordingOptions) (*Pipeline, error) {
return &Pipeline{
isStream: true,
kill: make(chan struct{}, 1),
}, nil
}
func NewFilePipeline(filename string, options *livekit.RecordingOptions) (*Pipeline, error) {
return &Pipeline{
isStream: false,
kill: make(chan struct{}, 1),
}, nil
}
func (p *Pipeline) Start() error {
select {
case <-time.After(time.Second * 3):
case <-p.kill:
}
return nil
}
func (p *Pipeline) AddOutput(url string) error {
if !p.isStream {
return ErrCannotAddToFile
}
return nil
}
func (p *Pipeline) RemoveOutput(url string) error {
if !p.isStream {
return ErrCannotRemoveFromFile
}
return nil
}
func (p *Pipeline) Close() {
p.kill <- struct{}{}
}

View File

@@ -0,0 +1,139 @@
// +build !test
package pipeline
import (
"fmt"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/tinyzimmer/go-glib/glib"
"github.com/tinyzimmer/go-gst/gst"
)
// gst.Init needs to be called before using gst but after gst package loads
var initialized = false
type Pipeline struct {
pipeline *gst.Pipeline
output *OutputBin
}
func NewRtmpPipeline(urls []string, options *livekit.RecordingOptions) (*Pipeline, error) {
if !initialized {
gst.Init(nil)
initialized = true
}
input, err := newInputBin(true, options)
if err != nil {
return nil, err
}
output, err := newRtmpOutputBin(urls)
if err != nil {
return nil, err
}
return newPipeline(input, output)
}
func NewFilePipeline(filename string, options *livekit.RecordingOptions) (*Pipeline, error) {
if !initialized {
gst.Init(nil)
initialized = true
}
input, err := newInputBin(false, options)
if err != nil {
return nil, err
}
output, err := newFileOutputBin(filename)
if err != nil {
return nil, err
}
return newPipeline(input, output)
}
func newPipeline(input *InputBin, output *OutputBin) (*Pipeline, error) {
// elements must be added to pipeline before linking
pipeline, err := gst.NewPipeline("pipeline")
if err != nil {
return nil, err
}
// add bins to pipeline
if err = pipeline.AddMany(input.bin.Element, output.bin.Element); err != nil {
return nil, err
}
// link bin elements
if err = input.Link(); err != nil {
return nil, err
}
if err = output.Link(); err != nil {
return nil, err
}
// link bins
if err = input.bin.Link(output.bin.Element); err != nil {
return nil, err
}
// TODO: output bin error handling
return &Pipeline{
pipeline: pipeline,
output: output,
}, nil
}
func (p *Pipeline) Start() error {
loop := glib.NewMainLoop(glib.MainContextDefault(), false)
p.pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool {
switch msg.Type() {
case gst.MessageEOS:
logger.Infow("EOS received")
_ = p.pipeline.BlockSetState(gst.StateNull)
logger.Infow("pipeline stopped")
loop.Quit()
case gst.MessageError:
gErr := msg.ParseError()
logger.Errorw("message error", gErr, "debug", gErr.DebugString())
loop.Quit()
default:
fmt.Println(msg)
}
return true
})
// start playing
err := p.pipeline.SetState(gst.StatePlaying)
if err != nil {
return err
}
// Block and iterate on the main loop
loop.Run()
return nil
}
func (p *Pipeline) AddOutput(url string) error {
return p.output.AddRtmpSink(url)
}
func (p *Pipeline) RemoveOutput(url string) error {
return p.output.RemoveRtmpSink(url)
}
func (p *Pipeline) Close() {
logger.Debugw("Sending EOS to pipeline")
p.pipeline.SendEvent(gst.NewEOSEvent())
}
func requireLink(src, sink *gst.Pad) error {
if linkReturn := src.Link(sink); linkReturn != gst.PadLinkOK {
return fmt.Errorf("pad link: %s", linkReturn.String())
}
return nil
}

143
pkg/recorder/recorder.go Normal file
View File

@@ -0,0 +1,143 @@
package recorder
import (
"errors"
"strings"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/livekit-recorder/pkg/config"
"github.com/livekit/livekit-recorder/pkg/display"
"github.com/livekit/livekit-recorder/pkg/pipeline"
)
type Recorder struct {
conf *config.Config
req *livekit.StartRecordingRequest
url string
filename string
display *display.Display
pipeline *pipeline.Pipeline
}
func NewRecorder(conf *config.Config) *Recorder {
return &Recorder{
conf: conf,
}
}
func (r *Recorder) Validate(req *livekit.StartRecordingRequest) error {
r.conf.ApplyDefaults(req)
// validate input
url, err := r.getInputUrl(req)
if err != nil {
return err
}
// validate output
switch req.Output.(type) {
case *livekit.StartRecordingRequest_S3Url:
s3 := req.Output.(*livekit.StartRecordingRequest_S3Url).S3Url
idx := strings.LastIndex(s3, "/")
if idx < 6 ||
!strings.HasPrefix(s3, "s3://") ||
!strings.HasSuffix(s3, ".mp4") {
return errors.New("s3 output must be s3://bucket/{path/}filename.mp4")
}
r.filename = s3[idx+1:]
case *livekit.StartRecordingRequest_Rtmp:
case *livekit.StartRecordingRequest_File:
filename := req.Output.(*livekit.StartRecordingRequest_File).File
if !strings.HasSuffix(filename, ".mp4") {
return errors.New("file output must be {path/}filename.mp4")
}
r.filename = filename
default:
return errors.New("missing output")
}
r.req = req
r.url = url
return nil
}
// Run blocks until completion
func (r *Recorder) Run(recordingId string) *livekit.RecordingResult {
r.display = display.New()
options := r.req.Options
err := r.display.Launch(r.url, int(options.Width), int(options.Height), int(options.Depth))
res := &livekit.RecordingResult{Id: recordingId}
if r.req == nil {
res.Error = "recorder not initialized"
return res
}
r.pipeline, err = r.getPipeline(r.req)
if err != nil {
logger.Errorw("error building pipeline", err)
res.Error = err.Error()
return res
}
start := time.Now()
err = r.pipeline.Start()
if err != nil {
logger.Errorw("error running pipeline", err)
res.Error = err.Error()
return res
}
res.Duration = time.Since(start).Milliseconds() / 1000
if s3, ok := r.req.Output.(*livekit.StartRecordingRequest_S3Url); ok {
if err = r.upload(s3.S3Url); err != nil {
res.Error = err.Error()
return res
}
res.DownloadUrl = s3.S3Url
}
return res
}
func (r *Recorder) getPipeline(req *livekit.StartRecordingRequest) (*pipeline.Pipeline, error) {
switch req.Output.(type) {
case *livekit.StartRecordingRequest_Rtmp:
return pipeline.NewRtmpPipeline(req.Output.(*livekit.StartRecordingRequest_Rtmp).Rtmp.Urls, req.Options)
case *livekit.StartRecordingRequest_S3Url, *livekit.StartRecordingRequest_File:
return pipeline.NewFilePipeline(r.filename, req.Options)
}
return nil, errors.New("output missing")
}
func (r *Recorder) AddOutput(url string) error {
logger.Debugw("Add Output", "url", url)
if r.pipeline == nil {
return errors.New("missing pipeline")
}
return r.pipeline.AddOutput(url)
}
func (r *Recorder) RemoveOutput(url string) error {
logger.Debugw("Remove Output", "url", url)
if r.pipeline == nil {
return errors.New("missing pipeline")
}
return r.pipeline.RemoveOutput(url)
}
func (r *Recorder) Stop() {
if p := r.pipeline; p != nil {
p.Close()
}
}
// should only be called after pipeline completes
func (r *Recorder) Close() {
r.display.Close()
}

59
pkg/recorder/request.go Normal file
View File

@@ -0,0 +1,59 @@
package recorder
import (
"errors"
"fmt"
"net/url"
"time"
"github.com/livekit/protocol/auth"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
)
func (r *Recorder) getInputUrl(req *livekit.StartRecordingRequest) (string, error) {
switch req.Input.(type) {
case *livekit.StartRecordingRequest_Url:
return req.Input.(*livekit.StartRecordingRequest_Url).Url, nil
case *livekit.StartRecordingRequest_Template:
template := req.Input.(*livekit.StartRecordingRequest_Template).Template
var token string
switch template.Room.(type) {
case *livekit.RecordingTemplate_RoomName:
var err error
token, err = r.buildToken(template.Room.(*livekit.RecordingTemplate_RoomName).RoomName)
if err != nil {
return "", err
}
case *livekit.RecordingTemplate_Token:
token = template.Room.(*livekit.RecordingTemplate_Token).Token
default:
return "", errors.New("token or room name required")
}
return fmt.Sprintf("https://recorder.livekit.io/#/%s?url=%s&token=%s",
template.Layout, url.QueryEscape(r.conf.WsUrl), token), nil
default:
return "", errors.New("input url or template required")
}
}
func (r *Recorder) buildToken(roomName string) (string, error) {
f := false
t := true
grant := &auth.VideoGrant{
RoomRecord: true,
Room: roomName,
CanPublish: &f,
CanSubscribe: &t,
Hidden: true,
}
at := auth.NewAccessToken(r.conf.ApiKey, r.conf.ApiSecret).
AddGrant(grant).
SetIdentity(utils.NewGuid(utils.RecordingPrefix)).
SetValidFor(24 * time.Hour)
return at.ToJWT()
}

7
pkg/recorder/s3_mock.go Normal file
View File

@@ -0,0 +1,7 @@
// +build test
package recorder
func (r *Recorder) upload(s3Url string) error {
return nil
}

64
pkg/recorder/s3_prod.go Normal file
View File

@@ -0,0 +1,64 @@
// +build !test
package recorder
import (
"bytes"
"os"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
// TODO: multi-part uploads
func (r *Recorder) upload(s3Url string) error {
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(
r.conf.S3.AccessKey,
r.conf.S3.Secret,
"",
),
Region: aws.String(r.conf.S3.Region),
})
if err != nil {
return err
}
file, err := os.Open(r.filename)
if err != nil {
return err
}
defer file.Close()
fileInfo, _ := file.Stat()
size := fileInfo.Size()
buffer := make([]byte, size)
if _, err = file.Read(buffer); err != nil {
return err
}
var bucket, key string
if strings.HasPrefix(s3Url, "s3://") {
s3Url = s3Url[5:]
}
if idx := strings.Index(s3Url, "/"); idx != -1 {
bucket = s3Url[:idx]
key = s3Url[idx+1:]
} else {
bucket = s3Url
key = "recording.mp4"
}
_, err = s3.New(sess).PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader(buffer),
ContentLength: aws.Int64(size),
ContentType: aws.String("video/mp4"),
})
return err
}

143
pkg/service/handler.go Normal file
View File

@@ -0,0 +1,143 @@
package service
import (
"errors"
"fmt"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/recording"
"google.golang.org/protobuf/proto"
)
func (s *Service) handleRecording() {
// subscribe to request channel
requests, err := s.bus.Subscribe(s.ctx, recording.RequestChannel(s.recordingId))
if err != nil {
return
}
defer requests.Close()
// ready to accept requests
err = s.handleResponse(s.recordingId, "", nil)
if err != nil {
return
}
// listen for rpcs
logger.Debugw("Waiting for requests", "recordingId", s.recordingId)
result := make(chan *livekit.RecordingResult, 1)
for {
select {
case <-s.kill:
// kill signal received, stop recorder
if status := s.status.Load(); status != Stopping {
s.status.Store(Stopping)
s.rec.Stop()
}
case res := <-result:
// recording stopped, send results to result channel
LogResult(res)
b, err := proto.Marshal(res)
if err != nil {
logger.Errorw("Failed to marshal results", err)
} else if err = s.bus.Publish(s.ctx, recording.ResultChannel, b); err != nil {
logger.Errorw("Failed to write results", err)
}
// clean up
s.rec.Close()
return
case msg := <-requests.Channel():
// unmarshal request
req := &livekit.RecordingRequest{}
err = proto.Unmarshal(requests.Payload(msg), req)
if err != nil {
logger.Errorw("Failed to read request", err, "recordingId", s.recordingId)
continue
}
s.handleRequest(req, result)
}
}
}
func (s *Service) handleRequest(req *livekit.RecordingRequest, result chan *livekit.RecordingResult) {
logger.Debugw("handling request", "recordingId", s.recordingId, "requestId", req.RequestId)
var err error
switch req.Request.(type) {
case *livekit.RecordingRequest_Start:
if status := s.status.Load(); status != Reserved {
err = fmt.Errorf("tried calling start with state %s", status)
break
}
// launch recorder
start := req.Request.(*livekit.RecordingRequest_Start).Start
err = s.rec.Validate(start)
if err != nil {
break
}
s.status.Store(Recording)
go func() {
// blocks until recorder is finished
result <- s.rec.Run(s.recordingId)
}()
case *livekit.RecordingRequest_AddOutput:
if status := s.status.Load(); status != Recording {
err = fmt.Errorf("tried calling AddOutput with status %s", status)
break
}
err = s.rec.AddOutput(req.Request.(*livekit.RecordingRequest_AddOutput).AddOutput.RtmpUrl)
case *livekit.RecordingRequest_RemoveOutput:
if status := s.status.Load(); status != Recording {
err = fmt.Errorf("tried calling RemoveOutput with status %s", status)
break
}
err = s.rec.RemoveOutput(req.Request.(*livekit.RecordingRequest_RemoveOutput).RemoveOutput.RtmpUrl)
case *livekit.RecordingRequest_End:
if status := s.status.Load(); status != Recording {
err = fmt.Errorf("tried calling End with status %s", status)
break
}
s.status.Store(Stopping)
s.rec.Stop()
}
_ = s.handleResponse(s.recordingId, req.RequestId, err)
}
func (s *Service) handleResponse(recordingId, requestId string, err error) error {
logger.Debugw("Sending response", "recordingId", recordingId, "requestId", requestId)
var message string
if err != nil {
logger.Errorw("Error handling request", err,
"recordingId", recordingId, "requestId", requestId)
message = err.Error()
}
b, err := proto.Marshal(&livekit.RecordingResponse{
RequestId: requestId,
Error: message,
})
if err != nil {
return err
}
return s.bus.Publish(s.ctx, recording.ResponseChannel(recordingId), b)
}
func LogResult(res *livekit.RecordingResult) {
if res.Error != "" {
logger.Errorw("recording failed", errors.New(res.Error))
} else {
values := []interface{}{"duration", time.Duration(res.Duration * 1e9)}
if res.DownloadUrl != "" {
values = append(values, "url", res.DownloadUrl)
}
logger.Infow("recording complete", values...)
}
}

106
pkg/service/service.go Normal file
View File

@@ -0,0 +1,106 @@
package service
import (
"context"
"sync/atomic"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/recording"
"github.com/livekit/protocol/utils"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-recorder/pkg/config"
"github.com/livekit/livekit-recorder/pkg/recorder"
)
type Service struct {
ctx context.Context
conf *config.Config
bus utils.MessageBus
status atomic.Value // Status
shutdown chan struct{}
kill chan struct{}
rec *recorder.Recorder
recordingId string
}
type Status string
const (
Available Status = "available"
Reserved Status = "reserved"
Recording Status = "recording"
Stopping Status = "stopping"
)
func NewService(conf *config.Config, bus utils.MessageBus) *Service {
return &Service{
ctx: context.Background(),
conf: conf,
bus: bus,
status: atomic.Value{},
shutdown: make(chan struct{}, 1),
kill: make(chan struct{}, 1),
}
}
func (s *Service) Run() error {
// TODO: catch panics
logger.Debugw("Starting service")
reservations, err := s.bus.SubscribeQueue(context.Background(), recording.ReservationChannel)
if err != nil {
return err
}
defer reservations.Close()
for {
s.status.Store(Available)
logger.Debugw("Recorder waiting")
select {
case <-s.shutdown:
logger.Debugw("Shutting down")
return nil
case msg := <-reservations.Channel():
logger.Debugw("Request received")
req := &livekit.RecordingReservation{}
err := proto.Unmarshal(reservations.Payload(msg), req)
if err != nil {
logger.Errorw("Malformed request", err)
continue
}
if req.SubmittedAt < time.Now().Add(-recording.ReservationTimeout).UnixNano()/1e6 {
logger.Debugw("Discarding old request", "ID", req.Id)
continue
}
s.status.Store(Reserved)
logger.Debugw("Request claimed", "ID", req.Id)
// handleRecording blocks until recording is finished
s.recordingId = req.Id
s.rec = recorder.NewRecorder(s.conf)
s.handleRecording()
s.rec = nil
s.recordingId = ""
}
}
}
func (s *Service) Status() Status {
return s.status.Load().(Status)
}
func (s *Service) Stop(kill bool) {
s.shutdown <- struct{}{}
if kill {
s.kill <- struct{}{}
}
}

148
pkg/service/service_test.go Normal file
View File

@@ -0,0 +1,148 @@
package service
import (
"context"
"testing"
"time"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/recording"
"github.com/livekit/protocol/utils"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-recorder/pkg/config"
"github.com/livekit/livekit-recorder/pkg/messaging"
"github.com/livekit/livekit-recorder/pkg/pipeline"
)
func TestService(t *testing.T) {
conf := config.TestConfig()
bus, err := messaging.NewMessageBus(conf)
require.NoError(t, err)
svc := NewService(conf, bus)
go func() {
require.NoError(t, svc.Run())
}()
// wait for service to start
time.Sleep(time.Millisecond * 100)
var id1, id2, id3 string
t.Run("Reservation", func(t *testing.T) {
require.Equal(t, Available, svc.Status())
id1, err = recording.ReserveRecorder(bus)
require.NoError(t, err)
require.Equal(t, Reserved, svc.Status())
})
t.Run("Double reservation fails", func(t *testing.T) {
// second reservation should fail
_, err = recording.ReserveRecorder(bus)
require.Error(t, err)
})
t.Run("Start recording", func(t *testing.T) {
require.NoError(t, recording.RPC(context.Background(), bus, id1, &livekit.RecordingRequest{
RequestId: utils.RandomSecret(),
Request: &livekit.RecordingRequest_Start{
Start: startRecordingRequest(true),
},
}))
})
t.Run("RPC validation", func(t *testing.T) {
require.Equal(t, pipeline.ErrCannotAddToFile,
recording.RPC(context.Background(), bus, id1, &livekit.RecordingRequest{
RequestId: utils.RandomSecret(),
Request: &livekit.RecordingRequest_AddOutput{
AddOutput: &livekit.AddOutputRequest{
RecordingId: id1,
RtmpUrl: "rtmp://fake-url.com?stream-id=xyz",
},
},
}),
)
})
t.Run("Recording completes", func(t *testing.T) {
time.Sleep(time.Millisecond * 3100)
require.Equal(t, Available, svc.Status())
})
t.Run("RPCs", func(t *testing.T) {
id2, err = recording.ReserveRecorder(bus)
require.NoError(t, err)
require.NoError(t, recording.RPC(context.Background(), bus, id2, &livekit.RecordingRequest{
RequestId: utils.RandomSecret(),
Request: &livekit.RecordingRequest_Start{
Start: startRecordingRequest(false),
},
}))
require.NoError(t, recording.RPC(context.Background(), bus, id2, &livekit.RecordingRequest{
RequestId: utils.RandomSecret(),
Request: &livekit.RecordingRequest_AddOutput{
AddOutput: &livekit.AddOutputRequest{
RecordingId: id2,
RtmpUrl: "rtmp://fake-url.com?stream-id=xyz",
},
},
}))
})
t.Run("Stop recording", func(t *testing.T) {
require.NoError(t, recording.RPC(context.Background(), bus, id2, &livekit.RecordingRequest{
RequestId: utils.RandomSecret(),
Request: &livekit.RecordingRequest_End{
End: &livekit.EndRecordingRequest{
RecordingId: id2,
},
},
}))
status := svc.Status()
require.True(t, status == Stopping || status == Available)
})
t.Run("Kill service", func(t *testing.T) {
id3, err = recording.ReserveRecorder(bus)
require.NoError(t, err)
require.NoError(t, recording.RPC(context.Background(), bus, id3, &livekit.RecordingRequest{
RequestId: utils.RandomSecret(),
Request: &livekit.RecordingRequest_Start{
Start: startRecordingRequest(false),
},
}))
svc.Stop(true)
time.Sleep(time.Millisecond * 100)
status := svc.Status()
// status will show available for a very small amount of time on shutdown
require.True(t, status == Stopping || status == Available)
})
}
func startRecordingRequest(s3 bool) *livekit.StartRecordingRequest {
req := &livekit.StartRecordingRequest{
Input: &livekit.StartRecordingRequest_Template{Template: &livekit.RecordingTemplate{
Layout: "speaker-dark",
Room: &livekit.RecordingTemplate_Token{
Token: "fake-recording-token",
},
}},
}
if s3 {
req.Output = &livekit.StartRecordingRequest_S3Url{
S3Url: "s3://livekit/test.mp4",
}
} else {
req.Output = &livekit.StartRecordingRequest_Rtmp{
Rtmp: &livekit.RtmpOutput{
Urls: []string{"rtmp://stream.io/test"},
},
}
}
return req
}

View File

@@ -1,21 +0,0 @@
{
"env": {
"browser": true,
"es2021": true
},
"extends": [
"eslint:recommended",
"plugin:@typescript-eslint/recommended"
],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 12,
"sourceType": "module"
},
"plugins": [
"@typescript-eslint"
],
"rules": {
"@typescript-eslint/no-var-requires": 0
}
}

109
recorder/.gitignore vendored
View File

@@ -1,109 +0,0 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# TypeScript v1 declaration files
typings/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variables file
.env
.env.test
# parcel-bundler cache (https://parceljs.org/)
.cache
# Next.js build output
.next
# Nuxt.js build / generate output
.nuxt
dist
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and *not* Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
.idea/
config/
recording.mp4
src/*.js

View File

@@ -1,25 +0,0 @@
FROM buildkite/puppeteer:latest
# Install pulse audio
RUN apt-get -qq update && apt-get install -y pulseaudio
# Add root user to group for pulseaudio access
RUN adduser root pulse-access
# xvfb
RUN apt-get install -y xvfb
# ffmpeg
RUN apt-get install -y ffmpeg
# Copy recorder
WORKDIR /app
COPY package.json package-lock.json tsconfig.json ./
COPY src ./src
RUN npm install
# Silence error about livekit-server-sdk protos
RUN npx tsc src/*.ts; exit 0
COPY entrypoint.sh .
ENTRYPOINT ["./entrypoint.sh"]

View File

@@ -1,265 +0,0 @@
# LiveKit Recorder
## How it works
The recorder launches Chrome and navigates to the supplied url, grabs audio from pulse and video from a virtual frame
buffer, and feeds them into ffmpeg. You can write the output as mp4 to a file or upload it to s3, or forward the
output to a rtmp stream.
It can be used standalone to make a single recording of any webpage, or it can be managed by our
[recorder service](https://github.com/livekit/livekit-recorder/tree/main/service).
## Config
The recorder expects a json config in the `LIVEKIT_RECORDER_CONFIG` environment variable:
```bash
LIVEKIT_RECORDER_CONFIG="$(cat config.json)"
```
Input: Either `url` or `template` required. For `template`, either `token`, or `api_key`, `api_secret`, and `room_name`
required.
Output: Either `file`, `rtmp`, or `s3` required.
See [Input](#input) and [Output](#output) sections below for more details.
All config options:
```yaml
{
"api_key": livekit server api key - required if using template with room_name instead of token
"api_secret": livekit server api secret - required if using template with room_name instead of token
"input": {
"url": custom url of recording web page
"template": {
"layout": <grid|speaker>-<light|dark>
"ws_url": livekit server websocket url
"token": livekit access token
"room_name": room name
}
}
"output": {
"file": filename
"rtmp": rtmp url
"s3": {
"access_key": aws access id
"secret": aws secret
"bucket": s3 bucket
"key": filename
}
}
"options": {
"preset": valid values are 720p30, 720p60, 1080p30, or 1080p60
"input_width": defaults to 1920 (optional)
"input_height": defaults to 1080 (optional)
"depth": defaults to 24 (optional)
"framerate": defaults to 30 (optional)
"output_width": scale output width (optional)
"output_height": scale output height (optional)
"audio_bitrate": defaults to 128 (kbps, optional)
"audio_frequency": defaults to 44100 (optional)
"video_bitrate": defaults to 4500 (kpbs, optional)
}
}
```
The `options.preset` field will provide defaults using the following values:
| Preset | input_width | input_height | framerate | video_bitrate |
|--- |--- |--- |--- |--- |
| 720p30 | 1280 | 720 | 30 | 3000 |
| 720p60 | 1280 | 720 | 60 | 4500 |
| 1080p30 | 1920 | 1080 | 30 | 4500 |
| 1080p60 | 1920 | 1080 | 60 | 6000 |
If you don't supply any options, it defaults to 1080p 30 fps.
## Input
### Using templates
We currently have 4 templates available - grid or speaker, each available in light or dark.
Just supply your server api key and secret, along with the websocket url.
Check out our [templates README](https://github.com/livekit/livekit-recorder/tree/main/web) to learn more or create your own.
```json
{
"api_key": "<key>",
"api_secret": "<secret>",
"input": {
"template": {
"layout": "<grid|speaker>-<light|dark>",
"ws_url": "wss://your-livekit-address.com",
"room_name": "room-to-record"
}
}
}
```
Or, to use your own token instead of having the recorder generate one:
```json
{
"input": {
"template": {
"layout": "<grid|speaker>-<light|dark>",
"ws_url": "wss://your-livekit-address.com",
"token": "<token>"
}
}
}
```
### Using a custom webpage
You can also save or stream any other webpage - just supply the url.
```json
{
"input": {
"url": "your-recording-domain.com"
}
}
```
## Output
### Save to file
```json
{
"output": {
"file": "/app/out/recording.mp4"
}
}
```
Note: your local mounted directory needs to exist, and the docker directory should match file output (i.e. `/app/out`)
```bash
mkdir -p ~/livekit/output
docker run --rm -e LIVEKIT_RECORDER_CONFIG="$(cat config.json)" \
-v ~/livekit/output:/app/out \
livekit/livekit-recorder
```
### Upload to S3
```json
{
"output": {
"S3": {
"access_key": "<aws-access-key>",
"secret": "<aws-secret>",
"bucket": "<bucket-name>",
"key": "recording.mp4"
}
}
}
```
```bash
docker run --rm -e LIVEKIT_RECORDER_CONFIG="$(cat config.json)" livekit/livekit-recorder
```
### RTMP
```json
{
"output": {
"rtmp": "<rtmp://stream-url.com>"
}
}
```
```bash
docker run --rm -e LIVEKIT_RECORDER_CONFIG="$(cat config.json)" livekit/livekit-recorder
```
## Ending a recording
Once started, there are a number of ways to end the recording:
* `docker stop <container>`
* if using our templates, the recorder will stop automatically when the last participant leaves
* if using your own webpage, logging `END_RECORDING` to the console
With any of these methods, the recorder will stop ffmpeg and finish uploading before shutting down.
## Examples
### Basic recording
basic.json:
```json
{
"api_key": "<server-api-key>",
"api_secret": "<server-api-secret>",
"input": {
"template": {
"layout": "speaker-dark",
"ws_url": "<wss://livekit.your-domain.com>",
"room_name": "<my-room>"
}
},
"output": {
"file": "/app/out/recording.mp4"
}
}
```
```bash
mkdir -p ~/livekit/output
docker run --rm -e LIVEKIT_RECORDER_CONFIG="$(cat basic.json)" \
-v ~/livekit/output:/app/out \
livekit/livekit-recorder
```
### Record custom url at 720p, 60fps and upload to s3
s3.json:
```json
{
"input": {
"url": "https://your-recording-domain.com",
},
"output": {
"s3": {
"access_key": "<aws-access-key>",
"secret": "<aws-secret>",
"bucket": "<my-bucket>",
"key": "recording.mp4"
},
},
"options": {
"preset": "720p60"
}
}
```
```bash
docker run --rm --name my-recorder -e LIVEKIT_RECORDER_CONFIG="$(cat s3.json)" livekit/livekit-recorder
```
```bash
docker stop my-recorder
```
### Stream to Twitch, scaling output from 1080p to 720p
twitch.json:
```json
{
"input": {
"template": {
"layout": "speaker-dark",
"ws_url": "<wss://livekit.your-domain.com>",
"token": "<recording-token>"
}
},
"output": {
"rtmp": "rtmp://live.twitch.tv/app/<stream-key>",
},
"options": {
"input_width": 1920,
"input_height": 1080,
"output_width": 1280,
"output_height": 720
}
}
```
```bash
docker run --rm -e LIVEKIT_RECORDER_CONFIG="$(cat twitch.json)" livekit/livekit-recorder
```

View File

@@ -1,33 +0,0 @@
#!/usr/bin/env bash
set -euxo pipefail
# Cleanup to be "stateless" on startup, otherwise pulseaudio daemon can't start
rm -rf /var/run/pulse /var/lib/pulse /root/.config/pulse
# Start pulseaudio as system wide daemon; for debugging it helps to start in non-daemon mode
pulseaudio -D --verbose --exit-idle-time=-1 --system --disallow-exit
# Load audio sink
pactl load-module module-null-sink sink_name="grab" sink_properties=device.description="monitorOUT"
# Forward signals to node
pid=0
# SIGTERM-handler
term_handler() {
if [ $pid -ne 0 ]; then
kill -SIGTERM "$pid"
wait "$pid"
fi
exit 143; # 128 + 15 -- SIGTERM
}
# On callback, kill the last background process, which is `tail -f /dev/null` and execute the specified handler
trap 'kill ${!}; term_handler' SIGTERM
# Run recorder
node src/record.js &
pid="$!"
# Wait
wait ${!}

File diff suppressed because it is too large Load Diff

View File

@@ -1,22 +0,0 @@
{
"name": "livekit-recording",
"description": "LiveKit Recording",
"repository": "git@github.com:livekit/livekit-recorder.git",
"author": "David Colburn <xero73@gmail.com>",
"license": "Apache-2.0",
"devDependencies": {
"@types/node": "^16.4.1",
"@typescript-eslint/eslint-plugin": "^4.28.4",
"@typescript-eslint/parser": "^4.28.4",
"eslint": "^7.31.0",
"ts-node": "^10.1.0",
"typescript": "^4.3.5"
},
"dependencies": {
"@types/puppeteer": "^5.4.4",
"aws-sdk": "^2.953.0",
"livekit-server-sdk": "^0.4.7",
"puppeteer": "^10.1.0",
"xvfb": "^0.4.0"
}
}

View File

@@ -1,166 +0,0 @@
import {AccessToken} from "livekit-server-sdk";
import {S3} from "aws-sdk";
import {readFileSync} from "fs";
type Config = {
api_key?: string
api_secret?: string
input: {
url?: string
template?: {
layout: string
ws_url: string
token?: string
room_name?: string
}
}
output: {
file?: string
rtmp?: string
s3?: {
bucket: string
key: string
access_key?: string
secret?: string
}
}
options: {
preset?: string | number
input_width: number
input_height: number
output_width?: number
output_height?: number
depth: number
framerate: number
audio_bitrate: number
audio_frequency: number
video_bitrate: number
}
}
export function loadConfig(): Config {
if (!process.env.LIVEKIT_RECORDER_CONFIG) {
throw Error('LIVEKIT_RECORDER_CONFIG, LIVEKIT_URL or Template required')
}
// load config from env
const json = JSON.parse(process.env.LIVEKIT_RECORDER_CONFIG)
const conf: Config = {
api_key: json.api_key,
api_secret: json.api_secret,
input: json.input,
output: json.output,
options: {
input_width: 1920,
input_height: 1080,
depth: 24,
framerate: 30,
audio_bitrate: 128,
audio_frequency: 44100,
video_bitrate: 4500,
}
}
switch(json.options?.preset) {
case "720p30":
case "HD_30":
case 1:
conf.options.input_width = 1280
conf.options.input_height = 720
conf.options.video_bitrate = 3000
break
case "720p60":
case "HD_60":
case 2:
conf.options.input_width = 1280
conf.options.input_height = 720
conf.options.framerate = 60
break
case "1080p30":
case "FULL_HD_30":
case 3:
// default
break
case "1080p60":
case "FULL_HD_60":
case 4:
conf.options.framerate = 60
conf.options.video_bitrate = 6000
break
default:
conf.options = {...conf.options, ...json.options}
}
// write to file if no output specified
if (!(conf.output.file || conf.output.rtmp)) {
const now = new Date().toISOString().
replace(/T/, '_').
replace(/\..+/, '')
conf.output.file = `recording_${now}.mp4`
}
return conf
}
export function getUrl(conf: Config): string {
const template = conf.input.template
if (template) {
let token: string
if (template.token) {
token = template.token
} else if (template.room_name && conf.api_key && conf.api_secret) {
token = buildRecorderToken(template.room_name, conf.api_key, conf.api_secret)
} else {
throw Error('Either token, or room name, api key, and secret required')
}
return `https://recorder.livekit.io/#/${template.layout}?url=${encodeURIComponent(template.ws_url)}&token=${token}`
} else if (conf.input.url) {
return conf.input.url
}
throw Error('Input url or template required')
}
export function upload(conf: Config): void {
if (!conf.output.s3) {
return
}
if (!conf.output.file) {
throw Error("output missing")
}
let s3: S3
if (conf.output.s3.access_key && conf.output.s3.secret) {
s3 = new S3({accessKeyId: conf.output.s3.access_key, secretAccessKey: conf.output.s3.secret})
} else {
s3 = new S3()
}
const params = {
Bucket: conf.output.s3.bucket,
Key: conf.output.s3.key,
Body: readFileSync(conf.output.file)
}
s3.upload(params, undefined,function(err, data) {
if (err) {
console.log(err)
} else {
console.log(`file uploaded to ${data.Location}`)
}
})
}
function buildRecorderToken(room: string, key: string, secret: string): string {
const at = new AccessToken(key, secret, {
identity: 'recorder-'+(Math.random()+1).toString(36).substring(2),
})
at.addGrant({
roomJoin: true,
room: room,
canPublish: false,
canSubscribe: true,
hidden: true,
})
return at.toJwt()
}

View File

@@ -1,121 +0,0 @@
import { loadConfig, getUrl, upload} from "./config"
import { Browser, Page, launch } from 'puppeteer'
import { spawn } from 'child_process'
const Xvfb = require('xvfb');
(async () => {
const conf = loadConfig()
// start xvfb
const xvfb = new Xvfb({
displayNum: 10,
silent: true,
xvfb_args: ['-screen', '0', `${conf.options.input_width}x${conf.options.input_height}x${conf.options.depth}`, '-ac']
})
xvfb.start((err: Error) => { if (err) { console.log(err) } })
// launch puppeteer
const browser: Browser = await launch({
headless: false,
defaultViewport: {width: conf.options.input_width, height: conf.options.input_height},
ignoreDefaultArgs: ["--enable-automation"],
args: [
'--kiosk', // full screen, no info bar
'--no-sandbox', // required when running as root
'--autoplay-policy=no-user-gesture-required', // autoplay
'--window-position=0,0',
`--window-size=${conf.options.input_width},${conf.options.input_height}`,
`--display=${xvfb.display()}`,
]
})
// load page
const url = getUrl(conf)
const page: Page = await browser.newPage()
await page.goto(url, {waitUntil: "load"})
// ffmpeg args
const args: string[] = [
'-fflags', 'nobuffer', // reduce delay
'-fflags', '+igndts', // generate dts
'-y', // automatically overwrite
// audio (pulse grab)
'-thread_queue_size', '1024', // avoid thread message queue blocking
'-ac', '2', // 2 channels
'-f', 'pulse', '-i', 'grab.monitor',
// video (x11 grab)
"-draw_mouse", "0", // don't draw the mouse
'-thread_queue_size', '1024', // avoid thread message queue blocking
'-s', `${conf.options.input_width}x${conf.options.input_height}`,
'-r', `${conf.options.framerate}`,
'-f', 'x11grab', '-i', `${xvfb.display()}.0`,
// output audio
'-c:a', 'aac', '-b:a', `${conf.options.audio_bitrate}k`, '-ar', `${conf.options.audio_frequency}`,
'-ac', '2', '-af', 'aresample=async=1',
// output video
'-c:v', 'libx264', '-preset', 'veryfast', '-tune', 'zerolatency',
'-b:v', `${conf.options.video_bitrate}k`,
]
// output scaling
if (conf.options.output_width && conf.options.output_height) {
args.push('-s', `${conf.options.output_width}x${conf.options.output_height}`)
}
// output location
if (conf.output.rtmp) {
args.push(
// streaming settings
'-maxrate', `${conf.options.video_bitrate}k`,
'-bufsize', `${conf.options.video_bitrate * 2}k`,
'-f', 'flv', conf.output.rtmp,
)
console.log(`Streaming to ${conf.output.rtmp}`)
} else if (conf.output.file) {
const filename = conf.output.file
args.push(filename)
console.log(`Writing to ${filename}`)
} else {
throw Error("Missing ffmpeg output")
}
// spawn ffmpeg
console.log('Start recording')
const ffmpeg = spawn('ffmpeg', args)
ffmpeg.stdout.pipe(process.stdout)
ffmpeg.stderr.pipe(process.stderr)
ffmpeg.on('error', (err) => {
console.log(`ffmpeg error: ${err}`)
})
ffmpeg.on('close', (code, signal) => {
console.log(`ffmpeg closed. code: ${code}, signal: ${signal}`)
xvfb.stop()
upload(conf)
});
let stopped = false
const stop = async () => {
if (stopped) {
return
}
stopped = true
console.log('End recording')
ffmpeg.kill('SIGINT')
await browser.close()
}
process.once('SIGINT', await stop)
process.once('SIGTERM', await stop)
// wait for END_RECORDING
page.on('console', async (msg) => {
if (msg.text() === 'END_RECORDING') {
await stop()
}
})
})().catch((err) => {
console.log(err)
});

View File

@@ -1,11 +0,0 @@
{
"compilerOptions": {
"target": "es2015", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */
"module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */
"allowJs": true, /* Allow javascript files to be compiled. */
"strict": true, /* Enable all strict type-checking options. */
"esModuleInterop": true, /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */
"skipLibCheck": true, /* Skip type checking of declaration files. */
"forceConsistentCasingInFileNames": true /* Disallow inconsistently-cased references to the same file. */
},
}

22
service/.gitignore vendored
View File

@@ -1,22 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# checksums of file tree
.checksumgo
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# generated files
proto/
# Dependency directories (remove the comment below to include it)
# vendor/
bin/

View File

@@ -1,78 +0,0 @@
# LiveKit Recorder Service
## How it works
The service listens to a redis subscription and waits for the LiveKit server to make a reservation. Once the reservation
is made to ensure availability, the service waits for a `START_RECORDING` signal from the server before launching the
[recorder](https://github.com/livekit/livekit-recorder/tree/main/recorder). The recorder will be stopped by
either a `END_RECORDING` signal from the server, or automatically when the last participant leaves if using our templates.
A single instance can record one room at a time.
## Guides
See guides and deployment docs at https://docs.livekit.io/guides/recording
## Config
The only required field is redis address. This must be the same redis address used by your LiveKit server.
If you want to use templates without supplying your own tokens, `api_key` and
`api_secret` are also required.
The `options.preset` field will provide defaults using the following values:
| Preset | input_width | input_height | framerate | video_bitrate |
|--- |--- |--- |--- |--- |
| 720p30 | 1280 | 720 | 30 | 3000 |
| 720p60 | 1280 | 720 | 60 | 4500 |
| 1080p30 | 1920 | 1080 | 30 | 4500 |
| 1080p60 | 1920 | 1080 | 60 | 6000 |
If you don't supply any options, it defaults to 1080p 30 fps.
```yaml
redis:
address: redis address, including port (required)
username: redis username (optional)
password: redis password (optional)
db: redis db (optional)
# template config
ws_url: livekit server ws url (required if using templates)
api_key: livekit server api key (required if using templates without supplying tokens)
api_secret: livekit server api secret (required if using templates without supplying tokens)
# default recording options (all optional)
options:
preset: valid options are "720p30", "720p60", "1080p30", or "1080p60"
input_width: defaults to 1920
input_height: defaults to 1080
depth: defaults to 24
framerate: defaults to 30
width: defaults to 0 (no scaling)
height: defaults to 0 (no scaling)
audio_bitrate: defaults to 128 (kbps)
audio_frequency: defaults to 44100 (Hz)
video_bitrate: defaults to 4500 (kbps)
log_level: valid levels are debug, info, warn, error, fatal, or panic (optional)
```
## Running locally
If you want to try running against a local livekit server, you'll need to make a couple changes:
* open `/usr/local/etc/redis.conf` and comment out the line that says `bind 127.0.0.1`
* change `protected-mode yes` to `protected-mode no` in the same file
* add `--network host` to your `docker run` command
* update your redis address from `localhost` to your host ip as docker sees it:
* on linux, this should be `172.17.0.1`
* on mac or windows, run `docker run -it --rm alpine nslookup host.docker.internal` and you should see something like
`Name: host.docker.internal
Address: 192.168.65.2`
These changes allow the service to connect to your local redis instance from inside the docker container.
Finally, to build and run:
```bash
docker build -t recorder-svc .
docker run --network host -e REDIS_HOST="192.168.65.2:6379" recorder-svc
```
You can then use our [cli](https://github.com/livekit/livekit-cli) to submit recording requests to your server.

View File

@@ -1,135 +0,0 @@
package main
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/go-logr/zapr"
"github.com/livekit/protocol/logger"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/livekit/livekit-recorder/service/pkg/config"
"github.com/livekit/livekit-recorder/service/pkg/service"
"github.com/livekit/livekit-recorder/service/version"
)
func main() {
app := &cli.App{
Name: "livekit-recorder-service",
Usage: "LiveKit Recorder Service",
Description: "runs the recording service",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "config",
Usage: "path to LiveKit recording config defaults",
},
&cli.StringFlag{
Name: "config-body",
Usage: "Default LiveKit recording config in JSON, typically passed in as an env var in a container",
EnvVars: []string{"LIVEKIT_RECORDER_SVC_CONFIG"},
},
&cli.StringFlag{
Name: "redis-host",
Usage: "host (incl. port) to redis server",
EnvVars: []string{"REDIS_HOST"},
},
},
Action: runService,
Version: version.Version,
}
if err := app.Run(os.Args); err != nil {
fmt.Println(err)
}
}
func getConfig(c *cli.Context) (*config.Config, error) {
confString, err := getConfigString(c)
if err != nil {
return nil, err
}
return config.NewConfig(confString, c)
}
func initLogger(level string) {
conf := zap.NewProductionConfig()
if level != "" {
lvl := zapcore.Level(0)
if err := lvl.UnmarshalText([]byte(level)); err == nil {
conf.Level = zap.NewAtomicLevelAt(lvl)
}
}
l, _ := conf.Build()
logger.SetLogger(zapr.NewLogger(l), "livekit-recorder")
}
func runService(c *cli.Context) error {
conf, err := getConfig(c)
if err != nil {
return err
}
initLogger(conf.LogLevel)
rc, err := service.NewMessageBus(conf)
if err != nil {
return err
}
worker := service.InitializeWorker(conf, rc)
if conf.HealthPort != 0 {
h := &handler{worker: worker}
go http.ListenAndServe(fmt.Sprintf(":%d", conf.HealthPort), h)
}
finishChan := make(chan os.Signal, 1)
signal.Notify(finishChan, syscall.SIGTERM, syscall.SIGQUIT)
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT)
go func() {
select {
case sig := <-finishChan:
logger.Infow("Exit requested, finishing recording then shutting down", "signal", sig)
worker.Stop(false)
case sig := <-stopChan:
logger.Infow("Exit requested, stopping recording and shutting down", "signal", sig)
worker.Stop(true)
}
}()
return worker.Start()
}
func getConfigString(c *cli.Context) (string, error) {
configFile := c.String("config")
configBody := c.String("config-body")
if configBody == "" {
if configFile != "" {
content, err := ioutil.ReadFile(configFile)
if err != nil {
return "", err
}
configBody = string(content)
}
}
return configBody, nil
}
type handler struct {
worker *service.Worker
}
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(h.worker.Status()))
}

View File

@@ -1,16 +0,0 @@
module github.com/livekit/livekit-recorder/service
go 1.16
require (
github.com/go-logr/zapr v1.0.0
github.com/go-redis/redis/v8 v8.11.3
github.com/livekit/protocol v0.9.2
github.com/magefile/mage v1.11.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/urfave/cli/v2 v2.3.0
go.uber.org/zap v1.18.1
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

View File

@@ -1,91 +0,0 @@
package config
import (
"fmt"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v3"
livekit "github.com/livekit/protocol/proto"
)
type Config struct {
Redis RedisConfig `yaml:"redis"`
ApiKey string `yaml:"api_key"`
ApiSecret string `yaml:"api_secret"`
WsUrl string `yaml:"ws_url"`
S3 S3Config `yaml:"s3"`
HealthPort int `yaml:"health_port"`
Options *livekit.RecordingOptions `yaml:"options"`
LogLevel string `yaml:"log_level"`
Test bool `yaml:"-"`
}
type RedisConfig struct {
Address string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
DB int `yaml:"db"`
}
type S3Config struct {
AccessKey string `yaml:"access_key"`
Secret string `yaml:"secret"`
}
func NewConfig(confString string, c *cli.Context) (*Config, error) {
// start with defaults
conf := &Config{
Redis: RedisConfig{},
Options: &livekit.RecordingOptions{
InputWidth: 1920,
InputHeight: 1080,
Depth: 24,
Framerate: 25,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 4500,
},
LogLevel: "debug",
}
if confString != "" {
if err := yaml.Unmarshal([]byte(confString), conf); err != nil {
return nil, fmt.Errorf("could not parse config: %v", err)
}
}
if c != nil {
if err := conf.updateFromCLI(c); err != nil {
return nil, err
}
}
return conf, nil
}
func TestConfig() *Config {
return &Config{
Redis: RedisConfig{
Address: "localhost:6379",
},
Options: &livekit.RecordingOptions{
InputWidth: 1920,
InputHeight: 1080,
Depth: 24,
Framerate: 25,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 4500,
},
Test: true,
}
}
func (conf *Config) updateFromCLI(c *cli.Context) error {
if c.IsSet("redis-host") {
conf.Redis.Address = c.String("redis-host")
}
return nil
}

View File

@@ -1,73 +0,0 @@
package config
import (
"testing"
"github.com/stretchr/testify/require"
livekit "github.com/livekit/protocol/proto"
)
func TestMerge(t *testing.T) {
defaults := &Config{
Redis: RedisConfig{},
WsUrl: "wss://testing.livekit.io",
S3: S3Config{
AccessKey: "s3key",
Secret: "s3secret",
},
Options: &livekit.RecordingOptions{
InputWidth: 1920,
InputHeight: 1080,
Depth: 24,
Framerate: 30,
AudioBitrate: 128,
AudioFrequency: 44100,
VideoBitrate: 4500,
},
}
req := &livekit.RecordingReservation{
Id: "id",
Request: &livekit.StartRecordingRequest{
Input: &livekit.RecordingInput{
Template: &livekit.RecordingTemplate{
Layout: "grid-dark",
Token: "token",
},
},
Output: &livekit.RecordingOutput{
S3Path: "bucket/recording.mp4",
},
Options: &livekit.RecordingOptions{
Framerate: 60,
VideoBitrate: 6000,
},
},
}
merged, err := Merge(defaults, req)
require.NoError(t, err)
expected := "{\"input\":{\"template\":{\"layout\":\"grid-dark\",\"ws_url\":\"wss://testing.livekit.io\",\"token\":\"token\"}},\"output\":{\"s3\":{\"bucket\":\"bucket\",\"key\":\"recording.mp4\",\"access_key\":\"s3key\",\"secret\":\"s3secret\"}},\"options\":{\"audio_bitrate\":128,\"audio_frequency\":44100,\"depth\":24,\"framerate\":60,\"input_height\":1080,\"input_width\":1920,\"video_bitrate\":6000}}"
require.Equal(t, expected, merged)
req = &livekit.RecordingReservation{
Id: "id",
Request: &livekit.StartRecordingRequest{
Input: &livekit.RecordingInput{
Template: &livekit.RecordingTemplate{
Layout: "grid-dark",
Token: "token",
},
},
Output: &livekit.RecordingOutput{
S3Path: "bucket/recording.mp4",
},
},
}
merged, err = Merge(defaults, req)
require.NoError(t, err)
expected = "{\"input\":{\"template\":{\"layout\":\"grid-dark\",\"ws_url\":\"wss://testing.livekit.io\",\"token\":\"token\"}},\"output\":{\"s3\":{\"bucket\":\"bucket\",\"key\":\"recording.mp4\",\"access_key\":\"s3key\",\"secret\":\"s3secret\"}},\"options\":{\"input_width\":1920,\"input_height\":1080,\"depth\":24,\"framerate\":30,\"audio_bitrate\":128,\"audio_frequency\":44100,\"video_bitrate\":4500}}"
require.Equal(t, expected, merged)
}

View File

@@ -1,112 +0,0 @@
package config
import (
"encoding/json"
"errors"
"strings"
livekit "github.com/livekit/protocol/proto"
)
type Request struct {
ApiKey string `json:"api_key,omitempty"`
ApiSecret string `json:"api_secret,omitempty"`
Input *RequestInput `json:"input,omitempty"`
Output *RequestOutput `json:"output,omitempty"`
Options interface{} `json:"options,omitempty"`
}
type RequestInput struct {
Template *RequestTemplate `json:"template,omitempty"`
Url string `json:"url,omitempty"`
}
type RequestTemplate struct {
Layout string `json:"layout,omitempty"`
WsUrl string `json:"ws_url,omitempty"`
Token string `json:"token,omitempty"`
RoomName string `json:"room_name,omitempty"`
}
type RequestOutput struct {
S3 *RequestS3 `json:"s3,omitempty"`
Rtmp string `json:"rtmp,omitempty"`
}
type RequestS3 struct {
Bucket string `json:"bucket,omitempty"`
Key string `json:"key,omitempty"`
AccessKey string `json:"access_key,omitempty"`
Secret string `json:"secret,omitempty"`
}
// Creates a recorder config by combining service defaults with reservation request
func Merge(defaults *Config, res *livekit.RecordingReservation) (string, error) {
req := res.Request
if req.Input == nil || req.Output == nil {
return "", errors.New("input and output required")
}
conf := &Request{
ApiKey: defaults.ApiKey,
ApiSecret: defaults.ApiSecret,
Input: &RequestInput{
Url: req.Input.Url,
},
Output: &RequestOutput{
Rtmp: req.Output.Rtmp,
},
}
if req.Input.Template != nil {
conf.Input.Template = &RequestTemplate{
Layout: req.Input.Template.Layout,
WsUrl: defaults.WsUrl,
Token: req.Input.Template.Token,
RoomName: req.Input.Template.RoomName,
}
}
if idx := strings.Index(req.Output.S3Path, "/"); idx != -1 {
conf.Output.S3 = &RequestS3{
Bucket: req.Output.S3Path[:idx],
Key: req.Output.S3Path[idx+1:],
AccessKey: defaults.S3.AccessKey,
Secret: defaults.S3.Secret,
}
}
if req.Options != nil {
if req.Options.Preset != livekit.RecordingPreset_NONE {
conf.Options = map[string]interface{}{"preset": req.Options.Preset}
} else {
// combine options
options := make(map[string]interface{})
jsonDefaults, err := json.Marshal(defaults.Options)
if err != nil {
return "", err
}
err = json.Unmarshal(jsonDefaults, &options)
if err != nil {
return "", err
}
jsonReq, err := json.Marshal(req.Options)
if err != nil {
return "", err
}
err = json.Unmarshal(jsonReq, &options)
if err != nil {
return "", err
}
conf.Options = options
}
} else {
conf.Options = defaults.Options
}
b, err := json.Marshal(conf)
return string(b), err
}

View File

@@ -1,195 +0,0 @@
package service
import (
"context"
"fmt"
"os"
"os/exec"
"sync/atomic"
"syscall"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-recorder/service/pkg/config"
)
type Worker struct {
ctx context.Context
bus utils.MessageBus
defaults *config.Config
status atomic.Value // Status
shutdown chan struct{}
kill chan struct{}
mock bool
}
type Status string
const (
Available Status = "available"
Reserved Status = "reserved"
Recording Status = "recording"
lockDuration = time.Second * 5
)
func InitializeWorker(conf *config.Config, bus utils.MessageBus) *Worker {
return &Worker{
ctx: context.Background(),
bus: bus,
defaults: conf,
status: atomic.Value{},
shutdown: make(chan struct{}, 1),
kill: make(chan struct{}, 1),
mock: conf.Test,
}
}
func (w *Worker) Start() error {
logger.Debugw("Starting worker", "mock", w.mock)
reservations, err := w.bus.Subscribe(context.Background(), utils.ReservationChannel)
if err != nil {
return err
}
defer reservations.Close()
for {
w.status.Store(Available)
logger.Debugw("Recorder waiting")
select {
case <-w.shutdown:
logger.Debugw("Shutting down")
return nil
case msg := <-reservations.Channel():
logger.Debugw("Request received")
req := &livekit.RecordingReservation{}
err := proto.Unmarshal(reservations.Payload(msg), req)
if err != nil {
logger.Errorw("Malformed request", err)
continue
}
if req.SubmittedAt < time.Now().Add(-utils.ReservationTimeout).UnixNano() {
logger.Debugw("Discarding old request", "ID", req.Id)
continue
}
claimed, err := w.bus.Lock(w.ctx, w.getKey(req.Id), lockDuration)
if err != nil {
logger.Errorw("Request failed", err, "ID", req.Id)
return err
} else if !claimed {
logger.Debugw("Request already claimed", "ID", req.Id)
continue
}
w.status.Store(Reserved)
logger.Debugw("Request claimed", "ID", req.Id)
res, err := w.Record(req)
b, _ := proto.Marshal(res)
_ = w.bus.Publish(w.ctx, utils.RecordingResultChannel, b)
if err != nil {
return err
}
}
}
}
func (w *Worker) Record(req *livekit.RecordingReservation) (res *livekit.RecordingResult, err error) {
res = &livekit.RecordingResult{Id: req.Id}
var startedAt time.Time
defer func() {
if err != nil {
logger.Errorw("Recorder failed", err)
res.Error = err.Error()
} else {
res.Duration = time.Since(startedAt).Milliseconds()
}
}()
start, err := w.bus.Subscribe(w.ctx, utils.StartRecordingChannel(req.Id))
if err != nil {
return
}
defer start.Close()
stop, err := w.bus.Subscribe(w.ctx, utils.EndRecordingChannel(req.Id))
if err != nil {
return
}
defer stop.Close()
err = w.bus.Publish(w.ctx, utils.ReservationResponseChannel(req.Id), nil)
if err != nil {
return
}
// send recording started message
<-start.Channel()
w.status.Store(Recording)
conf, err := config.Merge(w.defaults, req)
if err != nil {
return
}
// Launch node recorder
var cmd *exec.Cmd
logger.Debugw("Launching recorder", "ID", req.Id)
if w.mock {
cmd = exec.Command("sleep", "3")
} else {
cmd = exec.Command("node", "app/src/record.js")
cmd.Env = append(cmd.Env, fmt.Sprintf("LIVEKIT_RECORDER_CONFIG=%s", conf))
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
err = cmd.Start()
if err != nil {
return
}
startedAt = time.Now()
logger.Infow("Recording started", "ID", req.Id)
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case err = <-done:
break
case <-stop.Channel():
logger.Infow("Recording stopped by livekit server", "ID", req.Id)
err = cmd.Process.Signal(syscall.SIGTERM)
case <-w.kill:
logger.Infow("Recording stopped by recording service interrupt", "ID", req.Id)
err = cmd.Process.Signal(syscall.SIGTERM)
}
return
}
func (w *Worker) Status() Status {
return w.status.Load().(Status)
}
func (w *Worker) Stop(kill bool) {
w.shutdown <- struct{}{}
if kill {
w.kill <- struct{}{}
}
}
func (w *Worker) getKey(id string) string {
return fmt.Sprintf("recording-lock-%s", id)
}

View File

@@ -1,147 +0,0 @@
package service
import (
"context"
"testing"
"time"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-recorder/service/pkg/config"
)
func TestWorker(t *testing.T) {
conf := config.TestConfig()
rc, err := NewMessageBus(conf)
require.NoError(t, err)
worker := InitializeWorker(conf, rc)
go func() {
err := worker.Start()
require.NoError(t, err)
}()
// wait for worker to start
time.Sleep(time.Millisecond * 100)
t.Run("Submit", func(t *testing.T) {
require.Equal(t, Available, worker.Status())
submit(t, rc, worker)
// wait to finish
time.Sleep(time.Millisecond * 3100)
require.Equal(t, Available, worker.Status())
})
t.Run("Reserved", func(t *testing.T) {
require.Equal(t, Available, worker.Status())
submit(t, rc, worker)
submitReserved(t, rc)
// wait to finish
time.Sleep(time.Millisecond * 3100)
require.Equal(t, Available, worker.Status())
})
t.Run("Stop", func(t *testing.T) {
require.Equal(t, Available, worker.Status())
id := submit(t, rc, worker)
// server ends recording
require.NoError(t, rc.Publish(context.Background(), utils.EndRecordingChannel(id), nil))
time.Sleep(time.Millisecond * 50)
// check that recording has ended early
require.Equal(t, Available, worker.Status())
})
t.Run("Kill", func(t *testing.T) {
require.Equal(t, Available, worker.Status())
submit(t, rc, worker)
// worker is killed
worker.Stop(true)
time.Sleep(time.Millisecond * 50)
// check that recording has ended early
require.Equal(t, Available, worker.Status())
})
}
func submit(t *testing.T, rc utils.MessageBus, worker *Worker) string {
// send recording reservation
req := &livekit.RecordingReservation{
SubmittedAt: time.Now().UnixNano(),
Request: &livekit.StartRecordingRequest{
Input: &livekit.RecordingInput{
Template: &livekit.RecordingTemplate{
Layout: "speaker-light",
Token: "token",
},
},
Output: &livekit.RecordingOutput{
S3Path: "bucket/recording.mp4",
},
},
}
// server sends reservation
id, err := reserveRecorder(rc, req)
require.NoError(t, err)
// check that worker is reserved
require.Equal(t, Reserved, worker.Status())
// start recording
require.NoError(t, rc.Publish(context.Background(), utils.StartRecordingChannel(id), nil))
time.Sleep(time.Millisecond * 50)
// check that worker is recording
require.Equal(t, Recording, worker.Status())
return id
}
func submitReserved(t *testing.T, rc utils.MessageBus) {
// send recording reservation
req := &livekit.RecordingReservation{
SubmittedAt: time.Now().UnixNano(),
Request: &livekit.StartRecordingRequest{
Input: &livekit.RecordingInput{
Template: &livekit.RecordingTemplate{
Layout: "speaker-light",
Token: "token",
},
},
Output: &livekit.RecordingOutput{
S3Path: "bucket/recording.mp4",
},
},
}
// server sends reservation
_, err := reserveRecorder(rc, req)
require.Error(t, err)
}
func reserveRecorder(rc utils.MessageBus, req *livekit.RecordingReservation) (string, error) {
id := utils.NewGuid(utils.RecordingPrefix)
req.Id = id
b, err := proto.Marshal(req)
if err != nil {
return "", err
}
sub, _ := rc.Subscribe(context.Background(), utils.ReservationResponseChannel(id))
defer sub.Close()
err = rc.Publish(context.Background(), utils.ReservationChannel, string(b))
if err != nil {
return "", err
}
select {
case <-sub.Channel():
return id, nil
case <-time.After(utils.RecorderTimeout):
return "", errors.New("no recorders available")
}
}

View File

@@ -1,3 +0,0 @@
package version
const Version = "0.2.5"

3
version/version.go Normal file
View File

@@ -0,0 +1,3 @@
package version
const Version = "0.3.0"