refactoring-1

introduced pipelines and abstracted download process.go in Downloader interface
This commit is contained in:
2025-08-30 10:18:41 +02:00
parent 9ca7bb9377
commit 4c35b0b41f
36 changed files with 1067 additions and 706 deletions

View File

@@ -1,57 +0,0 @@
package internal
import (
"container/heap"
"log/slog"
)
type LoadBalancer struct {
pool Pool
done chan *Worker
}
func NewLoadBalancer(numWorker int) *LoadBalancer {
var pool Pool
doneChan := make(chan *Worker)
for i := range numWorker {
w := &Worker{
requests: make(chan *Process, 1),
index: i,
}
go w.Work(doneChan)
pool = append(pool, w)
slog.Info("spawned worker", slog.Int("index", i))
}
return &LoadBalancer{
pool: pool,
done: doneChan,
}
}
func (b *LoadBalancer) Balance(work chan *Process) {
for {
select {
case req := <-work:
b.dispatch(req)
case w := <-b.done:
b.completed(w)
}
}
}
func (b *LoadBalancer) dispatch(req *Process) {
w := heap.Pop(&b.pool).(*Worker)
w.requests <- req
w.pending++
heap.Push(&b.pool, w)
}
func (b *LoadBalancer) completed(w *Worker) {
w.pending--
heap.Remove(&b.pool, w.index)
heap.Push(&b.pool, w)
}

View File

@@ -33,18 +33,19 @@ type DownloadProgress struct {
// struct representing the response sent to the client
// as JSON-RPC result field
type ProcessResponse struct {
Id string `json:"id"`
Progress DownloadProgress `json:"progress"`
Info common.DownloadInfo `json:"info"`
Output DownloadOutput `json:"output"`
Params []string `json:"params"`
type ProcessSnapshot struct {
Id string `json:"id"`
Progress DownloadProgress `json:"progress"`
Info common.DownloadMetadata `json:"info"`
Output DownloadOutput `json:"output"`
Params []string `json:"params"`
DownloaderName string `json:"downloader_name"`
}
// struct representing the current status of the memoryDB
// used for serializaton/persistence reasons
type Session struct {
Processes []ProcessResponse `json:"processes"`
Snapshots []ProcessSnapshot `json:"processes"`
}
// struct representing the intent to stop a specific process
@@ -72,3 +73,11 @@ type CustomTemplate struct {
Name string `json:"name"`
Content string `json:"content"`
}
const (
StatusPending = iota
StatusDownloading
StatusCompleted
StatusErrored
StatusLiveStream
)

View File

@@ -0,0 +1,42 @@
package downloaders
import (
"log/slog"
"sync"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
)
type DownloaderBase struct {
Id string
URL string
Metadata common.DownloadMetadata
Pending bool
Completed bool
mutex sync.Mutex
}
func (d *DownloaderBase) FetchMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) {
d.mutex.Lock()
defer d.mutex.Unlock()
meta, err := fetcher(d.URL)
if err != nil {
slog.Error("failed to retrieve metadata", slog.Any("err", err))
return
}
d.Metadata = *meta
}
func (d *DownloaderBase) SetPending(p bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.Pending = p
}
func (d *DownloaderBase) Complete() {
d.mutex.Lock()
defer d.mutex.Unlock()
d.Completed = true
}

View File

@@ -0,0 +1,26 @@
package downloaders
import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
type Downloader interface {
Start() error
Stop() error
Status() *internal.ProcessSnapshot
SetOutput(output internal.DownloadOutput)
SetProgress(progress internal.DownloadProgress)
SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error))
SetPending(p bool)
IsCompleted() bool
UpdateSavedFilePath(path string)
RestoreFromSnapshot(*internal.ProcessSnapshot) error
GetId() string
GetUrl() string
}

View File

@@ -0,0 +1,211 @@
package downloaders
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/exec"
"slices"
"strings"
"syscall"
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
const downloadTemplate = `download:
{
"eta":%(progress.eta)s,
"percentage":"%(progress._percent_str)s",
"speed":%(progress.speed)s
}`
// filename not returning the correct extension after postprocess
const postprocessTemplate = `postprocess:
{
"filepath":"%(info.filepath)s"
}
`
type GenericDownloader struct {
Params []string
AutoRemove bool
progress internal.DownloadProgress
output internal.DownloadOutput
proc *os.Process
logConsumer LogConsumer
// embedded
DownloaderBase
}
func NewGenericDownload(url string, params []string) Downloader {
g := &GenericDownloader{
logConsumer: NewJSONLogConsumer(),
}
// in base
g.Id = uuid.NewString()
g.URL = url
return g
}
func (g *GenericDownloader) Start() error {
g.SetPending(true)
g.Params = argsSanitizer(g.Params)
out := internal.DownloadOutput{
Path: config.Instance().DownloadPath,
Filename: "%(title)s.%(ext)s",
}
if g.output.Path != "" {
out.Path = g.output.Path
}
if g.output.Filename != "" {
out.Filename = g.output.Filename
}
buildFilename(&g.output)
templateReplacer := strings.NewReplacer("\n", "", "\t", "", " ", "")
baseParams := []string{
strings.Split(g.URL, "?list")[0], //no playlist
"--newline",
"--no-colors",
"--no-playlist",
"--progress-template",
templateReplacer.Replace(downloadTemplate),
"--progress-template",
templateReplacer.Replace(postprocessTemplate),
"--no-exec",
}
// if user asked to manually override the output path...
if !(slices.Contains(g.Params, "-P") || slices.Contains(g.Params, "--paths")) {
g.Params = append(g.Params, "-o")
g.Params = append(g.Params, fmt.Sprintf("%s/%s", out.Path, out.Filename))
}
params := append(baseParams, g.Params...)
slog.Info("requesting download", slog.String("url", g.URL), slog.Any("params", params))
cmd := exec.Command(config.Instance().DownloaderPath, params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("failed to get a stdout pipe", slog.Any("err", err))
panic(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to get a stderr pipe", slog.Any("err", err))
panic(err)
}
if err := cmd.Start(); err != nil {
slog.Error("failed to start yt-dlp process", slog.Any("err", err))
panic(err)
}
g.proc = cmd.Process
ctx, cancel := context.WithCancel(context.Background())
defer func() {
stdout.Close()
g.Complete()
cancel()
}()
logs := make(chan []byte)
go produceLogs(stdout, logs)
go consumeLogs(ctx, logs, g.logConsumer, g)
go printYtDlpErrors(stderr, g.Id, g.URL)
g.SetPending(false)
return cmd.Wait()
}
func (g *GenericDownloader) Stop() error {
defer func() {
g.progress.Status = internal.StatusCompleted
g.Complete()
}()
// yt-dlp uses multiple child process the parent process
// has been spawned with setPgid = true. To properly kill
// all subprocesses a SIGTERM need to be sent to the correct
// process group
if g.proc == nil {
return errors.New("*os.Process not set")
}
pgid, err := syscall.Getpgid(g.proc.Pid)
if err != nil {
return err
}
if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil {
return err
}
return nil
}
func (g *GenericDownloader) Status() *internal.ProcessSnapshot {
return &internal.ProcessSnapshot{
Id: g.Id,
Info: g.Metadata,
Progress: g.progress,
Output: g.output,
Params: g.Params,
DownloaderName: "generic",
}
}
func (g *GenericDownloader) UpdateSavedFilePath(p string) { g.output.SavedFilePath = p }
func (g *GenericDownloader) SetOutput(o internal.DownloadOutput) { g.output = o }
func (g *GenericDownloader) SetProgress(p internal.DownloadProgress) { g.progress = p }
func (g *GenericDownloader) SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) {
g.FetchMetadata(fetcher)
}
func (g *GenericDownloader) SetPending(p bool) {
g.Pending = p
}
func (g *GenericDownloader) GetId() string { return g.Id }
func (g *GenericDownloader) GetUrl() string { return g.URL }
func (g *GenericDownloader) RestoreFromSnapshot(snap *internal.ProcessSnapshot) error {
if snap == nil {
return errors.New("cannot restore nil snapshot")
}
s := *snap
g.Id = s.Id
g.URL = s.Info.URL
g.Metadata = s.Info
g.progress = s.Progress
g.output = s.Output
g.Params = s.Params
return nil
}
func (g *GenericDownloader) IsCompleted() bool { return g.Completed }

View File

@@ -0,0 +1,205 @@
package downloaders
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"path/filepath"
"slices"
"syscall"
"time"
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes"
)
type LiveStreamDownloader struct {
progress internal.DownloadProgress
proc *os.Process
logConsumer LogConsumer
pipes []pipes.Pipe
// embedded
DownloaderBase
}
func NewLiveStreamDownloader(url string, pipes []pipes.Pipe) Downloader {
l := &LiveStreamDownloader{
logConsumer: NewFFMpegLogConsumer(),
pipes: pipes,
}
// in base
l.Id = uuid.NewString()
l.URL = url
return l
}
func (l *LiveStreamDownloader) Start() error {
l.SetPending(true)
baseParams := []string{
l.URL,
"--newline",
"--no-colors",
"--no-playlist",
"--no-exec",
}
params := append(baseParams, "-o", "-")
cmd := exec.Command(config.Instance().DownloaderPath, params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// stdout = media stream
media, err := cmd.StdoutPipe()
if err != nil {
slog.Error("failed to get media stdout", slog.Any("err", err))
panic(err)
}
// stderr = log/progress
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to get stderr pipe", slog.Any("err", err))
panic(err)
}
if err := cmd.Start(); err != nil {
slog.Error("failed to start yt-dlp process", slog.Any("err", err))
panic(err)
}
l.proc = cmd.Process
ctx, cancel := context.WithCancel(context.Background())
defer func() {
l.Complete()
cancel()
}()
// --- costruisci pipeline ---
reader := io.Reader(media)
for _, pipe := range l.pipes {
nr, err := pipe.Connect(reader)
if err != nil {
slog.Error("pipe failed", slog.String("pipe", pipe.Name()), slog.Any("err", err))
return err
}
reader = nr
}
// --- fallback: se nessun FileWriter, scrivi su file ---
if !l.hasFileWriter() {
go func() {
filepath.Join(
config.Instance().DownloadPath,
fmt.Sprintf("%s (live) %s.mp4", l.Id, time.Now().Format(time.ANSIC)),
)
defaultPath := filepath.Join(config.Instance().DownloadPath)
f, err := os.Create(defaultPath)
if err != nil {
slog.Error("failed to create fallback file", slog.Any("err", err))
return
}
defer f.Close()
_, err = io.Copy(f, reader)
if err != nil {
slog.Error("copy error", slog.Any("err", err))
}
slog.Info("download saved", slog.String("path", defaultPath))
}()
}
// --- logs consumer ---
logs := make(chan []byte)
go produceLogs(stderr, logs)
go consumeLogs(ctx, logs, l.logConsumer, l)
l.progress.Status = internal.StatusLiveStream
return cmd.Wait()
}
func (l *LiveStreamDownloader) Stop() error {
defer func() {
l.progress.Status = internal.StatusCompleted
l.Complete()
}()
// yt-dlp uses multiple child process the parent process
// has been spawned with setPgid = true. To properly kill
// all subprocesses a SIGTERM need to be sent to the correct
// process group
if l.proc == nil {
return errors.New("*os.Process not set")
}
pgid, err := syscall.Getpgid(l.proc.Pid)
if err != nil {
return err
}
if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil {
return err
}
return nil
}
func (l *LiveStreamDownloader) Status() *internal.ProcessSnapshot {
return &internal.ProcessSnapshot{
Id: l.Id,
Info: l.Metadata,
Progress: l.progress,
DownloaderName: "livestream",
}
}
func (l *LiveStreamDownloader) UpdateSavedFilePath(p string) {}
func (l *LiveStreamDownloader) SetOutput(o internal.DownloadOutput) {}
func (l *LiveStreamDownloader) SetProgress(p internal.DownloadProgress) { l.progress = p }
func (l *LiveStreamDownloader) SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) {
l.FetchMetadata(fetcher)
}
func (l *LiveStreamDownloader) SetPending(p bool) {
l.Pending = p
}
func (l *LiveStreamDownloader) GetId() string { return l.Id }
func (l *LiveStreamDownloader) GetUrl() string { return l.URL }
func (l *LiveStreamDownloader) RestoreFromSnapshot(snap *internal.ProcessSnapshot) error {
if snap == nil {
return errors.New("cannot restore nil snapshot")
}
s := *snap
l.Id = s.Id
l.URL = s.Info.URL
l.Metadata = s.Info
l.progress = s.Progress
return nil
}
func (l *LiveStreamDownloader) IsCompleted() bool { return l.Completed }
func (l *LiveStreamDownloader) hasFileWriter() bool {
return slices.ContainsFunc(l.pipes, func(p pipes.Pipe) bool {
return p.Name() == "file-writer"
})
}

View File

@@ -0,0 +1,68 @@
package downloaders
import (
"encoding/json"
"log/slog"
"strings"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
type LogConsumer interface {
GetName() string
ParseLogEntry(entry []byte, downloader Downloader)
}
type JSONLogConsumer struct{}
func NewJSONLogConsumer() LogConsumer {
return &JSONLogConsumer{}
}
func (j *JSONLogConsumer) GetName() string { return "json-log-consumer" }
func (j *JSONLogConsumer) ParseLogEntry(entry []byte, d Downloader) {
var progress internal.ProgressTemplate
var postprocess internal.PostprocessTemplate
if err := json.Unmarshal(entry, &progress); err == nil {
d.SetProgress(internal.DownloadProgress{
Status: internal.StatusDownloading,
Percentage: progress.Percentage,
Speed: progress.Speed,
ETA: progress.Eta,
})
slog.Info("progress",
slog.String("id", j.GetShortId(d.GetId())),
slog.String("url", d.GetUrl()),
slog.String("percentage", progress.Percentage),
)
}
if err := json.Unmarshal(entry, &postprocess); err == nil {
d.UpdateSavedFilePath(postprocess.FilePath)
}
}
func (j *JSONLogConsumer) GetShortId(id string) string {
return strings.Split(id, "-")[0]
}
//TODO: split in different files
type FFMpegLogConsumer struct{}
func NewFFMpegLogConsumer() LogConsumer {
return &JSONLogConsumer{}
}
func (f *FFMpegLogConsumer) GetName() string { return "ffmpeg-log-consumer" }
func (f *FFMpegLogConsumer) ParseLogEntry(entry []byte, d Downloader) {
slog.Info("ffmpeg output",
slog.String("id", d.GetId()),
slog.String("url", d.GetUrl()),
slog.String("output", string(entry)),
)
}

View File

@@ -0,0 +1 @@
package downloaders

View File

@@ -0,0 +1,76 @@
package downloaders
import (
"bufio"
"context"
"io"
"log/slog"
"regexp"
"slices"
"strings"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
func argsSanitizer(params []string) []string {
params = slices.DeleteFunc(params, func(e string) bool {
match, _ := regexp.MatchString(`(\$\{)|(\&\&)`, e)
return match
})
params = slices.DeleteFunc(params, func(e string) bool {
return e == ""
})
return params
}
func buildFilename(o *internal.DownloadOutput) {
if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") {
o.Filename += ".%(ext)s"
}
o.Filename = strings.Replace(
o.Filename,
".%(ext)s.%(ext)s",
".%(ext)s",
1,
)
}
func produceLogs(r io.Reader, logs chan<- []byte) {
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
logs <- scanner.Bytes()
}
}()
}
func consumeLogs(ctx context.Context, logs <-chan []byte, c LogConsumer, d Downloader) {
for {
select {
case <-ctx.Done():
slog.Info("detaching logs",
slog.String("url", d.GetUrl()),
slog.String("id", c.GetName()),
)
return
case entry := <-logs:
c.ParseLogEntry(entry, d)
}
}
}
func printYtDlpErrors(stdout io.Reader, shortId, url string) {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
slog.Error("yt-dlp process error",
slog.String("id", shortId),
slog.String("url", url),
slog.String("err", scanner.Text()),
)
}
}

View File

@@ -1,4 +1,4 @@
package internal
package kv
import (
"encoding/gob"
@@ -6,28 +6,31 @@ import (
"log/slog"
"os"
"path/filepath"
"runtime"
"sync"
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
)
var memDbEvents = make(chan *Process)
var memDbEvents = make(chan downloaders.Downloader, runtime.NumCPU())
// In-Memory Thread-Safe Key-Value Storage with optional persistence
type MemoryDB struct {
table map[string]*Process
type Store struct {
table map[string]downloaders.Downloader
mu sync.RWMutex
}
func NewMemoryDB() *MemoryDB {
return &MemoryDB{
table: make(map[string]*Process),
func NewStore() *Store {
return &Store{
table: make(map[string]downloaders.Downloader),
}
}
// Get a process pointer given its id
func (m *MemoryDB) Get(id string) (*Process, error) {
func (m *Store) Get(id string) (downloaders.Downloader, error) {
m.mu.RLock()
defer m.mu.RUnlock()
@@ -40,25 +43,22 @@ func (m *MemoryDB) Get(id string) (*Process, error) {
}
// Store a pointer of a process and return its id
func (m *MemoryDB) Set(process *Process) string {
id := uuid.NewString()
func (m *Store) Set(d downloaders.Downloader) string {
m.mu.Lock()
process.Id = id
m.table[id] = process
m.table[d.GetId()] = d
m.mu.Unlock()
return id
return d.GetId()
}
// Removes a process progress, given the process id
func (m *MemoryDB) Delete(id string) {
func (m *Store) Delete(id string) {
m.mu.Lock()
delete(m.table, id)
m.mu.Unlock()
}
func (m *MemoryDB) Keys() *[]string {
func (m *Store) Keys() *[]string {
var running []string
m.mu.RLock()
@@ -72,18 +72,12 @@ func (m *MemoryDB) Keys() *[]string {
}
// Returns a slice of all currently stored processes progess
func (m *MemoryDB) All() *[]ProcessResponse {
running := []ProcessResponse{}
func (m *Store) All() *[]internal.ProcessSnapshot {
running := []internal.ProcessSnapshot{}
m.mu.RLock()
for k, v := range m.table {
running = append(running, ProcessResponse{
Id: k,
Info: v.Info,
Progress: v.Progress,
Output: v.Output,
Params: v.Params,
})
for _, v := range m.table {
running = append(running, *(v.Status()))
}
m.mu.RUnlock()
@@ -91,7 +85,7 @@ func (m *MemoryDB) All() *[]ProcessResponse {
}
// Persist the database in a single file named "session.dat"
func (m *MemoryDB) Persist() error {
func (m *Store) Persist() error {
running := m.All()
sf := filepath.Join(config.Instance().SessionFilePath, "session.dat")
@@ -113,7 +107,7 @@ func (m *MemoryDB) Persist() error {
}
// Restore a persisted state
func (m *MemoryDB) Restore(mq *MessageQueue) {
func (m *Store) Restore(mq *queue.MessageQueue) {
sf := filepath.Join(config.Instance().SessionFilePath, "session.dat")
fd, err := os.Open(sf)
@@ -130,29 +124,31 @@ func (m *MemoryDB) Restore(mq *MessageQueue) {
m.mu.Lock()
defer m.mu.Unlock()
for _, proc := range session.Processes {
restored := &Process{
Id: proc.Id,
Url: proc.Info.URL,
Info: proc.Info,
Progress: proc.Progress,
Output: proc.Output,
Params: proc.Params,
}
for _, snap := range session.Processes {
var restored downloaders.Downloader
m.table[proc.Id] = restored
if snap.DownloaderName == "generic" {
d := downloaders.NewGenericDownload("", []string{})
err := d.RestoreFromSnapshot(&snap)
if err != nil {
continue
}
restored = d
if restored.Progress.Status != StatusCompleted {
mq.Publish(restored)
m.table[snap.Id] = restored
if !restored.(*downloaders.GenericDownloader).DownloaderBase.Completed {
mq.Publish(restored)
}
}
}
}
func (m *MemoryDB) EventListener() {
func (m *Store) EventListener() {
for p := range memDbEvents {
if p.AutoRemove {
slog.Info("compacting MemoryDB", slog.String("id", p.Id))
m.Delete(p.Id)
if p.Status().DownloaderName == "livestream" {
slog.Info("compacting Store", slog.String("id", p.GetId()))
m.Delete(p.GetId())
}
}
}

View File

@@ -0,0 +1,9 @@
package kv
import "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
// struct representing the current status of the memoryDB
// used for serializaton/persistence reasons
type Session struct {
Processes []internal.ProcessSnapshot `json:"processes"`
}

View File

@@ -11,7 +11,10 @@ import (
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
)
const (
@@ -32,11 +35,11 @@ type LiveStream struct {
waitTime time.Duration
liveDate time.Time
mq *internal.MessageQueue
db *internal.MemoryDB
mq *queue.MessageQueue
db *kv.Store
}
func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *internal.MemoryDB) *LiveStream {
func New(url string, done chan *LiveStream, mq *queue.MessageQueue, db *kv.Store) *LiveStream {
return &LiveStream{
url: url,
done: done,
@@ -87,13 +90,12 @@ func (l *LiveStream) Start() error {
l.done <- l
// Send the started livestream to the message queue! :D
p := &internal.Process{
Url: l.url,
Livestream: true,
Params: []string{"--downloader", "ffmpeg", "--no-part"},
}
l.db.Set(p)
l.mq.Publish(p)
//TODO: add pipes
d := downloaders.NewLiveStreamDownloader(l.url, []pipes.Pipe{})
l.db.Set(d)
l.mq.Publish(d)
return nil
}

View File

@@ -5,7 +5,8 @@ import (
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
)
func setupTest() {
@@ -19,7 +20,7 @@ func TestLivestream(t *testing.T) {
done := make(chan *LiveStream)
ls := New(URL, done, &internal.MessageQueue{}, &internal.MemoryDB{})
ls := New(URL, done, &queue.MessageQueue{}, &kv.Store{})
go ls.Start()
time.AfterFunc(time.Second*20, func() {

View File

@@ -8,17 +8,18 @@ import (
"path/filepath"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
)
type Monitor struct {
db *internal.MemoryDB // where the just started livestream will be published
mq *internal.MessageQueue // where the just started livestream will be published
db *kv.Store // where the just started livestream will be published
mq *queue.MessageQueue // where the just started livestream will be published
streams map[string]*LiveStream // keeps track of the livestreams
done chan *LiveStream // to signal individual processes completition
}
func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor {
func NewMonitor(mq *queue.MessageQueue, db *kv.Store) *Monitor {
return &Monitor{
mq: mq,
db: db,

View File

@@ -0,0 +1,57 @@
package metadata
import (
"bytes"
"encoding/json"
"errors"
"io"
"log/slog"
"os/exec"
"syscall"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
func DefaultFetcher(url string) (*common.DownloadMetadata, error) {
cmd := exec.Command(config.Instance().DownloaderPath, url, "-J")
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
meta := common.DownloadMetadata{
URL: url,
CreatedAt: time.Now(),
}
if err := cmd.Start(); err != nil {
return nil, err
}
var bufferedStderr bytes.Buffer
go func() {
io.Copy(&bufferedStderr, stderr)
}()
slog.Info("retrieving metadata", slog.String("url", url))
if err := json.NewDecoder(stdout).Decode(&meta); err != nil {
return nil, err
}
if err := cmd.Wait(); err != nil {
return nil, errors.New(bufferedStderr.String())
}
return &meta, nil
}

View File

@@ -0,0 +1,45 @@
package pipes
import (
"io"
"log/slog"
"os"
)
type FileWriter struct {
Path string
IsFinal bool
}
func (f *FileWriter) Name() string { return "file-writer" }
func (f *FileWriter) Connect(r io.Reader) (io.Reader, error) {
file, err := os.Create(f.Path)
if err != nil {
return nil, err
}
if f.IsFinal {
go func() {
defer file.Close()
if _, err := io.Copy(file, r); err != nil {
slog.Error("FileWriter (final) error", slog.Any("err", err))
}
}()
return r, nil
}
pr, pw := io.Pipe()
go func() {
defer file.Close()
defer pw.Close()
writer := io.MultiWriter(file, pw)
if _, err := io.Copy(writer, r); err != nil {
slog.Error("FileWriter (pipeline) error", slog.Any("err", err))
}
}()
return pr, nil
}

View File

@@ -0,0 +1,66 @@
package pipes
import (
"bufio"
"errors"
"io"
"log/slog"
"os/exec"
"strings"
)
type Transcoder struct {
Args []string
}
func (t *Transcoder) Name() string { return "ffmpeg-transcoder" }
func (t *Transcoder) Connect(r io.Reader) (io.Reader, error) {
cmd := exec.Command("ffmpeg",
append([]string{"-i", "pipe:0"}, append(t.Args, "-f", "webm", "pipe:1")...)...,
)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
go func() {
reader := bufio.NewReader(stderr)
var line string
for {
part, err := reader.ReadString('\r')
line += part
if err != nil {
break
}
line = strings.TrimRight(line, "\r\n")
slog.Info("ffmpeg transcoder", slog.String("log", line))
line = ""
}
}()
go func() {
defer stdin.Close()
_, err := io.Copy(stdin, r)
if err != nil && !errors.Is(err, io.EOF) {
slog.Error("transcoder stdin error", slog.Any("err", err))
}
}()
if err := cmd.Start(); err != nil {
return nil, err
}
return stdout, nil
}

View File

@@ -0,0 +1,8 @@
package pipes
import "io"
type Pipe interface {
Name() string
Connect(r io.Reader) (io.Reader, error)
}

View File

@@ -1,110 +0,0 @@
package internal
import (
"encoding/json"
"errors"
"log/slog"
"os/exec"
"slices"
"strings"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist"
)
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
params := append(req.Params, "--flat-playlist", "-J")
urlWithParams := append([]string{req.URL}, params...)
var (
downloader = config.Instance().DownloaderPath
cmd = exec.Command(downloader, urlWithParams...)
)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
var m playlist.Metadata
if err := cmd.Start(); err != nil {
return err
}
slog.Info("decoding playlist metadata", slog.String("url", req.URL))
if err := json.NewDecoder(stdout).Decode(&m); err != nil {
return err
}
if err := cmd.Wait(); err != nil {
return err
}
slog.Info("decoded playlist metadata", slog.String("url", req.URL))
if m.Type == "" {
return errors.New("probably not a valid URL")
}
if m.IsPlaylist() {
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool {
return a.URL == b.URL
})
entries = slices.DeleteFunc(entries, func(e common.DownloadInfo) bool {
return strings.Contains(e.URL, "list=")
})
slog.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries)))
if err := playlist.ApplyModifiers(&entries, req.Params); err != nil {
return err
}
for i, meta := range entries {
// detect playlist title from metadata since each playlist entry will be
// treated as an individual download
req.Rename = strings.Replace(
req.Rename,
"%(playlist_title)s",
m.PlaylistTitle,
1,
)
//XXX: it's idiotic but it works: virtually delay the creation time
meta.CreatedAt = time.Now().Add(time.Millisecond * time.Duration(i*10))
proc := &Process{
Url: meta.URL,
Progress: DownloadProgress{},
Output: DownloadOutput{Filename: req.Rename},
Info: meta,
Params: req.Params,
}
proc.Info.URL = meta.URL
db.Set(proc)
mq.Publish(proc)
proc.Info.CreatedAt = meta.CreatedAt
}
return nil
}
proc := &Process{
Url: req.URL,
Params: req.Params,
}
db.Set(proc)
mq.Publish(proc)
slog.Info("sending new process to message queue", slog.String("url", proc.Url))
return cmd.Wait()
}

View File

@@ -1,24 +0,0 @@
package internal
// Pool implements heap.Interface interface as a standard priority queue
type Pool []*Worker
func (h Pool) Len() int { return len(h) }
func (h Pool) Less(i, j int) bool { return h[i].pending < h[j].pending }
func (h Pool) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
func (h *Pool) Pop() any {
old := *h
n := len(old)
x := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
return x
}

View File

@@ -1,386 +0,0 @@
package internal
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"regexp"
"slices"
"syscall"
"os"
"os/exec"
"strings"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archiver"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
const downloadTemplate = `download:
{
"eta":%(progress.eta)s,
"percentage":"%(progress._percent_str)s",
"speed":%(progress.speed)s
}`
// filename not returning the correct extension after postprocess
const postprocessTemplate = `postprocess:
{
"filepath":"%(info.filepath)s"
}
`
const (
StatusPending = iota
StatusDownloading
StatusCompleted
StatusErrored
)
// Process descriptor
type Process struct {
Id string
Url string
Livestream bool
AutoRemove bool
Params []string
Info common.DownloadInfo
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
}
// Starts spawns/forks a new yt-dlp process and parse its stdout.
// The process is spawned to outputting a custom progress text that
// Resembles a JSON Object in order to Unmarshal it later.
// This approach is anyhow not perfect: quotes are not escaped properly.
// Each process is not identified by its PID but by a UUIDv4
func (p *Process) Start() {
// escape bash variable escaping and command piping, you'll never know
// what they might come with...
p.Params = slices.DeleteFunc(p.Params, func(e string) bool {
match, _ := regexp.MatchString(`(\$\{)|(\&\&)`, e)
return match
})
p.Params = slices.DeleteFunc(p.Params, func(e string) bool {
return e == ""
})
out := DownloadOutput{
Path: config.Instance().DownloadPath,
Filename: "%(title)s.%(ext)s",
}
if p.Output.Path != "" {
out.Path = p.Output.Path
}
if p.Output.Filename != "" {
out.Filename = p.Output.Filename
}
buildFilename(&p.Output)
templateReplacer := strings.NewReplacer("\n", "", "\t", "", " ", "")
baseParams := []string{
strings.Split(p.Url, "?list")[0], //no playlist
"--newline",
"--no-colors",
"--no-playlist",
"--progress-template",
templateReplacer.Replace(downloadTemplate),
"--progress-template",
templateReplacer.Replace(postprocessTemplate),
"--no-exec",
}
// if user asked to manually override the output path...
if !(slices.Contains(p.Params, "-P") || slices.Contains(p.Params, "--paths")) {
p.Params = append(p.Params, "-o")
p.Params = append(p.Params, fmt.Sprintf("%s/%s", out.Path, out.Filename))
}
params := append(baseParams, p.Params...)
slog.Info("requesting download", slog.String("url", p.Url), slog.Any("params", params))
cmd := exec.Command(config.Instance().DownloaderPath, params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("failed to get a stdout pipe", slog.Any("err", err))
panic(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to get a stderr pipe", slog.Any("err", err))
panic(err)
}
if err := cmd.Start(); err != nil {
slog.Error("failed to start yt-dlp process", slog.Any("err", err))
panic(err)
}
p.proc = cmd.Process
ctx, cancel := context.WithCancel(context.Background())
defer func() {
stdout.Close()
p.Complete()
cancel()
}()
logs := make(chan []byte)
go produceLogs(stdout, logs)
go p.consumeLogs(ctx, logs)
go p.detectYtDlpErrors(stderr)
cmd.Wait()
}
func produceLogs(r io.Reader, logs chan<- []byte) {
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
logs <- scanner.Bytes()
}
}()
}
func (p *Process) consumeLogs(ctx context.Context, logs <-chan []byte) {
for {
select {
case <-ctx.Done():
slog.Info("detaching from yt-dlp stdout",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
return
case entry := <-logs:
p.parseLogEntry(entry)
}
}
}
func (p *Process) parseLogEntry(entry []byte) {
var progress ProgressTemplate
var postprocess PostprocessTemplate
if err := json.Unmarshal(entry, &progress); err == nil {
p.Progress = DownloadProgress{
Status: StatusDownloading,
Percentage: progress.Percentage,
Speed: progress.Speed,
ETA: progress.Eta,
}
slog.Info("progress",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("percentage", progress.Percentage),
)
}
if err := json.Unmarshal(entry, &postprocess); err == nil {
p.Output.SavedFilePath = postprocess.FilePath
// slog.Info("postprocess",
// slog.String("id", p.getShortId()),
// slog.String("url", p.Url),
// slog.String("filepath", postprocess.FilePath),
// )
}
}
func (p *Process) detectYtDlpErrors(r io.Reader) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
slog.Error("yt-dlp process error",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", scanner.Text()),
)
}
}
// Keep process in the memoryDB but marks it as complete
// Convention: All completed processes has progress -1
// and speed 0 bps.
func (p *Process) Complete() {
// auto archive
// TODO: it's not that deterministic :/
if p.Progress.Percentage == "" && p.Progress.Speed == 0 {
var serializedMetadata bytes.Buffer
json.NewEncoder(&serializedMetadata).Encode(p.Info)
archiver.Publish(&archiver.Message{
Id: p.Id,
Path: p.Output.SavedFilePath,
Title: p.Info.Title,
Thumbnail: p.Info.Thumbnail,
Source: p.Url,
Metadata: serializedMetadata.String(),
CreatedAt: p.Info.CreatedAt,
})
}
p.Progress = DownloadProgress{
Status: StatusCompleted,
Percentage: "-1",
Speed: 0,
ETA: 0,
}
// for safety, if the filename is not set, set it with original function
if p.Output.SavedFilePath == "" {
p.GetFileName(&p.Output)
}
slog.Info("finished",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
memDbEvents <- p
}
// Kill a process and remove it from the memory
func (p *Process) Kill() error {
defer func() {
p.Progress.Status = StatusCompleted
}()
// yt-dlp uses multiple child process the parent process
// has been spawned with setPgid = true. To properly kill
// all subprocesses a SIGTERM need to be sent to the correct
// process group
if p.proc == nil {
return errors.New("*os.Process not set")
}
pgid, err := syscall.Getpgid(p.proc.Pid)
if err != nil {
return err
}
if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil {
return err
}
return nil
}
func (p *Process) GetFileName(o *DownloadOutput) error {
cmd := exec.Command(
config.Instance().DownloaderPath,
"--print", "filename",
"-o", fmt.Sprintf("%s/%s", o.Path, o.Filename),
p.Url,
)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
out, err := cmd.Output()
if err != nil {
return err
}
p.Output.SavedFilePath = strings.Trim(string(out), "\n")
return nil
}
func (p *Process) SetPending() {
// Since video's title isn't available yet, fill in with the URL.
p.Info = common.DownloadInfo{
URL: p.Url,
Title: p.Url,
CreatedAt: time.Now(),
}
p.Progress.Status = StatusPending
}
func (p *Process) SetMetadata() error {
cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J")
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("failed to connect to stdout",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
)
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to connect to stderr",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
)
return err
}
info := common.DownloadInfo{
URL: p.Url,
CreatedAt: time.Now(),
}
if err := cmd.Start(); err != nil {
return err
}
var bufferedStderr bytes.Buffer
go func() {
io.Copy(&bufferedStderr, stderr)
}()
slog.Info("retrieving metadata",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
if err := json.NewDecoder(stdout).Decode(&info); err != nil {
return err
}
p.Info = info
p.Progress.Status = StatusPending
if err := cmd.Wait(); err != nil {
return errors.New(bufferedStderr.String())
}
return nil
}
func (p *Process) getShortId() string { return strings.Split(p.Id, "-")[0] }
func buildFilename(o *DownloadOutput) {
if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") {
o.Filename += ".%(ext)s"
}
o.Filename = strings.Replace(
o.Filename,
".%(ext)s.%(ext)s",
".%(ext)s",
1,
)
}

View File

@@ -1,4 +1,4 @@
package internal
package queue
import (
"context"
@@ -7,6 +7,8 @@ import (
evbus "github.com/asaskevich/EventBus"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/metadata"
"golang.org/x/sync/semaphore"
)
@@ -35,9 +37,9 @@ func NewMessageQueue() (*MessageQueue, error) {
}
// Publish a message to the queue and set the task to a peding state.
func (m *MessageQueue) Publish(p *Process) {
func (m *MessageQueue) Publish(p downloaders.Downloader) {
// needs to have an id set before
p.SetPending()
p.SetPending(true)
m.eventBus.Publish(queueName, p)
}
@@ -52,27 +54,22 @@ func (m *MessageQueue) SetupConsumers() {
func (m *MessageQueue) downloadConsumer() {
sem := semaphore.NewWeighted(int64(m.concurrency))
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
m.eventBus.SubscribeAsync(queueName, func(p downloaders.Downloader) {
sem.Acquire(context.Background(), 1)
defer sem.Release(1)
slog.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "downloadConsumer"),
slog.String("id", p.getShortId()),
slog.String("id", p.GetId()),
)
if p.Progress.Status != StatusCompleted {
if !p.IsCompleted() {
slog.Info("started process",
slog.String("bus", queueName),
slog.String("id", p.getShortId()),
slog.String("id", p.GetId()),
)
if p.Livestream {
// livestreams have higher priorty and they ignore the semaphore
go p.Start()
} else {
p.Start()
}
p.Start()
}
}, false)
}
@@ -84,29 +81,25 @@ func (m *MessageQueue) metadataSubscriber() {
// Since there's ongoing downloads, 1 job at time seems a good compromise
sem := semaphore.NewWeighted(1)
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
m.eventBus.SubscribeAsync(queueName, func(p downloaders.Downloader) {
sem.Acquire(context.Background(), 1)
defer sem.Release(1)
slog.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "metadataConsumer"),
slog.String("id", p.getShortId()),
slog.String("id", p.GetId()),
)
if p.Progress.Status == StatusCompleted {
if p.IsCompleted() {
slog.Warn("proccess has an illegal state",
slog.String("id", p.getShortId()),
slog.Int("status", p.Progress.Status),
slog.String("id", p.GetId()),
slog.String("status", "completed"),
)
return
}
if err := p.SetMetadata(); err != nil {
slog.Error("failed to retrieve metadata",
slog.String("id", p.getShortId()),
slog.String("err", err.Error()),
)
}
p.SetMetadata(metadata.DefaultFetcher)
}, false)
}

View File

@@ -1,15 +0,0 @@
package internal
type Worker struct {
requests chan *Process // downloads to do
pending int // downloads pending
index int // index in the heap
}
func (w *Worker) Work(done chan *Worker) {
for {
req := <-w.requests
req.Start()
done <- w
}
}