From 472db89ea3b9aedd0ee9428b642428f01690d453 Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Mon, 20 May 2024 08:48:01 +0200 Subject: [PATCH] load balancer implementation, code refactoring --- server/{dbutils => dbutil}/migrate.go | 2 +- server/internal/balancer.go | 34 +++++++++++++++++++++++ server/internal/memory_db.go | 18 +++++++----- server/internal/message_queue.go | 7 +++-- server/internal/pool.go | 16 +++++++++++ server/internal/process.go | 9 ++++-- server/internal/worker.go | 15 ++++++++++ server/rpc/service.go | 40 +++++++++++++++++++-------- server/server.go | 10 ++++--- 9 files changed, 122 insertions(+), 29 deletions(-) rename server/{dbutils => dbutil}/migrate.go (96%) create mode 100644 server/internal/balancer.go create mode 100644 server/internal/pool.go create mode 100644 server/internal/worker.go diff --git a/server/dbutils/migrate.go b/server/dbutil/migrate.go similarity index 96% rename from server/dbutils/migrate.go rename to server/dbutil/migrate.go index 4aa9693..a933d09 100644 --- a/server/dbutils/migrate.go +++ b/server/dbutil/migrate.go @@ -1,4 +1,4 @@ -package dbutils +package dbutil import ( "context" diff --git a/server/internal/balancer.go b/server/internal/balancer.go new file mode 100644 index 0000000..b21a955 --- /dev/null +++ b/server/internal/balancer.go @@ -0,0 +1,34 @@ +package internal + +import ( + "container/heap" +) + +type LoadBalancer struct { + pool Pool + done chan *Worker +} + +func (b *LoadBalancer) Balance(work chan Process) { + for { + select { + case req := <-work: + b.dispatch(req) + case w := <-b.done: + b.completed(w) + } + } +} + +func (b *LoadBalancer) dispatch(req Process) { + w := heap.Pop(&b.pool).(*Worker) + w.requests <- req + w.pending++ + heap.Push(&b.pool, w) +} + +func (b *LoadBalancer) completed(w *Worker) { + w.pending-- + heap.Remove(&b.pool, w.index) + heap.Push(&b.pool, w) +} diff --git a/server/internal/memory_db.go b/server/internal/memory_db.go index a36f7a7..c329fb3 100644 --- a/server/internal/memory_db.go +++ b/server/internal/memory_db.go @@ -23,14 +23,17 @@ func (m *MemoryDB) Get(id string) (*Process, error) { if !ok { return nil, errors.New("no process found for the given key") } + return entry.(*Process), nil } // Store a pointer of a process and return its id func (m *MemoryDB) Set(process *Process) string { id := uuid.NewString() + m.table.Store(id, process) process.Id = id + return id } @@ -40,17 +43,20 @@ func (m *MemoryDB) Delete(id string) { } func (m *MemoryDB) Keys() *[]string { - running := []string{} + var running []string + m.table.Range(func(key, value any) bool { running = append(running, key.(string)) return true }) + return &running } // Returns a slice of all currently stored processes progess func (m *MemoryDB) All() *[]ProcessResponse { running := []ProcessResponse{} + m.table.Range(func(key, value any) bool { running = append(running, ProcessResponse{ Id: key.(string), @@ -61,6 +67,7 @@ func (m *MemoryDB) All() *[]ProcessResponse { }) return true }) + return &running } @@ -75,12 +82,9 @@ func (m *MemoryDB) Persist() error { return errors.Join(errors.New("failed to persist session"), err) } - session := Session{ - Processes: *running, - } + session := Session{Processes: *running} - err = gob.NewEncoder(fd).Encode(session) - if err != nil { + if err := gob.NewEncoder(fd).Encode(session); err != nil { return errors.Join(errors.New("failed to persist session"), err) } @@ -113,7 +117,7 @@ func (m *MemoryDB) Restore(mq *MessageQueue, logger *slog.Logger) { m.table.Store(proc.Id, restored) - if restored.Progress.Percentage != "-1" { + if restored.Progress.Status != StatusCompleted { mq.Publish(restored) } } diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index a048658..ddb027e 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -2,6 +2,7 @@ package internal import ( "context" + "errors" "log/slog" evbus "github.com/asaskevich/EventBus" @@ -21,18 +22,18 @@ type MessageQueue struct { // By default it will be created with a size equals to nthe number of logical // CPU cores -1. // The queue size can be set via the qs flag. -func NewMessageQueue(l *slog.Logger) *MessageQueue { +func NewMessageQueue(l *slog.Logger) (*MessageQueue, error) { qs := config.Instance().QueueSize if qs <= 0 { - panic("invalid queue size") + return nil, errors.New("invalid queue size") } return &MessageQueue{ concurrency: qs, eventBus: evbus.New(), logger: l, - } + }, nil } // Publish a message to the queue and set the task to a peding state. diff --git a/server/internal/pool.go b/server/internal/pool.go new file mode 100644 index 0000000..85a6aee --- /dev/null +++ b/server/internal/pool.go @@ -0,0 +1,16 @@ +package internal + +type Pool []*Worker + +func (h Pool) Len() int { return len(h) } +func (h Pool) Less(i, j int) bool { return h[i].index < h[j].index } +func (h Pool) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) } + +func (h *Pool) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/server/internal/process.go b/server/internal/process.go index fa3de89..50b9ca6 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -125,6 +125,8 @@ func (p *Process) Start() { p.proc = cmd.Process + go p.SetMetadata() + // --------------- progress block --------------- // var ( sourceChan = make(chan []byte) @@ -139,7 +141,9 @@ func (p *Process) Start() { defer func() { r.Close() p.Complete() + doneChan <- struct{}{} + close(sourceChan) close(doneChan) }() @@ -215,6 +219,7 @@ func (p *Process) Kill() error { } // Returns the available format for this URL +// TODO: Move out from process.go func (p *Process) GetFormatsSync() (DownloadFormats, error) { cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J") @@ -356,9 +361,7 @@ func (p *Process) SetMetadata() error { return nil } -func (p *Process) getShortId() string { - return strings.Split(p.Id, "-")[0] -} +func (p *Process) getShortId() string { return strings.Split(p.Id, "-")[0] } func buildFilename(o *DownloadOutput) { if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") { diff --git a/server/internal/worker.go b/server/internal/worker.go new file mode 100644 index 0000000..c3231bb --- /dev/null +++ b/server/internal/worker.go @@ -0,0 +1,15 @@ +package internal + +type Worker struct { + requests chan Process // downloads to do + pending int // downloads pending + index int // index in the heap +} + +func (w *Worker) Work(done chan *Worker) { + for { + req := <-w.requests + req.Start() + done <- w + } +} diff --git a/server/rpc/service.go b/server/rpc/service.go index 1b86ef1..ac70705 100644 --- a/server/rpc/service.go +++ b/server/rpc/service.go @@ -54,7 +54,6 @@ func (s *Service) ExecPlaylist(args internal.DownloadRequest, result *string) er } *result = "" - return nil } @@ -64,14 +63,17 @@ func (s *Service) Progess(args Args, progress *internal.DownloadProgress) error if err != nil { return err } + *progress = proc.Progress return nil } // Progess retrieves available format for a given resource func (s *Service) Formats(args Args, meta *internal.DownloadFormats) error { - var err error - p := internal.Process{Url: args.URL, Logger: s.logger} + var ( + err error + p = internal.Process{Url: args.URL, Logger: s.logger} + ) *meta, err = p.GetFormatsSync() return err } @@ -91,11 +93,12 @@ func (s *Service) Running(args NoArgs, running *Running) error { // Kill kills a process given its id and remove it from the memoryDB func (s *Service) Kill(args string, killed *string) error { s.logger.Info("Trying killing process with id", slog.String("id", args)) - proc, err := s.db.Get(args) + proc, err := s.db.Get(args) if err != nil { return err } + if proc != nil { err = proc.Kill() s.db.Delete(proc.Id) @@ -120,6 +123,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error { if err != nil { return err } + if proc != nil { proc.Kill() s.db.Delete(proc.Id) @@ -139,6 +143,10 @@ func (s *Service) Clear(args string, killed *string) error { // FreeSpace gets the available from package sys util func (s *Service) FreeSpace(args NoArgs, free *uint64) error { freeSpace, err := sys.FreeSpace() + if err != nil { + return err + } + *free = freeSpace return err } @@ -146,21 +154,31 @@ func (s *Service) FreeSpace(args NoArgs, free *uint64) error { // Return a flattned tree of the download directory func (s *Service) DirectoryTree(args NoArgs, tree *[]string) error { dfsTree, err := sys.DirectoryTree() + + if err != nil { + *tree = nil + return err + } + if dfsTree != nil { *tree = *dfsTree } - return err + + return nil } // Updates the yt-dlp binary using its builtin function func (s *Service) UpdateExecutable(args NoArgs, updated *bool) error { s.logger.Info("Updating yt-dlp executable to the latest release") - err := updater.UpdateExecutable() - if err != nil { - *updated = true - s.logger.Info("Succesfully updated yt-dlp") + + if err := updater.UpdateExecutable(); err != nil { + s.logger.Error("Failed updating yt-dlp") + *updated = false return err } - *updated = false - return err + + *updated = true + s.logger.Info("Succesfully updated yt-dlp") + + return nil } diff --git a/server/server.go b/server/server.go index 0ad2a4d..7904f86 100644 --- a/server/server.go +++ b/server/server.go @@ -19,7 +19,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/cors" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" - "github.com/marcopeocchi/yt-dlp-web-ui/server/dbutils" + "github.com/marcopeocchi/yt-dlp-web-ui/server/dbutil" "github.com/marcopeocchi/yt-dlp-web-ui/server/handlers" "github.com/marcopeocchi/yt-dlp-web-ui/server/internal" "github.com/marcopeocchi/yt-dlp-web-ui/server/logging" @@ -82,12 +82,14 @@ func RunBlocking(cfg *RunConfig) { logger.Error("failed to open database", slog.String("err", err.Error())) } - err = dbutils.AutoMigrate(context.Background(), db) - if err != nil { + if err := dbutil.AutoMigrate(context.Background(), db); err != nil { logger.Error("failed to init database", slog.String("err", err.Error())) } - mq := internal.NewMessageQueue(logger) + mq, err := internal.NewMessageQueue(logger) + if err != nil { + panic(err) + } mq.SetupConsumers() go mdb.Restore(mq, logger)