Code refactoring, added clear button
This commit is contained in:
@@ -2,6 +2,7 @@ package internal
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
type LoadBalancer struct {
|
||||
@@ -9,7 +10,29 @@ type LoadBalancer struct {
|
||||
done chan *Worker
|
||||
}
|
||||
|
||||
func (b *LoadBalancer) Balance(work chan Process) {
|
||||
func NewLoadBalancer(numWorker int) *LoadBalancer {
|
||||
var pool Pool
|
||||
|
||||
doneChan := make(chan *Worker)
|
||||
|
||||
for i := range numWorker {
|
||||
w := &Worker{
|
||||
requests: make(chan *Process, 1),
|
||||
index: i,
|
||||
}
|
||||
go w.Work(doneChan)
|
||||
pool = append(pool, w)
|
||||
|
||||
slog.Info("spawned worker", slog.Int("index", i))
|
||||
}
|
||||
|
||||
return &LoadBalancer{
|
||||
pool: pool,
|
||||
done: doneChan,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LoadBalancer) Balance(work chan *Process) {
|
||||
for {
|
||||
select {
|
||||
case req := <-work:
|
||||
@@ -20,7 +43,7 @@ func (b *LoadBalancer) Balance(work chan Process) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LoadBalancer) dispatch(req Process) {
|
||||
func (b *LoadBalancer) dispatch(req *Process) {
|
||||
w := heap.Pop(&b.pool).(*Worker)
|
||||
w.requests <- req
|
||||
w.pending++
|
||||
|
||||
@@ -50,7 +50,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
|
||||
return errors.New("probably not a valid URL")
|
||||
}
|
||||
|
||||
if m.Type == "playlist" {
|
||||
if m.IsPlaylist() {
|
||||
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool {
|
||||
return a.URL == b.URL
|
||||
})
|
||||
|
||||
@@ -1,16 +1,24 @@
|
||||
package internal
|
||||
|
||||
// Pool implements heap.Interface interface as a standard priority queue
|
||||
type Pool []*Worker
|
||||
|
||||
func (h Pool) Len() int { return len(h) }
|
||||
func (h Pool) Less(i, j int) bool { return h[i].index < h[j].index }
|
||||
func (h Pool) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
|
||||
func (h Pool) Less(i, j int) bool { return h[i].pending < h[j].pending }
|
||||
|
||||
func (h Pool) Swap(i, j int) {
|
||||
h[i], h[j] = h[j], h[i]
|
||||
h[i].index = i
|
||||
h[j].index = j
|
||||
}
|
||||
|
||||
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
|
||||
|
||||
func (h *Pool) Pop() any {
|
||||
old := *h
|
||||
n := len(old)
|
||||
x := old[n-1]
|
||||
old[n-1] = nil
|
||||
*h = old[0 : n-1]
|
||||
return x
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package internal
|
||||
|
||||
type Worker struct {
|
||||
requests chan Process // downloads to do
|
||||
pending int // downloads pending
|
||||
index int // index in the heap
|
||||
requests chan *Process // downloads to do
|
||||
pending int // downloads pending
|
||||
index int // index in the heap
|
||||
}
|
||||
|
||||
func (w *Worker) Work(done chan *Worker) {
|
||||
|
||||
@@ -8,3 +8,5 @@ type Metadata struct {
|
||||
PlaylistTitle string `json:"title"`
|
||||
Type string `json:"_type"`
|
||||
}
|
||||
|
||||
func (m *Metadata) IsPlaylist() bool { return m.Type == "playlist" }
|
||||
|
||||
@@ -183,6 +183,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
|
||||
}
|
||||
|
||||
slog.Info("succesfully killed process", slog.String("id", proc.Id))
|
||||
proc = nil // gc helper
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -195,6 +196,35 @@ func (s *Service) Clear(args string, killed *string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Removes completed processes
|
||||
func (s *Service) ClearCompleted(cleared *string) error {
|
||||
var (
|
||||
keys = s.db.Keys()
|
||||
removeFunc = func(p *internal.Process) error {
|
||||
defer s.db.Delete(p.Id)
|
||||
|
||||
if p.Progress.Status != internal.StatusCompleted {
|
||||
return nil
|
||||
}
|
||||
|
||||
return p.Kill()
|
||||
}
|
||||
)
|
||||
|
||||
for _, key := range *keys {
|
||||
proc, err := s.db.Get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := removeFunc(proc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FreeSpace gets the available from package sys util
|
||||
func (s *Service) FreeSpace(args NoArgs, free *uint64) error {
|
||||
freeSpace, err := sys.FreeSpace()
|
||||
|
||||
Reference in New Issue
Block a user