diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index da08f03..7421a10 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -7,7 +7,7 @@ import ( ) type MessageQueue struct { - ch chan *Process + producerCh chan *Process consumerCh chan struct{} } @@ -23,7 +23,7 @@ func NewMessageQueue() *MessageQueue { } return &MessageQueue{ - ch: make(chan *Process, size), + producerCh: make(chan *Process, size), consumerCh: make(chan struct{}, size), } } @@ -31,12 +31,13 @@ func NewMessageQueue() *MessageQueue { // Publish a message to the queue and set the task to a peding state. func (m *MessageQueue) Publish(p *Process) { go p.SetPending() - m.ch <- p + m.producerCh <- p } -// Setup the consumer listened which "subscribes" to the queue events. -func (m *MessageQueue) SetupConsumer() { - for msg := range m.ch { +// Setup the consumer listener which subscribes to the changes to the producer +// channel and triggers the "download" action. +func (m *MessageQueue) Subscriber() { + for msg := range m.producerCh { m.consumerCh <- struct{}{} go func(p *Process) { p.Start() diff --git a/server/server.go b/server/server.go index c3c77c7..04776bc 100644 --- a/server/server.go +++ b/server/server.go @@ -28,10 +28,9 @@ func RunBlocking(port int, frontend fs.FS) { db.Restore() mq := internal.NewMessageQueue() - go mq.SetupConsumer() + go mq.Subscriber() service := ytdlpRPC.Container(&db, mq) - rpc.Register(service) app := fiber.New()