From 3205711bb1becb0653a56a97e596f345f22568ad Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Tue, 20 Aug 2024 18:50:42 +0200 Subject: [PATCH] improved livestream waiting --- server/internal/livestream/livestream.go | 103 ++++++++++++++++------- 1 file changed, 73 insertions(+), 30 deletions(-) diff --git a/server/internal/livestream/livestream.go b/server/internal/livestream/livestream.go index 9cbce4c..28472ac 100644 --- a/server/internal/livestream/livestream.go +++ b/server/internal/livestream/livestream.go @@ -4,6 +4,7 @@ import ( "bufio" "errors" "io" + "log/slog" "os" "os/exec" "strconv" @@ -53,6 +54,7 @@ func (l *LiveStream) Start() error { l.url, "--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs "--no-colors", // no ansi color fuzz + "--newline", "--paths", config.Instance().DownloadPath, ) l.proc = cmd.Process @@ -72,7 +74,13 @@ func (l *LiveStream) Start() error { // Start monitoring when the livestream is goin to be live. // If already live do nothing. - go l.monitorStartTime(stdout) + doneWaiting := make(chan struct{}) + go l.monitorStartTime(stdout, doneWaiting) + + go func() { + <-doneWaiting + l.logFFMpeg(stdout) + }() // Wait to the yt-dlp+ffmpeg process to finish. cmd.Wait() @@ -81,14 +89,18 @@ func (l *LiveStream) Start() error { l.status = completed l.done <- l + // cleanup + close(doneWaiting) + return nil } -func (l *LiveStream) monitorStartTime(r io.Reader) error { +func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) { // yt-dlp shows the time in the stdout scanner := bufio.NewScanner(r) defer func() { + doneWait <- struct{}{} close(l.waitTimeChan) close(l.errors) }() @@ -97,7 +109,7 @@ func (l *LiveStream) monitorStartTime(r io.Reader) error { // use a custom split funciton to set the line separator to \r instead of \r\n or \n scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { for i := 0; i < len(data); i++ { - if data[i] == '\r' { + if data[i] == '\r' || data[i] == '\n' { return i + 1, data[:i], nil } } @@ -108,37 +120,60 @@ func (l *LiveStream) monitorStartTime(r io.Reader) error { return 0, data, bufio.ErrFinalToken }) - // start scanning the stdout - for scanner.Scan() { - // l.log <- scanner.Bytes() + waitTimeScanner := func() { + for scanner.Scan() { + // l.log <- scanner.Bytes() - parts := strings.Split(scanner.Text(), ": ") - if len(parts) < 2 { - continue + // if this substring is in the current line the download is starting, + // no need to monitor the time to live. + //TODO: silly + if !strings.Contains(scanner.Text(), "Remaining time until next attempt") { + l.status = inProgress + return + } + + parts := strings.Split(scanner.Text(), ": ") + if len(parts) < 2 { + continue + } + + startsIn := parts[1] + parsed, err := parseTimeSpan(startsIn) + if err != nil { + continue + } + + l.liveDate = parsed + + //TODO: check if using channels is stupid or not + // l.waitTimeChan <- time.Until(start) + l.waitTime = time.Until(parsed) } - - // if this substring is in the current line the download is starting, - // no need to monitor the time to live. - //TODO: silly - if !strings.Contains(scanner.Text(), "Remaining time until next attempt") { - l.status = inProgress - return nil - } - - startsIn := parts[1] - parsed, err := parseTimeSpan(startsIn) - if err != nil { - continue - } - - l.liveDate = parsed - - //TODO: check if using channels is stupid or not - // l.waitTimeChan <- time.Until(start) - l.waitTime = time.Until(parsed) } - return nil + const TRIES = 5 + + /* + if it's waiting a livestream the 5th line will indicate the time to live + its a dumb and not robust method. + + example: + [youtube] Extracting URL: https://www.youtube.com/watch?v=IQVbGfVVjgY + [youtube] IQVbGfVVjgY: Downloading webpage + [youtube] IQVbGfVVjgY: Downloading ios player API JSON + [youtube] IQVbGfVVjgY: Downloading web creator player API JSON + WARNING: [youtube] This live event will begin in 27 minutes. <- STDERR, ignore + [wait] Waiting for 00:27:15 - Press Ctrl+C to try now <- 5th line + */ + + for range TRIES { + scanner.Scan() + line := scanner.Text() + + if strings.Contains(line, "Waiting for") { + waitTimeScanner() + } + } } func (l *LiveStream) WaitTime() <-chan time.Duration { @@ -191,3 +226,11 @@ func parseTimeSpan(timeStr string) (time.Time, error) { return start, nil } + +func (l *LiveStream) logFFMpeg(r io.Reader) { + scanner := bufio.NewScanner(r) + + for scanner.Scan() { + slog.Info("livestream ffmpeg output", slog.String("url", l.url), slog.String("stdout", scanner.Text())) + } +}