Compare commits

...

2 Commits

6 changed files with 46 additions and 58 deletions

View File

@@ -4,7 +4,6 @@ import (
"bufio"
"errors"
"io"
"log/slog"
"os"
"os/exec"
"strconv"
@@ -12,6 +11,7 @@ import (
"time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
)
const (
@@ -27,23 +27,24 @@ type LiveStream struct {
url string
proc *os.Process // used to manually kill the yt-dlp process
status int // whether is monitoring or completed
log chan []byte // keeps tracks of the process logs while monitoring, not when started
done chan *LiveStream // where to signal the completition
waitTimeChan chan time.Duration // time to livestream start
errors chan error
waitTime time.Duration
liveDate time.Time
mq *internal.MessageQueue
db *internal.MemoryDB
}
func New(url string, log chan []byte, done chan *LiveStream) *LiveStream {
func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *internal.MemoryDB) *LiveStream {
return &LiveStream{
url: url,
done: done,
status: waiting,
waitTime: time.Second * 0,
log: log,
errors: make(chan error),
waitTimeChan: make(chan time.Duration),
mq: mq,
db: db,
}
}
@@ -52,8 +53,9 @@ func (l *LiveStream) Start() error {
cmd := exec.Command(
config.Instance().DownloaderPath,
l.url,
"--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs
"--wait-for-video", "30", // wait for the stream to be live and recheck every 10 secs
"--no-colors", // no ansi color fuzz
"--simulate",
"--newline",
"--paths", config.Instance().DownloadPath,
)
@@ -65,13 +67,6 @@ func (l *LiveStream) Start() error {
}
defer stdout.Close()
stderr, err := cmd.StderrPipe()
if err != nil {
l.status = errored
return err
}
defer stderr.Close()
if err := cmd.Start(); err != nil {
l.status = errored
return err
@@ -82,37 +77,30 @@ func (l *LiveStream) Start() error {
// Start monitoring when the livestream is goin to be live.
// If already live do nothing.
doneWaiting := make(chan struct{})
go l.monitorStartTime(stdout, doneWaiting)
go l.monitorStartTime(stdout)
go func() {
<-doneWaiting
l.logFFMpeg(io.MultiReader(stdout, stderr))
}()
// Wait to the yt-dlp+ffmpeg process to finish.
// Wait to the simulated download process to finish.
cmd.Wait()
// Set the job as completed and notify the parent the completion.
l.status = completed
l.done <- l
// cleanup
close(doneWaiting)
// Send the started livestream to the message queue! :D
p := &internal.Process{Url: l.url, Livestream: true}
l.db.Set(p)
l.mq.Publish(p)
return nil
}
func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
func (l *LiveStream) monitorStartTime(r io.Reader) {
// yt-dlp shows the time in the stdout
scanner := bufio.NewScanner(r)
defer func() {
l.status = inProgress
doneWait <- struct{}{}
close(l.waitTimeChan)
close(l.errors)
}()
// however the time to live is not shown in a new line (and atm there's nothing to do about)
@@ -164,9 +152,8 @@ func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
*/
for range TRIES {
scanner.Scan()
line := scanner.Text()
if strings.Contains(line, "Waiting for") {
if strings.Contains(scanner.Text(), "Waiting for") {
waitTimeScanner()
}
}
@@ -222,11 +209,3 @@ func parseTimeSpan(timeStr string) (time.Time, error) {
return start, nil
}
func (l *LiveStream) logFFMpeg(r io.Reader) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
slog.Info("livestream ffmpeg output", slog.String("url", l.url), slog.String("stdout", scanner.Text()))
}
}

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
)
func setupTest() {
@@ -15,9 +16,8 @@ func TestLivestream(t *testing.T) {
setupTest()
done := make(chan *LiveStream)
log := make(chan []byte)
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", log, done)
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", done, &internal.MessageQueue{}, &internal.MemoryDB{})
go ls.Start()
time.AfterFunc(time.Second*20, func() {

View File

@@ -8,22 +8,26 @@ import (
"time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
)
type Monitor struct {
db *internal.MemoryDB // where the just started livestream will be published
mq *internal.MessageQueue // where the just started livestream will be published
streams map[string]*LiveStream // keeps track of the livestreams
done chan *LiveStream // to signal individual processes completition
logs chan []byte // to signal individual processes completition
}
func NewMonitor() *Monitor {
func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor {
return &Monitor{
mq: mq,
db: db,
streams: make(map[string]*LiveStream),
done: make(chan *LiveStream),
}
}
// Detect each livestream completition, if done remove it from the monitor.
// Detect each livestream completition, if done detach it from the monitor.
func (m *Monitor) Schedule() {
for l := range m.done {
delete(m.streams, l.url)
@@ -31,7 +35,7 @@ func (m *Monitor) Schedule() {
}
func (m *Monitor) Add(url string) {
ls := New(url, m.logs, m.done)
ls := New(url, m.done, m.mq, m.db)
go ls.Start()
m.streams[url] = ls
@@ -111,8 +115,3 @@ func (m *Monitor) Restore() error {
return nil
}
// Return a fan-in logs channel
func (m *Monitor) Logs() <-chan []byte {
return m.logs
}

View File

@@ -63,7 +63,11 @@ func (m *MessageQueue) downloadConsumer() {
)
if p.Progress.Status != StatusCompleted {
p.Start()
if p.Livestream {
go p.Start() // livestreams have higher priorty and will ignore the queue
} else {
p.Start()
}
}
slog.Info("started process",

View File

@@ -36,17 +36,19 @@ const (
StatusDownloading
StatusCompleted
StatusErrored
StatusLivestream
)
// Process descriptor
type Process struct {
Id string
Url string
Params []string
Info DownloadInfo
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
Id string
Url string
Livestream bool
Params []string
Info DownloadInfo
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
}
// Starts spawns/forks a new yt-dlp process and parse its stdout.
@@ -166,6 +168,10 @@ func (p *Process) Start() {
ETA: progress.Eta,
}
if p.Livestream {
p.Progress.Status = StatusLivestream
}
slog.Info("progress",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),

View File

@@ -140,7 +140,7 @@ func RunBlocking(cfg *RunConfig) {
}
func newServer(c serverConfig) *http.Server {
lm := livestream.NewMonitor()
lm := livestream.NewMonitor(c.mq, c.mdb)
go lm.Schedule()
go lm.Restore()