changed to an "event bus" approach in the message queue, fixed unset download parameters.

This commit is contained in:
2024-05-14 11:21:02 +02:00
parent 62eadb52a6
commit 8f10baf09b
6 changed files with 70 additions and 37 deletions

1
go.mod
View File

@@ -3,6 +3,7 @@ module github.com/marcopeocchi/yt-dlp-web-ui
go 1.22 go 1.22
require ( require (
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/go-chi/chi/v5 v5.0.12 github.com/go-chi/chi/v5 v5.0.12
github.com/go-chi/cors v1.2.1 github.com/go-chi/cors v1.2.1
github.com/golang-jwt/jwt/v5 v5.2.1 github.com/golang-jwt/jwt/v5 v5.2.1

2
go.sum
View File

@@ -1,3 +1,5 @@
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII=
github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=

View File

@@ -39,10 +39,14 @@ var (
) )
func init() { func init() {
qs := runtime.NumCPU() - 1
if qs == 0 {
qs = 1
}
flag.StringVar(&host, "host", "0.0.0.0", "Host where server will listen at") flag.StringVar(&host, "host", "0.0.0.0", "Host where server will listen at")
flag.IntVar(&port, "port", 3033, "Port where server will listen at") flag.IntVar(&port, "port", 3033, "Port where server will listen at")
flag.IntVar(&queueSize, "qs", runtime.NumCPU(), "Download queue size") flag.IntVar(&queueSize, "qs", qs, "Download queue size")
flag.StringVar(&configFile, "conf", "./config.yml", "Config file path") flag.StringVar(&configFile, "conf", "./config.yml", "Config file path")
flag.StringVar(&downloadPath, "out", ".", "Where files will be saved") flag.StringVar(&downloadPath, "out", ".", "Where files will be saved")

View File

@@ -2,13 +2,15 @@ package internal
import ( import (
"log/slog" "log/slog"
"runtime"
evbus "github.com/asaskevich/EventBus"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config" "github.com/marcopeocchi/yt-dlp-web-ui/server/config"
) )
const queueName = "process:pending"
type MessageQueue struct { type MessageQueue struct {
producerCh chan *Process eventBus evbus.Bus
consumerCh chan struct{} consumerCh chan struct{}
metadataCh chan struct{} metadataCh chan struct{}
logger *slog.Logger logger *slog.Logger
@@ -16,19 +18,21 @@ type MessageQueue struct {
// Creates a new message queue. // Creates a new message queue.
// By default it will be created with a size equals to nthe number of logical // By default it will be created with a size equals to nthe number of logical
// CPU cores. // CPU cores -1.
// The queue size can be set via the qs flag. // The queue size can be set via the qs flag.
func NewMessageQueue(l *slog.Logger) *MessageQueue { func NewMessageQueue(l *slog.Logger) *MessageQueue {
size := config.Instance().QueueSize qs := config.Instance().QueueSize
if size <= 0 { if qs <= 0 {
panic("invalid queue size") panic("invalid queue size")
} }
mqs := 1
return &MessageQueue{ return &MessageQueue{
producerCh: make(chan *Process, size), eventBus: evbus.New(),
consumerCh: make(chan struct{}, size), consumerCh: make(chan struct{}, qs),
metadataCh: make(chan struct{}, runtime.NumCPU()), metadataCh: make(chan struct{}, mqs),
logger: l, logger: l,
} }
} }
@@ -36,39 +40,62 @@ func NewMessageQueue(l *slog.Logger) *MessageQueue {
// Publish a message to the queue and set the task to a peding state. // Publish a message to the queue and set the task to a peding state.
func (m *MessageQueue) Publish(p *Process) { func (m *MessageQueue) Publish(p *Process) {
p.SetPending() p.SetPending()
m.metadataCh <- struct{}{}
go func() { m.eventBus.Publish(queueName, p)
if err := p.SetMetadata(); err != nil {
m.logger.Error(
"failed to retrieve metadata",
slog.String("err", err.Error()),
)
}
<-m.metadataCh
}()
m.producerCh <- p
} }
// Setup the consumer listener which subscribes to the changes to the producer // Setup the consumer listener which subscribes to the changes to the producer
// channel and triggers the "download" action. // channel and triggers the "download" action.
func (m *MessageQueue) Subscriber() { func (m *MessageQueue) Subscriber() {
for msg := range m.producerCh { go m.metadataSubscriber()
m.consumerCh <- struct{}{}
go func(p *Process) { m.eventBus.SubscribeAsync(queueName, func(p *Process) {
m.logger.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "downloadConsumer"),
slog.String("id", p.Id),
)
go func() {
m.consumerCh <- struct{}{}
p.Start() p.Start()
m.logger.Info("started process",
slog.String("bus", queueName),
slog.String("id", p.Id),
)
<-m.consumerCh <-m.consumerCh
}(msg) }()
} }, false)
} }
// Empties the message queue // Empties the message queue
func (m *MessageQueue) Empty() { func (m *MessageQueue) Empty() {
for range m.producerCh {
<-m.producerCh
}
for range m.consumerCh { for range m.consumerCh {
<-m.consumerCh <-m.consumerCh
} }
} }
// Setup the metadata consumer listener which subscribes to the changes to the
// producer channel and adds metadata to each download.
func (m *MessageQueue) metadataSubscriber() {
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
m.metadataCh <- struct{}{}
m.logger.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "metadataConsumer"),
slog.String("id", p.Id),
)
if err := p.SetMetadata(); err != nil {
m.logger.Error("failed to retrieve metadata",
slog.String("id", p.Id),
slog.String("err", err.Error()),
)
}
<-m.metadataCh
}, false)
}

View File

@@ -35,7 +35,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
return err return err
} }
logger.Info("decoding metadata", slog.String("url", req.URL)) logger.Info("decoding playlist metadata", slog.String("url", req.URL))
if err := json.NewDecoder(stdout).Decode(&m); err != nil { if err := json.NewDecoder(stdout).Decode(&m); err != nil {
return err return err

View File

@@ -84,7 +84,7 @@ func (p *Process) Start() {
go p.GetFileName(&out) go p.GetFileName(&out)
params := []string{ baseParams := []string{
strings.Split(p.Url, "?list")[0], //no playlist strings.Split(p.Url, "?list")[0], //no playlist
"--newline", "--newline",
"--no-colors", "--no-colors",
@@ -94,12 +94,12 @@ func (p *Process) Start() {
} }
// if user asked to manually override the output path... // if user asked to manually override the output path...
if !(slices.Contains(params, "-P") || slices.Contains(params, "--paths")) { if !(slices.Contains(p.Params, "-P") || slices.Contains(p.Params, "--paths")) {
params = append(params, "-o") p.Params = append(p.Params, "-o")
params = append(params, fmt.Sprintf("%s/%s", out.Path, out.Filename)) p.Params = append(p.Params, fmt.Sprintf("%s/%s", out.Path, out.Filename))
} }
params = append(params, p.Params...) params := append(baseParams, p.Params...)
// ----------------- main block ----------------- // // ----------------- main block ----------------- //
cmd := exec.Command(config.Instance().DownloaderPath, params...) cmd := exec.Command(config.Instance().DownloaderPath, params...)
@@ -114,8 +114,7 @@ func (p *Process) Start() {
panic(err) panic(err)
} }
err = cmd.Start() if err := cmd.Start(); err != nil {
if err != nil {
p.Logger.Error( p.Logger.Error(
"failed to start yt-dlp process", "failed to start yt-dlp process",
slog.String("err", err.Error()), slog.String("err", err.Error()),