fixed potential goroutine deadlock in message queue
This commit is contained in:
@@ -12,7 +12,6 @@ const queueName = "process:pending"
|
|||||||
type MessageQueue struct {
|
type MessageQueue struct {
|
||||||
eventBus evbus.Bus
|
eventBus evbus.Bus
|
||||||
consumerCh chan struct{}
|
consumerCh chan struct{}
|
||||||
metadataCh chan struct{}
|
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -27,37 +26,38 @@ func NewMessageQueue(l *slog.Logger) *MessageQueue {
|
|||||||
panic("invalid queue size")
|
panic("invalid queue size")
|
||||||
}
|
}
|
||||||
|
|
||||||
mqs := 1
|
|
||||||
|
|
||||||
return &MessageQueue{
|
return &MessageQueue{
|
||||||
eventBus: evbus.New(),
|
eventBus: evbus.New(),
|
||||||
consumerCh: make(chan struct{}, qs),
|
consumerCh: make(chan struct{}, qs),
|
||||||
metadataCh: make(chan struct{}, mqs),
|
|
||||||
logger: l,
|
logger: l,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish a message to the queue and set the task to a peding state.
|
// Publish a message to the queue and set the task to a peding state.
|
||||||
func (m *MessageQueue) Publish(p *Process) {
|
func (m *MessageQueue) Publish(p *Process) {
|
||||||
|
// needs to have an id set before
|
||||||
p.SetPending()
|
p.SetPending()
|
||||||
|
|
||||||
m.eventBus.Publish(queueName, p)
|
m.eventBus.Publish(queueName, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MessageQueue) SetupConsumers() {
|
||||||
|
go m.downloadConsumer()
|
||||||
|
go m.metadataSubscriber()
|
||||||
|
}
|
||||||
|
|
||||||
// Setup the consumer listener which subscribes to the changes to the producer
|
// Setup the consumer listener which subscribes to the changes to the producer
|
||||||
// channel and triggers the "download" action.
|
// channel and triggers the "download" action.
|
||||||
func (m *MessageQueue) Subscriber() {
|
func (m *MessageQueue) downloadConsumer() {
|
||||||
go m.metadataSubscriber()
|
|
||||||
|
|
||||||
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
|
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
|
||||||
|
m.consumerCh <- struct{}{}
|
||||||
|
|
||||||
m.logger.Info("received process from event bus",
|
m.logger.Info("received process from event bus",
|
||||||
slog.String("bus", queueName),
|
slog.String("bus", queueName),
|
||||||
slog.String("consumer", "downloadConsumer"),
|
slog.String("consumer", "downloadConsumer"),
|
||||||
slog.String("id", p.Id),
|
slog.String("id", p.Id),
|
||||||
)
|
)
|
||||||
|
|
||||||
go func() {
|
|
||||||
m.consumerCh <- struct{}{}
|
|
||||||
p.Start()
|
p.Start()
|
||||||
|
|
||||||
m.logger.Info("started process",
|
m.logger.Info("started process",
|
||||||
@@ -66,23 +66,15 @@ func (m *MessageQueue) Subscriber() {
|
|||||||
)
|
)
|
||||||
|
|
||||||
<-m.consumerCh
|
<-m.consumerCh
|
||||||
}()
|
|
||||||
}, false)
|
}, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empties the message queue
|
|
||||||
func (m *MessageQueue) Empty() {
|
|
||||||
for range m.consumerCh {
|
|
||||||
<-m.consumerCh
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup the metadata consumer listener which subscribes to the changes to the
|
// Setup the metadata consumer listener which subscribes to the changes to the
|
||||||
// producer channel and adds metadata to each download.
|
// producer channel and adds metadata to each download.
|
||||||
func (m *MessageQueue) metadataSubscriber() {
|
func (m *MessageQueue) metadataSubscriber() {
|
||||||
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
|
// How many concurrent metadata fetcher jobs are spawned
|
||||||
m.metadataCh <- struct{}{}
|
// Since there's ongoing downloads, 1 job at time seems a good compromise
|
||||||
|
m.eventBus.Subscribe(queueName, func(p *Process) {
|
||||||
m.logger.Info("received process from event bus",
|
m.logger.Info("received process from event bus",
|
||||||
slog.String("bus", queueName),
|
slog.String("bus", queueName),
|
||||||
slog.String("consumer", "metadataConsumer"),
|
slog.String("consumer", "metadataConsumer"),
|
||||||
@@ -95,7 +87,12 @@ func (m *MessageQueue) metadataSubscriber() {
|
|||||||
slog.String("err", err.Error()),
|
slog.String("err", err.Error()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
<-m.metadataCh
|
}
|
||||||
}, false)
|
|
||||||
|
// Empties the message queue
|
||||||
|
func (m *MessageQueue) Empty() {
|
||||||
|
for range m.consumerCh {
|
||||||
|
<-m.consumerCh
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -88,8 +88,8 @@ func RunBlocking(cfg *RunConfig) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
mq := internal.NewMessageQueue(logger)
|
mq := internal.NewMessageQueue(logger)
|
||||||
|
mq.SetupConsumers()
|
||||||
|
|
||||||
go mq.Subscriber()
|
|
||||||
go mdb.Restore(mq, logger)
|
go mdb.Restore(mq, logger)
|
||||||
|
|
||||||
srv := newServer(serverConfig{
|
srv := newServer(serverConfig{
|
||||||
|
|||||||
Reference in New Issue
Block a user