Compare commits

..

2 Commits

6 changed files with 46 additions and 58 deletions

View File

@@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"errors" "errors"
"io" "io"
"log/slog"
"os" "os"
"os/exec" "os/exec"
"strconv" "strconv"
@@ -12,6 +11,7 @@ import (
"time" "time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config" "github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
) )
const ( const (
@@ -27,23 +27,24 @@ type LiveStream struct {
url string url string
proc *os.Process // used to manually kill the yt-dlp process proc *os.Process // used to manually kill the yt-dlp process
status int // whether is monitoring or completed status int // whether is monitoring or completed
log chan []byte // keeps tracks of the process logs while monitoring, not when started
done chan *LiveStream // where to signal the completition done chan *LiveStream // where to signal the completition
waitTimeChan chan time.Duration // time to livestream start waitTimeChan chan time.Duration // time to livestream start
errors chan error
waitTime time.Duration waitTime time.Duration
liveDate time.Time liveDate time.Time
mq *internal.MessageQueue
db *internal.MemoryDB
} }
func New(url string, log chan []byte, done chan *LiveStream) *LiveStream { func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *internal.MemoryDB) *LiveStream {
return &LiveStream{ return &LiveStream{
url: url, url: url,
done: done, done: done,
status: waiting, status: waiting,
waitTime: time.Second * 0, waitTime: time.Second * 0,
log: log,
errors: make(chan error),
waitTimeChan: make(chan time.Duration), waitTimeChan: make(chan time.Duration),
mq: mq,
db: db,
} }
} }
@@ -52,8 +53,9 @@ func (l *LiveStream) Start() error {
cmd := exec.Command( cmd := exec.Command(
config.Instance().DownloaderPath, config.Instance().DownloaderPath,
l.url, l.url,
"--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs "--wait-for-video", "30", // wait for the stream to be live and recheck every 10 secs
"--no-colors", // no ansi color fuzz "--no-colors", // no ansi color fuzz
"--simulate",
"--newline", "--newline",
"--paths", config.Instance().DownloadPath, "--paths", config.Instance().DownloadPath,
) )
@@ -65,13 +67,6 @@ func (l *LiveStream) Start() error {
} }
defer stdout.Close() defer stdout.Close()
stderr, err := cmd.StderrPipe()
if err != nil {
l.status = errored
return err
}
defer stderr.Close()
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
l.status = errored l.status = errored
return err return err
@@ -82,37 +77,30 @@ func (l *LiveStream) Start() error {
// Start monitoring when the livestream is goin to be live. // Start monitoring when the livestream is goin to be live.
// If already live do nothing. // If already live do nothing.
doneWaiting := make(chan struct{}) go l.monitorStartTime(stdout)
go l.monitorStartTime(stdout, doneWaiting)
go func() { // Wait to the simulated download process to finish.
<-doneWaiting
l.logFFMpeg(io.MultiReader(stdout, stderr))
}()
// Wait to the yt-dlp+ffmpeg process to finish.
cmd.Wait() cmd.Wait()
// Set the job as completed and notify the parent the completion. // Set the job as completed and notify the parent the completion.
l.status = completed l.status = completed
l.done <- l l.done <- l
// cleanup // Send the started livestream to the message queue! :D
close(doneWaiting) p := &internal.Process{Url: l.url, Livestream: true}
l.db.Set(p)
l.mq.Publish(p)
return nil return nil
} }
func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) { func (l *LiveStream) monitorStartTime(r io.Reader) {
// yt-dlp shows the time in the stdout // yt-dlp shows the time in the stdout
scanner := bufio.NewScanner(r) scanner := bufio.NewScanner(r)
defer func() { defer func() {
l.status = inProgress l.status = inProgress
doneWait <- struct{}{}
close(l.waitTimeChan) close(l.waitTimeChan)
close(l.errors)
}() }()
// however the time to live is not shown in a new line (and atm there's nothing to do about) // however the time to live is not shown in a new line (and atm there's nothing to do about)
@@ -164,9 +152,8 @@ func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
*/ */
for range TRIES { for range TRIES {
scanner.Scan() scanner.Scan()
line := scanner.Text()
if strings.Contains(line, "Waiting for") { if strings.Contains(scanner.Text(), "Waiting for") {
waitTimeScanner() waitTimeScanner()
} }
} }
@@ -222,11 +209,3 @@ func parseTimeSpan(timeStr string) (time.Time, error) {
return start, nil return start, nil
} }
func (l *LiveStream) logFFMpeg(r io.Reader) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
slog.Info("livestream ffmpeg output", slog.String("url", l.url), slog.String("stdout", scanner.Text()))
}
}

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config" "github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
) )
func setupTest() { func setupTest() {
@@ -15,9 +16,8 @@ func TestLivestream(t *testing.T) {
setupTest() setupTest()
done := make(chan *LiveStream) done := make(chan *LiveStream)
log := make(chan []byte)
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", log, done) ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", done, &internal.MessageQueue{}, &internal.MemoryDB{})
go ls.Start() go ls.Start()
time.AfterFunc(time.Second*20, func() { time.AfterFunc(time.Second*20, func() {

View File

@@ -8,22 +8,26 @@ import (
"time" "time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config" "github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
) )
type Monitor struct { type Monitor struct {
db *internal.MemoryDB // where the just started livestream will be published
mq *internal.MessageQueue // where the just started livestream will be published
streams map[string]*LiveStream // keeps track of the livestreams streams map[string]*LiveStream // keeps track of the livestreams
done chan *LiveStream // to signal individual processes completition done chan *LiveStream // to signal individual processes completition
logs chan []byte // to signal individual processes completition
} }
func NewMonitor() *Monitor { func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor {
return &Monitor{ return &Monitor{
mq: mq,
db: db,
streams: make(map[string]*LiveStream), streams: make(map[string]*LiveStream),
done: make(chan *LiveStream), done: make(chan *LiveStream),
} }
} }
// Detect each livestream completition, if done remove it from the monitor. // Detect each livestream completition, if done detach it from the monitor.
func (m *Monitor) Schedule() { func (m *Monitor) Schedule() {
for l := range m.done { for l := range m.done {
delete(m.streams, l.url) delete(m.streams, l.url)
@@ -31,7 +35,7 @@ func (m *Monitor) Schedule() {
} }
func (m *Monitor) Add(url string) { func (m *Monitor) Add(url string) {
ls := New(url, m.logs, m.done) ls := New(url, m.done, m.mq, m.db)
go ls.Start() go ls.Start()
m.streams[url] = ls m.streams[url] = ls
@@ -111,8 +115,3 @@ func (m *Monitor) Restore() error {
return nil return nil
} }
// Return a fan-in logs channel
func (m *Monitor) Logs() <-chan []byte {
return m.logs
}

View File

@@ -63,8 +63,12 @@ func (m *MessageQueue) downloadConsumer() {
) )
if p.Progress.Status != StatusCompleted { if p.Progress.Status != StatusCompleted {
if p.Livestream {
go p.Start() // livestreams have higher priorty and will ignore the queue
} else {
p.Start() p.Start()
} }
}
slog.Info("started process", slog.Info("started process",
slog.String("bus", queueName), slog.String("bus", queueName),

View File

@@ -36,12 +36,14 @@ const (
StatusDownloading StatusDownloading
StatusCompleted StatusCompleted
StatusErrored StatusErrored
StatusLivestream
) )
// Process descriptor // Process descriptor
type Process struct { type Process struct {
Id string Id string
Url string Url string
Livestream bool
Params []string Params []string
Info DownloadInfo Info DownloadInfo
Progress DownloadProgress Progress DownloadProgress
@@ -166,6 +168,10 @@ func (p *Process) Start() {
ETA: progress.Eta, ETA: progress.Eta,
} }
if p.Livestream {
p.Progress.Status = StatusLivestream
}
slog.Info("progress", slog.Info("progress",
slog.String("id", p.getShortId()), slog.String("id", p.getShortId()),
slog.String("url", p.Url), slog.String("url", p.Url),

View File

@@ -140,7 +140,7 @@ func RunBlocking(cfg *RunConfig) {
} }
func newServer(c serverConfig) *http.Server { func newServer(c serverConfig) *http.Server {
lm := livestream.NewMonitor() lm := livestream.NewMonitor(c.mq, c.mdb)
go lm.Schedule() go lm.Schedule()
go lm.Restore() go lm.Restore()