better error logging
This commit is contained in:
@@ -201,16 +201,20 @@ func DownloadFile(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func BulkDownload(mdb *internal.MemoryDB) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
procs := slices.DeleteFunc(*mdb.All(), func(e internal.ProcessResponse) bool {
|
||||
return e.Progress.Status != 2 // status completed
|
||||
ps := slices.DeleteFunc(*mdb.All(), func(e internal.ProcessResponse) bool {
|
||||
return e.Progress.Status != internal.StatusCompleted
|
||||
})
|
||||
|
||||
if len(ps) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
buff bytes.Buffer
|
||||
zipWriter = zip.NewWriter(&buff)
|
||||
)
|
||||
|
||||
for _, p := range procs {
|
||||
for _, p := range ps {
|
||||
wr, err := zipWriter.Create(filepath.Base(p.Output.SavedFilePath))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
||||
@@ -2,6 +2,19 @@ package internal
|
||||
|
||||
import "time"
|
||||
|
||||
type ProgressTemplate struct {
|
||||
Percentage string `json:"percentage"`
|
||||
Speed float32 `json:"speed"`
|
||||
Size string `json:"size"`
|
||||
Eta float32 `json:"eta"`
|
||||
}
|
||||
|
||||
type DownloadOutput struct {
|
||||
Path string
|
||||
Filename string
|
||||
SavedFilePath string `json:"savedFilePath"`
|
||||
}
|
||||
|
||||
// Progress for the Running call
|
||||
type DownloadProgress struct {
|
||||
Status int `json:"process_status"`
|
||||
|
||||
@@ -1,19 +1,22 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
|
||||
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
|
||||
)
|
||||
|
||||
type MessageQueue struct {
|
||||
producerCh chan *Process
|
||||
consumerCh chan struct{}
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// Creates a new message queue.
|
||||
// By default it will be created with a size equals to nthe number of logical
|
||||
// CPU cores.
|
||||
// The queue size can be set via the qs flag.
|
||||
func NewMessageQueue() *MessageQueue {
|
||||
func NewMessageQueue(logger *slog.Logger) *MessageQueue {
|
||||
size := config.Instance().QueueSize
|
||||
|
||||
if size <= 0 {
|
||||
@@ -23,13 +26,20 @@ func NewMessageQueue() *MessageQueue {
|
||||
return &MessageQueue{
|
||||
producerCh: make(chan *Process, size),
|
||||
consumerCh: make(chan struct{}, size),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Publish a message to the queue and set the task to a peding state.
|
||||
func (m *MessageQueue) Publish(p *Process) {
|
||||
p.SetPending()
|
||||
go p.SetMetadata()
|
||||
go func() {
|
||||
err := p.SetMetadata()
|
||||
m.logger.Error(
|
||||
"failed to retrieve metadata",
|
||||
slog.String("err", err.Error()),
|
||||
)
|
||||
}()
|
||||
m.producerCh <- p
|
||||
}
|
||||
|
||||
|
||||
@@ -2,8 +2,11 @@ package internal
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"regexp"
|
||||
"slices"
|
||||
@@ -35,13 +38,6 @@ const (
|
||||
StatusErrored
|
||||
)
|
||||
|
||||
type ProgressTemplate struct {
|
||||
Percentage string `json:"percentage"`
|
||||
Speed float32 `json:"speed"`
|
||||
Size string `json:"size"`
|
||||
Eta float32 `json:"eta"`
|
||||
}
|
||||
|
||||
// Process descriptor
|
||||
type Process struct {
|
||||
Id string
|
||||
@@ -54,12 +50,6 @@ type Process struct {
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
type DownloadOutput struct {
|
||||
Path string
|
||||
Filename string
|
||||
SavedFilePath string `json:"savedFilePath"`
|
||||
}
|
||||
|
||||
// Starts spawns/forks a new yt-dlp process and parse its stdout.
|
||||
// The process is spawned to outputting a custom progress text that
|
||||
// Resembles a JSON Object in order to Unmarshal it later.
|
||||
@@ -123,7 +113,6 @@ func (p *Process) Start() {
|
||||
)
|
||||
panic(err)
|
||||
}
|
||||
scan := bufio.NewScanner(r)
|
||||
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
@@ -145,6 +134,8 @@ func (p *Process) Start() {
|
||||
// spawn a goroutine that does the dirty job of parsing the stdout
|
||||
// filling the channel with as many stdout line as yt-dlp produces (producer)
|
||||
go func() {
|
||||
scan := bufio.NewScanner(r)
|
||||
|
||||
defer func() {
|
||||
r.Close()
|
||||
p.Complete()
|
||||
@@ -161,21 +152,24 @@ func (p *Process) Start() {
|
||||
// Slows down the unmarshal operation to every 500ms
|
||||
go func() {
|
||||
rx.Sample(time.Millisecond*500, sourceChan, doneChan, func(event []byte) {
|
||||
stdout := ProgressTemplate{}
|
||||
err := json.Unmarshal(event, &stdout)
|
||||
if err == nil {
|
||||
var progress ProgressTemplate
|
||||
|
||||
if err := json.Unmarshal(event, &progress); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
p.Progress = DownloadProgress{
|
||||
Status: StatusDownloading,
|
||||
Percentage: stdout.Percentage,
|
||||
Speed: stdout.Speed,
|
||||
ETA: stdout.Eta,
|
||||
Percentage: progress.Percentage,
|
||||
Speed: progress.Speed,
|
||||
ETA: progress.Eta,
|
||||
}
|
||||
|
||||
p.Logger.Info("progress",
|
||||
slog.String("id", p.getShortId()),
|
||||
slog.String("url", p.Url),
|
||||
slog.String("percentege", stdout.Percentage),
|
||||
slog.String("percentege", progress.Percentage),
|
||||
)
|
||||
}
|
||||
})
|
||||
}()
|
||||
|
||||
@@ -223,9 +217,13 @@ func (p *Process) Kill() error {
|
||||
// Returns the available format for this URL
|
||||
func (p *Process) GetFormatsSync() (DownloadFormats, error) {
|
||||
cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J")
|
||||
stdout, err := cmd.Output()
|
||||
|
||||
stdout, err := cmd.Output()
|
||||
if err != nil {
|
||||
p.Logger.Error(
|
||||
"failed to retrieve metadata",
|
||||
slog.String("err", err.Error()),
|
||||
)
|
||||
return DownloadFormats{}, err
|
||||
}
|
||||
|
||||
@@ -306,7 +304,17 @@ func (p *Process) SetMetadata() error {
|
||||
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
p.Logger.Error("failed retrieving info",
|
||||
p.Logger.Error("failed to connect to stdout",
|
||||
slog.String("id", p.getShortId()),
|
||||
slog.String("url", p.Url),
|
||||
slog.String("err", err.Error()),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
p.Logger.Error("failed to connect to stderr",
|
||||
slog.String("id", p.getShortId()),
|
||||
slog.String("url", p.Url),
|
||||
slog.String("err", err.Error()),
|
||||
@@ -319,27 +327,33 @@ func (p *Process) SetMetadata() error {
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
if err := cmd.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var bufferedStderr bytes.Buffer
|
||||
|
||||
go func() {
|
||||
io.Copy(&bufferedStderr, stderr)
|
||||
}()
|
||||
|
||||
p.Logger.Info("retrieving metadata",
|
||||
slog.String("id", p.getShortId()),
|
||||
slog.String("url", p.Url),
|
||||
)
|
||||
|
||||
err = json.NewDecoder(stdout).Decode(&info)
|
||||
if err != nil {
|
||||
if err := json.NewDecoder(stdout).Decode(&info); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.Info = info
|
||||
p.Progress.Status = StatusPending
|
||||
|
||||
err = cmd.Wait()
|
||||
if err := cmd.Wait(); err != nil {
|
||||
return errors.New(bufferedStderr.String())
|
||||
}
|
||||
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Process) getShortId() string {
|
||||
|
||||
@@ -89,7 +89,7 @@ func RunBlocking(cfg *RunConfig) {
|
||||
logger.Error("failed to init database", slog.String("err", err.Error()))
|
||||
}
|
||||
|
||||
mq := internal.NewMessageQueue()
|
||||
mq := internal.NewMessageQueue(logger)
|
||||
go mq.Subscriber()
|
||||
|
||||
srv := newServer(serverConfig{
|
||||
|
||||
Reference in New Issue
Block a user