From f5f0af7e1e65e86e45d9cef0ce46eb355f9c2de3 Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Thu, 16 May 2024 10:10:42 +0200 Subject: [PATCH] fixed potential goroutine deadlock in message queue --- server/internal/message_queue.go | 55 +++++++++++++++----------------- server/server.go | 2 +- 2 files changed, 27 insertions(+), 30 deletions(-) diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index 0d32a94..79a2536 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -12,7 +12,6 @@ const queueName = "process:pending" type MessageQueue struct { eventBus evbus.Bus consumerCh chan struct{} - metadataCh chan struct{} logger *slog.Logger } @@ -27,62 +26,55 @@ func NewMessageQueue(l *slog.Logger) *MessageQueue { panic("invalid queue size") } - mqs := 1 - return &MessageQueue{ eventBus: evbus.New(), consumerCh: make(chan struct{}, qs), - metadataCh: make(chan struct{}, mqs), logger: l, } } // Publish a message to the queue and set the task to a peding state. func (m *MessageQueue) Publish(p *Process) { + // needs to have an id set before p.SetPending() m.eventBus.Publish(queueName, p) } +func (m *MessageQueue) SetupConsumers() { + go m.downloadConsumer() + go m.metadataSubscriber() +} + // Setup the consumer listener which subscribes to the changes to the producer // channel and triggers the "download" action. -func (m *MessageQueue) Subscriber() { - go m.metadataSubscriber() - +func (m *MessageQueue) downloadConsumer() { m.eventBus.SubscribeAsync(queueName, func(p *Process) { + m.consumerCh <- struct{}{} + m.logger.Info("received process from event bus", slog.String("bus", queueName), slog.String("consumer", "downloadConsumer"), slog.String("id", p.Id), ) - go func() { - m.consumerCh <- struct{}{} - p.Start() + p.Start() - m.logger.Info("started process", - slog.String("bus", queueName), - slog.String("id", p.Id), - ) + m.logger.Info("started process", + slog.String("bus", queueName), + slog.String("id", p.Id), + ) - <-m.consumerCh - }() - }, false) -} - -// Empties the message queue -func (m *MessageQueue) Empty() { - for range m.consumerCh { <-m.consumerCh - } + }, false) } // Setup the metadata consumer listener which subscribes to the changes to the // producer channel and adds metadata to each download. func (m *MessageQueue) metadataSubscriber() { - m.eventBus.SubscribeAsync(queueName, func(p *Process) { - m.metadataCh <- struct{}{} - + // How many concurrent metadata fetcher jobs are spawned + // Since there's ongoing downloads, 1 job at time seems a good compromise + m.eventBus.Subscribe(queueName, func(p *Process) { m.logger.Info("received process from event bus", slog.String("bus", queueName), slog.String("consumer", "metadataConsumer"), @@ -95,7 +87,12 @@ func (m *MessageQueue) metadataSubscriber() { slog.String("err", err.Error()), ) } - - <-m.metadataCh - }, false) + }) +} + +// Empties the message queue +func (m *MessageQueue) Empty() { + for range m.consumerCh { + <-m.consumerCh + } } diff --git a/server/server.go b/server/server.go index e4c3b52..0ad2a4d 100644 --- a/server/server.go +++ b/server/server.go @@ -88,8 +88,8 @@ func RunBlocking(cfg *RunConfig) { } mq := internal.NewMessageQueue(logger) + mq.SetupConsumers() - go mq.Subscriber() go mdb.Restore(mq, logger) srv := newServer(serverConfig{