From 4c35b0b41fbe9a5855b4d359cf56d7284e4cdcff Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Sat, 30 Aug 2025 10:18:41 +0200 Subject: [PATCH] refactoring-1 introduced pipelines and abstracted download process.go in Downloader interface --- server/common/types.go | 2 +- server/filebrowser/handlers.go | 5 +- server/internal/balancer.go | 57 --- server/internal/common_types.go | 23 +- server/internal/downloaders/common.go | 42 ++ server/internal/downloaders/downloader.go | 26 ++ server/internal/downloaders/generic.go | 211 ++++++++++ server/internal/downloaders/livestream.go | 205 ++++++++++ server/internal/downloaders/logging.go | 68 +++ server/internal/downloaders/playlist.go | 1 + server/internal/downloaders/utils.go | 76 ++++ server/internal/{memory_db.go => kv/store.go} | 86 ++-- server/internal/kv/types.go | 9 + server/internal/livestream/livestream.go | 24 +- server/internal/livestream/livestream_test.go | 5 +- server/internal/livestream/monitor.go | 9 +- server/internal/metadata/fetchers.go | 57 +++ server/internal/pipes/FileWriter.go | 45 ++ server/internal/pipes/Transcoder.go | 66 +++ server/internal/pipes/pipe.go | 8 + server/internal/pool.go | 24 -- server/internal/process.go | 386 ------------------ server/internal/{ => queue}/message_queue.go | 41 +- server/internal/worker.go | 15 - server/playlist/modifiers.go | 8 +- server/{internal => playlist}/playlist.go | 46 +-- server/playlist/types.go | 8 +- server/rest/common.go | 7 +- server/rest/service.go | 29 +- server/rpc/container.go | 5 +- server/rpc/service.go | 74 ++-- server/server.go | 13 +- server/status/repository/repository.go | 11 +- server/status/status.go | 6 +- server/subscription/task/runner.go | 24 +- server/twitch/monitor.go | 51 ++- 36 files changed, 1067 insertions(+), 706 deletions(-) delete mode 100644 server/internal/balancer.go create mode 100644 server/internal/downloaders/common.go create mode 100644 server/internal/downloaders/downloader.go create mode 100644 server/internal/downloaders/generic.go create mode 100644 server/internal/downloaders/livestream.go create mode 100644 server/internal/downloaders/logging.go create mode 100644 server/internal/downloaders/playlist.go create mode 100644 server/internal/downloaders/utils.go rename server/internal/{memory_db.go => kv/store.go} (53%) create mode 100644 server/internal/kv/types.go create mode 100644 server/internal/metadata/fetchers.go create mode 100644 server/internal/pipes/FileWriter.go create mode 100644 server/internal/pipes/Transcoder.go create mode 100644 server/internal/pipes/pipe.go delete mode 100644 server/internal/pool.go delete mode 100644 server/internal/process.go rename server/internal/{ => queue}/message_queue.go (72%) delete mode 100644 server/internal/worker.go rename server/{internal => playlist}/playlist.go (64%) diff --git a/server/common/types.go b/server/common/types.go index 51825d1..c366036 100644 --- a/server/common/types.go +++ b/server/common/types.go @@ -3,7 +3,7 @@ package common import "time" // Used to deser the yt-dlp -J output -type DownloadInfo struct { +type DownloadMetadata struct { URL string `json:"url"` Title string `json:"title"` Thumbnail string `json:"thumbnail"` diff --git a/server/filebrowser/handlers.go b/server/filebrowser/handlers.go index 1e1621f..f584719 100644 --- a/server/filebrowser/handlers.go +++ b/server/filebrowser/handlers.go @@ -19,6 +19,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" ) /* @@ -207,9 +208,9 @@ func DownloadFile(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusUnauthorized) } -func BulkDownload(mdb *internal.MemoryDB) http.HandlerFunc { +func BulkDownload(mdb *kv.Store) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - ps := slices.DeleteFunc(*mdb.All(), func(e internal.ProcessResponse) bool { + ps := slices.DeleteFunc(*mdb.All(), func(e internal.ProcessSnapshot) bool { return e.Progress.Status != internal.StatusCompleted }) diff --git a/server/internal/balancer.go b/server/internal/balancer.go deleted file mode 100644 index 5cdb7ab..0000000 --- a/server/internal/balancer.go +++ /dev/null @@ -1,57 +0,0 @@ -package internal - -import ( - "container/heap" - "log/slog" -) - -type LoadBalancer struct { - pool Pool - done chan *Worker -} - -func NewLoadBalancer(numWorker int) *LoadBalancer { - var pool Pool - - doneChan := make(chan *Worker) - - for i := range numWorker { - w := &Worker{ - requests: make(chan *Process, 1), - index: i, - } - go w.Work(doneChan) - pool = append(pool, w) - - slog.Info("spawned worker", slog.Int("index", i)) - } - - return &LoadBalancer{ - pool: pool, - done: doneChan, - } -} - -func (b *LoadBalancer) Balance(work chan *Process) { - for { - select { - case req := <-work: - b.dispatch(req) - case w := <-b.done: - b.completed(w) - } - } -} - -func (b *LoadBalancer) dispatch(req *Process) { - w := heap.Pop(&b.pool).(*Worker) - w.requests <- req - w.pending++ - heap.Push(&b.pool, w) -} - -func (b *LoadBalancer) completed(w *Worker) { - w.pending-- - heap.Remove(&b.pool, w.index) - heap.Push(&b.pool, w) -} diff --git a/server/internal/common_types.go b/server/internal/common_types.go index 3a6af76..69fe473 100644 --- a/server/internal/common_types.go +++ b/server/internal/common_types.go @@ -33,18 +33,19 @@ type DownloadProgress struct { // struct representing the response sent to the client // as JSON-RPC result field -type ProcessResponse struct { - Id string `json:"id"` - Progress DownloadProgress `json:"progress"` - Info common.DownloadInfo `json:"info"` - Output DownloadOutput `json:"output"` - Params []string `json:"params"` +type ProcessSnapshot struct { + Id string `json:"id"` + Progress DownloadProgress `json:"progress"` + Info common.DownloadMetadata `json:"info"` + Output DownloadOutput `json:"output"` + Params []string `json:"params"` + DownloaderName string `json:"downloader_name"` } // struct representing the current status of the memoryDB // used for serializaton/persistence reasons type Session struct { - Processes []ProcessResponse `json:"processes"` + Snapshots []ProcessSnapshot `json:"processes"` } // struct representing the intent to stop a specific process @@ -72,3 +73,11 @@ type CustomTemplate struct { Name string `json:"name"` Content string `json:"content"` } + +const ( + StatusPending = iota + StatusDownloading + StatusCompleted + StatusErrored + StatusLiveStream +) diff --git a/server/internal/downloaders/common.go b/server/internal/downloaders/common.go new file mode 100644 index 0000000..9b4eb34 --- /dev/null +++ b/server/internal/downloaders/common.go @@ -0,0 +1,42 @@ +package downloaders + +import ( + "log/slog" + "sync" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common" +) + +type DownloaderBase struct { + Id string + URL string + Metadata common.DownloadMetadata + Pending bool + Completed bool + mutex sync.Mutex +} + +func (d *DownloaderBase) FetchMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) { + d.mutex.Lock() + defer d.mutex.Unlock() + + meta, err := fetcher(d.URL) + if err != nil { + slog.Error("failed to retrieve metadata", slog.Any("err", err)) + return + } + + d.Metadata = *meta +} + +func (d *DownloaderBase) SetPending(p bool) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.Pending = p +} + +func (d *DownloaderBase) Complete() { + d.mutex.Lock() + defer d.mutex.Unlock() + d.Completed = true +} diff --git a/server/internal/downloaders/downloader.go b/server/internal/downloaders/downloader.go new file mode 100644 index 0000000..e694440 --- /dev/null +++ b/server/internal/downloaders/downloader.go @@ -0,0 +1,26 @@ +package downloaders + +import ( + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" +) + +type Downloader interface { + Start() error + Stop() error + Status() *internal.ProcessSnapshot + + SetOutput(output internal.DownloadOutput) + SetProgress(progress internal.DownloadProgress) + SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) + SetPending(p bool) + + IsCompleted() bool + + UpdateSavedFilePath(path string) + + RestoreFromSnapshot(*internal.ProcessSnapshot) error + + GetId() string + GetUrl() string +} diff --git a/server/internal/downloaders/generic.go b/server/internal/downloaders/generic.go new file mode 100644 index 0000000..0088d77 --- /dev/null +++ b/server/internal/downloaders/generic.go @@ -0,0 +1,211 @@ +package downloaders + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "os/exec" + "slices" + "strings" + "syscall" + + "github.com/google/uuid" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" +) + +const downloadTemplate = `download: +{ + "eta":%(progress.eta)s, + "percentage":"%(progress._percent_str)s", + "speed":%(progress.speed)s +}` + +// filename not returning the correct extension after postprocess +const postprocessTemplate = `postprocess: +{ + "filepath":"%(info.filepath)s" +} +` + +type GenericDownloader struct { + Params []string + + AutoRemove bool + + progress internal.DownloadProgress + output internal.DownloadOutput + + proc *os.Process + + logConsumer LogConsumer + + // embedded + DownloaderBase +} + +func NewGenericDownload(url string, params []string) Downloader { + g := &GenericDownloader{ + logConsumer: NewJSONLogConsumer(), + } + // in base + g.Id = uuid.NewString() + g.URL = url + return g +} + +func (g *GenericDownloader) Start() error { + g.SetPending(true) + + g.Params = argsSanitizer(g.Params) + + out := internal.DownloadOutput{ + Path: config.Instance().DownloadPath, + Filename: "%(title)s.%(ext)s", + } + + if g.output.Path != "" { + out.Path = g.output.Path + } + + if g.output.Filename != "" { + out.Filename = g.output.Filename + } + + buildFilename(&g.output) + + templateReplacer := strings.NewReplacer("\n", "", "\t", "", " ", "") + + baseParams := []string{ + strings.Split(g.URL, "?list")[0], //no playlist + "--newline", + "--no-colors", + "--no-playlist", + "--progress-template", + templateReplacer.Replace(downloadTemplate), + "--progress-template", + templateReplacer.Replace(postprocessTemplate), + "--no-exec", + } + + // if user asked to manually override the output path... + if !(slices.Contains(g.Params, "-P") || slices.Contains(g.Params, "--paths")) { + g.Params = append(g.Params, "-o") + g.Params = append(g.Params, fmt.Sprintf("%s/%s", out.Path, out.Filename)) + } + + params := append(baseParams, g.Params...) + + slog.Info("requesting download", slog.String("url", g.URL), slog.Any("params", params)) + + cmd := exec.Command(config.Instance().DownloaderPath, params...) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + stdout, err := cmd.StdoutPipe() + if err != nil { + slog.Error("failed to get a stdout pipe", slog.Any("err", err)) + panic(err) + } + + stderr, err := cmd.StderrPipe() + if err != nil { + slog.Error("failed to get a stderr pipe", slog.Any("err", err)) + panic(err) + } + + if err := cmd.Start(); err != nil { + slog.Error("failed to start yt-dlp process", slog.Any("err", err)) + panic(err) + } + + g.proc = cmd.Process + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + stdout.Close() + g.Complete() + cancel() + }() + + logs := make(chan []byte) + go produceLogs(stdout, logs) + go consumeLogs(ctx, logs, g.logConsumer, g) + + go printYtDlpErrors(stderr, g.Id, g.URL) + + g.SetPending(false) + return cmd.Wait() +} + +func (g *GenericDownloader) Stop() error { + defer func() { + g.progress.Status = internal.StatusCompleted + g.Complete() + }() + // yt-dlp uses multiple child process the parent process + // has been spawned with setPgid = true. To properly kill + // all subprocesses a SIGTERM need to be sent to the correct + // process group + if g.proc == nil { + return errors.New("*os.Process not set") + } + + pgid, err := syscall.Getpgid(g.proc.Pid) + if err != nil { + return err + } + if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil { + return err + } + + return nil +} + +func (g *GenericDownloader) Status() *internal.ProcessSnapshot { + return &internal.ProcessSnapshot{ + Id: g.Id, + Info: g.Metadata, + Progress: g.progress, + Output: g.output, + Params: g.Params, + DownloaderName: "generic", + } +} + +func (g *GenericDownloader) UpdateSavedFilePath(p string) { g.output.SavedFilePath = p } + +func (g *GenericDownloader) SetOutput(o internal.DownloadOutput) { g.output = o } +func (g *GenericDownloader) SetProgress(p internal.DownloadProgress) { g.progress = p } + +func (g *GenericDownloader) SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) { + g.FetchMetadata(fetcher) +} + +func (g *GenericDownloader) SetPending(p bool) { + g.Pending = p +} + +func (g *GenericDownloader) GetId() string { return g.Id } +func (g *GenericDownloader) GetUrl() string { return g.URL } + +func (g *GenericDownloader) RestoreFromSnapshot(snap *internal.ProcessSnapshot) error { + if snap == nil { + return errors.New("cannot restore nil snapshot") + } + + s := *snap + + g.Id = s.Id + g.URL = s.Info.URL + g.Metadata = s.Info + g.progress = s.Progress + g.output = s.Output + g.Params = s.Params + + return nil +} + +func (g *GenericDownloader) IsCompleted() bool { return g.Completed } diff --git a/server/internal/downloaders/livestream.go b/server/internal/downloaders/livestream.go new file mode 100644 index 0000000..86b6af1 --- /dev/null +++ b/server/internal/downloaders/livestream.go @@ -0,0 +1,205 @@ +package downloaders + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "path/filepath" + "slices" + "syscall" + "time" + + "github.com/google/uuid" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes" +) + +type LiveStreamDownloader struct { + progress internal.DownloadProgress + + proc *os.Process + + logConsumer LogConsumer + + pipes []pipes.Pipe + + // embedded + DownloaderBase +} + +func NewLiveStreamDownloader(url string, pipes []pipes.Pipe) Downloader { + l := &LiveStreamDownloader{ + logConsumer: NewFFMpegLogConsumer(), + pipes: pipes, + } + // in base + l.Id = uuid.NewString() + l.URL = url + return l +} + +func (l *LiveStreamDownloader) Start() error { + l.SetPending(true) + + baseParams := []string{ + l.URL, + "--newline", + "--no-colors", + "--no-playlist", + "--no-exec", + } + + params := append(baseParams, "-o", "-") + + cmd := exec.Command(config.Instance().DownloaderPath, params...) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // stdout = media stream + media, err := cmd.StdoutPipe() + if err != nil { + slog.Error("failed to get media stdout", slog.Any("err", err)) + panic(err) + } + + // stderr = log/progress + stderr, err := cmd.StderrPipe() + if err != nil { + slog.Error("failed to get stderr pipe", slog.Any("err", err)) + panic(err) + } + + if err := cmd.Start(); err != nil { + slog.Error("failed to start yt-dlp process", slog.Any("err", err)) + panic(err) + } + + l.proc = cmd.Process + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + l.Complete() + cancel() + }() + + // --- costruisci pipeline --- + reader := io.Reader(media) + for _, pipe := range l.pipes { + nr, err := pipe.Connect(reader) + if err != nil { + slog.Error("pipe failed", slog.String("pipe", pipe.Name()), slog.Any("err", err)) + return err + } + reader = nr + } + + // --- fallback: se nessun FileWriter, scrivi su file --- + if !l.hasFileWriter() { + go func() { + filepath.Join( + config.Instance().DownloadPath, + fmt.Sprintf("%s (live) %s.mp4", l.Id, time.Now().Format(time.ANSIC)), + ) + + defaultPath := filepath.Join(config.Instance().DownloadPath) + f, err := os.Create(defaultPath) + if err != nil { + slog.Error("failed to create fallback file", slog.Any("err", err)) + return + } + defer f.Close() + + _, err = io.Copy(f, reader) + if err != nil { + slog.Error("copy error", slog.Any("err", err)) + } + slog.Info("download saved", slog.String("path", defaultPath)) + }() + } + + // --- logs consumer --- + logs := make(chan []byte) + go produceLogs(stderr, logs) + go consumeLogs(ctx, logs, l.logConsumer, l) + + l.progress.Status = internal.StatusLiveStream + + return cmd.Wait() +} + +func (l *LiveStreamDownloader) Stop() error { + defer func() { + l.progress.Status = internal.StatusCompleted + l.Complete() + }() + // yt-dlp uses multiple child process the parent process + // has been spawned with setPgid = true. To properly kill + // all subprocesses a SIGTERM need to be sent to the correct + // process group + if l.proc == nil { + return errors.New("*os.Process not set") + } + + pgid, err := syscall.Getpgid(l.proc.Pid) + if err != nil { + return err + } + if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil { + return err + } + + return nil +} + +func (l *LiveStreamDownloader) Status() *internal.ProcessSnapshot { + return &internal.ProcessSnapshot{ + Id: l.Id, + Info: l.Metadata, + Progress: l.progress, + DownloaderName: "livestream", + } +} + +func (l *LiveStreamDownloader) UpdateSavedFilePath(p string) {} + +func (l *LiveStreamDownloader) SetOutput(o internal.DownloadOutput) {} +func (l *LiveStreamDownloader) SetProgress(p internal.DownloadProgress) { l.progress = p } + +func (l *LiveStreamDownloader) SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) { + l.FetchMetadata(fetcher) +} + +func (l *LiveStreamDownloader) SetPending(p bool) { + l.Pending = p +} + +func (l *LiveStreamDownloader) GetId() string { return l.Id } +func (l *LiveStreamDownloader) GetUrl() string { return l.URL } + +func (l *LiveStreamDownloader) RestoreFromSnapshot(snap *internal.ProcessSnapshot) error { + if snap == nil { + return errors.New("cannot restore nil snapshot") + } + + s := *snap + + l.Id = s.Id + l.URL = s.Info.URL + l.Metadata = s.Info + l.progress = s.Progress + + return nil +} + +func (l *LiveStreamDownloader) IsCompleted() bool { return l.Completed } + +func (l *LiveStreamDownloader) hasFileWriter() bool { + return slices.ContainsFunc(l.pipes, func(p pipes.Pipe) bool { + return p.Name() == "file-writer" + }) +} diff --git a/server/internal/downloaders/logging.go b/server/internal/downloaders/logging.go new file mode 100644 index 0000000..b05fdab --- /dev/null +++ b/server/internal/downloaders/logging.go @@ -0,0 +1,68 @@ +package downloaders + +import ( + "encoding/json" + "log/slog" + "strings" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" +) + +type LogConsumer interface { + GetName() string + ParseLogEntry(entry []byte, downloader Downloader) +} + +type JSONLogConsumer struct{} + +func NewJSONLogConsumer() LogConsumer { + return &JSONLogConsumer{} +} + +func (j *JSONLogConsumer) GetName() string { return "json-log-consumer" } + +func (j *JSONLogConsumer) ParseLogEntry(entry []byte, d Downloader) { + var progress internal.ProgressTemplate + var postprocess internal.PostprocessTemplate + + if err := json.Unmarshal(entry, &progress); err == nil { + d.SetProgress(internal.DownloadProgress{ + Status: internal.StatusDownloading, + Percentage: progress.Percentage, + Speed: progress.Speed, + ETA: progress.Eta, + }) + + slog.Info("progress", + slog.String("id", j.GetShortId(d.GetId())), + slog.String("url", d.GetUrl()), + slog.String("percentage", progress.Percentage), + ) + } + + if err := json.Unmarshal(entry, &postprocess); err == nil { + d.UpdateSavedFilePath(postprocess.FilePath) + } +} + +func (j *JSONLogConsumer) GetShortId(id string) string { + return strings.Split(id, "-")[0] +} + +//TODO: split in different files + +type FFMpegLogConsumer struct{} + +func NewFFMpegLogConsumer() LogConsumer { + return &JSONLogConsumer{} +} + +func (f *FFMpegLogConsumer) GetName() string { return "ffmpeg-log-consumer" } + +func (f *FFMpegLogConsumer) ParseLogEntry(entry []byte, d Downloader) { + slog.Info("ffmpeg output", + slog.String("id", d.GetId()), + slog.String("url", d.GetUrl()), + slog.String("output", string(entry)), + ) +} diff --git a/server/internal/downloaders/playlist.go b/server/internal/downloaders/playlist.go new file mode 100644 index 0000000..3b0598e --- /dev/null +++ b/server/internal/downloaders/playlist.go @@ -0,0 +1 @@ +package downloaders diff --git a/server/internal/downloaders/utils.go b/server/internal/downloaders/utils.go new file mode 100644 index 0000000..2cc4634 --- /dev/null +++ b/server/internal/downloaders/utils.go @@ -0,0 +1,76 @@ +package downloaders + +import ( + "bufio" + "context" + "io" + "log/slog" + "regexp" + "slices" + "strings" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" +) + +func argsSanitizer(params []string) []string { + params = slices.DeleteFunc(params, func(e string) bool { + match, _ := regexp.MatchString(`(\$\{)|(\&\&)`, e) + return match + }) + + params = slices.DeleteFunc(params, func(e string) bool { + return e == "" + }) + + return params +} + +func buildFilename(o *internal.DownloadOutput) { + if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") { + o.Filename += ".%(ext)s" + } + + o.Filename = strings.Replace( + o.Filename, + ".%(ext)s.%(ext)s", + ".%(ext)s", + 1, + ) +} + +func produceLogs(r io.Reader, logs chan<- []byte) { + go func() { + scanner := bufio.NewScanner(r) + + for scanner.Scan() { + logs <- scanner.Bytes() + } + }() +} + +func consumeLogs(ctx context.Context, logs <-chan []byte, c LogConsumer, d Downloader) { + for { + select { + case <-ctx.Done(): + slog.Info("detaching logs", + slog.String("url", d.GetUrl()), + slog.String("id", c.GetName()), + ) + return + case entry := <-logs: + c.ParseLogEntry(entry, d) + } + } +} + +func printYtDlpErrors(stdout io.Reader, shortId, url string) { + scanner := bufio.NewScanner(stdout) + + for scanner.Scan() { + slog.Error("yt-dlp process error", + slog.String("id", shortId), + slog.String("url", url), + slog.String("err", scanner.Text()), + ) + } +} diff --git a/server/internal/memory_db.go b/server/internal/kv/store.go similarity index 53% rename from server/internal/memory_db.go rename to server/internal/kv/store.go index c59350f..ceff592 100644 --- a/server/internal/memory_db.go +++ b/server/internal/kv/store.go @@ -1,4 +1,4 @@ -package internal +package kv import ( "encoding/gob" @@ -6,28 +6,31 @@ import ( "log/slog" "os" "path/filepath" + "runtime" "sync" - "github.com/google/uuid" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" ) -var memDbEvents = make(chan *Process) +var memDbEvents = make(chan downloaders.Downloader, runtime.NumCPU()) // In-Memory Thread-Safe Key-Value Storage with optional persistence -type MemoryDB struct { - table map[string]*Process +type Store struct { + table map[string]downloaders.Downloader mu sync.RWMutex } -func NewMemoryDB() *MemoryDB { - return &MemoryDB{ - table: make(map[string]*Process), +func NewStore() *Store { + return &Store{ + table: make(map[string]downloaders.Downloader), } } // Get a process pointer given its id -func (m *MemoryDB) Get(id string) (*Process, error) { +func (m *Store) Get(id string) (downloaders.Downloader, error) { m.mu.RLock() defer m.mu.RUnlock() @@ -40,25 +43,22 @@ func (m *MemoryDB) Get(id string) (*Process, error) { } // Store a pointer of a process and return its id -func (m *MemoryDB) Set(process *Process) string { - id := uuid.NewString() - +func (m *Store) Set(d downloaders.Downloader) string { m.mu.Lock() - process.Id = id - m.table[id] = process + m.table[d.GetId()] = d m.mu.Unlock() - return id + return d.GetId() } // Removes a process progress, given the process id -func (m *MemoryDB) Delete(id string) { +func (m *Store) Delete(id string) { m.mu.Lock() delete(m.table, id) m.mu.Unlock() } -func (m *MemoryDB) Keys() *[]string { +func (m *Store) Keys() *[]string { var running []string m.mu.RLock() @@ -72,18 +72,12 @@ func (m *MemoryDB) Keys() *[]string { } // Returns a slice of all currently stored processes progess -func (m *MemoryDB) All() *[]ProcessResponse { - running := []ProcessResponse{} +func (m *Store) All() *[]internal.ProcessSnapshot { + running := []internal.ProcessSnapshot{} m.mu.RLock() - for k, v := range m.table { - running = append(running, ProcessResponse{ - Id: k, - Info: v.Info, - Progress: v.Progress, - Output: v.Output, - Params: v.Params, - }) + for _, v := range m.table { + running = append(running, *(v.Status())) } m.mu.RUnlock() @@ -91,7 +85,7 @@ func (m *MemoryDB) All() *[]ProcessResponse { } // Persist the database in a single file named "session.dat" -func (m *MemoryDB) Persist() error { +func (m *Store) Persist() error { running := m.All() sf := filepath.Join(config.Instance().SessionFilePath, "session.dat") @@ -113,7 +107,7 @@ func (m *MemoryDB) Persist() error { } // Restore a persisted state -func (m *MemoryDB) Restore(mq *MessageQueue) { +func (m *Store) Restore(mq *queue.MessageQueue) { sf := filepath.Join(config.Instance().SessionFilePath, "session.dat") fd, err := os.Open(sf) @@ -130,29 +124,31 @@ func (m *MemoryDB) Restore(mq *MessageQueue) { m.mu.Lock() defer m.mu.Unlock() - for _, proc := range session.Processes { - restored := &Process{ - Id: proc.Id, - Url: proc.Info.URL, - Info: proc.Info, - Progress: proc.Progress, - Output: proc.Output, - Params: proc.Params, - } + for _, snap := range session.Processes { + var restored downloaders.Downloader - m.table[proc.Id] = restored + if snap.DownloaderName == "generic" { + d := downloaders.NewGenericDownload("", []string{}) + err := d.RestoreFromSnapshot(&snap) + if err != nil { + continue + } + restored = d - if restored.Progress.Status != StatusCompleted { - mq.Publish(restored) + m.table[snap.Id] = restored + + if !restored.(*downloaders.GenericDownloader).DownloaderBase.Completed { + mq.Publish(restored) + } } } } -func (m *MemoryDB) EventListener() { +func (m *Store) EventListener() { for p := range memDbEvents { - if p.AutoRemove { - slog.Info("compacting MemoryDB", slog.String("id", p.Id)) - m.Delete(p.Id) + if p.Status().DownloaderName == "livestream" { + slog.Info("compacting Store", slog.String("id", p.GetId())) + m.Delete(p.GetId()) } } } diff --git a/server/internal/kv/types.go b/server/internal/kv/types.go new file mode 100644 index 0000000..74452d0 --- /dev/null +++ b/server/internal/kv/types.go @@ -0,0 +1,9 @@ +package kv + +import "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + +// struct representing the current status of the memoryDB +// used for serializaton/persistence reasons +type Session struct { + Processes []internal.ProcessSnapshot `json:"processes"` +} diff --git a/server/internal/livestream/livestream.go b/server/internal/livestream/livestream.go index 1867436..6a4bdc4 100644 --- a/server/internal/livestream/livestream.go +++ b/server/internal/livestream/livestream.go @@ -11,7 +11,10 @@ import ( "time" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" ) const ( @@ -32,11 +35,11 @@ type LiveStream struct { waitTime time.Duration liveDate time.Time - mq *internal.MessageQueue - db *internal.MemoryDB + mq *queue.MessageQueue + db *kv.Store } -func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *internal.MemoryDB) *LiveStream { +func New(url string, done chan *LiveStream, mq *queue.MessageQueue, db *kv.Store) *LiveStream { return &LiveStream{ url: url, done: done, @@ -87,13 +90,12 @@ func (l *LiveStream) Start() error { l.done <- l // Send the started livestream to the message queue! :D - p := &internal.Process{ - Url: l.url, - Livestream: true, - Params: []string{"--downloader", "ffmpeg", "--no-part"}, - } - l.db.Set(p) - l.mq.Publish(p) + + //TODO: add pipes + d := downloaders.NewLiveStreamDownloader(l.url, []pipes.Pipe{}) + + l.db.Set(d) + l.mq.Publish(d) return nil } diff --git a/server/internal/livestream/livestream_test.go b/server/internal/livestream/livestream_test.go index 50fa141..3883a0a 100644 --- a/server/internal/livestream/livestream_test.go +++ b/server/internal/livestream/livestream_test.go @@ -5,7 +5,8 @@ import ( "time" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" ) func setupTest() { @@ -19,7 +20,7 @@ func TestLivestream(t *testing.T) { done := make(chan *LiveStream) - ls := New(URL, done, &internal.MessageQueue{}, &internal.MemoryDB{}) + ls := New(URL, done, &queue.MessageQueue{}, &kv.Store{}) go ls.Start() time.AfterFunc(time.Second*20, func() { diff --git a/server/internal/livestream/monitor.go b/server/internal/livestream/monitor.go index 79f052c..3b0cecf 100644 --- a/server/internal/livestream/monitor.go +++ b/server/internal/livestream/monitor.go @@ -8,17 +8,18 @@ import ( "path/filepath" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" ) type Monitor struct { - db *internal.MemoryDB // where the just started livestream will be published - mq *internal.MessageQueue // where the just started livestream will be published + db *kv.Store // where the just started livestream will be published + mq *queue.MessageQueue // where the just started livestream will be published streams map[string]*LiveStream // keeps track of the livestreams done chan *LiveStream // to signal individual processes completition } -func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor { +func NewMonitor(mq *queue.MessageQueue, db *kv.Store) *Monitor { return &Monitor{ mq: mq, db: db, diff --git a/server/internal/metadata/fetchers.go b/server/internal/metadata/fetchers.go new file mode 100644 index 0000000..900052f --- /dev/null +++ b/server/internal/metadata/fetchers.go @@ -0,0 +1,57 @@ +package metadata + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "log/slog" + "os/exec" + "syscall" + "time" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" +) + +func DefaultFetcher(url string) (*common.DownloadMetadata, error) { + cmd := exec.Command(config.Instance().DownloaderPath, url, "-J") + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + + meta := common.DownloadMetadata{ + URL: url, + CreatedAt: time.Now(), + } + + if err := cmd.Start(); err != nil { + return nil, err + } + + var bufferedStderr bytes.Buffer + + go func() { + io.Copy(&bufferedStderr, stderr) + }() + + slog.Info("retrieving metadata", slog.String("url", url)) + + if err := json.NewDecoder(stdout).Decode(&meta); err != nil { + return nil, err + } + + if err := cmd.Wait(); err != nil { + return nil, errors.New(bufferedStderr.String()) + } + + return &meta, nil +} diff --git a/server/internal/pipes/FileWriter.go b/server/internal/pipes/FileWriter.go new file mode 100644 index 0000000..894fcac --- /dev/null +++ b/server/internal/pipes/FileWriter.go @@ -0,0 +1,45 @@ +package pipes + +import ( + "io" + "log/slog" + "os" +) + +type FileWriter struct { + Path string + IsFinal bool +} + +func (f *FileWriter) Name() string { return "file-writer" } + +func (f *FileWriter) Connect(r io.Reader) (io.Reader, error) { + file, err := os.Create(f.Path) + if err != nil { + return nil, err + } + + if f.IsFinal { + go func() { + defer file.Close() + if _, err := io.Copy(file, r); err != nil { + slog.Error("FileWriter (final) error", slog.Any("err", err)) + } + }() + return r, nil + } + + pr, pw := io.Pipe() + + go func() { + defer file.Close() + defer pw.Close() + + writer := io.MultiWriter(file, pw) + if _, err := io.Copy(writer, r); err != nil { + slog.Error("FileWriter (pipeline) error", slog.Any("err", err)) + } + }() + + return pr, nil +} diff --git a/server/internal/pipes/Transcoder.go b/server/internal/pipes/Transcoder.go new file mode 100644 index 0000000..49febbc --- /dev/null +++ b/server/internal/pipes/Transcoder.go @@ -0,0 +1,66 @@ +package pipes + +import ( + "bufio" + "errors" + "io" + "log/slog" + "os/exec" + "strings" +) + +type Transcoder struct { + Args []string +} + +func (t *Transcoder) Name() string { return "ffmpeg-transcoder" } + +func (t *Transcoder) Connect(r io.Reader) (io.Reader, error) { + cmd := exec.Command("ffmpeg", + append([]string{"-i", "pipe:0"}, append(t.Args, "-f", "webm", "pipe:1")...)..., + ) + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + + go func() { + reader := bufio.NewReader(stderr) + var line string + + for { + part, err := reader.ReadString('\r') + line += part + if err != nil { + break + } + + line = strings.TrimRight(line, "\r\n") + slog.Info("ffmpeg transcoder", slog.String("log", line)) + line = "" + } + }() + + go func() { + defer stdin.Close() + _, err := io.Copy(stdin, r) + if err != nil && !errors.Is(err, io.EOF) { + slog.Error("transcoder stdin error", slog.Any("err", err)) + } + }() + + if err := cmd.Start(); err != nil { + return nil, err + } + + return stdout, nil +} diff --git a/server/internal/pipes/pipe.go b/server/internal/pipes/pipe.go new file mode 100644 index 0000000..35d22e7 --- /dev/null +++ b/server/internal/pipes/pipe.go @@ -0,0 +1,8 @@ +package pipes + +import "io" + +type Pipe interface { + Name() string + Connect(r io.Reader) (io.Reader, error) +} diff --git a/server/internal/pool.go b/server/internal/pool.go deleted file mode 100644 index 0f86db8..0000000 --- a/server/internal/pool.go +++ /dev/null @@ -1,24 +0,0 @@ -package internal - -// Pool implements heap.Interface interface as a standard priority queue -type Pool []*Worker - -func (h Pool) Len() int { return len(h) } -func (h Pool) Less(i, j int) bool { return h[i].pending < h[j].pending } - -func (h Pool) Swap(i, j int) { - h[i], h[j] = h[j], h[i] - h[i].index = i - h[j].index = j -} - -func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) } - -func (h *Pool) Pop() any { - old := *h - n := len(old) - x := old[n-1] - old[n-1] = nil - *h = old[0 : n-1] - return x -} diff --git a/server/internal/process.go b/server/internal/process.go deleted file mode 100644 index cffd51d..0000000 --- a/server/internal/process.go +++ /dev/null @@ -1,386 +0,0 @@ -package internal - -import ( - "bufio" - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "log/slog" - "regexp" - "slices" - "syscall" - - "os" - "os/exec" - "strings" - "time" - - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archiver" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" -) - -const downloadTemplate = `download: -{ - "eta":%(progress.eta)s, - "percentage":"%(progress._percent_str)s", - "speed":%(progress.speed)s -}` - -// filename not returning the correct extension after postprocess -const postprocessTemplate = `postprocess: -{ - "filepath":"%(info.filepath)s" -} -` - -const ( - StatusPending = iota - StatusDownloading - StatusCompleted - StatusErrored -) - -// Process descriptor -type Process struct { - Id string - Url string - Livestream bool - AutoRemove bool - Params []string - Info common.DownloadInfo - Progress DownloadProgress - Output DownloadOutput - proc *os.Process -} - -// Starts spawns/forks a new yt-dlp process and parse its stdout. -// The process is spawned to outputting a custom progress text that -// Resembles a JSON Object in order to Unmarshal it later. -// This approach is anyhow not perfect: quotes are not escaped properly. -// Each process is not identified by its PID but by a UUIDv4 -func (p *Process) Start() { - // escape bash variable escaping and command piping, you'll never know - // what they might come with... - p.Params = slices.DeleteFunc(p.Params, func(e string) bool { - match, _ := regexp.MatchString(`(\$\{)|(\&\&)`, e) - return match - }) - - p.Params = slices.DeleteFunc(p.Params, func(e string) bool { - return e == "" - }) - - out := DownloadOutput{ - Path: config.Instance().DownloadPath, - Filename: "%(title)s.%(ext)s", - } - - if p.Output.Path != "" { - out.Path = p.Output.Path - } - - if p.Output.Filename != "" { - out.Filename = p.Output.Filename - } - - buildFilename(&p.Output) - - templateReplacer := strings.NewReplacer("\n", "", "\t", "", " ", "") - - baseParams := []string{ - strings.Split(p.Url, "?list")[0], //no playlist - "--newline", - "--no-colors", - "--no-playlist", - "--progress-template", - templateReplacer.Replace(downloadTemplate), - "--progress-template", - templateReplacer.Replace(postprocessTemplate), - "--no-exec", - } - - // if user asked to manually override the output path... - if !(slices.Contains(p.Params, "-P") || slices.Contains(p.Params, "--paths")) { - p.Params = append(p.Params, "-o") - p.Params = append(p.Params, fmt.Sprintf("%s/%s", out.Path, out.Filename)) - } - - params := append(baseParams, p.Params...) - - slog.Info("requesting download", slog.String("url", p.Url), slog.Any("params", params)) - - cmd := exec.Command(config.Instance().DownloaderPath, params...) - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - stdout, err := cmd.StdoutPipe() - if err != nil { - slog.Error("failed to get a stdout pipe", slog.Any("err", err)) - panic(err) - } - - stderr, err := cmd.StderrPipe() - if err != nil { - slog.Error("failed to get a stderr pipe", slog.Any("err", err)) - panic(err) - } - - if err := cmd.Start(); err != nil { - slog.Error("failed to start yt-dlp process", slog.Any("err", err)) - panic(err) - } - - p.proc = cmd.Process - - ctx, cancel := context.WithCancel(context.Background()) - defer func() { - stdout.Close() - p.Complete() - cancel() - }() - - logs := make(chan []byte) - go produceLogs(stdout, logs) - go p.consumeLogs(ctx, logs) - - go p.detectYtDlpErrors(stderr) - - cmd.Wait() -} - -func produceLogs(r io.Reader, logs chan<- []byte) { - go func() { - scanner := bufio.NewScanner(r) - - for scanner.Scan() { - logs <- scanner.Bytes() - } - }() -} - -func (p *Process) consumeLogs(ctx context.Context, logs <-chan []byte) { - for { - select { - case <-ctx.Done(): - slog.Info("detaching from yt-dlp stdout", - slog.String("id", p.getShortId()), - slog.String("url", p.Url), - ) - return - case entry := <-logs: - p.parseLogEntry(entry) - } - } -} - -func (p *Process) parseLogEntry(entry []byte) { - var progress ProgressTemplate - var postprocess PostprocessTemplate - - if err := json.Unmarshal(entry, &progress); err == nil { - p.Progress = DownloadProgress{ - Status: StatusDownloading, - Percentage: progress.Percentage, - Speed: progress.Speed, - ETA: progress.Eta, - } - - slog.Info("progress", - slog.String("id", p.getShortId()), - slog.String("url", p.Url), - slog.String("percentage", progress.Percentage), - ) - } - - if err := json.Unmarshal(entry, &postprocess); err == nil { - p.Output.SavedFilePath = postprocess.FilePath - - // slog.Info("postprocess", - // slog.String("id", p.getShortId()), - // slog.String("url", p.Url), - // slog.String("filepath", postprocess.FilePath), - // ) - } -} - -func (p *Process) detectYtDlpErrors(r io.Reader) { - scanner := bufio.NewScanner(r) - - for scanner.Scan() { - slog.Error("yt-dlp process error", - slog.String("id", p.getShortId()), - slog.String("url", p.Url), - slog.String("err", scanner.Text()), - ) - } -} - -// Keep process in the memoryDB but marks it as complete -// Convention: All completed processes has progress -1 -// and speed 0 bps. -func (p *Process) Complete() { - // auto archive - // TODO: it's not that deterministic :/ - if p.Progress.Percentage == "" && p.Progress.Speed == 0 { - var serializedMetadata bytes.Buffer - - json.NewEncoder(&serializedMetadata).Encode(p.Info) - - archiver.Publish(&archiver.Message{ - Id: p.Id, - Path: p.Output.SavedFilePath, - Title: p.Info.Title, - Thumbnail: p.Info.Thumbnail, - Source: p.Url, - Metadata: serializedMetadata.String(), - CreatedAt: p.Info.CreatedAt, - }) - } - - p.Progress = DownloadProgress{ - Status: StatusCompleted, - Percentage: "-1", - Speed: 0, - ETA: 0, - } - - // for safety, if the filename is not set, set it with original function - if p.Output.SavedFilePath == "" { - p.GetFileName(&p.Output) - } - - slog.Info("finished", - slog.String("id", p.getShortId()), - slog.String("url", p.Url), - ) - - memDbEvents <- p -} - -// Kill a process and remove it from the memory -func (p *Process) Kill() error { - defer func() { - p.Progress.Status = StatusCompleted - }() - // yt-dlp uses multiple child process the parent process - // has been spawned with setPgid = true. To properly kill - // all subprocesses a SIGTERM need to be sent to the correct - // process group - if p.proc == nil { - return errors.New("*os.Process not set") - } - - pgid, err := syscall.Getpgid(p.proc.Pid) - if err != nil { - return err - } - if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil { - return err - } - - return nil -} - -func (p *Process) GetFileName(o *DownloadOutput) error { - cmd := exec.Command( - config.Instance().DownloaderPath, - "--print", "filename", - "-o", fmt.Sprintf("%s/%s", o.Path, o.Filename), - p.Url, - ) - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - out, err := cmd.Output() - if err != nil { - return err - } - - p.Output.SavedFilePath = strings.Trim(string(out), "\n") - return nil -} - -func (p *Process) SetPending() { - // Since video's title isn't available yet, fill in with the URL. - p.Info = common.DownloadInfo{ - URL: p.Url, - Title: p.Url, - CreatedAt: time.Now(), - } - p.Progress.Status = StatusPending -} - -func (p *Process) SetMetadata() error { - cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J") - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - stdout, err := cmd.StdoutPipe() - if err != nil { - slog.Error("failed to connect to stdout", - slog.String("id", p.getShortId()), - slog.String("url", p.Url), - slog.String("err", err.Error()), - ) - return err - } - - stderr, err := cmd.StderrPipe() - if err != nil { - slog.Error("failed to connect to stderr", - slog.String("id", p.getShortId()), - slog.String("url", p.Url), - slog.String("err", err.Error()), - ) - return err - } - - info := common.DownloadInfo{ - URL: p.Url, - CreatedAt: time.Now(), - } - - if err := cmd.Start(); err != nil { - return err - } - - var bufferedStderr bytes.Buffer - - go func() { - io.Copy(&bufferedStderr, stderr) - }() - - slog.Info("retrieving metadata", - slog.String("id", p.getShortId()), - slog.String("url", p.Url), - ) - - if err := json.NewDecoder(stdout).Decode(&info); err != nil { - return err - } - - p.Info = info - p.Progress.Status = StatusPending - - if err := cmd.Wait(); err != nil { - return errors.New(bufferedStderr.String()) - } - - return nil -} - -func (p *Process) getShortId() string { return strings.Split(p.Id, "-")[0] } - -func buildFilename(o *DownloadOutput) { - if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") { - o.Filename += ".%(ext)s" - } - - o.Filename = strings.Replace( - o.Filename, - ".%(ext)s.%(ext)s", - ".%(ext)s", - 1, - ) -} diff --git a/server/internal/message_queue.go b/server/internal/queue/message_queue.go similarity index 72% rename from server/internal/message_queue.go rename to server/internal/queue/message_queue.go index 0c5a4a5..7d501f8 100644 --- a/server/internal/message_queue.go +++ b/server/internal/queue/message_queue.go @@ -1,4 +1,4 @@ -package internal +package queue import ( "context" @@ -7,6 +7,8 @@ import ( evbus "github.com/asaskevich/EventBus" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/metadata" "golang.org/x/sync/semaphore" ) @@ -35,9 +37,9 @@ func NewMessageQueue() (*MessageQueue, error) { } // Publish a message to the queue and set the task to a peding state. -func (m *MessageQueue) Publish(p *Process) { +func (m *MessageQueue) Publish(p downloaders.Downloader) { // needs to have an id set before - p.SetPending() + p.SetPending(true) m.eventBus.Publish(queueName, p) } @@ -52,27 +54,22 @@ func (m *MessageQueue) SetupConsumers() { func (m *MessageQueue) downloadConsumer() { sem := semaphore.NewWeighted(int64(m.concurrency)) - m.eventBus.SubscribeAsync(queueName, func(p *Process) { + m.eventBus.SubscribeAsync(queueName, func(p downloaders.Downloader) { sem.Acquire(context.Background(), 1) defer sem.Release(1) slog.Info("received process from event bus", slog.String("bus", queueName), slog.String("consumer", "downloadConsumer"), - slog.String("id", p.getShortId()), + slog.String("id", p.GetId()), ) - if p.Progress.Status != StatusCompleted { + if !p.IsCompleted() { slog.Info("started process", slog.String("bus", queueName), - slog.String("id", p.getShortId()), + slog.String("id", p.GetId()), ) - if p.Livestream { - // livestreams have higher priorty and they ignore the semaphore - go p.Start() - } else { - p.Start() - } + p.Start() } }, false) } @@ -84,29 +81,25 @@ func (m *MessageQueue) metadataSubscriber() { // Since there's ongoing downloads, 1 job at time seems a good compromise sem := semaphore.NewWeighted(1) - m.eventBus.SubscribeAsync(queueName, func(p *Process) { + m.eventBus.SubscribeAsync(queueName, func(p downloaders.Downloader) { sem.Acquire(context.Background(), 1) defer sem.Release(1) slog.Info("received process from event bus", slog.String("bus", queueName), slog.String("consumer", "metadataConsumer"), - slog.String("id", p.getShortId()), + slog.String("id", p.GetId()), ) - if p.Progress.Status == StatusCompleted { + if p.IsCompleted() { slog.Warn("proccess has an illegal state", - slog.String("id", p.getShortId()), - slog.Int("status", p.Progress.Status), + slog.String("id", p.GetId()), + slog.String("status", "completed"), ) return } - if err := p.SetMetadata(); err != nil { - slog.Error("failed to retrieve metadata", - slog.String("id", p.getShortId()), - slog.String("err", err.Error()), - ) - } + p.SetMetadata(metadata.DefaultFetcher) + }, false) } diff --git a/server/internal/worker.go b/server/internal/worker.go deleted file mode 100644 index eb060d8..0000000 --- a/server/internal/worker.go +++ /dev/null @@ -1,15 +0,0 @@ -package internal - -type Worker struct { - requests chan *Process // downloads to do - pending int // downloads pending - index int // index in the heap -} - -func (w *Worker) Work(done chan *Worker) { - for { - req := <-w.requests - req.Start() - done <- w - } -} diff --git a/server/playlist/modifiers.go b/server/playlist/modifiers.go index 37adc1e..24cb152 100644 --- a/server/playlist/modifiers.go +++ b/server/playlist/modifiers.go @@ -18,7 +18,7 @@ import ( --max-downloads NUMBER | | stops after N completed downloads */ -func ApplyModifiers(entries *[]common.DownloadInfo, args []string) error { +func ApplyModifiers(entries *[]common.DownloadMetadata, args []string) error { for i, modifier := range args { switch modifier { case "--playlist-start": @@ -38,7 +38,7 @@ func ApplyModifiers(entries *[]common.DownloadInfo, args []string) error { return nil } -func playlistStart(i int, modifier string, args []string, entries *[]common.DownloadInfo) error { +func playlistStart(i int, modifier string, args []string, entries *[]common.DownloadMetadata) error { if !guard(i, len(modifier)) { return nil } @@ -53,7 +53,7 @@ func playlistStart(i int, modifier string, args []string, entries *[]common.Down return nil } -func playlistEnd(i int, modifier string, args []string, entries *[]common.DownloadInfo) error { +func playlistEnd(i int, modifier string, args []string, entries *[]common.DownloadMetadata) error { if !guard(i, len(modifier)) { return nil } @@ -68,7 +68,7 @@ func playlistEnd(i int, modifier string, args []string, entries *[]common.Downlo return nil } -func maxDownloads(i int, modifier string, args []string, entries *[]common.DownloadInfo) error { +func maxDownloads(i int, modifier string, args []string, entries *[]common.DownloadMetadata) error { if !guard(i, len(modifier)) { return nil } diff --git a/server/internal/playlist.go b/server/playlist/playlist.go similarity index 64% rename from server/internal/playlist.go rename to server/playlist/playlist.go index fcf3f7e..d3c865a 100644 --- a/server/internal/playlist.go +++ b/server/playlist/playlist.go @@ -1,4 +1,4 @@ -package internal +package playlist import ( "encoding/json" @@ -11,10 +11,13 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" ) -func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error { +func PlaylistDetect(req internal.DownloadRequest, mq *queue.MessageQueue, db *kv.Store) error { params := append(req.Params, "--flat-playlist", "-J") urlWithParams := append([]string{req.URL}, params...) @@ -28,7 +31,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error { return err } - var m playlist.Metadata + var m Metadata if err := cmd.Start(); err != nil { return err @@ -51,17 +54,17 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error { } if m.IsPlaylist() { - entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool { + entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadMetadata, b common.DownloadMetadata) bool { return a.URL == b.URL }) - entries = slices.DeleteFunc(entries, func(e common.DownloadInfo) bool { + entries = slices.DeleteFunc(entries, func(e common.DownloadMetadata) bool { return strings.Contains(e.URL, "list=") }) slog.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries))) - if err := playlist.ApplyModifiers(&entries, req.Params); err != nil { + if err := ApplyModifiers(&entries, req.Params); err != nil { return err } @@ -78,33 +81,22 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error { //XXX: it's idiotic but it works: virtually delay the creation time meta.CreatedAt = time.Now().Add(time.Millisecond * time.Duration(i*10)) - proc := &Process{ - Url: meta.URL, - Progress: DownloadProgress{}, - Output: DownloadOutput{Filename: req.Rename}, - Info: meta, - Params: req.Params, - } + downloader := downloaders.NewGenericDownload(meta.URL, req.Params) + downloader.SetOutput(internal.DownloadOutput{Filename: req.Rename}) + // downloader.SetMetadata(meta) - proc.Info.URL = meta.URL - - db.Set(proc) - mq.Publish(proc) - - proc.Info.CreatedAt = meta.CreatedAt + db.Set(downloader) + mq.Publish(downloader) } return nil } - proc := &Process{ - Url: req.URL, - Params: req.Params, - } + d := downloaders.NewGenericDownload(req.URL, req.Params) - db.Set(proc) - mq.Publish(proc) - slog.Info("sending new process to message queue", slog.String("url", proc.Url)) + db.Set(d) + mq.Publish(d) + slog.Info("sending new process to message queue", slog.String("url", d.GetUrl())) return cmd.Wait() } diff --git a/server/playlist/types.go b/server/playlist/types.go index 3cbf97d..523736c 100644 --- a/server/playlist/types.go +++ b/server/playlist/types.go @@ -3,10 +3,10 @@ package playlist import "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common" type Metadata struct { - Entries []common.DownloadInfo `json:"entries"` - Count int `json:"playlist_count"` - PlaylistTitle string `json:"title"` - Type string `json:"_type"` + Entries []common.DownloadMetadata `json:"entries"` + Count int `json:"playlist_count"` + PlaylistTitle string `json:"title"` + Type string `json:"_type"` } func (m *Metadata) IsPlaylist() bool { return m.Type == "playlist" } diff --git a/server/rest/common.go b/server/rest/common.go index 4d45dfc..5347102 100644 --- a/server/rest/common.go +++ b/server/rest/common.go @@ -3,11 +3,12 @@ package rest import ( "database/sql" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" ) type ContainerArgs struct { DB *sql.DB - MDB *internal.MemoryDB - MQ *internal.MessageQueue + MDB *kv.Store + MQ *queue.MessageQueue } diff --git a/server/rest/service.go b/server/rest/service.go index 006a682..7bed586 100644 --- a/server/rest/service.go +++ b/server/rest/service.go @@ -12,41 +12,42 @@ import ( "github.com/google/uuid" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist" ) type Service struct { - mdb *internal.MemoryDB + mdb *kv.Store db *sql.DB - mq *internal.MessageQueue + mq *queue.MessageQueue lm *livestream.Monitor } func (s *Service) Exec(req internal.DownloadRequest) (string, error) { - p := &internal.Process{ - Url: req.URL, - Params: req.Params, - Output: internal.DownloadOutput{ - Path: req.Path, - Filename: req.Rename, - }, - } + d := downloaders.NewGenericDownload(req.URL, req.Params) + d.SetOutput(internal.DownloadOutput{ + Path: req.Path, + Filename: req.Rename, + }) - id := s.mdb.Set(p) - s.mq.Publish(p) + id := s.mdb.Set(d) + s.mq.Publish(d) return id, nil } func (s *Service) ExecPlaylist(req internal.DownloadRequest) error { - return internal.PlaylistDetect(req, s.mq, s.mdb) + return playlist.PlaylistDetect(req, s.mq, s.mdb) } func (s *Service) ExecLivestream(req internal.DownloadRequest) { s.lm.Add(req.URL) } -func (s *Service) Running(ctx context.Context) (*[]internal.ProcessResponse, error) { +func (s *Service) Running(ctx context.Context) (*[]internal.ProcessSnapshot, error) { select { case <-ctx.Done(): return nil, context.Canceled diff --git a/server/rpc/container.go b/server/rpc/container.go index 1beac03..9ca1b8e 100644 --- a/server/rpc/container.go +++ b/server/rpc/container.go @@ -3,14 +3,15 @@ package rpc import ( "github.com/go-chi/chi/v5" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" middlewares "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/middleware" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid" ) // Dependency injection container. -func Container(db *internal.MemoryDB, mq *internal.MessageQueue, lm *livestream.Monitor) *Service { +func Container(db *kv.Store, mq *queue.MessageQueue, lm *livestream.Monitor) *Service { return &Service{ db: db, mq: mq, diff --git a/server/rpc/service.go b/server/rpc/service.go index 887b132..abe1274 100644 --- a/server/rpc/service.go +++ b/server/rpc/service.go @@ -6,18 +6,22 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/formats" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/sys" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/updater" ) type Service struct { - db *internal.MemoryDB - mq *internal.MessageQueue + db *kv.Store + mq *queue.MessageQueue lm *livestream.Monitor } -type Running []internal.ProcessResponse +type Running []internal.ProcessSnapshot type Pending []string type NoArgs struct{} @@ -25,26 +29,23 @@ type NoArgs struct{} // Exec spawns a Process. // The result of the execution is the newly spawned process Id. func (s *Service) Exec(args internal.DownloadRequest, result *string) error { - p := &internal.Process{ - Url: args.URL, - Params: args.Params, - Output: internal.DownloadOutput{ - Path: args.Path, - Filename: args.Rename, - }, - } + d := downloaders.NewGenericDownload(args.URL, args.Params) + d.SetOutput(internal.DownloadOutput{ + Path: args.Path, + Filename: args.Rename, + }) - s.db.Set(p) - s.mq.Publish(p) + s.db.Set(d) + s.mq.Publish(d) - *result = p.Id + *result = d.GetId() return nil } // Exec spawns a Process. // The result of the execution is the newly spawned process Id. func (s *Service) ExecPlaylist(args internal.DownloadRequest, result *string) error { - err := internal.PlaylistDetect(args, s.mq, s.db) + err := playlist.PlaylistDetect(args, s.mq, s.db) if err != nil { return err } @@ -87,12 +88,12 @@ func (s *Service) KillAllLivestream(args NoArgs, result *struct{}) error { // Progess retrieves the Progress of a specific Process given its Id func (s *Service) Progess(args internal.DownloadRequest, progress *internal.DownloadProgress) error { - proc, err := s.db.Get(args.Id) + dl, err := s.db.Get(args.Id) if err != nil { return err } - *progress = proc.Progress + *progress = dl.Status().Progress return nil } @@ -106,7 +107,7 @@ func (s *Service) Formats(args internal.DownloadRequest, meta *formats.Metadata) } if metadata.IsPlaylist() { - go internal.PlaylistDetect(args, s.mq, s.db) + go playlist.PlaylistDetect(args, s.mq, s.db) } *meta = *metadata @@ -129,22 +130,22 @@ func (s *Service) Running(args NoArgs, running *Running) error { func (s *Service) Kill(args string, killed *string) error { slog.Info("Trying killing process with id", slog.String("id", args)) - proc, err := s.db.Get(args) + download, err := s.db.Get(args) if err != nil { return err } - if proc == nil { + if download == nil { return errors.New("nil process") } - if err := proc.Kill(); err != nil { - slog.Info("failed killing process", slog.String("id", proc.Id), slog.Any("err", err)) + if err := download.Stop(); err != nil { + slog.Info("failed killing process", slog.String("id", download.GetId()), slog.Any("err", err)) return err } - s.db.Delete(proc.Id) - slog.Info("succesfully killed process", slog.String("id", proc.Id)) + s.db.Delete(download.GetId()) + slog.Info("succesfully killed process", slog.String("id", download.GetId())) return nil } @@ -156,34 +157,33 @@ func (s *Service) KillAll(args NoArgs, killed *string) error { var ( keys = s.db.Keys() - removeFunc = func(p *internal.Process) error { - defer s.db.Delete(p.Id) - return p.Kill() + removeFunc = func(d downloaders.Downloader) error { + defer s.db.Delete(d.GetId()) + return d.Stop() } ) for _, key := range *keys { - proc, err := s.db.Get(key) + dl, err := s.db.Get(key) if err != nil { return err } - if proc == nil { + if dl == nil { s.db.Delete(key) continue } - if err := removeFunc(proc); err != nil { + if err := removeFunc(dl); err != nil { slog.Info( "failed killing process", - slog.String("id", proc.Id), + slog.String("id", dl.GetId()), slog.Any("err", err), ) continue } - slog.Info("succesfully killed process", slog.String("id", proc.Id)) - proc = nil // gc helper + slog.Info("succesfully killed process", slog.String("id", dl.GetId())) } return nil @@ -200,14 +200,14 @@ func (s *Service) Clear(args string, killed *string) error { func (s *Service) ClearCompleted(cleared *string) error { var ( keys = s.db.Keys() - removeFunc = func(p *internal.Process) error { - defer s.db.Delete(p.Id) + removeFunc = func(d downloaders.Downloader) error { + defer s.db.Delete(d.GetId()) - if p.Progress.Status != internal.StatusCompleted { + if !d.IsCompleted() { return nil } - return p.Kill() + return d.Stop() } ) diff --git a/server/server.go b/server/server.go index 75bcaa9..066cafc 100644 --- a/server/server.go +++ b/server/server.go @@ -24,8 +24,9 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/dbutil" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/filebrowser" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/logging" middlewares "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/middleware" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid" @@ -48,9 +49,9 @@ type RunConfig struct { type serverConfig struct { frontend fs.FS swagger fs.FS - mdb *internal.MemoryDB + mdb *kv.Store db *sql.DB - mq *internal.MessageQueue + mq *queue.MessageQueue lm *livestream.Monitor tm *twitch.Monitor } @@ -59,7 +60,7 @@ type serverConfig struct { var observableLogger = logging.NewObservableLogger() func RunBlocking(rc *RunConfig) { - mdb := internal.NewMemoryDB() + mdb := kv.NewStore() // ---- LOGGING --------------------------------------------------- logWriters := []io.Writer{ @@ -105,7 +106,7 @@ func RunBlocking(rc *RunConfig) { slog.Error("failed to init database", slog.String("err", err.Error())) } - mq, err := internal.NewMessageQueue() + mq, err := queue.NewMessageQueue() if err != nil { panic(err) } @@ -283,7 +284,7 @@ func gracefulShutdown(srv *http.Server, cfg *serverConfig) { func autoPersist( d time.Duration, - db *internal.MemoryDB, + db *kv.Store, lm *livestream.Monitor, tm *twitch.Monitor, ) { diff --git a/server/status/repository/repository.go b/server/status/repository/repository.go index 9504b57..dc8f119 100644 --- a/server/status/repository/repository.go +++ b/server/status/repository/repository.go @@ -5,11 +5,12 @@ import ( "slices" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status/domain" ) type Repository struct { - mdb *internal.MemoryDB + mdb *kv.Store } // DownloadSpeed implements domain.Repository. @@ -29,7 +30,7 @@ func (r *Repository) DownloadSpeed(ctx context.Context) int64 { func (r *Repository) Completed(ctx context.Context) int { processes := r.mdb.All() - completed := slices.DeleteFunc(*processes, func(p internal.ProcessResponse) bool { + completed := slices.DeleteFunc(*processes, func(p internal.ProcessSnapshot) bool { return p.Progress.Status != internal.StatusCompleted }) @@ -40,7 +41,7 @@ func (r *Repository) Completed(ctx context.Context) int { func (r *Repository) Downloading(ctx context.Context) int { processes := r.mdb.All() - downloading := slices.DeleteFunc(*processes, func(p internal.ProcessResponse) bool { + downloading := slices.DeleteFunc(*processes, func(p internal.ProcessSnapshot) bool { return p.Progress.Status != internal.StatusDownloading }) @@ -51,14 +52,14 @@ func (r *Repository) Downloading(ctx context.Context) int { func (r *Repository) Pending(ctx context.Context) int { processes := r.mdb.All() - pending := slices.DeleteFunc(*processes, func(p internal.ProcessResponse) bool { + pending := slices.DeleteFunc(*processes, func(p internal.ProcessSnapshot) bool { return p.Progress.Status != internal.StatusPending }) return len(pending) } -func New(mdb *internal.MemoryDB) domain.Repository { +func New(mdb *kv.Store) domain.Repository { return &Repository{ mdb: mdb, } diff --git a/server/status/status.go b/server/status/status.go index 7b28d44..2e213c6 100644 --- a/server/status/status.go +++ b/server/status/status.go @@ -2,16 +2,16 @@ package status import ( "github.com/go-chi/chi/v5" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status/repository" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status/rest" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status/service" ) -func ApplyRouter(mdb *internal.MemoryDB) func(chi.Router) { +func ApplyRouter(mdb *kv.Store) func(chi.Router) { var ( r = repository.New(mdb) - s = service.New(r, nil) //TODO: nil, wtf? + s = service.New(r, nil) h = rest.New(s) ) diff --git a/server/subscription/task/runner.go b/server/subscription/task/runner.go index a9823a2..dbf948a 100644 --- a/server/subscription/task/runner.go +++ b/server/subscription/task/runner.go @@ -11,7 +11,9 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" "github.com/robfig/cron/v3" ) @@ -30,8 +32,8 @@ type monitorTask struct { } type CronTaskRunner struct { - mq *internal.MessageQueue - db *internal.MemoryDB + mq *queue.MessageQueue + db *kv.Store tasks chan monitorTask errors chan error @@ -39,7 +41,7 @@ type CronTaskRunner struct { running map[string]*monitorTask } -func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner { +func NewCronTaskRunner(mq *queue.MessageQueue, db *kv.Store) TaskRunner { return &CronTaskRunner{ mq: mq, db: db, @@ -148,20 +150,20 @@ func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Dur return nextSchedule } - p := &internal.Process{ - Url: latestVideoURL, - Params: append( + // TODO: autoremove hook + d := downloaders.NewGenericDownload( + latestVideoURL, + append( argsSplitterRe.FindAllString(req.Subscription.Params, 1), []string{ "--break-on-existing", "--download-archive", filepath.Join(config.Instance().Dir(), "archive.txt"), }...), - AutoRemove: true, - } + ) - t.db.Set(p) // give it an id - t.mq.Publish(p) // send it to the message queue waiting to be processed + t.db.Set(d) // give it an id + t.mq.Publish(d) // send it to the message queue waiting to be processed slog.Info( "cron task runner next schedule", diff --git a/server/twitch/monitor.go b/server/twitch/monitor.go index d803d6d..a729d2d 100644 --- a/server/twitch/monitor.go +++ b/server/twitch/monitor.go @@ -13,7 +13,10 @@ import ( "time" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" ) type Monitor struct { @@ -40,7 +43,7 @@ func (m *Monitor) Add(user string) { slog.Info("added user to twitch monitor", slog.String("user", user)) } -func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler func(url string) error) { +func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler func(user string) error) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -64,7 +67,7 @@ func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler f wasLive := m.lastState[stream.UserName] if stream.IsLive && !wasLive { slog.Info("stream went live", slog.String("user", stream.UserName)) - if err := handler(fmt.Sprintf("https://www.twitch.tv/%s", stream.UserName)); err != nil { + if err := handler(stream.UserName); err != nil { slog.Error("handler failed", slog.String("user", stream.UserName), slog.Any("err", err)) } } @@ -90,15 +93,39 @@ func (m *Monitor) DeleteUser(user string) { delete(m.lastState, user) } -func DEFAULT_DOWNLOAD_HANDLER(db *internal.MemoryDB, mq *internal.MessageQueue) func(url string) error { - return func(url string) error { - p := &internal.Process{ - Url: url, - Livestream: true, - Params: []string{"--downloader", "ffmpeg", "--no-part"}, - } - db.Set(p) - mq.Publish(p) +func DEFAULT_DOWNLOAD_HANDLER(db *kv.Store, mq *queue.MessageQueue) func(user string) error { + return func(user string) error { + var ( + url = fmt.Sprintf("https://www.twitch.tv/%s", user) + filename = filepath.Join( + config.Instance().DownloadPath, + fmt.Sprintf("%s (live) %s", user, time.Now().Format(time.ANSIC)), + ) + ext = ".webm" + path = filename + ext + ) + + d := downloaders.NewLiveStreamDownloader(url, []pipes.Pipe{ + // &pipes.FileWriter{ + // Path: filename + ".mp4", + // IsFinal: false, + // }, + &pipes.Transcoder{ + Args: []string{ + "-c:a", "libopus", + "-c:v", "libsvtav1", + "-crf", "30", + "-preset", "7", + }, + }, + &pipes.FileWriter{ + Path: path, + IsFinal: true, + }, + }) + + db.Set(d) + mq.Publish(d) return nil } }