diff --git a/server/handlers/archive.go b/server/handlers/archive.go index 4ba2756..b49e515 100644 --- a/server/handlers/archive.go +++ b/server/handlers/archive.go @@ -201,16 +201,20 @@ func DownloadFile(w http.ResponseWriter, r *http.Request) { func BulkDownload(mdb *internal.MemoryDB) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - procs := slices.DeleteFunc(*mdb.All(), func(e internal.ProcessResponse) bool { - return e.Progress.Status != 2 // status completed + ps := slices.DeleteFunc(*mdb.All(), func(e internal.ProcessResponse) bool { + return e.Progress.Status != internal.StatusCompleted }) + if len(ps) == 0 { + return + } + var ( buff bytes.Buffer zipWriter = zip.NewWriter(&buff) ) - for _, p := range procs { + for _, p := range ps { wr, err := zipWriter.Create(filepath.Base(p.Output.SavedFilePath)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/server/internal/common.go b/server/internal/common.go index 31e99a4..2d0b91e 100644 --- a/server/internal/common.go +++ b/server/internal/common.go @@ -2,6 +2,19 @@ package internal import "time" +type ProgressTemplate struct { + Percentage string `json:"percentage"` + Speed float32 `json:"speed"` + Size string `json:"size"` + Eta float32 `json:"eta"` +} + +type DownloadOutput struct { + Path string + Filename string + SavedFilePath string `json:"savedFilePath"` +} + // Progress for the Running call type DownloadProgress struct { Status int `json:"process_status"` diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index d6dcb57..d2698ee 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -1,19 +1,22 @@ package internal import ( + "log/slog" + "github.com/marcopeocchi/yt-dlp-web-ui/server/config" ) type MessageQueue struct { producerCh chan *Process consumerCh chan struct{} + logger *slog.Logger } // Creates a new message queue. // By default it will be created with a size equals to nthe number of logical // CPU cores. // The queue size can be set via the qs flag. -func NewMessageQueue() *MessageQueue { +func NewMessageQueue(logger *slog.Logger) *MessageQueue { size := config.Instance().QueueSize if size <= 0 { @@ -23,13 +26,20 @@ func NewMessageQueue() *MessageQueue { return &MessageQueue{ producerCh: make(chan *Process, size), consumerCh: make(chan struct{}, size), + logger: logger, } } // Publish a message to the queue and set the task to a peding state. func (m *MessageQueue) Publish(p *Process) { p.SetPending() - go p.SetMetadata() + go func() { + err := p.SetMetadata() + m.logger.Error( + "failed to retrieve metadata", + slog.String("err", err.Error()), + ) + }() m.producerCh <- p } diff --git a/server/internal/process.go b/server/internal/process.go index ee75790..1abbc7a 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -2,8 +2,11 @@ package internal import ( "bufio" + "bytes" "encoding/json" + "errors" "fmt" + "io" "log/slog" "regexp" "slices" @@ -35,13 +38,6 @@ const ( StatusErrored ) -type ProgressTemplate struct { - Percentage string `json:"percentage"` - Speed float32 `json:"speed"` - Size string `json:"size"` - Eta float32 `json:"eta"` -} - // Process descriptor type Process struct { Id string @@ -54,12 +50,6 @@ type Process struct { Logger *slog.Logger } -type DownloadOutput struct { - Path string - Filename string - SavedFilePath string `json:"savedFilePath"` -} - // Starts spawns/forks a new yt-dlp process and parse its stdout. // The process is spawned to outputting a custom progress text that // Resembles a JSON Object in order to Unmarshal it later. @@ -123,7 +113,6 @@ func (p *Process) Start() { ) panic(err) } - scan := bufio.NewScanner(r) err = cmd.Start() if err != nil { @@ -145,6 +134,8 @@ func (p *Process) Start() { // spawn a goroutine that does the dirty job of parsing the stdout // filling the channel with as many stdout line as yt-dlp produces (producer) go func() { + scan := bufio.NewScanner(r) + defer func() { r.Close() p.Complete() @@ -161,21 +152,24 @@ func (p *Process) Start() { // Slows down the unmarshal operation to every 500ms go func() { rx.Sample(time.Millisecond*500, sourceChan, doneChan, func(event []byte) { - stdout := ProgressTemplate{} - err := json.Unmarshal(event, &stdout) - if err == nil { - p.Progress = DownloadProgress{ - Status: StatusDownloading, - Percentage: stdout.Percentage, - Speed: stdout.Speed, - ETA: stdout.Eta, - } - p.Logger.Info("progress", - slog.String("id", p.getShortId()), - slog.String("url", p.Url), - slog.String("percentege", stdout.Percentage), - ) + var progress ProgressTemplate + + if err := json.Unmarshal(event, &progress); err != nil { + return } + + p.Progress = DownloadProgress{ + Status: StatusDownloading, + Percentage: progress.Percentage, + Speed: progress.Speed, + ETA: progress.Eta, + } + + p.Logger.Info("progress", + slog.String("id", p.getShortId()), + slog.String("url", p.Url), + slog.String("percentege", progress.Percentage), + ) }) }() @@ -223,9 +217,13 @@ func (p *Process) Kill() error { // Returns the available format for this URL func (p *Process) GetFormatsSync() (DownloadFormats, error) { cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J") - stdout, err := cmd.Output() + stdout, err := cmd.Output() if err != nil { + p.Logger.Error( + "failed to retrieve metadata", + slog.String("err", err.Error()), + ) return DownloadFormats{}, err } @@ -306,7 +304,17 @@ func (p *Process) SetMetadata() error { stdout, err := cmd.StdoutPipe() if err != nil { - p.Logger.Error("failed retrieving info", + p.Logger.Error("failed to connect to stdout", + slog.String("id", p.getShortId()), + slog.String("url", p.Url), + slog.String("err", err.Error()), + ) + return err + } + + stderr, err := cmd.StderrPipe() + if err != nil { + p.Logger.Error("failed to connect to stderr", slog.String("id", p.getShortId()), slog.String("url", p.Url), slog.String("err", err.Error()), @@ -319,27 +327,33 @@ func (p *Process) SetMetadata() error { CreatedAt: time.Now(), } - err = cmd.Start() - if err != nil { + if err := cmd.Start(); err != nil { return err } + var bufferedStderr bytes.Buffer + + go func() { + io.Copy(&bufferedStderr, stderr) + }() + p.Logger.Info("retrieving metadata", slog.String("id", p.getShortId()), slog.String("url", p.Url), ) - err = json.NewDecoder(stdout).Decode(&info) - if err != nil { + if err := json.NewDecoder(stdout).Decode(&info); err != nil { return err } p.Info = info p.Progress.Status = StatusPending - err = cmd.Wait() + if err := cmd.Wait(); err != nil { + return errors.New(bufferedStderr.String()) + } - return err + return nil } func (p *Process) getShortId() string { diff --git a/server/server.go b/server/server.go index b10a974..7254c1e 100644 --- a/server/server.go +++ b/server/server.go @@ -89,7 +89,7 @@ func RunBlocking(cfg *RunConfig) { logger.Error("failed to init database", slog.String("err", err.Error())) } - mq := internal.NewMessageQueue() + mq := internal.NewMessageQueue(logger) go mq.Subscriber() srv := newServer(serverConfig{