load balancer implementation, code refactoring

This commit is contained in:
2024-05-20 08:48:01 +02:00
parent fee7f86182
commit 472db89ea3
9 changed files with 122 additions and 29 deletions

View File

@@ -1,4 +1,4 @@
package dbutils package dbutil
import ( import (
"context" "context"

View File

@@ -0,0 +1,34 @@
package internal
import (
"container/heap"
)
type LoadBalancer struct {
pool Pool
done chan *Worker
}
func (b *LoadBalancer) Balance(work chan Process) {
for {
select {
case req := <-work:
b.dispatch(req)
case w := <-b.done:
b.completed(w)
}
}
}
func (b *LoadBalancer) dispatch(req Process) {
w := heap.Pop(&b.pool).(*Worker)
w.requests <- req
w.pending++
heap.Push(&b.pool, w)
}
func (b *LoadBalancer) completed(w *Worker) {
w.pending--
heap.Remove(&b.pool, w.index)
heap.Push(&b.pool, w)
}

View File

@@ -23,14 +23,17 @@ func (m *MemoryDB) Get(id string) (*Process, error) {
if !ok { if !ok {
return nil, errors.New("no process found for the given key") return nil, errors.New("no process found for the given key")
} }
return entry.(*Process), nil return entry.(*Process), nil
} }
// Store a pointer of a process and return its id // Store a pointer of a process and return its id
func (m *MemoryDB) Set(process *Process) string { func (m *MemoryDB) Set(process *Process) string {
id := uuid.NewString() id := uuid.NewString()
m.table.Store(id, process) m.table.Store(id, process)
process.Id = id process.Id = id
return id return id
} }
@@ -40,17 +43,20 @@ func (m *MemoryDB) Delete(id string) {
} }
func (m *MemoryDB) Keys() *[]string { func (m *MemoryDB) Keys() *[]string {
running := []string{} var running []string
m.table.Range(func(key, value any) bool { m.table.Range(func(key, value any) bool {
running = append(running, key.(string)) running = append(running, key.(string))
return true return true
}) })
return &running return &running
} }
// Returns a slice of all currently stored processes progess // Returns a slice of all currently stored processes progess
func (m *MemoryDB) All() *[]ProcessResponse { func (m *MemoryDB) All() *[]ProcessResponse {
running := []ProcessResponse{} running := []ProcessResponse{}
m.table.Range(func(key, value any) bool { m.table.Range(func(key, value any) bool {
running = append(running, ProcessResponse{ running = append(running, ProcessResponse{
Id: key.(string), Id: key.(string),
@@ -61,6 +67,7 @@ func (m *MemoryDB) All() *[]ProcessResponse {
}) })
return true return true
}) })
return &running return &running
} }
@@ -75,12 +82,9 @@ func (m *MemoryDB) Persist() error {
return errors.Join(errors.New("failed to persist session"), err) return errors.Join(errors.New("failed to persist session"), err)
} }
session := Session{ session := Session{Processes: *running}
Processes: *running,
}
err = gob.NewEncoder(fd).Encode(session) if err := gob.NewEncoder(fd).Encode(session); err != nil {
if err != nil {
return errors.Join(errors.New("failed to persist session"), err) return errors.Join(errors.New("failed to persist session"), err)
} }
@@ -113,7 +117,7 @@ func (m *MemoryDB) Restore(mq *MessageQueue, logger *slog.Logger) {
m.table.Store(proc.Id, restored) m.table.Store(proc.Id, restored)
if restored.Progress.Percentage != "-1" { if restored.Progress.Status != StatusCompleted {
mq.Publish(restored) mq.Publish(restored)
} }
} }

View File

@@ -2,6 +2,7 @@ package internal
import ( import (
"context" "context"
"errors"
"log/slog" "log/slog"
evbus "github.com/asaskevich/EventBus" evbus "github.com/asaskevich/EventBus"
@@ -21,18 +22,18 @@ type MessageQueue struct {
// By default it will be created with a size equals to nthe number of logical // By default it will be created with a size equals to nthe number of logical
// CPU cores -1. // CPU cores -1.
// The queue size can be set via the qs flag. // The queue size can be set via the qs flag.
func NewMessageQueue(l *slog.Logger) *MessageQueue { func NewMessageQueue(l *slog.Logger) (*MessageQueue, error) {
qs := config.Instance().QueueSize qs := config.Instance().QueueSize
if qs <= 0 { if qs <= 0 {
panic("invalid queue size") return nil, errors.New("invalid queue size")
} }
return &MessageQueue{ return &MessageQueue{
concurrency: qs, concurrency: qs,
eventBus: evbus.New(), eventBus: evbus.New(),
logger: l, logger: l,
} }, nil
} }
// Publish a message to the queue and set the task to a peding state. // Publish a message to the queue and set the task to a peding state.

16
server/internal/pool.go Normal file
View File

@@ -0,0 +1,16 @@
package internal
type Pool []*Worker
func (h Pool) Len() int { return len(h) }
func (h Pool) Less(i, j int) bool { return h[i].index < h[j].index }
func (h Pool) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
func (h *Pool) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

View File

@@ -125,6 +125,8 @@ func (p *Process) Start() {
p.proc = cmd.Process p.proc = cmd.Process
go p.SetMetadata()
// --------------- progress block --------------- // // --------------- progress block --------------- //
var ( var (
sourceChan = make(chan []byte) sourceChan = make(chan []byte)
@@ -139,7 +141,9 @@ func (p *Process) Start() {
defer func() { defer func() {
r.Close() r.Close()
p.Complete() p.Complete()
doneChan <- struct{}{} doneChan <- struct{}{}
close(sourceChan) close(sourceChan)
close(doneChan) close(doneChan)
}() }()
@@ -215,6 +219,7 @@ func (p *Process) Kill() error {
} }
// Returns the available format for this URL // Returns the available format for this URL
// TODO: Move out from process.go
func (p *Process) GetFormatsSync() (DownloadFormats, error) { func (p *Process) GetFormatsSync() (DownloadFormats, error) {
cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J") cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J")
@@ -356,9 +361,7 @@ func (p *Process) SetMetadata() error {
return nil return nil
} }
func (p *Process) getShortId() string { func (p *Process) getShortId() string { return strings.Split(p.Id, "-")[0] }
return strings.Split(p.Id, "-")[0]
}
func buildFilename(o *DownloadOutput) { func buildFilename(o *DownloadOutput) {
if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") { if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") {

15
server/internal/worker.go Normal file
View File

@@ -0,0 +1,15 @@
package internal
type Worker struct {
requests chan Process // downloads to do
pending int // downloads pending
index int // index in the heap
}
func (w *Worker) Work(done chan *Worker) {
for {
req := <-w.requests
req.Start()
done <- w
}
}

View File

@@ -54,7 +54,6 @@ func (s *Service) ExecPlaylist(args internal.DownloadRequest, result *string) er
} }
*result = "" *result = ""
return nil return nil
} }
@@ -64,14 +63,17 @@ func (s *Service) Progess(args Args, progress *internal.DownloadProgress) error
if err != nil { if err != nil {
return err return err
} }
*progress = proc.Progress *progress = proc.Progress
return nil return nil
} }
// Progess retrieves available format for a given resource // Progess retrieves available format for a given resource
func (s *Service) Formats(args Args, meta *internal.DownloadFormats) error { func (s *Service) Formats(args Args, meta *internal.DownloadFormats) error {
var err error var (
p := internal.Process{Url: args.URL, Logger: s.logger} err error
p = internal.Process{Url: args.URL, Logger: s.logger}
)
*meta, err = p.GetFormatsSync() *meta, err = p.GetFormatsSync()
return err return err
} }
@@ -91,11 +93,12 @@ func (s *Service) Running(args NoArgs, running *Running) error {
// Kill kills a process given its id and remove it from the memoryDB // Kill kills a process given its id and remove it from the memoryDB
func (s *Service) Kill(args string, killed *string) error { func (s *Service) Kill(args string, killed *string) error {
s.logger.Info("Trying killing process with id", slog.String("id", args)) s.logger.Info("Trying killing process with id", slog.String("id", args))
proc, err := s.db.Get(args)
proc, err := s.db.Get(args)
if err != nil { if err != nil {
return err return err
} }
if proc != nil { if proc != nil {
err = proc.Kill() err = proc.Kill()
s.db.Delete(proc.Id) s.db.Delete(proc.Id)
@@ -120,6 +123,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
if err != nil { if err != nil {
return err return err
} }
if proc != nil { if proc != nil {
proc.Kill() proc.Kill()
s.db.Delete(proc.Id) s.db.Delete(proc.Id)
@@ -139,6 +143,10 @@ func (s *Service) Clear(args string, killed *string) error {
// FreeSpace gets the available from package sys util // FreeSpace gets the available from package sys util
func (s *Service) FreeSpace(args NoArgs, free *uint64) error { func (s *Service) FreeSpace(args NoArgs, free *uint64) error {
freeSpace, err := sys.FreeSpace() freeSpace, err := sys.FreeSpace()
if err != nil {
return err
}
*free = freeSpace *free = freeSpace
return err return err
} }
@@ -146,21 +154,31 @@ func (s *Service) FreeSpace(args NoArgs, free *uint64) error {
// Return a flattned tree of the download directory // Return a flattned tree of the download directory
func (s *Service) DirectoryTree(args NoArgs, tree *[]string) error { func (s *Service) DirectoryTree(args NoArgs, tree *[]string) error {
dfsTree, err := sys.DirectoryTree() dfsTree, err := sys.DirectoryTree()
if err != nil {
*tree = nil
return err
}
if dfsTree != nil { if dfsTree != nil {
*tree = *dfsTree *tree = *dfsTree
} }
return err
return nil
} }
// Updates the yt-dlp binary using its builtin function // Updates the yt-dlp binary using its builtin function
func (s *Service) UpdateExecutable(args NoArgs, updated *bool) error { func (s *Service) UpdateExecutable(args NoArgs, updated *bool) error {
s.logger.Info("Updating yt-dlp executable to the latest release") s.logger.Info("Updating yt-dlp executable to the latest release")
err := updater.UpdateExecutable()
if err != nil { if err := updater.UpdateExecutable(); err != nil {
*updated = true s.logger.Error("Failed updating yt-dlp")
s.logger.Info("Succesfully updated yt-dlp")
return err
}
*updated = false *updated = false
return err return err
}
*updated = true
s.logger.Info("Succesfully updated yt-dlp")
return nil
} }

View File

@@ -19,7 +19,7 @@ import (
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/cors" "github.com/go-chi/cors"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config" "github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/dbutils" "github.com/marcopeocchi/yt-dlp-web-ui/server/dbutil"
"github.com/marcopeocchi/yt-dlp-web-ui/server/handlers" "github.com/marcopeocchi/yt-dlp-web-ui/server/handlers"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal" "github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
"github.com/marcopeocchi/yt-dlp-web-ui/server/logging" "github.com/marcopeocchi/yt-dlp-web-ui/server/logging"
@@ -82,12 +82,14 @@ func RunBlocking(cfg *RunConfig) {
logger.Error("failed to open database", slog.String("err", err.Error())) logger.Error("failed to open database", slog.String("err", err.Error()))
} }
err = dbutils.AutoMigrate(context.Background(), db) if err := dbutil.AutoMigrate(context.Background(), db); err != nil {
if err != nil {
logger.Error("failed to init database", slog.String("err", err.Error())) logger.Error("failed to init database", slog.String("err", err.Error()))
} }
mq := internal.NewMessageQueue(logger) mq, err := internal.NewMessageQueue(logger)
if err != nil {
panic(err)
}
mq.SetupConsumers() mq.SetupConsumers()
go mdb.Restore(mq, logger) go mdb.Restore(mq, logger)