10 feat download queue (#59)

* testing message queue

* better mq syncronisation

* major code refactoring, concern separation.

* bugfix

* code refactoring

* queuesize configurable via flags

* code refactoring

* comments

* code refactoring, updated readme
This commit is contained in:
Marco
2023-06-26 11:27:15 +02:00
committed by GitHub
parent dd753c5f26
commit 3ded768a6f
14 changed files with 283 additions and 118 deletions

View File

@@ -0,0 +1,125 @@
package internal
import (
"errors"
"fmt"
"log"
"os"
"sync"
"github.com/goccy/go-json"
"github.com/google/uuid"
"github.com/marcopeocchi/yt-dlp-web-ui/server/cli"
)
// In-Memory Thread-Safe Key-Value Storage with optional persistence
type MemoryDB struct {
table sync.Map
}
// Get a process pointer given its id
func (m *MemoryDB) Get(id string) (*Process, error) {
entry, ok := m.table.Load(id)
if !ok {
return nil, errors.New("no process found for the given key")
}
return entry.(*Process), nil
}
// Store a pointer of a process and return its id
func (m *MemoryDB) Set(process *Process) string {
id := uuid.Must(uuid.NewRandom()).String()
m.table.Store(id, process)
return id
}
// Update a process info/metadata, given the process id
//
// Deprecated: will be removed anytime soon.
func (m *MemoryDB) UpdateInfo(id string, info DownloadInfo) error {
entry, ok := m.table.Load(id)
if ok {
entry.(*Process).Info = info
m.table.Store(id, entry)
return nil
}
return fmt.Errorf("can't update row with id %s", id)
}
// Update a process progress data, given the process id
// Used for updating completition percentage or ETA.
//
// Deprecated: will be removed anytime soon.
func (m *MemoryDB) UpdateProgress(id string, progress DownloadProgress) error {
entry, ok := m.table.Load(id)
if ok {
entry.(*Process).Progress = progress
m.table.Store(id, entry)
return nil
}
return fmt.Errorf("can't update row with id %s", id)
}
// Removes a process progress, given the process id
func (m *MemoryDB) Delete(id string) {
m.table.Delete(id)
}
func (m *MemoryDB) Keys() *[]string {
running := []string{}
m.table.Range(func(key, value any) bool {
running = append(running, key.(string))
return true
})
return &running
}
// Returns a slice of all currently stored processes progess
func (m *MemoryDB) All() *[]ProcessResponse {
running := []ProcessResponse{}
m.table.Range(func(key, value any) bool {
running = append(running, ProcessResponse{
Id: key.(string),
Info: value.(*Process).Info,
Progress: value.(*Process).Progress,
})
return true
})
return &running
}
// WIP: Persist the database in a single file named "session.dat"
func (m *MemoryDB) Persist() {
running := m.All()
session, err := json.Marshal(Session{
Processes: *running,
})
if err != nil {
log.Println(cli.Red, "Failed to persist database", cli.Reset)
return
}
err = os.WriteFile("session.dat", session, 0700)
if err != nil {
log.Println(cli.Red, "Failed to persist database", cli.Reset)
}
}
// WIP: Restore a persisted state
func (m *MemoryDB) Restore() {
feed, _ := os.ReadFile("session.dat")
session := Session{}
json.Unmarshal(feed, &session)
for _, proc := range session.Processes {
m.table.Store(proc.Id, &Process{
Id: proc.Id,
Url: proc.Info.URL,
Info: proc.Info,
Progress: proc.Progress,
DB: m,
})
}
}

View File

@@ -0,0 +1,46 @@
package internal
import (
"log"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
type MessageQueue struct {
ch chan *Process
consumerCh chan struct{}
}
// Creates a new message queue.
// By default it will be created with a size equals to nthe number of logical
// CPU cores.
// The queue size can be set via the qs flag.
func NewMessageQueue() *MessageQueue {
size := config.Instance().GetConfig().QueueSize
if size <= 0 {
log.Fatalln("invalid queue size")
}
return &MessageQueue{
ch: make(chan *Process, size),
consumerCh: make(chan struct{}, size),
}
}
// Publish a message to the queue and set the task to a peding state.
func (m *MessageQueue) Publish(p *Process) {
go p.SetPending()
m.ch <- p
}
// Setup the consumer listened which "subscribes" to the queue events.
func (m *MessageQueue) SetupConsumer() {
for msg := range m.ch {
m.consumerCh <- struct{}{}
go func(p *Process) {
p.Start()
<-m.consumerCh
}(msg)
}
}

224
server/internal/process.go Normal file
View File

@@ -0,0 +1,224 @@
package internal
import (
"bufio"
"fmt"
"regexp"
"syscall"
"github.com/goccy/go-json"
"log"
"os"
"os/exec"
"strings"
"time"
"github.com/marcopeocchi/fazzoletti/slices"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
const template = `download:
{
"eta":%(progress.eta)s,
"percentage":"%(progress._percent_str)s",
"speed":%(progress.speed)s
}`
var (
cfg = config.Instance()
)
const (
StatusPending = iota
StatusDownloading
StatusCompleted
StatusErrored
)
type ProgressTemplate struct {
Percentage string `json:"percentage"`
Speed float32 `json:"speed"`
Size string `json:"size"`
Eta int `json:"eta"`
}
// Process descriptor
type Process struct {
Id string
Url string
Params []string
Info DownloadInfo
Progress DownloadProgress
DB *MemoryDB
Output DownloadOutput
proc *os.Process
}
type DownloadOutput struct {
Path string
Filename string
}
// Starts spawns/forks a new yt-dlp process and parse its stdout.
// The process is spawned to outputting a custom progress text that
// Resembles a JSON Object in order to Unmarshal it later.
// This approach is anyhow not perfect: quotes are not escaped properly.
// Each process is not identified by its PID but by a UUIDv4
func (p *Process) Start() {
// escape bash variable escaping and command piping, you'll never know
// what they might come with...
p.Params = slices.Filter(p.Params, func(e string) bool {
match, _ := regexp.MatchString(`(\$\{)|(\&\&)`, e)
return !match
})
out := DownloadOutput{
Path: cfg.GetConfig().DownloadPath,
Filename: "%(title)s.%(ext)s",
}
if p.Output.Path != "" {
out.Path = p.Output.Path
}
if p.Output.Filename != "" {
out.Filename = p.Output.Filename + ".%(ext)s"
}
params := append([]string{
strings.Split(p.Url, "?list")[0], //no playlist
"--newline",
"--no-colors",
"--no-playlist",
"--progress-template", strings.ReplaceAll(template, "\n", ""),
"-o",
fmt.Sprintf("%s/%s", out.Path, out.Filename),
}, p.Params...)
// ----------------- main block ----------------- //
cmd := exec.Command(cfg.GetConfig().DownloaderPath, params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
r, err := cmd.StdoutPipe()
if err != nil {
log.Panicln(err)
}
scan := bufio.NewScanner(r)
err = cmd.Start()
if err != nil {
log.Panicln(err)
}
p.proc = cmd.Process
// ----------------- info block ----------------- //
// spawn a goroutine that retrieves the info for the download
// --------------- progress block --------------- //
// unbuffered channel connected to stdout
// spawn a goroutine that does the dirty job of parsing the stdout
// filling the channel with as many stdout line as yt-dlp produces (producer)
go func() {
defer func() {
r.Close()
p.Complete()
}()
for scan.Scan() {
stdout := ProgressTemplate{}
err := json.Unmarshal([]byte(scan.Text()), &stdout)
if err == nil {
p.Progress = DownloadProgress{
Status: StatusDownloading,
Percentage: stdout.Percentage,
Speed: stdout.Speed,
ETA: stdout.Eta,
}
shortId := strings.Split(p.Id, "-")[0]
log.Printf("[%s] %s %s\n", shortId, p.Url, p.Progress.Percentage)
}
}
}()
// ------------- end progress block ------------- //
cmd.Wait()
}
// Keep process in the memoryDB but marks it as complete
// Convention: All completed processes has progress -1
// and speed 0 bps.
func (p *Process) Complete() {
p.Progress = DownloadProgress{
Status: StatusCompleted,
Percentage: "-1",
Speed: 0,
ETA: 0,
}
}
// Kill a process and remove it from the memory
func (p *Process) Kill() error {
// yt-dlp uses multiple child process the parent process
// has been spawned with setPgid = true. To properly kill
// all subprocesses a SIGTERM need to be sent to the correct
// process group
if p.proc != nil {
pgid, err := syscall.Getpgid(p.proc.Pid)
if err != nil {
return err
}
err = syscall.Kill(-pgid, syscall.SIGTERM)
log.Println("Killed process", p.Id)
return err
}
p.DB.Delete(p.Id)
return nil
}
// Returns the available format for this URL
func (p *Process) GetFormatsSync() (DownloadFormats, error) {
cmd := exec.Command(cfg.GetConfig().DownloaderPath, p.Url, "-J")
stdout, err := cmd.Output()
if err != nil {
return DownloadFormats{}, err
}
cmd.Wait()
info := DownloadFormats{URL: p.Url}
best := Format{}
json.Unmarshal(stdout, &info)
json.Unmarshal(stdout, &best)
info.Best = best
return info, nil
}
func (p *Process) SetPending() {
p.Id = p.DB.Set(p)
cmd := exec.Command(cfg.GetConfig().DownloaderPath, p.Url, "-J")
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.Output()
if err != nil {
log.Println("Cannot retrieve info for", p.Url)
}
info := DownloadInfo{
URL: p.Url,
CreatedAt: time.Now(),
}
json.Unmarshal(stdout, &info)
p.Info = info
p.Progress.Status = StatusPending
}

74
server/internal/types.go Normal file
View File

@@ -0,0 +1,74 @@
package internal
import "time"
// Progress for the Running call
type DownloadProgress struct {
Status int `json:"process_status"`
Percentage string `json:"percentage"`
Speed float32 `json:"speed"`
ETA int `json:"eta"`
}
// Used to deser the yt-dlp -J output
type DownloadInfo struct {
URL string `json:"url"`
Title string `json:"title"`
Thumbnail string `json:"thumbnail"`
Resolution string `json:"resolution"`
Size int32 `json:"filesize_approx"`
VCodec string `json:"vcodec"`
ACodec string `json:"acodec"`
Extension string `json:"ext"`
CreatedAt time.Time `json:"created_at"`
}
// Used to deser the formats in the -J output
type DownloadFormats struct {
Formats []Format `json:"formats"`
Best Format `json:"best"`
Thumbnail string `json:"thumbnail"`
Title string `json:"title"`
URL string `json:"url"`
}
// A skimmed yt-dlp format node
type Format struct {
Format_id string `json:"format_id"`
Format_note string `json:"format_note"`
FPS float32 `json:"fps"`
Resolution string `json:"resolution"`
VCodec string `json:"vcodec"`
ACodec string `json:"acodec"`
Size float32 `json:"filesize_approx"`
}
// struct representing the response sent to the client
// as JSON-RPC result field
type ProcessResponse struct {
Id string `json:"id"`
Progress DownloadProgress `json:"progress"`
Info DownloadInfo `json:"info"`
}
// struct representing the current status of the memoryDB
// used for serializaton/persistence reasons
type Session struct {
Processes []ProcessResponse `json:"processes"`
}
// struct representing the intent to stop a specific process
type AbortRequest struct {
Id string `json:"id"`
}
// struct representing the intent to start a download
type DownloadRequest struct {
Url string `json:"url"`
Params []string `json:"params"`
RenameTo string `json:"renameTo"`
Id string
URL string
Path string
Rename string
}