prevent completed download restart

This commit is contained in:
2024-07-02 11:37:06 +02:00
parent 98f0ea3bd2
commit 3edebbdb6c
4 changed files with 34 additions and 20 deletions

View File

@@ -65,7 +65,9 @@ func (m *MessageQueue) downloadConsumer() {
slog.String("id", p.getShortId()), slog.String("id", p.getShortId()),
) )
p.Start() if p.Progress.Status != StatusCompleted {
p.Start()
}
m.logger.Info("started process", m.logger.Info("started process",
slog.String("bus", queueName), slog.String("bus", queueName),
@@ -92,6 +94,14 @@ func (m *MessageQueue) metadataSubscriber() {
slog.String("id", p.getShortId()), slog.String("id", p.getShortId()),
) )
if p.Progress.Status == StatusCompleted {
m.logger.Warn("proccess has an illegal state",
slog.String("id", p.getShortId()),
slog.Int("status", p.Progress.Status),
)
return
}
if err := p.SetMetadata(); err != nil { if err := p.SetMetadata(); err != nil {
m.logger.Error("failed to retrieve metadata", m.logger.Error("failed to retrieve metadata",
slog.String("id", p.getShortId()), slog.String("id", p.getShortId()),

View File

@@ -198,6 +198,9 @@ func (p *Process) Complete() {
// Kill a process and remove it from the memory // Kill a process and remove it from the memory
func (p *Process) Kill() error { func (p *Process) Kill() error {
defer func() {
p.Progress.Status = StatusCompleted
}()
// yt-dlp uses multiple child process the parent process // yt-dlp uses multiple child process the parent process
// has been spawned with setPgid = true. To properly kill // has been spawned with setPgid = true. To properly kill
// all subprocesses a SIGTERM need to be sent to the correct // all subprocesses a SIGTERM need to be sent to the correct

View File

@@ -2,7 +2,6 @@ package rpc
import ( import (
"io" "io"
"log"
"net/http" "net/http"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@@ -30,8 +29,6 @@ func WebSocket(w http.ResponseWriter, r *http.Request) {
for { for {
mtype, reader, err := c.NextReader() mtype, reader, err := c.NextReader()
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(err)
break break
} }
@@ -40,7 +37,6 @@ func WebSocket(w http.ResponseWriter, r *http.Request) {
writer, err := c.NextWriter(mtype) writer, err := c.NextWriter(mtype)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
log.Println(err)
break break
} }

View File

@@ -121,8 +121,11 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
s.logger.Info("Killing all spawned processes") s.logger.Info("Killing all spawned processes")
var ( var (
keys = s.db.Keys() keys = s.db.Keys()
err error removeFunc = func(p *internal.Process) error {
defer s.db.Delete(p.Id)
return p.Kill()
}
) )
for _, key := range *keys { for _, key := range *keys {
@@ -131,22 +134,24 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
return err return err
} }
if proc != nil { if proc == nil {
err := proc.Kill() s.db.Delete(key)
if err != nil { continue
s.logger.Info(
"failed killing process",
slog.String("id", proc.Id),
slog.String("err", err.Error()),
)
return err
}
s.logger.Info("succesfully killed process", slog.String("id", proc.Id))
} }
if err := removeFunc(proc); err != nil {
s.logger.Info(
"failed killing process",
slog.String("id", proc.Id),
slog.Any("err", err),
)
continue
}
s.logger.Info("succesfully killed process", slog.String("id", proc.Id))
} }
return err return nil
} }
// Remove a process from the db rendering it unusable if active // Remove a process from the db rendering it unusable if active