diff --git a/README.md b/README.md index 35d1d8c..cfa4845 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ The currently avaible settings are: - Override the output filename - Override the output path - Pass custom yt-dlp arguments safely +- Download queue (limit concurrent downloads) ![](https://i.ibb.co/YdBVcgc/image.png) ![](https://i.ibb.co/Sf102b1/image.png) @@ -84,8 +85,9 @@ Future releases will have: - ~~Multi download~~ *done* - ~~Exctract audio~~ *done* - ~~Format selection~~ *done* -- Download archive +- ~~Download archive~~ *done* - ~~ARM Build~~ *done available through ghcr.io* +- Playlist support ## Troubleshooting - **It says that it isn't connected/ip in the header is not defined.** @@ -118,6 +120,18 @@ docker run -d \ --secret your_rpc_secret ``` +If you wish for limiting the download queue size... + +e.g. limiting max 2 concurrent download. +```sh +docker run -d \ + -p 3033:3033 \ + -e JWT_SECRET randomsecret + -v /path/to/downloads:/downloads \ + marcobaobao/yt-dlp-webui \ + --qs 2 +``` + ## [Prebuilt binaries](https://github.com/marcopeocchi/yt-dlp-web-ui/releases) installation ```sh @@ -134,6 +148,25 @@ yt-dlp-webui --out /home/user/downloads --driver /opt/soemdir/yt-dlp yt-dlp-webui --conf /home/user/.config/yt-dlp-webui.conf ``` +### Arguments +```sh +Usage yt-dlp-webui: + -auth + Enable RPC authentication + -conf string + Config file path + -driver string + yt-dlp executable path (default "yt-dlp") + -out string + Where files will be saved (default ".") + -port int + Port where server will listen at (default 3033) + -qs int + Download queue size (default 8) + -secret string + Secret required for auth +``` + ### Config file By running `yt-dlp-webui` in standalone mode you have the ability to also specify a config file. The config file **will overwrite what have been passed as cli argument**. @@ -149,6 +182,7 @@ downloaderPath: /usr/local/bin/yt-dlp # Optional settings require_auth: true rpc_secret: my_random_secret +queue_size: 4 ``` ### Systemd integration diff --git a/frontend/src/components/DownloadsCardView.tsx b/frontend/src/components/DownloadsCardView.tsx index fde0134..7ac9eca 100644 --- a/frontend/src/components/DownloadsCardView.tsx +++ b/frontend/src/components/DownloadsCardView.tsx @@ -30,6 +30,7 @@ export function DownloadsCardView({ downloads, onStop }: Props) { resolution={download.info.resolution ?? ''} speed={download.progress.speed} size={download.info.filesize_approx ?? 0} + status={download.progress.process_status} /> diff --git a/frontend/src/components/DownloadsListView.tsx b/frontend/src/components/DownloadsListView.tsx index ae9e19e..3f4a26d 100644 --- a/frontend/src/components/DownloadsListView.tsx +++ b/frontend/src/components/DownloadsListView.tsx @@ -55,7 +55,11 @@ export function DownloadsListView({ downloads, onStop }: Props) { download.progress.percentage === '-1' ? 100 : Number(download.progress.percentage.replace('%', '')) } - variant="determinate" + variant={ + download.progress.process_status === 0 + ? 'indeterminate' + : 'determinate' + } color={download.progress.percentage === '-1' ? 'success' : 'primary'} /> diff --git a/frontend/src/components/StackableResult.tsx b/frontend/src/components/StackableResult.tsx index b1d1b14..4b08ed6 100644 --- a/frontend/src/components/StackableResult.tsx +++ b/frontend/src/components/StackableResult.tsx @@ -13,7 +13,7 @@ import { Typography } from '@mui/material' import { useEffect, useState } from 'react' -import { ellipsis, formatSpeedMiB, roundMiB } from '../utils' +import { ellipsis, formatSpeedMiB, mapProcessStatus, roundMiB } from '../utils' type Props = { title: string @@ -23,6 +23,7 @@ type Props = { percentage: string size: number speed: number + status: number onStop: () => void onCopy: () => void } @@ -35,6 +36,7 @@ export function StackableResult({ percentage, speed, size, + status, onStop, onCopy, }: Props) { @@ -80,7 +82,7 @@ export function StackableResult({ } diff --git a/frontend/src/types/index.d.ts b/frontend/src/types/index.d.ts index e812f72..26aecdf 100644 --- a/frontend/src/types/index.d.ts +++ b/frontend/src/types/index.d.ts @@ -37,6 +37,7 @@ type DownloadProgress = { speed: number eta: number percentage: string + process_status: number } export type RPCResult = { diff --git a/frontend/src/utils.ts b/frontend/src/utils.ts index 14cbc3c..79c8f5f 100644 --- a/frontend/src/utils.ts +++ b/frontend/src/utils.ts @@ -99,4 +99,19 @@ export const datetimeCompareFunc = (a: string, b: string) => new Date(a).getTime export function isRPCResponse(object: any): object is RPCResponse { return 'result' in object && 'id' in object +} + +export function mapProcessStatus(status: number) { + switch (status) { + case 0: + return 'Pending' + case 1: + return 'Downloading' + case 2: + return 'Completed' + case 3: + return 'Error' + default: + return 'Pending' + } } \ No newline at end of file diff --git a/main.go b/main.go index 95f4c3e..4666ea4 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "flag" "io/fs" "log" + "runtime" "github.com/marcopeocchi/yt-dlp-web-ui/server" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" @@ -12,20 +13,23 @@ import ( var ( port int + queueSize int configFile string downloadPath string downloaderPath string requireAuth bool rpcSecret string + //go:embed frontend/dist frontend embed.FS ) func init() { flag.IntVar(&port, "port", 3033, "Port where server will listen at") + flag.IntVar(&queueSize, "qs", runtime.NumCPU(), "Download queue size") - flag.StringVar(&configFile, "conf", "", "yt-dlp-WebUI config file path") + flag.StringVar(&configFile, "conf", "", "Config file path") flag.StringVar(&downloadPath, "out", ".", "Where files will be saved") flag.StringVar(&downloaderPath, "driver", "yt-dlp", "yt-dlp executable path") @@ -45,6 +49,7 @@ func main() { c := config.Instance() c.SetPort(port) + c.QueueSize(queueSize) c.DownloadPath(downloadPath) c.DownloaderPath(downloaderPath) diff --git a/server/config/parser.go b/server/config/parser.go index 8cd98c2..1f4b42a 100644 --- a/server/config/parser.go +++ b/server/config/parser.go @@ -15,6 +15,7 @@ type serverConfig struct { DownloaderPath string `yaml:"downloaderPath"` RequireAuth bool `yaml:"require_auth"` RPCSecret string `yaml:"rpc_secret"` + QueueSize int `yaml:"queue_size"` } type config struct { @@ -51,10 +52,15 @@ func (c *config) DownloaderPath(path string) { func (c *config) RequireAuth(value bool) { c.cfg.RequireAuth = value } + func (c *config) RPCSecret(secret string) { c.cfg.RPCSecret = secret } +func (c *config) QueueSize(size int) { + c.cfg.QueueSize = size +} + var instance *config func Instance() *config { diff --git a/server/memory_db.go b/server/internal/memory_db.go similarity index 81% rename from server/memory_db.go rename to server/internal/memory_db.go index 13ef5e2..1c2e8c1 100644 --- a/server/memory_db.go +++ b/server/internal/memory_db.go @@ -1,4 +1,4 @@ -package server +package internal import ( "errors" @@ -20,7 +20,7 @@ type MemoryDB struct { // Get a process pointer given its id func (m *MemoryDB) Get(id string) (*Process, error) { - entry, ok := db.table.Load(id) + entry, ok := m.table.Load(id) if !ok { return nil, errors.New("no process found for the given key") } @@ -30,28 +30,32 @@ func (m *MemoryDB) Get(id string) (*Process, error) { // Store a pointer of a process and return its id func (m *MemoryDB) Set(process *Process) string { id := uuid.Must(uuid.NewRandom()).String() - db.table.Store(id, process) + m.table.Store(id, process) return id } // Update a process info/metadata, given the process id +// +// Deprecated: will be removed anytime soon. func (m *MemoryDB) UpdateInfo(id string, info DownloadInfo) error { - entry, ok := db.table.Load(id) + entry, ok := m.table.Load(id) if ok { entry.(*Process).Info = info - db.table.Store(id, entry) + m.table.Store(id, entry) return nil } return fmt.Errorf("can't update row with id %s", id) } // Update a process progress data, given the process id -// Used for updating completition percentage or ETA +// Used for updating completition percentage or ETA. +// +// Deprecated: will be removed anytime soon. func (m *MemoryDB) UpdateProgress(id string, progress DownloadProgress) error { - entry, ok := db.table.Load(id) + entry, ok := m.table.Load(id) if ok { entry.(*Process).Progress = progress - db.table.Store(id, entry) + m.table.Store(id, entry) return nil } return fmt.Errorf("can't update row with id %s", id) @@ -59,12 +63,12 @@ func (m *MemoryDB) UpdateProgress(id string, progress DownloadProgress) error { // Removes a process progress, given the process id func (m *MemoryDB) Delete(id string) { - db.table.Delete(id) + m.table.Delete(id) } func (m *MemoryDB) Keys() *[]string { running := []string{} - db.table.Range(func(key, value any) bool { + m.table.Range(func(key, value any) bool { running = append(running, key.(string)) return true }) @@ -74,7 +78,7 @@ func (m *MemoryDB) Keys() *[]string { // Returns a slice of all currently stored processes progess func (m *MemoryDB) All() *[]ProcessResponse { running := []ProcessResponse{} - db.table.Range(func(key, value any) bool { + m.table.Range(func(key, value any) bool { running = append(running, ProcessResponse{ Id: key.(string), Info: value.(*Process).Info, @@ -110,12 +114,12 @@ func (m *MemoryDB) Restore() { json.Unmarshal(feed, &session) for _, proc := range session.Processes { - db.table.Store(proc.Id, &Process{ - id: proc.Id, - url: proc.Info.URL, + m.table.Store(proc.Id, &Process{ + Id: proc.Id, + Url: proc.Info.URL, Info: proc.Info, Progress: proc.Progress, - mem: m, + DB: m, }) } } diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go new file mode 100644 index 0000000..da08f03 --- /dev/null +++ b/server/internal/message_queue.go @@ -0,0 +1,46 @@ +package internal + +import ( + "log" + + "github.com/marcopeocchi/yt-dlp-web-ui/server/config" +) + +type MessageQueue struct { + ch chan *Process + consumerCh chan struct{} +} + +// Creates a new message queue. +// By default it will be created with a size equals to nthe number of logical +// CPU cores. +// The queue size can be set via the qs flag. +func NewMessageQueue() *MessageQueue { + size := config.Instance().GetConfig().QueueSize + + if size <= 0 { + log.Fatalln("invalid queue size") + } + + return &MessageQueue{ + ch: make(chan *Process, size), + consumerCh: make(chan struct{}, size), + } +} + +// Publish a message to the queue and set the task to a peding state. +func (m *MessageQueue) Publish(p *Process) { + go p.SetPending() + m.ch <- p +} + +// Setup the consumer listened which "subscribes" to the queue events. +func (m *MessageQueue) SetupConsumer() { + for msg := range m.ch { + m.consumerCh <- struct{}{} + go func(p *Process) { + p.Start() + <-m.consumerCh + }(msg) + } +} diff --git a/server/process.go b/server/internal/process.go similarity index 62% rename from server/process.go rename to server/internal/process.go index 4df3b39..eea53c0 100644 --- a/server/process.go +++ b/server/internal/process.go @@ -1,4 +1,4 @@ -package server +package internal import ( "bufio" @@ -16,7 +16,6 @@ import ( "github.com/marcopeocchi/fazzoletti/slices" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" - "github.com/marcopeocchi/yt-dlp-web-ui/server/rx" ) const template = `download: @@ -30,6 +29,13 @@ var ( cfg = config.Instance() ) +const ( + StatusPending = iota + StatusDownloading + StatusCompleted + StatusErrored +) + type ProgressTemplate struct { Percentage string `json:"percentage"` Speed float32 `json:"speed"` @@ -39,54 +45,55 @@ type ProgressTemplate struct { // Process descriptor type Process struct { - id string - url string - params []string + Id string + Url string + Params []string Info DownloadInfo Progress DownloadProgress - mem *MemoryDB + DB *MemoryDB + Output DownloadOutput proc *os.Process } -type downloadOutput struct { - path string - filaneme string +type DownloadOutput struct { + Path string + Filename string } // Starts spawns/forks a new yt-dlp process and parse its stdout. // The process is spawned to outputting a custom progress text that // Resembles a JSON Object in order to Unmarshal it later. // This approach is anyhow not perfect: quotes are not escaped properly. -// Each process is not identified by its PID but by a UUIDv2 -func (p *Process) Start(path, filename string) { +// Each process is not identified by its PID but by a UUIDv4 +func (p *Process) Start() { // escape bash variable escaping and command piping, you'll never know // what they might come with... - p.params = slices.Filter(p.params, func(e string) bool { + p.Params = slices.Filter(p.Params, func(e string) bool { match, _ := regexp.MatchString(`(\$\{)|(\&\&)`, e) return !match }) - out := downloadOutput{ - path: cfg.GetConfig().DownloadPath, - filaneme: "%(title)s.%(ext)s", + out := DownloadOutput{ + Path: cfg.GetConfig().DownloadPath, + Filename: "%(title)s.%(ext)s", } - if path != "" { - out.path = path + if p.Output.Path != "" { + out.Path = p.Output.Path } - if filename != "" { - out.filaneme = filename + ".%(ext)s" + if p.Output.Filename != "" { + out.Filename = p.Output.Filename + ".%(ext)s" } params := append([]string{ - strings.Split(p.url, "?list")[0], //no playlist + strings.Split(p.Url, "?list")[0], //no playlist "--newline", "--no-colors", "--no-playlist", "--progress-template", strings.ReplaceAll(template, "\n", ""), "-o", - fmt.Sprintf("%s/%s", out.path, out.filaneme), - }, p.params...) + fmt.Sprintf("%s/%s", out.Path, out.Filename), + }, p.Params...) // ----------------- main block ----------------- // cmd := exec.Command(cfg.GetConfig().DownloaderPath, params...) @@ -103,73 +110,56 @@ func (p *Process) Start(path, filename string) { log.Panicln(err) } - p.id = p.mem.Set(p) p.proc = cmd.Process // ----------------- info block ----------------- // // spawn a goroutine that retrieves the info for the download - go func() { - cmd := exec.Command(cfg.GetConfig().DownloaderPath, p.url, "-J") - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - - stdout, err := cmd.Output() - if err != nil { - log.Println("Cannot retrieve info for", p.url) - } - info := DownloadInfo{ - URL: p.url, - CreatedAt: time.Now(), - } - json.Unmarshal(stdout, &info) - p.mem.UpdateInfo(p.id, info) - }() // --------------- progress block --------------- // // unbuffered channel connected to stdout - eventChan := make(chan string) // spawn a goroutine that does the dirty job of parsing the stdout // filling the channel with as many stdout line as yt-dlp produces (producer) go func() { - defer r.Close() - defer p.Complete() + defer func() { + r.Close() + p.Complete() + }() + for scan.Scan() { - eventChan <- scan.Text() + stdout := ProgressTemplate{} + err := json.Unmarshal([]byte(scan.Text()), &stdout) + if err == nil { + p.Progress = DownloadProgress{ + Status: StatusDownloading, + Percentage: stdout.Percentage, + Speed: stdout.Speed, + ETA: stdout.Eta, + } + shortId := strings.Split(p.Id, "-")[0] + log.Printf("[%s] %s %s\n", shortId, p.Url, p.Progress.Percentage) + } } - cmd.Wait() }() - // do the unmarshal operation every 250ms (consumer) - go rx.Debounce(time.Millisecond*250, eventChan, func(text string) { - stdout := ProgressTemplate{} - err := json.Unmarshal([]byte(text), &stdout) - if err == nil { - p.mem.UpdateProgress(p.id, DownloadProgress{ - Percentage: stdout.Percentage, - Speed: stdout.Speed, - ETA: stdout.Eta, - }) - shortId := strings.Split(p.id, "-")[0] - log.Printf("[%s] %s %s\n", shortId, p.url, p.Progress.Percentage) - } - }) // ------------- end progress block ------------- // + cmd.Wait() } // Keep process in the memoryDB but marks it as complete // Convention: All completed processes has progress -1 // and speed 0 bps. func (p *Process) Complete() { - p.mem.UpdateProgress(p.id, DownloadProgress{ + p.Progress = DownloadProgress{ + Status: StatusCompleted, Percentage: "-1", Speed: 0, ETA: 0, - }) + } } // Kill a process and remove it from the memory func (p *Process) Kill() error { - p.mem.Delete(p.id) // yt-dlp uses multiple child process the parent process // has been spawned with setPgid = true. To properly kill // all subprocesses a SIGTERM need to be sent to the correct @@ -181,15 +171,17 @@ func (p *Process) Kill() error { } err = syscall.Kill(-pgid, syscall.SIGTERM) - log.Println("Killed process", p.id) + log.Println("Killed process", p.Id) return err } + + p.DB.Delete(p.Id) return nil } // Returns the available format for this URL func (p *Process) GetFormatsSync() (DownloadFormats, error) { - cmd := exec.Command(cfg.GetConfig().DownloaderPath, p.url, "-J") + cmd := exec.Command(cfg.GetConfig().DownloaderPath, p.Url, "-J") stdout, err := cmd.Output() if err != nil { @@ -198,7 +190,7 @@ func (p *Process) GetFormatsSync() (DownloadFormats, error) { cmd.Wait() - info := DownloadFormats{URL: p.url} + info := DownloadFormats{URL: p.Url} best := Format{} json.Unmarshal(stdout, &info) @@ -208,3 +200,25 @@ func (p *Process) GetFormatsSync() (DownloadFormats, error) { return info, nil } + +func (p *Process) SetPending() { + p.Id = p.DB.Set(p) + + cmd := exec.Command(cfg.GetConfig().DownloaderPath, p.Url, "-J") + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + stdout, err := cmd.Output() + if err != nil { + log.Println("Cannot retrieve info for", p.Url) + } + + info := DownloadInfo{ + URL: p.Url, + CreatedAt: time.Now(), + } + + json.Unmarshal(stdout, &info) + p.Info = info + + p.Progress.Status = StatusPending +} diff --git a/server/types.go b/server/internal/types.go similarity index 93% rename from server/types.go rename to server/internal/types.go index 043441a..da5c0f2 100644 --- a/server/types.go +++ b/server/internal/types.go @@ -1,9 +1,10 @@ -package server +package internal import "time" // Progress for the Running call type DownloadProgress struct { + Status int `json:"process_status"` Percentage string `json:"percentage"` Speed float32 `json:"speed"` ETA int `json:"eta"` @@ -66,4 +67,8 @@ type DownloadRequest struct { Url string `json:"url"` Params []string `json:"params"` RenameTo string `json:"renameTo"` + Id string + URL string + Path string + Rename string } diff --git a/server/service.go b/server/rpc/service.go similarity index 55% rename from server/service.go rename to server/rpc/service.go index 7d92407..e2a597c 100644 --- a/server/service.go +++ b/server/rpc/service.go @@ -1,15 +1,19 @@ -package server +package rpc import ( "log" + "github.com/marcopeocchi/yt-dlp-web-ui/server/internal" "github.com/marcopeocchi/yt-dlp-web-ui/server/sys" "github.com/marcopeocchi/yt-dlp-web-ui/server/updater" ) -type Service int +type Service struct { + db *internal.MemoryDB + mq *internal.MessageQueue +} -type Running []ProcessResponse +type Running []internal.ProcessResponse type Pending []string type NoArgs struct{} @@ -28,19 +32,38 @@ type DownloadSpecificArgs struct { Params []string } +// Dependency injection container. +func Container(db *internal.MemoryDB, mq *internal.MessageQueue) *Service { + return &Service{ + db: db, + mq: mq, + } +} + // Exec spawns a Process. // The result of the execution is the newly spawned process Id. -func (t *Service) Exec(args DownloadSpecificArgs, result *string) error { - log.Println("Spawning new process for", args.URL) - p := Process{mem: &db, url: args.URL, params: args.Params} - p.Start(args.Path, args.Rename) - *result = p.id +func (s *Service) Exec(args DownloadSpecificArgs, result *string) error { + log.Println("Sending new process to message queue", args.URL) + + p := &internal.Process{ + DB: s.db, + Url: args.URL, + Params: args.Params, + Output: internal.DownloadOutput{ + Path: args.Path, + Filename: args.Rename, + }, + } + + s.mq.Publish(p) + *result = p.Id + return nil } // Progess retrieves the Progress of a specific Process given its Id -func (t *Service) Progess(args Args, progress *DownloadProgress) error { - proc, err := db.Get(args.Id) +func (s *Service) Progess(args Args, progress *internal.DownloadProgress) error { + proc, err := s.db.Get(args.Id) if err != nil { return err } @@ -49,29 +72,29 @@ func (t *Service) Progess(args Args, progress *DownloadProgress) error { } // Progess retrieves the Progress of a specific Process given its Id -func (t *Service) Formats(args Args, progress *DownloadFormats) error { +func (s *Service) Formats(args Args, progress *internal.DownloadFormats) error { var err error - p := Process{url: args.URL} + p := internal.Process{Url: args.URL} *progress, err = p.GetFormatsSync() return err } // Pending retrieves a slice of all Pending/Running processes ids -func (t *Service) Pending(args NoArgs, pending *Pending) error { - *pending = *db.Keys() +func (s *Service) Pending(args NoArgs, pending *Pending) error { + *pending = *s.db.Keys() return nil } // Running retrieves a slice of all Processes progress -func (t *Service) Running(args NoArgs, running *Running) error { - *running = *db.All() +func (s *Service) Running(args NoArgs, running *Running) error { + *running = *s.db.All() return nil } // Kill kills a process given its id and remove it from the memoryDB -func (t *Service) Kill(args string, killed *string) error { +func (s *Service) Kill(args string, killed *string) error { log.Println("Trying killing process with id", args) - proc, err := db.Get(args) + proc, err := s.db.Get(args) if err != nil { return err @@ -80,18 +103,18 @@ func (t *Service) Kill(args string, killed *string) error { err = proc.Kill() } - db.Delete(proc.id) + s.db.Delete(proc.Id) return err } // KillAll kills all process unconditionally and removes them from // the memory db -func (t *Service) KillAll(args NoArgs, killed *string) error { +func (s *Service) KillAll(args NoArgs, killed *string) error { log.Println("Killing all spawned processes", args) - keys := db.Keys() + keys := s.db.Keys() var err error for _, key := range *keys { - proc, err := db.Get(key) + proc, err := s.db.Get(key) if err != nil { return err } @@ -103,28 +126,28 @@ func (t *Service) KillAll(args NoArgs, killed *string) error { } // Remove a process from the db rendering it unusable if active -func (t *Service) Clear(args string, killed *string) error { +func (s *Service) Clear(args string, killed *string) error { log.Println("Clearing process with id", args) - db.Delete(args) + s.db.Delete(args) return nil } // FreeSpace gets the available from package sys util -func (t *Service) FreeSpace(args NoArgs, free *uint64) error { +func (s *Service) FreeSpace(args NoArgs, free *uint64) error { freeSpace, err := sys.FreeSpace() *free = freeSpace return err } // Return a flattned tree of the download directory -func (t *Service) DirectoryTree(args NoArgs, tree *[]string) error { +func (s *Service) DirectoryTree(args NoArgs, tree *[]string) error { dfsTree, err := sys.DirectoryTree() *tree = *dfsTree return err } // Updates the yt-dlp binary using its builtin function -func (t *Service) UpdateExecutable(args NoArgs, updated *bool) error { +func (s *Service) UpdateExecutable(args NoArgs, updated *bool) error { log.Println("Updating yt-dlp executable to the latest release") err := updater.UpdateExecutable() if err != nil { diff --git a/server/server.go b/server/server.go index f46cb4d..c3c77c7 100644 --- a/server/server.go +++ b/server/server.go @@ -17,16 +17,21 @@ import ( "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/filesystem" "github.com/gofiber/websocket/v2" + "github.com/marcopeocchi/yt-dlp-web-ui/server/internal" middlewares "github.com/marcopeocchi/yt-dlp-web-ui/server/middleware" "github.com/marcopeocchi/yt-dlp-web-ui/server/rest" + ytdlpRPC "github.com/marcopeocchi/yt-dlp-web-ui/server/rpc" ) -var db MemoryDB - func RunBlocking(port int, frontend fs.FS) { + var db internal.MemoryDB db.Restore() - service := new(Service) + mq := internal.NewMessageQueue() + go mq.SetupConsumer() + + service := ytdlpRPC.Container(&db, mq) + rpc.Register(service) app := fiber.New() @@ -93,13 +98,13 @@ func RunBlocking(port int, frontend fs.FS) { app.Server().StreamRequestBody = true - go gracefulShutdown(app) - go autoPersist(time.Minute * 5) + go gracefulShutdown(app, &db) + go autoPersist(time.Minute*5, &db) log.Fatal(app.Listen(fmt.Sprintf(":%d", port))) } -func gracefulShutdown(app *fiber.App) { +func gracefulShutdown(app *fiber.App, db *internal.MemoryDB) { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, @@ -118,7 +123,7 @@ func gracefulShutdown(app *fiber.App) { }() } -func autoPersist(d time.Duration) { +func autoPersist(d time.Duration, db *internal.MemoryDB) { for { db.Persist() time.Sleep(d)