From 3edebbdb6c9f68b7c78a8bfa38c571d71071fd06 Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Tue, 2 Jul 2024 11:37:06 +0200 Subject: [PATCH] prevent completed download restart --- server/internal/message_queue.go | 12 ++++++++++- server/internal/process.go | 3 +++ server/rpc/handlers.go | 4 ---- server/rpc/service.go | 35 ++++++++++++++++++-------------- 4 files changed, 34 insertions(+), 20 deletions(-) diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index 47143b5..1e00143 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -65,7 +65,9 @@ func (m *MessageQueue) downloadConsumer() { slog.String("id", p.getShortId()), ) - p.Start() + if p.Progress.Status != StatusCompleted { + p.Start() + } m.logger.Info("started process", slog.String("bus", queueName), @@ -92,6 +94,14 @@ func (m *MessageQueue) metadataSubscriber() { slog.String("id", p.getShortId()), ) + if p.Progress.Status == StatusCompleted { + m.logger.Warn("proccess has an illegal state", + slog.String("id", p.getShortId()), + slog.Int("status", p.Progress.Status), + ) + return + } + if err := p.SetMetadata(); err != nil { m.logger.Error("failed to retrieve metadata", slog.String("id", p.getShortId()), diff --git a/server/internal/process.go b/server/internal/process.go index 5ad37ed..933f9c4 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -198,6 +198,9 @@ func (p *Process) Complete() { // Kill a process and remove it from the memory func (p *Process) Kill() error { + defer func() { + p.Progress.Status = StatusCompleted + }() // yt-dlp uses multiple child process the parent process // has been spawned with setPgid = true. To properly kill // all subprocesses a SIGTERM need to be sent to the correct diff --git a/server/rpc/handlers.go b/server/rpc/handlers.go index 9d760ae..5cd209e 100644 --- a/server/rpc/handlers.go +++ b/server/rpc/handlers.go @@ -2,7 +2,6 @@ package rpc import ( "io" - "log" "net/http" "github.com/gorilla/websocket" @@ -30,8 +29,6 @@ func WebSocket(w http.ResponseWriter, r *http.Request) { for { mtype, reader, err := c.NextReader() if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - log.Println(err) break } @@ -40,7 +37,6 @@ func WebSocket(w http.ResponseWriter, r *http.Request) { writer, err := c.NextWriter(mtype) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) - log.Println(err) break } diff --git a/server/rpc/service.go b/server/rpc/service.go index 5c0f67e..b786c7a 100644 --- a/server/rpc/service.go +++ b/server/rpc/service.go @@ -121,8 +121,11 @@ func (s *Service) KillAll(args NoArgs, killed *string) error { s.logger.Info("Killing all spawned processes") var ( - keys = s.db.Keys() - err error + keys = s.db.Keys() + removeFunc = func(p *internal.Process) error { + defer s.db.Delete(p.Id) + return p.Kill() + } ) for _, key := range *keys { @@ -131,22 +134,24 @@ func (s *Service) KillAll(args NoArgs, killed *string) error { return err } - if proc != nil { - err := proc.Kill() - if err != nil { - s.logger.Info( - "failed killing process", - slog.String("id", proc.Id), - slog.String("err", err.Error()), - ) - return err - } - - s.logger.Info("succesfully killed process", slog.String("id", proc.Id)) + if proc == nil { + s.db.Delete(key) + continue } + + if err := removeFunc(proc); err != nil { + s.logger.Info( + "failed killing process", + slog.String("id", proc.Id), + slog.Any("err", err), + ) + continue + } + + s.logger.Info("succesfully killed process", slog.String("id", proc.Id)) } - return err + return nil } // Remove a process from the db rendering it unusable if active