code refactoring, dependencies update
This commit is contained in:
@@ -64,7 +64,8 @@ func (m *MessageQueue) downloadConsumer() {
|
||||
|
||||
if p.Progress.Status != StatusCompleted {
|
||||
if p.Livestream {
|
||||
go p.Start() // livestreams have higher priorty and will ignore the queue
|
||||
// livestreams have higher priorty and they ignore the semaphore
|
||||
go p.Start()
|
||||
} else {
|
||||
p.Start()
|
||||
}
|
||||
|
||||
@@ -168,10 +168,6 @@ func (p *Process) Start() {
|
||||
ETA: progress.Eta,
|
||||
}
|
||||
|
||||
if p.Livestream {
|
||||
p.Progress.Status = StatusLivestream
|
||||
}
|
||||
|
||||
slog.Info("progress",
|
||||
slog.String("id", p.getShortId()),
|
||||
slog.String("url", p.Url),
|
||||
|
||||
@@ -51,11 +51,13 @@ type serverConfig struct {
|
||||
mdb *internal.MemoryDB
|
||||
db *sql.DB
|
||||
mq *internal.MessageQueue
|
||||
lm *livestream.Monitor
|
||||
}
|
||||
|
||||
func RunBlocking(cfg *RunConfig) {
|
||||
var mdb internal.MemoryDB
|
||||
|
||||
// ---- LOGGING ---------------------------------------------------
|
||||
logWriters := []io.Writer{
|
||||
os.Stdout,
|
||||
logging.NewObservableLogger(), // for web-ui
|
||||
@@ -84,6 +86,7 @@ func RunBlocking(cfg *RunConfig) {
|
||||
|
||||
// make the new logger the default one with all the new writers
|
||||
slog.SetDefault(logger)
|
||||
// ----------------------------------------------------------------
|
||||
|
||||
db, err := sql.Open("sqlite", cfg.DBPath)
|
||||
if err != nil {
|
||||
@@ -99,9 +102,12 @@ func RunBlocking(cfg *RunConfig) {
|
||||
panic(err)
|
||||
}
|
||||
mq.SetupConsumers()
|
||||
|
||||
go mdb.Restore(mq)
|
||||
|
||||
lm := livestream.NewMonitor(mq, &mdb)
|
||||
go lm.Schedule()
|
||||
go lm.Restore()
|
||||
|
||||
srv := newServer(serverConfig{
|
||||
frontend: cfg.App,
|
||||
swagger: cfg.Swagger,
|
||||
@@ -110,6 +116,7 @@ func RunBlocking(cfg *RunConfig) {
|
||||
mdb: &mdb,
|
||||
mq: mq,
|
||||
db: db,
|
||||
lm: lm,
|
||||
})
|
||||
|
||||
go gracefulShutdown(srv, &mdb)
|
||||
@@ -140,18 +147,14 @@ func RunBlocking(cfg *RunConfig) {
|
||||
}
|
||||
|
||||
func newServer(c serverConfig) *http.Server {
|
||||
lm := livestream.NewMonitor(c.mq, c.mdb)
|
||||
go lm.Schedule()
|
||||
go lm.Restore()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
lm.Persist()
|
||||
c.lm.Persist()
|
||||
time.Sleep(time.Minute * 5)
|
||||
}
|
||||
}()
|
||||
|
||||
service := ytdlpRPC.Container(c.mdb, c.mq, lm)
|
||||
service := ytdlpRPC.Container(c.mdb, c.mq, c.lm)
|
||||
rpc.Register(service)
|
||||
|
||||
r := chi.NewRouter()
|
||||
@@ -236,6 +239,7 @@ func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) {
|
||||
|
||||
defer func() {
|
||||
db.Persist()
|
||||
|
||||
stop()
|
||||
srv.Shutdown(context.Background())
|
||||
}()
|
||||
|
||||
Reference in New Issue
Block a user