async metadata provider

This commit is contained in:
2024-05-16 11:05:08 +02:00
parent da84eb14f3
commit c10f60d4d4
3 changed files with 15 additions and 3 deletions

View File

@@ -41,7 +41,7 @@ var (
func init() {
flag.StringVar(&host, "host", "0.0.0.0", "Host where server will listen at")
flag.IntVar(&port, "port", 3033, "Port where server will listen at")
flag.IntVar(&queueSize, "qs", runtime.NumCPU(), "Download queue size")
flag.IntVar(&queueSize, "qs", 2, "Queue size (concurrent downloads)")
flag.StringVar(&configFile, "conf", "./config.yml", "Config file path")
flag.StringVar(&downloadPath, "out", ".", "Where files will be saved")
@@ -79,6 +79,11 @@ func main() {
c.Username = username
c.Password = password
// limit concurrent downloads for systems with 2 or less logical cores
if runtime.NumCPU() <= 2 {
c.QueueSize = 1
}
// if config file is found it will be merged with the current config struct
if err := c.LoadFile(configFile); err != nil {
log.Println(cli.BgRed, "config", cli.Reset, err)

View File

@@ -78,7 +78,13 @@ func (m *MessageQueue) downloadConsumer() {
func (m *MessageQueue) metadataSubscriber() {
// How many concurrent metadata fetcher jobs are spawned
// Since there's ongoing downloads, 1 job at time seems a good compromise
m.eventBus.Subscribe(queueName, func(p *Process) {
sem := semaphore.NewWeighted(1)
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
//TODO: provide valid context
sem.Acquire(context.TODO(), 1)
defer sem.Release(1)
m.logger.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "metadataConsumer"),
@@ -91,5 +97,5 @@ func (m *MessageQueue) metadataSubscriber() {
slog.String("err", err.Error()),
)
}
})
}, false)
}

View File

@@ -82,6 +82,7 @@ func (p *Process) Start() {
buildFilename(&p.Output)
//TODO: it spawn another one yt-dlp process, too slow.
go p.GetFileName(&out)
baseParams := []string{