From c10f60d4d4409e72e32391ef218eb0a251b97b00 Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Thu, 16 May 2024 11:05:08 +0200 Subject: [PATCH] async metadata provider --- main.go | 7 ++++++- server/internal/message_queue.go | 10 ++++++++-- server/internal/process.go | 1 + 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 02b81a7..8e68425 100644 --- a/main.go +++ b/main.go @@ -41,7 +41,7 @@ var ( func init() { flag.StringVar(&host, "host", "0.0.0.0", "Host where server will listen at") flag.IntVar(&port, "port", 3033, "Port where server will listen at") - flag.IntVar(&queueSize, "qs", runtime.NumCPU(), "Download queue size") + flag.IntVar(&queueSize, "qs", 2, "Queue size (concurrent downloads)") flag.StringVar(&configFile, "conf", "./config.yml", "Config file path") flag.StringVar(&downloadPath, "out", ".", "Where files will be saved") @@ -79,6 +79,11 @@ func main() { c.Username = username c.Password = password + // limit concurrent downloads for systems with 2 or less logical cores + if runtime.NumCPU() <= 2 { + c.QueueSize = 1 + } + // if config file is found it will be merged with the current config struct if err := c.LoadFile(configFile); err != nil { log.Println(cli.BgRed, "config", cli.Reset, err) diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index 4c21e1a..a048658 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -78,7 +78,13 @@ func (m *MessageQueue) downloadConsumer() { func (m *MessageQueue) metadataSubscriber() { // How many concurrent metadata fetcher jobs are spawned // Since there's ongoing downloads, 1 job at time seems a good compromise - m.eventBus.Subscribe(queueName, func(p *Process) { + sem := semaphore.NewWeighted(1) + + m.eventBus.SubscribeAsync(queueName, func(p *Process) { + //TODO: provide valid context + sem.Acquire(context.TODO(), 1) + defer sem.Release(1) + m.logger.Info("received process from event bus", slog.String("bus", queueName), slog.String("consumer", "metadataConsumer"), @@ -91,5 +97,5 @@ func (m *MessageQueue) metadataSubscriber() { slog.String("err", err.Error()), ) } - }) + }, false) } diff --git a/server/internal/process.go b/server/internal/process.go index 9b73d62..fa3de89 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -82,6 +82,7 @@ func (p *Process) Start() { buildFilename(&p.Output) + //TODO: it spawn another one yt-dlp process, too slow. go p.GetFileName(&out) baseParams := []string{