Use cookies saved server side (#188)

* retrieve cookies stored server side

fixed netscape cookies validation pipeline

* code refactoring
This commit is contained in:
Marco Piovanello
2024-08-26 10:09:02 +02:00
committed by GitHub
parent 64df0e0b32
commit bb4db5d342
13 changed files with 247 additions and 152 deletions

View File

@@ -63,6 +63,10 @@ func (m *MessageQueue) downloadConsumer() {
)
if p.Progress.Status != StatusCompleted {
slog.Info("started process",
slog.String("bus", queueName),
slog.String("id", p.getShortId()),
)
if p.Livestream {
// livestreams have higher priorty and they ignore the semaphore
go p.Start()
@@ -70,11 +74,6 @@ func (m *MessageQueue) downloadConsumer() {
p.Start()
}
}
slog.Info("started process",
slog.String("bus", queueName),
slog.String("id", p.getShortId()),
)
}, false)
}

View File

@@ -3,6 +3,7 @@ package internal
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@@ -21,7 +22,6 @@ import (
"github.com/marcopeocchi/yt-dlp-web-ui/server/cli"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/rx"
)
const template = `download:
@@ -36,7 +36,6 @@ const (
StatusDownloading
StatusCompleted
StatusErrored
StatusLivestream
)
// Process descriptor
@@ -103,81 +102,102 @@ func (p *Process) Start() {
params := append(baseParams, p.Params...)
// ----------------- main block ----------------- //
slog.Info("requesting download", slog.String("url", p.Url), slog.Any("params", params))
cmd := exec.Command(config.Instance().DownloaderPath, params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
r, err := cmd.StdoutPipe()
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error(
"failed to connect to stdout",
slog.String("err", err.Error()),
)
slog.Error("failed to connect to stdout", slog.Any("err", err))
panic(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to connect to stdout", slog.Any("err", err))
panic(err)
}
if err := cmd.Start(); err != nil {
slog.Error(
"failed to start yt-dlp process",
slog.String("err", err.Error()),
)
slog.Error("failed to start yt-dlp process", slog.Any("err", err))
panic(err)
}
p.proc = cmd.Process
// --------------- progress block --------------- //
var (
sourceChan = make(chan []byte)
doneChan = make(chan struct{})
)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
stdout.Close()
p.Complete()
cancel()
}()
// spawn a goroutine that does the dirty job of parsing the stdout
// filling the channel with as many stdout line as yt-dlp produces (producer)
logs := make(chan []byte)
go produceLogs(stdout, logs)
go p.consumeLogs(ctx, logs)
go p.detectYtDlpErrors(stderr)
cmd.Wait()
}
func produceLogs(r io.Reader, logs chan<- []byte) {
go func() {
scan := bufio.NewScanner(r)
scanner := bufio.NewScanner(r)
defer func() {
r.Close()
p.Complete()
doneChan <- struct{}{}
close(sourceChan)
close(doneChan)
}()
for scan.Scan() {
sourceChan <- scan.Bytes()
for scanner.Scan() {
logs <- scanner.Bytes()
}
}()
}
// Slows down the unmarshal operation to every 500ms
go func() {
rx.Sample(time.Millisecond*500, sourceChan, doneChan, func(event []byte) {
var progress ProgressTemplate
if err := json.Unmarshal(event, &progress); err != nil {
return
}
p.Progress = DownloadProgress{
Status: StatusDownloading,
Percentage: progress.Percentage,
Speed: progress.Speed,
ETA: progress.Eta,
}
slog.Info("progress",
func (p *Process) consumeLogs(ctx context.Context, logs <-chan []byte) {
for {
select {
case <-ctx.Done():
slog.Info("detaching from yt-dlp stdout",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("percentage", progress.Percentage),
)
})
}()
return
case entry := <-logs:
p.parseLogEntry(entry)
}
}
}
// ------------- end progress block ------------- //
cmd.Wait()
func (p *Process) parseLogEntry(entry []byte) {
var progress ProgressTemplate
if err := json.Unmarshal(entry, &progress); err != nil {
return
}
p.Progress = DownloadProgress{
Status: StatusDownloading,
Percentage: progress.Percentage,
Speed: progress.Speed,
ETA: progress.Eta,
}
slog.Info("progress",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("percentage", progress.Percentage),
)
}
func (p *Process) detectYtDlpErrors(r io.Reader) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
slog.Error("yt-dlp process error",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", scanner.Text()),
)
}
}
// Keep process in the memoryDB but marks it as complete
@@ -222,6 +242,7 @@ func (p *Process) Kill() error {
}
// Returns the available format for this URL
//
// TODO: Move out from process.go
func (p *Process) GetFormats() (DownloadFormats, error) {
cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J")