Feat livestream support (#180)

* experimental livestrea support

* test livestream

* update wait time detection

* update livestream functions

* persist and restore livestreams monitor session

* fan-in logging

* deps update

* added live time display

* livestream monitor prototype

* changed to default logger instead of passing *slog.Logger everywhere

* code refactoring, comments
This commit is contained in:
Marco Piovanello
2024-08-19 22:08:09 +02:00
committed by GitHub
parent a64798644a
commit fd5e62e23b
25 changed files with 865 additions and 95 deletions

View File

@@ -0,0 +1,193 @@
package livestream
import (
"bufio"
"errors"
"io"
"os"
"os/exec"
"strconv"
"strings"
"time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
const (
waiting = iota
inProgress
completed
errored
)
// Defines a generic livestream.
// A livestream is identified by its url.
type LiveStream struct {
url string
proc *os.Process // used to manually kill the yt-dlp process
status int // whether is monitoring or completed
log chan []byte // keeps tracks of the process logs while monitoring, not when started
done chan *LiveStream // where to signal the completition
waitTimeChan chan time.Duration // time to livestream start
errors chan error
waitTime time.Duration
liveDate time.Time
}
func New(url string, log chan []byte, done chan *LiveStream) *LiveStream {
return &LiveStream{
url: url,
done: done,
status: waiting,
waitTime: time.Second * 0,
log: log,
errors: make(chan error),
waitTimeChan: make(chan time.Duration),
}
}
// Start the livestream monitoring process, once completion signals on the done channel
func (l *LiveStream) Start() error {
cmd := exec.Command(
config.Instance().DownloaderPath,
l.url,
"--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs
"--no-colors", // no ansi color fuzz
"--paths", config.Instance().DownloadPath,
)
l.proc = cmd.Process
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
defer stdout.Close()
if err := cmd.Start(); err != nil {
l.status = errored
return err
}
l.status = waiting
// Start monitoring when the livestream is goin to be live.
// If already live do nothing.
go l.monitorStartTime(stdout)
// Wait to the yt-dlp+ffmpeg process to finish.
cmd.Wait()
// Set the job as completed and notify the parent the completion.
l.status = completed
l.done <- l
return nil
}
func (l *LiveStream) monitorStartTime(r io.Reader) error {
// yt-dlp shows the time in the stdout
scanner := bufio.NewScanner(r)
defer func() {
close(l.waitTimeChan)
close(l.errors)
}()
// however the time to live is not shown in a new line (and atm there's nothing to do about)
// use a custom split funciton to set the line separator to \r instead of \r\n or \n
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
for i := 0; i < len(data); i++ {
if data[i] == '\r' {
return i + 1, data[:i], nil
}
}
if !atEOF {
return 0, nil, nil
}
return 0, data, bufio.ErrFinalToken
})
// start scanning the stdout
for scanner.Scan() {
// l.log <- scanner.Bytes()
parts := strings.Split(scanner.Text(), ": ")
if len(parts) < 2 {
continue
}
// if this substring is in the current line the download is starting,
// no need to monitor the time to live.
//TODO: silly
if !strings.Contains(scanner.Text(), "Remaining time until next attempt") {
l.status = inProgress
return nil
}
startsIn := parts[1]
parsed, err := parseTimeSpan(startsIn)
if err != nil {
continue
}
l.liveDate = parsed
//TODO: check if useing channels is stupid or not
// l.waitTimeChan <- time.Until(start)
l.waitTime = time.Until(parsed)
}
return nil
}
func (l *LiveStream) WaitTime() <-chan time.Duration {
return l.waitTimeChan
}
// Kills a livestream process and signal its completition
func (l *LiveStream) Kill() error {
l.done <- l
if l.proc != nil {
return l.proc.Kill()
}
return errors.New("nil yt-dlp process")
}
// Parse the timespan returned from yt-dlp (time to live)
//
// parsed := parseTimeSpan("76:12:15")
// fmt.Println(parsed) // 2024-07-21 13:59:59.634781 +0200 CEST
func parseTimeSpan(timeStr string) (time.Time, error) {
parts := strings.Split(timeStr, ":")
hh, err := strconv.Atoi(parts[0])
if err != nil {
return time.Time{}, err
}
mm, err := strconv.Atoi(parts[1])
if err != nil {
return time.Time{}, err
}
ss, err := strconv.Atoi(parts[2])
if err != nil {
return time.Time{}, err
}
dd := 0
if hh > 24 {
dd = hh / 24
hh = hh % 24
}
start := time.Now()
start = start.AddDate(0, 0, dd)
start = start.Add(time.Duration(hh) * time.Hour)
start = start.Add(time.Duration(mm) * time.Minute)
start = start.Add(time.Duration(ss) * time.Second)
return start, nil
}

View File

@@ -0,0 +1,36 @@
package livestream
import (
"testing"
"time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
func setupTest() {
config.Instance().DownloaderPath = "yt-dlp"
}
func TestLivestream(t *testing.T) {
setupTest()
done := make(chan *LiveStream)
log := make(chan []byte)
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", log, done)
go ls.Start()
time.AfterFunc(time.Second*20, func() {
ls.Kill()
})
for {
select {
case wt := <-ls.WaitTime():
t.Log(wt)
case <-done:
t.Log("done")
return
}
}
}

View File

@@ -0,0 +1,118 @@
package livestream
import (
"encoding/gob"
"log/slog"
"os"
"path/filepath"
"time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
type Monitor struct {
streams map[string]*LiveStream // keeps track of the livestreams
done chan *LiveStream // to signal individual processes completition
logs chan []byte // to signal individual processes completition
}
func NewMonitor() *Monitor {
return &Monitor{
streams: make(map[string]*LiveStream),
done: make(chan *LiveStream),
}
}
// Detect each livestream completition, if done remove it from the monitor.
func (m *Monitor) Schedule() {
for l := range m.done {
delete(m.streams, l.url)
}
}
func (m *Monitor) Add(url string) {
ls := New(url, m.logs, m.done)
go ls.Start()
m.streams[url] = ls
}
func (m *Monitor) Remove(url string) error {
return m.streams[url].Kill()
}
func (m *Monitor) RemoveAll() error {
for _, v := range m.streams {
if err := v.Kill(); err != nil {
return err
}
}
return nil
}
func (m *Monitor) Status() LiveStreamStatus {
status := make(LiveStreamStatus)
for k, v := range m.streams {
// wt, ok := <-v.WaitTime()
// if !ok {
// continue
// }
status[k] = struct {
Status int
WaitTime time.Duration
LiveDate time.Time
}{
Status: v.status,
WaitTime: v.waitTime,
LiveDate: v.liveDate,
}
}
return status
}
// Persist the monitor current state to a file.
// The file is located in the configured config directory
func (m *Monitor) Persist() error {
fd, err := os.Create(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
if err != nil {
return err
}
defer fd.Close()
slog.Debug("persisting livestream monitor state")
return gob.NewEncoder(fd).Encode(m.streams)
}
// Restore a saved state and resume the monitored livestreams
func (m *Monitor) Restore() error {
fd, err := os.Open(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
if err != nil {
return err
}
defer fd.Close()
restored := make(map[string]*LiveStream)
if err := gob.NewDecoder(fd).Decode(&restored); err != nil {
return err
}
for k := range restored {
m.Add(k)
}
slog.Debug("restored livestream monitor state")
return nil
}
// Return a fan-in logs channel
func (m *Monitor) Logs() <-chan []byte {
return m.logs
}

View File

@@ -0,0 +1 @@
package livestream

View File

@@ -0,0 +1,11 @@
package livestream
import "time"
type LiveStreamStatus = map[string]Status
type Status = struct {
Status int
WaitTime time.Duration
LiveDate time.Time
}

View File

@@ -3,7 +3,6 @@ package internal
import (
"encoding/gob"
"errors"
"log/slog"
"os"
"path/filepath"
"sync"
@@ -92,7 +91,7 @@ func (m *MemoryDB) Persist() error {
}
// Restore a persisted state
func (m *MemoryDB) Restore(mq *MessageQueue, logger *slog.Logger) {
func (m *MemoryDB) Restore(mq *MessageQueue) {
fd, err := os.Open("session.dat")
if err != nil {
return
@@ -112,7 +111,6 @@ func (m *MemoryDB) Restore(mq *MessageQueue, logger *slog.Logger) {
Progress: proc.Progress,
Output: proc.Output,
Params: proc.Params,
Logger: logger,
}
m.table.Store(proc.Id, restored)

View File

@@ -15,14 +15,13 @@ const queueName = "process:pending"
type MessageQueue struct {
concurrency int
eventBus evbus.Bus
logger *slog.Logger
}
// Creates a new message queue.
// By default it will be created with a size equals to nthe number of logical
// CPU cores -1.
// The queue size can be set via the qs flag.
func NewMessageQueue(l *slog.Logger) (*MessageQueue, error) {
func NewMessageQueue() (*MessageQueue, error) {
qs := config.Instance().QueueSize
if qs <= 0 {
@@ -32,7 +31,6 @@ func NewMessageQueue(l *slog.Logger) (*MessageQueue, error) {
return &MessageQueue{
concurrency: qs,
eventBus: evbus.New(),
logger: l,
}, nil
}
@@ -59,7 +57,7 @@ func (m *MessageQueue) downloadConsumer() {
sem.Acquire(context.Background(), 1)
defer sem.Release(1)
m.logger.Info("received process from event bus",
slog.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "downloadConsumer"),
slog.String("id", p.getShortId()),
@@ -69,7 +67,7 @@ func (m *MessageQueue) downloadConsumer() {
p.Start()
}
m.logger.Info("started process",
slog.Info("started process",
slog.String("bus", queueName),
slog.String("id", p.getShortId()),
)
@@ -88,14 +86,14 @@ func (m *MessageQueue) metadataSubscriber() {
sem.Acquire(context.TODO(), 1)
defer sem.Release(1)
m.logger.Info("received process from event bus",
slog.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "metadataConsumer"),
slog.String("id", p.getShortId()),
)
if p.Progress.Status == StatusCompleted {
m.logger.Warn("proccess has an illegal state",
slog.Warn("proccess has an illegal state",
slog.String("id", p.getShortId()),
slog.Int("status", p.Progress.Status),
)
@@ -103,7 +101,7 @@ func (m *MessageQueue) metadataSubscriber() {
}
if err := p.SetMetadata(); err != nil {
m.logger.Error("failed to retrieve metadata",
slog.Error("failed to retrieve metadata",
slog.String("id", p.getShortId()),
slog.String("err", err.Error()),
)

View File

@@ -19,7 +19,7 @@ type metadata struct {
Type string `json:"_type"`
}
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger *slog.Logger) error {
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
var (
downloader = config.Instance().DownloaderPath
cmd = exec.Command(downloader, req.URL, "--flat-playlist", "-J")
@@ -36,7 +36,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
return err
}
logger.Info("decoding playlist metadata", slog.String("url", req.URL))
slog.Info("decoding playlist metadata", slog.String("url", req.URL))
if err := json.NewDecoder(stdout).Decode(&m); err != nil {
return err
@@ -46,7 +46,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
return err
}
logger.Info("decoded playlist metadata", slog.String("url", req.URL))
slog.Info("decoded playlist metadata", slog.String("url", req.URL))
if m.Type == "" {
return errors.New("probably not a valid URL")
@@ -57,7 +57,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
return a.URL == b.URL
})
logger.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries)))
slog.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries)))
for i, meta := range entries {
// detect playlist title from metadata since each playlist entry will be
@@ -78,7 +78,6 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
Output: DownloadOutput{Filename: req.Rename},
Info: meta,
Params: req.Params,
Logger: logger,
}
proc.Info.URL = meta.URL
@@ -93,12 +92,11 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
proc := &Process{
Url: req.URL,
Params: req.Params,
Logger: logger,
}
db.Set(proc)
mq.Publish(proc)
logger.Info("sending new process to message queue", slog.String("url", proc.Url))
slog.Info("sending new process to message queue", slog.String("url", proc.Url))
return cmd.Wait()
}

View File

@@ -47,7 +47,6 @@ type Process struct {
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
Logger *slog.Logger
}
// Starts spawns/forks a new yt-dlp process and parse its stdout.
@@ -108,7 +107,7 @@ func (p *Process) Start() {
r, err := cmd.StdoutPipe()
if err != nil {
p.Logger.Error(
slog.Error(
"failed to connect to stdout",
slog.String("err", err.Error()),
)
@@ -116,7 +115,7 @@ func (p *Process) Start() {
}
if err := cmd.Start(); err != nil {
p.Logger.Error(
slog.Error(
"failed to start yt-dlp process",
slog.String("err", err.Error()),
)
@@ -167,7 +166,7 @@ func (p *Process) Start() {
ETA: progress.Eta,
}
p.Logger.Info("progress",
slog.Info("progress",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("percentage", progress.Percentage),
@@ -190,7 +189,7 @@ func (p *Process) Complete() {
ETA: 0,
}
p.Logger.Info("finished",
slog.Info("finished",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
@@ -227,7 +226,7 @@ func (p *Process) GetFormats() (DownloadFormats, error) {
stdout, err := cmd.Output()
if err != nil {
p.Logger.Error("failed to retrieve metadata", slog.String("err", err.Error()))
slog.Error("failed to retrieve metadata", slog.String("err", err.Error()))
return DownloadFormats{}, err
}
@@ -247,7 +246,7 @@ func (p *Process) GetFormats() (DownloadFormats, error) {
p.Url,
)
p.Logger.Info(
slog.Info(
"retrieving metadata",
slog.String("caller", "getFormats"),
slog.String("url", p.Url),
@@ -307,7 +306,7 @@ func (p *Process) SetMetadata() error {
stdout, err := cmd.StdoutPipe()
if err != nil {
p.Logger.Error("failed to connect to stdout",
slog.Error("failed to connect to stdout",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
@@ -317,7 +316,7 @@ func (p *Process) SetMetadata() error {
stderr, err := cmd.StderrPipe()
if err != nil {
p.Logger.Error("failed to connect to stderr",
slog.Error("failed to connect to stderr",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
@@ -340,7 +339,7 @@ func (p *Process) SetMetadata() error {
io.Copy(&bufferedStderr, stderr)
}()
p.Logger.Info("retrieving metadata",
slog.Info("retrieving metadata",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)