diff --git a/.dockerignore b/.dockerignore index 22d759f..167234e 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,6 +3,7 @@ result/ result dist +.pnpm-store/ .pnpm-debug.log node_modules .env @@ -20,9 +21,11 @@ cookies.txt __debug* ui/ .idea +.idea/ frontend/.pnp.cjs frontend/.pnp.loader.mjs frontend/.yarn/install-state.gz .db.lock livestreams.dat -.git \ No newline at end of file +.vite/deps +archive.txt diff --git a/frontend/src/assets/i18n/en_US.yaml b/frontend/src/assets/i18n/en_US.yaml index 5c838fc..57b6b29 100644 --- a/frontend/src/assets/i18n/en_US.yaml +++ b/frontend/src/assets/i18n/en_US.yaml @@ -79,4 +79,5 @@ keys: The monitor job will be scheduled/triggered by a defined cron expression (defaults to every 5 minutes if left blank). cronExpressionLabel: 'Cron expression' editButtonLabel: 'Edit' - newSubscriptionButton: New subscription \ No newline at end of file + newSubscriptionButton: New subscription + clearCompletedButton: 'Clear completed' \ No newline at end of file diff --git a/frontend/src/components/HomeSpeedDial.tsx b/frontend/src/components/HomeSpeedDial.tsx index 9ada86c..ab8432f 100644 --- a/frontend/src/components/HomeSpeedDial.tsx +++ b/frontend/src/components/HomeSpeedDial.tsx @@ -1,5 +1,6 @@ import AddCircleIcon from '@mui/icons-material/AddCircle' import BuildCircleIcon from '@mui/icons-material/BuildCircle' +import ClearAllIcon from '@mui/icons-material/ClearAll' import DeleteForeverIcon from '@mui/icons-material/DeleteForever' import FolderZipIcon from '@mui/icons-material/FolderZip' import FormatListBulleted from '@mui/icons-material/FormatListBulleted' @@ -42,6 +43,11 @@ const HomeSpeedDial: React.FC = ({ onDownloadOpen, onEditorOpen }) => { tooltipTitle={i18n.t('bulkDownload')} onClick={() => window.open(`${serverAddr}/archive/bulk?token=${localStorage.getItem('token')}`)} /> + } + tooltipTitle={i18n.t('clearCompletedButton')} + onClick={() => client.clearCompleted()} + /> } tooltipTitle={i18n.t('abortAllButton')} diff --git a/frontend/src/lib/rpcClient.ts b/frontend/src/lib/rpcClient.ts index 1ea3d5c..5ee4ab1 100644 --- a/frontend/src/lib/rpcClient.ts +++ b/frontend/src/lib/rpcClient.ts @@ -200,4 +200,11 @@ export class RPCClient { params: [] }) } + + public clearCompleted() { + return this.sendHTTP({ + method: 'Service.ClearCompleted', + params: [] + }) + } } \ No newline at end of file diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 1c031f2..dbdfd6c 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -13,6 +13,7 @@ export type RPCMethods = | "Service.ProgressLivestream" | "Service.KillLivestream" | "Service.KillAllLivestream" + | "Service.ClearCompleted" export type RPCRequest = { method: RPCMethods diff --git a/go.mod b/go.mod index b666154..0542d1b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/marcopiovanello/yt-dlp-web-ui/v3 -go 1.23 +go 1.24 require ( github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef diff --git a/server/internal/balancer.go b/server/internal/balancer.go index b21a955..5cdb7ab 100644 --- a/server/internal/balancer.go +++ b/server/internal/balancer.go @@ -2,6 +2,7 @@ package internal import ( "container/heap" + "log/slog" ) type LoadBalancer struct { @@ -9,7 +10,29 @@ type LoadBalancer struct { done chan *Worker } -func (b *LoadBalancer) Balance(work chan Process) { +func NewLoadBalancer(numWorker int) *LoadBalancer { + var pool Pool + + doneChan := make(chan *Worker) + + for i := range numWorker { + w := &Worker{ + requests: make(chan *Process, 1), + index: i, + } + go w.Work(doneChan) + pool = append(pool, w) + + slog.Info("spawned worker", slog.Int("index", i)) + } + + return &LoadBalancer{ + pool: pool, + done: doneChan, + } +} + +func (b *LoadBalancer) Balance(work chan *Process) { for { select { case req := <-work: @@ -20,7 +43,7 @@ func (b *LoadBalancer) Balance(work chan Process) { } } -func (b *LoadBalancer) dispatch(req Process) { +func (b *LoadBalancer) dispatch(req *Process) { w := heap.Pop(&b.pool).(*Worker) w.requests <- req w.pending++ diff --git a/server/internal/playlist.go b/server/internal/playlist.go index 72588e7..fcf3f7e 100644 --- a/server/internal/playlist.go +++ b/server/internal/playlist.go @@ -50,7 +50,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error { return errors.New("probably not a valid URL") } - if m.Type == "playlist" { + if m.IsPlaylist() { entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool { return a.URL == b.URL }) diff --git a/server/internal/pool.go b/server/internal/pool.go index 85a6aee..0f86db8 100644 --- a/server/internal/pool.go +++ b/server/internal/pool.go @@ -1,16 +1,24 @@ package internal +// Pool implements heap.Interface interface as a standard priority queue type Pool []*Worker func (h Pool) Len() int { return len(h) } -func (h Pool) Less(i, j int) bool { return h[i].index < h[j].index } -func (h Pool) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) } +func (h Pool) Less(i, j int) bool { return h[i].pending < h[j].pending } + +func (h Pool) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) } func (h *Pool) Pop() any { old := *h n := len(old) x := old[n-1] + old[n-1] = nil *h = old[0 : n-1] return x } diff --git a/server/internal/worker.go b/server/internal/worker.go index c3231bb..eb060d8 100644 --- a/server/internal/worker.go +++ b/server/internal/worker.go @@ -1,9 +1,9 @@ package internal type Worker struct { - requests chan Process // downloads to do - pending int // downloads pending - index int // index in the heap + requests chan *Process // downloads to do + pending int // downloads pending + index int // index in the heap } func (w *Worker) Work(done chan *Worker) { diff --git a/server/playlist/types.go b/server/playlist/types.go index 003171a..3cbf97d 100644 --- a/server/playlist/types.go +++ b/server/playlist/types.go @@ -8,3 +8,5 @@ type Metadata struct { PlaylistTitle string `json:"title"` Type string `json:"_type"` } + +func (m *Metadata) IsPlaylist() bool { return m.Type == "playlist" } diff --git a/server/rpc/service.go b/server/rpc/service.go index 457e007..887b132 100644 --- a/server/rpc/service.go +++ b/server/rpc/service.go @@ -183,6 +183,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error { } slog.Info("succesfully killed process", slog.String("id", proc.Id)) + proc = nil // gc helper } return nil @@ -195,6 +196,35 @@ func (s *Service) Clear(args string, killed *string) error { return nil } +// Removes completed processes +func (s *Service) ClearCompleted(cleared *string) error { + var ( + keys = s.db.Keys() + removeFunc = func(p *internal.Process) error { + defer s.db.Delete(p.Id) + + if p.Progress.Status != internal.StatusCompleted { + return nil + } + + return p.Kill() + } + ) + + for _, key := range *keys { + proc, err := s.db.Get(key) + if err != nil { + return err + } + + if err := removeFunc(proc); err != nil { + return err + } + } + + return nil +} + // FreeSpace gets the available from package sys util func (s *Service) FreeSpace(args NoArgs, free *uint64) error { freeSpace, err := sys.FreeSpace()