refactoring: config struct & pipelines
This commit is contained in:
@@ -5,101 +5,119 @@ import (
|
||||
"errors"
|
||||
"log/slog"
|
||||
|
||||
evbus "github.com/asaskevich/EventBus"
|
||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
|
||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/metadata"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
const queueName = "process:pending"
|
||||
|
||||
type MessageQueue struct {
|
||||
concurrency int
|
||||
eventBus evbus.Bus
|
||||
concurrency int
|
||||
downloadQueue chan downloaders.Downloader
|
||||
metadataQueue chan downloaders.Downloader
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// Creates a new message queue.
|
||||
// By default it will be created with a size equals to nthe number of logical
|
||||
// CPU cores -1.
|
||||
// The queue size can be set via the qs flag.
|
||||
func NewMessageQueue() (*MessageQueue, error) {
|
||||
qs := config.Instance().QueueSize
|
||||
|
||||
qs := config.Instance().Server.QueueSize
|
||||
if qs <= 0 {
|
||||
return nil, errors.New("invalid queue size")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &MessageQueue{
|
||||
concurrency: qs,
|
||||
eventBus: evbus.New(),
|
||||
concurrency: qs,
|
||||
downloadQueue: make(chan downloaders.Downloader, qs*2),
|
||||
metadataQueue: make(chan downloaders.Downloader, qs*4),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Publish a message to the queue and set the task to a peding state.
|
||||
func (m *MessageQueue) Publish(p downloaders.Downloader) {
|
||||
// needs to have an id set before
|
||||
p.SetPending(true)
|
||||
// Publish download job
|
||||
func (m *MessageQueue) Publish(d downloaders.Downloader) {
|
||||
d.SetPending(true)
|
||||
|
||||
m.eventBus.Publish(queueName, p)
|
||||
select {
|
||||
case m.downloadQueue <- d:
|
||||
slog.Info("published download", slog.String("id", d.GetId()))
|
||||
case <-m.ctx.Done():
|
||||
slog.Warn("queue stopped, dropping download", slog.String("id", d.GetId()))
|
||||
}
|
||||
}
|
||||
|
||||
// Workers: download + metadata
|
||||
func (m *MessageQueue) SetupConsumers() {
|
||||
go m.downloadConsumer()
|
||||
go m.metadataSubscriber()
|
||||
// N parallel workers for downloadQueue
|
||||
for i := 0; i < m.concurrency; i++ {
|
||||
go m.downloadWorker(i)
|
||||
}
|
||||
|
||||
// 1 serial worker for metadata
|
||||
go m.metadataWorker()
|
||||
}
|
||||
|
||||
// Setup the consumer listener which subscribes to the changes to the producer
|
||||
// channel and triggers the "download" action.
|
||||
func (m *MessageQueue) downloadConsumer() {
|
||||
sem := semaphore.NewWeighted(int64(m.concurrency))
|
||||
|
||||
m.eventBus.SubscribeAsync(queueName, func(p downloaders.Downloader) {
|
||||
sem.Acquire(context.Background(), 1)
|
||||
defer sem.Release(1)
|
||||
|
||||
slog.Info("received process from event bus",
|
||||
slog.String("bus", queueName),
|
||||
slog.String("consumer", "downloadConsumer"),
|
||||
slog.String("id", p.GetId()),
|
||||
)
|
||||
|
||||
if !p.IsCompleted() {
|
||||
slog.Info("started process",
|
||||
slog.String("bus", queueName),
|
||||
slog.String("id", p.GetId()),
|
||||
)
|
||||
p.Start()
|
||||
}
|
||||
}, false)
|
||||
}
|
||||
|
||||
// Setup the metadata consumer listener which subscribes to the changes to the
|
||||
// producer channel and adds metadata to each download.
|
||||
func (m *MessageQueue) metadataSubscriber() {
|
||||
// How many concurrent metadata fetcher jobs are spawned
|
||||
// Since there's ongoing downloads, 1 job at time seems a good compromise
|
||||
sem := semaphore.NewWeighted(1)
|
||||
|
||||
m.eventBus.SubscribeAsync(queueName, func(p downloaders.Downloader) {
|
||||
sem.Acquire(context.Background(), 1)
|
||||
defer sem.Release(1)
|
||||
|
||||
slog.Info("received process from event bus",
|
||||
slog.String("bus", queueName),
|
||||
slog.String("consumer", "metadataConsumer"),
|
||||
slog.String("id", p.GetId()),
|
||||
)
|
||||
|
||||
if p.IsCompleted() {
|
||||
slog.Warn("proccess has an illegal state",
|
||||
slog.String("id", p.GetId()),
|
||||
slog.String("status", "completed"),
|
||||
)
|
||||
// Worker dei download
|
||||
func (m *MessageQueue) downloadWorker(workerId int) {
|
||||
for {
|
||||
select {
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
case p := <-m.downloadQueue:
|
||||
if p == nil {
|
||||
continue
|
||||
}
|
||||
if p.IsCompleted() {
|
||||
continue
|
||||
}
|
||||
|
||||
slog.Info("download worker started",
|
||||
slog.Int("worker", workerId),
|
||||
slog.String("id", p.GetId()),
|
||||
)
|
||||
|
||||
p.Start()
|
||||
|
||||
// after the download starts succesfully we pass it to the metadata queue
|
||||
select {
|
||||
case m.metadataQueue <- p:
|
||||
slog.Info("queued for metadata", slog.String("id", p.GetId()))
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
p.SetMetadata(metadata.DefaultFetcher)
|
||||
|
||||
}, false)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MessageQueue) metadataWorker() {
|
||||
for {
|
||||
select {
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
case p := <-m.metadataQueue:
|
||||
if p == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
slog.Info("metadata worker started",
|
||||
slog.String("id", p.GetId()),
|
||||
)
|
||||
|
||||
if p.IsCompleted() {
|
||||
slog.Warn("metadata skipped, illegal state",
|
||||
slog.String("id", p.GetId()),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
p.SetMetadata(metadata.DefaultFetcher)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MessageQueue) Stop() {
|
||||
m.cancel()
|
||||
close(m.downloadQueue)
|
||||
close(m.metadataQueue)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user