code refactor

This commit is contained in:
2023-07-26 09:51:00 +02:00
parent 49ded2e0f6
commit 82b22db7ae
2 changed files with 8 additions and 8 deletions

View File

@@ -7,7 +7,7 @@ import (
)
type MessageQueue struct {
ch chan *Process
producerCh chan *Process
consumerCh chan struct{}
}
@@ -23,7 +23,7 @@ func NewMessageQueue() *MessageQueue {
}
return &MessageQueue{
ch: make(chan *Process, size),
producerCh: make(chan *Process, size),
consumerCh: make(chan struct{}, size),
}
}
@@ -31,12 +31,13 @@ func NewMessageQueue() *MessageQueue {
// Publish a message to the queue and set the task to a peding state.
func (m *MessageQueue) Publish(p *Process) {
go p.SetPending()
m.ch <- p
m.producerCh <- p
}
// Setup the consumer listened which "subscribes" to the queue events.
func (m *MessageQueue) SetupConsumer() {
for msg := range m.ch {
// Setup the consumer listener which subscribes to the changes to the producer
// channel and triggers the "download" action.
func (m *MessageQueue) Subscriber() {
for msg := range m.producerCh {
m.consumerCh <- struct{}{}
go func(p *Process) {
p.Start()

View File

@@ -28,10 +28,9 @@ func RunBlocking(port int, frontend fs.FS) {
db.Restore()
mq := internal.NewMessageQueue()
go mq.SetupConsumer()
go mq.Subscriber()
service := ytdlpRPC.Container(&db, mq)
rpc.Register(service)
app := fiber.New()