10 playlist download (#71)

* leveraging message queue for playlist entries DL

* playlist support implemented

It's a little bit slow but solid enough :D
This commit is contained in:
Marco
2023-07-28 11:44:38 +02:00
committed by GitHub
parent d4f656fd87
commit 68c829c40e
15 changed files with 257 additions and 58 deletions

View File

@@ -12,15 +12,16 @@ type DownloadProgress struct {
// Used to deser the yt-dlp -J output
type DownloadInfo struct {
URL string `json:"url"`
Title string `json:"title"`
Thumbnail string `json:"thumbnail"`
Resolution string `json:"resolution"`
Size int32 `json:"filesize_approx"`
VCodec string `json:"vcodec"`
ACodec string `json:"acodec"`
Extension string `json:"ext"`
CreatedAt time.Time `json:"created_at"`
URL string `json:"url"`
Title string `json:"title"`
Thumbnail string `json:"thumbnail"`
Resolution string `json:"resolution"`
Size int32 `json:"filesize_approx"`
VCodec string `json:"vcodec"`
ACodec string `json:"acodec"`
Extension string `json:"ext"`
OriginalURL string `json:"original_url"`
CreatedAt time.Time `json:"created_at"`
}
// Used to deser the formats in the -J output
@@ -64,11 +65,9 @@ type AbortRequest struct {
// struct representing the intent to start a download
type DownloadRequest struct {
Url string `json:"url"`
Params []string `json:"params"`
RenameTo string `json:"renameTo"`
Id string
URL string
Path string
Rename string
Id string
URL string
Path string
Rename string
Params []string
}

View File

@@ -30,6 +30,7 @@ func (m *MemoryDB) Get(id string) (*Process, error) {
func (m *MemoryDB) Set(process *Process) string {
id := uuid.Must(uuid.NewRandom()).String()
m.table.Store(id, process)
process.Id = id
return id
}
@@ -129,7 +130,6 @@ func (m *MemoryDB) Restore() {
Url: proc.Info.URL,
Info: proc.Info,
Progress: proc.Progress,
DB: m,
})
}

View File

@@ -30,7 +30,15 @@ func NewMessageQueue() *MessageQueue {
// Publish a message to the queue and set the task to a peding state.
func (m *MessageQueue) Publish(p *Process) {
go p.SetPending()
p.SetPending()
go p.SetMetadata()
m.producerCh <- p
}
// Publish a message to the queue and set the task to a peding state.
// ENSURE P IS PART OF A PLAYLIST
// Needs a further review
func (m *MessageQueue) PublishPlaylistEntry(p *Process) {
m.producerCh <- p
}
@@ -45,3 +53,13 @@ func (m *MessageQueue) Subscriber() {
}(msg)
}
}
// Empties the message queue
func (m *MessageQueue) Empty() {
for range m.producerCh {
<-m.producerCh
}
for range m.consumerCh {
<-m.consumerCh
}
}

View File

@@ -0,0 +1,81 @@
package internal
import (
"errors"
"log"
"os/exec"
"time"
"github.com/goccy/go-json"
"github.com/marcopeocchi/yt-dlp-web-ui/server/cli"
)
type metadata struct {
Entries []DownloadInfo `json:"entries"`
Count int `json:"playlist_count"`
Type string `json:"_type"`
}
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
cmd := exec.Command(cfg.GetConfig().DownloaderPath, req.URL, "-J")
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
m := metadata{}
err = cmd.Start()
if err != nil {
return err
}
log.Println(cli.BgRed, "Decoding metadata", cli.Reset, req.URL)
err = json.NewDecoder(stdout).Decode(&m)
if err != nil {
return err
}
log.Println(cli.BgGreen, "Decoded metadata", cli.Reset, req.URL)
if m.Type == "" {
cmd.Wait()
return errors.New("probably not a valid URL")
}
if m.Type == "playlist" {
log.Println(
cli.BgGreen, "Playlist detected", cli.Reset, m.Count, "entries",
)
for _, meta := range m.Entries {
proc := &Process{
Url: meta.OriginalURL,
Progress: DownloadProgress{},
Output: DownloadOutput{},
Info: meta,
Params: req.Params,
}
proc.Info.URL = meta.OriginalURL
proc.Info.CreatedAt = time.Now().Add(time.Second)
db.Set(proc)
proc.SetPending()
mq.PublishPlaylistEntry(proc)
}
err = cmd.Wait()
return err
}
proc := &Process{Url: req.URL, Params: req.Params}
mq.Publish(proc)
log.Println("Sending new process to message queue", proc.Url)
err = cmd.Wait()
return err
}

View File

@@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"regexp"
"sync"
"syscall"
"github.com/goccy/go-json"
@@ -50,7 +51,6 @@ type Process struct {
Params []string
Info DownloadInfo
Progress DownloadProgress
DB *MemoryDB
Output DownloadOutput
proc *os.Process
}
@@ -128,7 +128,7 @@ func (p *Process) Start() {
for scan.Scan() {
stdout := ProgressTemplate{}
err := json.Unmarshal([]byte(scan.Text()), &stdout)
err := json.Unmarshal(scan.Bytes(), &stdout)
if err == nil {
p.Progress = DownloadProgress{
Status: StatusDownloading,
@@ -175,26 +175,49 @@ func (p *Process) Kill() error {
return err
}
p.DB.Delete(p.Id)
return nil
}
// Returns the available format for this URL
func (p *Process) GetFormatsSync() (DownloadFormats, error) {
cmd := exec.Command(cfg.GetConfig().DownloaderPath, p.Url, "-J")
stdout, err := cmd.Output()
stdout, err := cmd.StdoutPipe()
if err != nil {
return DownloadFormats{}, err
}
cmd.Wait()
info := DownloadFormats{URL: p.Url}
best := Format{}
json.Unmarshal(stdout, &info)
json.Unmarshal(stdout, &best)
var (
wg sync.WaitGroup
decodingError error
)
wg.Add(2)
err = cmd.Start()
if err != nil {
return DownloadFormats{}, err
}
go func() {
decodingError = json.NewDecoder(stdout).Decode(&info)
wg.Done()
}()
go func() {
decodingError = json.NewDecoder(stdout).Decode(&best)
wg.Done()
}()
wg.Wait()
cmd.Wait()
if decodingError != nil {
return DownloadFormats{}, err
}
info.Best = best
@@ -202,14 +225,17 @@ func (p *Process) GetFormatsSync() (DownloadFormats, error) {
}
func (p *Process) SetPending() {
p.Id = p.DB.Set(p)
p.Progress.Status = StatusPending
}
func (p *Process) SetMetadata() error {
cmd := exec.Command(cfg.GetConfig().DownloaderPath, p.Url, "-J")
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.Output()
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Println("Cannot retrieve info for", p.Url)
return err
}
info := DownloadInfo{
@@ -217,8 +243,20 @@ func (p *Process) SetPending() {
CreatedAt: time.Now(),
}
json.Unmarshal(stdout, &info)
p.Info = info
err = cmd.Start()
if err != nil {
return err
}
err = json.NewDecoder(stdout).Decode(&info)
if err != nil {
return err
}
p.Info = info
p.Progress.Status = StatusPending
err = cmd.Wait()
return err
}