code refactoring: cancellation signal for stdout parsers

This commit is contained in:
2024-08-23 11:54:10 +02:00
parent c4075fb640
commit fceb36c723

View File

@@ -2,6 +2,7 @@ package livestream
import ( import (
"bufio" "bufio"
"context"
"errors" "errors"
"io" "io"
"log/slog" "log/slog"
@@ -30,7 +31,6 @@ type LiveStream struct {
log chan []byte // keeps tracks of the process logs while monitoring, not when started log chan []byte // keeps tracks of the process logs while monitoring, not when started
done chan *LiveStream // where to signal the completition done chan *LiveStream // where to signal the completition
waitTimeChan chan time.Duration // time to livestream start waitTimeChan chan time.Duration // time to livestream start
errors chan error
waitTime time.Duration waitTime time.Duration
liveDate time.Time liveDate time.Time
} }
@@ -42,13 +42,15 @@ func New(url string, log chan []byte, done chan *LiveStream) *LiveStream {
status: waiting, status: waiting,
waitTime: time.Second * 0, waitTime: time.Second * 0,
log: log, log: log,
errors: make(chan error),
waitTimeChan: make(chan time.Duration), waitTimeChan: make(chan time.Duration),
} }
} }
// Start the livestream monitoring process, once completion signals on the done channel // Start the livestream monitoring process, once completion signals on the done channel
func (l *LiveStream) Start() error { func (l *LiveStream) Start() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := exec.Command( cmd := exec.Command(
config.Instance().DownloaderPath, config.Instance().DownloaderPath,
l.url, l.url,
@@ -85,9 +87,11 @@ func (l *LiveStream) Start() error {
doneWaiting := make(chan struct{}) doneWaiting := make(chan struct{})
go l.monitorStartTime(stdout, doneWaiting) go l.monitorStartTime(stdout, doneWaiting)
//TODO: FFmpeg cannot be logged since is a subprocess of yt-dlp.
// It also may have implication on how the process is killed.
go func() { go func() {
<-doneWaiting <-doneWaiting
l.logFFMpeg(io.MultiReader(stdout, stderr)) l.logFFMpeg(ctx, io.MultiReader(stdout, stderr))
}() }()
// Wait to the yt-dlp+ffmpeg process to finish. // Wait to the yt-dlp+ffmpeg process to finish.
@@ -112,7 +116,6 @@ func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
doneWait <- struct{}{} doneWait <- struct{}{}
close(l.waitTimeChan) close(l.waitTimeChan)
close(l.errors)
}() }()
// however the time to live is not shown in a new line (and atm there's nothing to do about) // however the time to live is not shown in a new line (and atm there's nothing to do about)
@@ -164,9 +167,8 @@ func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
*/ */
for range TRIES { for range TRIES {
scanner.Scan() scanner.Scan()
line := scanner.Text()
if strings.Contains(line, "Waiting for") { if strings.Contains(scanner.Text(), "Waiting for") {
waitTimeScanner() waitTimeScanner()
} }
} }
@@ -223,10 +225,15 @@ func parseTimeSpan(timeStr string) (time.Time, error) {
return start, nil return start, nil
} }
func (l *LiveStream) logFFMpeg(r io.Reader) { func (l *LiveStream) logFFMpeg(ctx context.Context, r io.Reader) {
scanner := bufio.NewScanner(r) scanner := bufio.NewScanner(r)
select {
case <-ctx.Done():
return
default:
for scanner.Scan() { for scanner.Scan() {
slog.Info("livestream ffmpeg output", slog.String("url", l.url), slog.String("stdout", scanner.Text())) slog.Info("livestream output", slog.String("url", l.url), slog.String("stdout", scanner.Text()))
}
} }
} }