From 8f10baf09ba2eea72a13d87cbb6ef73d10de1465 Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Tue, 14 May 2024 11:21:02 +0200 Subject: [PATCH] changed to an "event bus" approach in the message queue, fixed unset download parameters. --- go.mod | 1 + go.sum | 2 + main.go | 6 ++- server/internal/message_queue.go | 83 +++++++++++++++++++++----------- server/internal/playlist.go | 2 +- server/internal/process.go | 13 +++-- 6 files changed, 70 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index 38705d7..aa3922f 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/marcopeocchi/yt-dlp-web-ui go 1.22 require ( + github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/go-chi/chi/v5 v5.0.12 github.com/go-chi/cors v1.2.1 github.com/golang-jwt/jwt/v5 v5.2.1 diff --git a/go.sum b/go.sum index 8bac504..ad38ecd 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= diff --git a/main.go b/main.go index 3545547..27dbc19 100644 --- a/main.go +++ b/main.go @@ -39,10 +39,14 @@ var ( ) func init() { + qs := runtime.NumCPU() - 1 + if qs == 0 { + qs = 1 + } flag.StringVar(&host, "host", "0.0.0.0", "Host where server will listen at") flag.IntVar(&port, "port", 3033, "Port where server will listen at") - flag.IntVar(&queueSize, "qs", runtime.NumCPU(), "Download queue size") + flag.IntVar(&queueSize, "qs", qs, "Download queue size") flag.StringVar(&configFile, "conf", "./config.yml", "Config file path") flag.StringVar(&downloadPath, "out", ".", "Where files will be saved") diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index b36edb5..0d32a94 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -2,13 +2,15 @@ package internal import ( "log/slog" - "runtime" + evbus "github.com/asaskevich/EventBus" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" ) +const queueName = "process:pending" + type MessageQueue struct { - producerCh chan *Process + eventBus evbus.Bus consumerCh chan struct{} metadataCh chan struct{} logger *slog.Logger @@ -16,19 +18,21 @@ type MessageQueue struct { // Creates a new message queue. // By default it will be created with a size equals to nthe number of logical -// CPU cores. +// CPU cores -1. // The queue size can be set via the qs flag. func NewMessageQueue(l *slog.Logger) *MessageQueue { - size := config.Instance().QueueSize + qs := config.Instance().QueueSize - if size <= 0 { + if qs <= 0 { panic("invalid queue size") } + mqs := 1 + return &MessageQueue{ - producerCh: make(chan *Process, size), - consumerCh: make(chan struct{}, size), - metadataCh: make(chan struct{}, runtime.NumCPU()), + eventBus: evbus.New(), + consumerCh: make(chan struct{}, qs), + metadataCh: make(chan struct{}, mqs), logger: l, } } @@ -36,39 +40,62 @@ func NewMessageQueue(l *slog.Logger) *MessageQueue { // Publish a message to the queue and set the task to a peding state. func (m *MessageQueue) Publish(p *Process) { p.SetPending() - m.metadataCh <- struct{}{} - go func() { - if err := p.SetMetadata(); err != nil { - m.logger.Error( - "failed to retrieve metadata", - slog.String("err", err.Error()), - ) - } - <-m.metadataCh - }() - - m.producerCh <- p + m.eventBus.Publish(queueName, p) } // Setup the consumer listener which subscribes to the changes to the producer // channel and triggers the "download" action. func (m *MessageQueue) Subscriber() { - for msg := range m.producerCh { - m.consumerCh <- struct{}{} - go func(p *Process) { + go m.metadataSubscriber() + + m.eventBus.SubscribeAsync(queueName, func(p *Process) { + m.logger.Info("received process from event bus", + slog.String("bus", queueName), + slog.String("consumer", "downloadConsumer"), + slog.String("id", p.Id), + ) + + go func() { + m.consumerCh <- struct{}{} p.Start() + + m.logger.Info("started process", + slog.String("bus", queueName), + slog.String("id", p.Id), + ) + <-m.consumerCh - }(msg) - } + }() + }, false) } // Empties the message queue func (m *MessageQueue) Empty() { - for range m.producerCh { - <-m.producerCh - } for range m.consumerCh { <-m.consumerCh } } + +// Setup the metadata consumer listener which subscribes to the changes to the +// producer channel and adds metadata to each download. +func (m *MessageQueue) metadataSubscriber() { + m.eventBus.SubscribeAsync(queueName, func(p *Process) { + m.metadataCh <- struct{}{} + + m.logger.Info("received process from event bus", + slog.String("bus", queueName), + slog.String("consumer", "metadataConsumer"), + slog.String("id", p.Id), + ) + + if err := p.SetMetadata(); err != nil { + m.logger.Error("failed to retrieve metadata", + slog.String("id", p.Id), + slog.String("err", err.Error()), + ) + } + + <-m.metadataCh + }, false) +} diff --git a/server/internal/playlist.go b/server/internal/playlist.go index eff9513..b6dc224 100644 --- a/server/internal/playlist.go +++ b/server/internal/playlist.go @@ -35,7 +35,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger return err } - logger.Info("decoding metadata", slog.String("url", req.URL)) + logger.Info("decoding playlist metadata", slog.String("url", req.URL)) if err := json.NewDecoder(stdout).Decode(&m); err != nil { return err diff --git a/server/internal/process.go b/server/internal/process.go index 54b65db..9b73d62 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -84,7 +84,7 @@ func (p *Process) Start() { go p.GetFileName(&out) - params := []string{ + baseParams := []string{ strings.Split(p.Url, "?list")[0], //no playlist "--newline", "--no-colors", @@ -94,12 +94,12 @@ func (p *Process) Start() { } // if user asked to manually override the output path... - if !(slices.Contains(params, "-P") || slices.Contains(params, "--paths")) { - params = append(params, "-o") - params = append(params, fmt.Sprintf("%s/%s", out.Path, out.Filename)) + if !(slices.Contains(p.Params, "-P") || slices.Contains(p.Params, "--paths")) { + p.Params = append(p.Params, "-o") + p.Params = append(p.Params, fmt.Sprintf("%s/%s", out.Path, out.Filename)) } - params = append(params, p.Params...) + params := append(baseParams, p.Params...) // ----------------- main block ----------------- // cmd := exec.Command(config.Instance().DownloaderPath, params...) @@ -114,8 +114,7 @@ func (p *Process) Start() { panic(err) } - err = cmd.Start() - if err != nil { + if err := cmd.Start(); err != nil { p.Logger.Error( "failed to start yt-dlp process", slog.String("err", err.Error()),