diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index 302c8fe..b36edb5 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -2,6 +2,7 @@ package internal import ( "log/slog" + "runtime" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" ) @@ -9,6 +10,7 @@ import ( type MessageQueue struct { producerCh chan *Process consumerCh chan struct{} + metadataCh chan struct{} logger *slog.Logger } @@ -26,6 +28,7 @@ func NewMessageQueue(l *slog.Logger) *MessageQueue { return &MessageQueue{ producerCh: make(chan *Process, size), consumerCh: make(chan struct{}, size), + metadataCh: make(chan struct{}, runtime.NumCPU()), logger: l, } } @@ -33,6 +36,8 @@ func NewMessageQueue(l *slog.Logger) *MessageQueue { // Publish a message to the queue and set the task to a peding state. func (m *MessageQueue) Publish(p *Process) { p.SetPending() + m.metadataCh <- struct{}{} + go func() { if err := p.SetMetadata(); err != nil { m.logger.Error( @@ -40,14 +45,9 @@ func (m *MessageQueue) Publish(p *Process) { slog.String("err", err.Error()), ) } + <-m.metadataCh }() - m.producerCh <- p -} -// Publish a message to the queue and set the task to a peding state. -// ENSURE P IS PART OF A PLAYLIST -// Needs a further review -func (m *MessageQueue) PublishPlaylistEntry(p *Process) { m.producerCh <- p } diff --git a/server/internal/playlist.go b/server/internal/playlist.go index 6d708ba..eff9513 100644 --- a/server/internal/playlist.go +++ b/server/internal/playlist.go @@ -55,9 +55,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger slog.Int("count", m.Count), ) - for i, meta := range m.Entries { - delta := time.Second.Microseconds() * int64(i+1) - + for _, meta := range m.Entries { // detect playlist title from metadata since each playlist entry will be // treated as an individual download req.Rename = strings.Replace( @@ -77,11 +75,11 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger } proc.Info.URL = meta.OriginalURL - proc.Info.CreatedAt = time.Now().Add(time.Duration(delta)) + + time.Sleep(time.Millisecond) db.Set(proc) - proc.SetPending() - mq.PublishPlaylistEntry(proc) + mq.Publish(proc) } err = cmd.Wait() @@ -94,6 +92,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger Logger: logger, } + db.Set(proc) mq.Publish(proc) logger.Info("sending new process to message queue", slog.String("url", proc.Url))