Compare commits
2 Commits
v3.2.0
...
v3.2.0-fix
| Author | SHA1 | Date | |
|---|---|---|---|
| 54771b2d78 | |||
| fceb36c723 |
@@ -4,7 +4,6 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -12,6 +11,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
|
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
|
||||||
|
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -27,23 +27,24 @@ type LiveStream struct {
|
|||||||
url string
|
url string
|
||||||
proc *os.Process // used to manually kill the yt-dlp process
|
proc *os.Process // used to manually kill the yt-dlp process
|
||||||
status int // whether is monitoring or completed
|
status int // whether is monitoring or completed
|
||||||
log chan []byte // keeps tracks of the process logs while monitoring, not when started
|
|
||||||
done chan *LiveStream // where to signal the completition
|
done chan *LiveStream // where to signal the completition
|
||||||
waitTimeChan chan time.Duration // time to livestream start
|
waitTimeChan chan time.Duration // time to livestream start
|
||||||
errors chan error
|
|
||||||
waitTime time.Duration
|
waitTime time.Duration
|
||||||
liveDate time.Time
|
liveDate time.Time
|
||||||
|
|
||||||
|
mq *internal.MessageQueue
|
||||||
|
db *internal.MemoryDB
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(url string, log chan []byte, done chan *LiveStream) *LiveStream {
|
func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *internal.MemoryDB) *LiveStream {
|
||||||
return &LiveStream{
|
return &LiveStream{
|
||||||
url: url,
|
url: url,
|
||||||
done: done,
|
done: done,
|
||||||
status: waiting,
|
status: waiting,
|
||||||
waitTime: time.Second * 0,
|
waitTime: time.Second * 0,
|
||||||
log: log,
|
|
||||||
errors: make(chan error),
|
|
||||||
waitTimeChan: make(chan time.Duration),
|
waitTimeChan: make(chan time.Duration),
|
||||||
|
mq: mq,
|
||||||
|
db: db,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,8 +53,9 @@ func (l *LiveStream) Start() error {
|
|||||||
cmd := exec.Command(
|
cmd := exec.Command(
|
||||||
config.Instance().DownloaderPath,
|
config.Instance().DownloaderPath,
|
||||||
l.url,
|
l.url,
|
||||||
"--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs
|
"--wait-for-video", "30", // wait for the stream to be live and recheck every 10 secs
|
||||||
"--no-colors", // no ansi color fuzz
|
"--no-colors", // no ansi color fuzz
|
||||||
|
"--simulate",
|
||||||
"--newline",
|
"--newline",
|
||||||
"--paths", config.Instance().DownloadPath,
|
"--paths", config.Instance().DownloadPath,
|
||||||
)
|
)
|
||||||
@@ -65,13 +67,6 @@ func (l *LiveStream) Start() error {
|
|||||||
}
|
}
|
||||||
defer stdout.Close()
|
defer stdout.Close()
|
||||||
|
|
||||||
stderr, err := cmd.StderrPipe()
|
|
||||||
if err != nil {
|
|
||||||
l.status = errored
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer stderr.Close()
|
|
||||||
|
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
l.status = errored
|
l.status = errored
|
||||||
return err
|
return err
|
||||||
@@ -82,37 +77,30 @@ func (l *LiveStream) Start() error {
|
|||||||
|
|
||||||
// Start monitoring when the livestream is goin to be live.
|
// Start monitoring when the livestream is goin to be live.
|
||||||
// If already live do nothing.
|
// If already live do nothing.
|
||||||
doneWaiting := make(chan struct{})
|
go l.monitorStartTime(stdout)
|
||||||
go l.monitorStartTime(stdout, doneWaiting)
|
|
||||||
|
|
||||||
go func() {
|
// Wait to the simulated download process to finish.
|
||||||
<-doneWaiting
|
|
||||||
l.logFFMpeg(io.MultiReader(stdout, stderr))
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait to the yt-dlp+ffmpeg process to finish.
|
|
||||||
cmd.Wait()
|
cmd.Wait()
|
||||||
|
|
||||||
// Set the job as completed and notify the parent the completion.
|
// Set the job as completed and notify the parent the completion.
|
||||||
l.status = completed
|
l.status = completed
|
||||||
l.done <- l
|
l.done <- l
|
||||||
|
|
||||||
// cleanup
|
// Send the started livestream to the message queue! :D
|
||||||
close(doneWaiting)
|
p := &internal.Process{Url: l.url, Livestream: true}
|
||||||
|
l.db.Set(p)
|
||||||
|
l.mq.Publish(p)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
|
func (l *LiveStream) monitorStartTime(r io.Reader) {
|
||||||
// yt-dlp shows the time in the stdout
|
// yt-dlp shows the time in the stdout
|
||||||
scanner := bufio.NewScanner(r)
|
scanner := bufio.NewScanner(r)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
l.status = inProgress
|
l.status = inProgress
|
||||||
doneWait <- struct{}{}
|
|
||||||
|
|
||||||
close(l.waitTimeChan)
|
close(l.waitTimeChan)
|
||||||
close(l.errors)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// however the time to live is not shown in a new line (and atm there's nothing to do about)
|
// however the time to live is not shown in a new line (and atm there's nothing to do about)
|
||||||
@@ -164,9 +152,8 @@ func (l *LiveStream) monitorStartTime(r io.Reader, doneWait chan struct{}) {
|
|||||||
*/
|
*/
|
||||||
for range TRIES {
|
for range TRIES {
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
line := scanner.Text()
|
|
||||||
|
|
||||||
if strings.Contains(line, "Waiting for") {
|
if strings.Contains(scanner.Text(), "Waiting for") {
|
||||||
waitTimeScanner()
|
waitTimeScanner()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -222,11 +209,3 @@ func parseTimeSpan(timeStr string) (time.Time, error) {
|
|||||||
|
|
||||||
return start, nil
|
return start, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LiveStream) logFFMpeg(r io.Reader) {
|
|
||||||
scanner := bufio.NewScanner(r)
|
|
||||||
|
|
||||||
for scanner.Scan() {
|
|
||||||
slog.Info("livestream ffmpeg output", slog.String("url", l.url), slog.String("stdout", scanner.Text()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
|
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
|
||||||
|
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupTest() {
|
func setupTest() {
|
||||||
@@ -15,9 +16,8 @@ func TestLivestream(t *testing.T) {
|
|||||||
setupTest()
|
setupTest()
|
||||||
|
|
||||||
done := make(chan *LiveStream)
|
done := make(chan *LiveStream)
|
||||||
log := make(chan []byte)
|
|
||||||
|
|
||||||
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", log, done)
|
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", done, &internal.MessageQueue{}, &internal.MemoryDB{})
|
||||||
go ls.Start()
|
go ls.Start()
|
||||||
|
|
||||||
time.AfterFunc(time.Second*20, func() {
|
time.AfterFunc(time.Second*20, func() {
|
||||||
|
|||||||
@@ -8,22 +8,26 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
|
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
|
||||||
|
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Monitor struct {
|
type Monitor struct {
|
||||||
|
db *internal.MemoryDB // where the just started livestream will be published
|
||||||
|
mq *internal.MessageQueue // where the just started livestream will be published
|
||||||
streams map[string]*LiveStream // keeps track of the livestreams
|
streams map[string]*LiveStream // keeps track of the livestreams
|
||||||
done chan *LiveStream // to signal individual processes completition
|
done chan *LiveStream // to signal individual processes completition
|
||||||
logs chan []byte // to signal individual processes completition
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMonitor() *Monitor {
|
func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor {
|
||||||
return &Monitor{
|
return &Monitor{
|
||||||
|
mq: mq,
|
||||||
|
db: db,
|
||||||
streams: make(map[string]*LiveStream),
|
streams: make(map[string]*LiveStream),
|
||||||
done: make(chan *LiveStream),
|
done: make(chan *LiveStream),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Detect each livestream completition, if done remove it from the monitor.
|
// Detect each livestream completition, if done detach it from the monitor.
|
||||||
func (m *Monitor) Schedule() {
|
func (m *Monitor) Schedule() {
|
||||||
for l := range m.done {
|
for l := range m.done {
|
||||||
delete(m.streams, l.url)
|
delete(m.streams, l.url)
|
||||||
@@ -31,7 +35,7 @@ func (m *Monitor) Schedule() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Monitor) Add(url string) {
|
func (m *Monitor) Add(url string) {
|
||||||
ls := New(url, m.logs, m.done)
|
ls := New(url, m.done, m.mq, m.db)
|
||||||
|
|
||||||
go ls.Start()
|
go ls.Start()
|
||||||
m.streams[url] = ls
|
m.streams[url] = ls
|
||||||
@@ -111,8 +115,3 @@ func (m *Monitor) Restore() error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a fan-in logs channel
|
|
||||||
func (m *Monitor) Logs() <-chan []byte {
|
|
||||||
return m.logs
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -63,7 +63,11 @@ func (m *MessageQueue) downloadConsumer() {
|
|||||||
)
|
)
|
||||||
|
|
||||||
if p.Progress.Status != StatusCompleted {
|
if p.Progress.Status != StatusCompleted {
|
||||||
p.Start()
|
if p.Livestream {
|
||||||
|
go p.Start() // livestreams have higher priorty and will ignore the queue
|
||||||
|
} else {
|
||||||
|
p.Start()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("started process",
|
slog.Info("started process",
|
||||||
|
|||||||
@@ -36,17 +36,19 @@ const (
|
|||||||
StatusDownloading
|
StatusDownloading
|
||||||
StatusCompleted
|
StatusCompleted
|
||||||
StatusErrored
|
StatusErrored
|
||||||
|
StatusLivestream
|
||||||
)
|
)
|
||||||
|
|
||||||
// Process descriptor
|
// Process descriptor
|
||||||
type Process struct {
|
type Process struct {
|
||||||
Id string
|
Id string
|
||||||
Url string
|
Url string
|
||||||
Params []string
|
Livestream bool
|
||||||
Info DownloadInfo
|
Params []string
|
||||||
Progress DownloadProgress
|
Info DownloadInfo
|
||||||
Output DownloadOutput
|
Progress DownloadProgress
|
||||||
proc *os.Process
|
Output DownloadOutput
|
||||||
|
proc *os.Process
|
||||||
}
|
}
|
||||||
|
|
||||||
// Starts spawns/forks a new yt-dlp process and parse its stdout.
|
// Starts spawns/forks a new yt-dlp process and parse its stdout.
|
||||||
@@ -166,6 +168,10 @@ func (p *Process) Start() {
|
|||||||
ETA: progress.Eta,
|
ETA: progress.Eta,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if p.Livestream {
|
||||||
|
p.Progress.Status = StatusLivestream
|
||||||
|
}
|
||||||
|
|
||||||
slog.Info("progress",
|
slog.Info("progress",
|
||||||
slog.String("id", p.getShortId()),
|
slog.String("id", p.getShortId()),
|
||||||
slog.String("url", p.Url),
|
slog.String("url", p.Url),
|
||||||
|
|||||||
@@ -140,7 +140,7 @@ func RunBlocking(cfg *RunConfig) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newServer(c serverConfig) *http.Server {
|
func newServer(c serverConfig) *http.Server {
|
||||||
lm := livestream.NewMonitor()
|
lm := livestream.NewMonitor(c.mq, c.mdb)
|
||||||
go lm.Schedule()
|
go lm.Schedule()
|
||||||
go lm.Restore()
|
go lm.Restore()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user