Compare commits

..

1 Commits

Author SHA1 Message Date
a4ba8cea27 initial support for playlist modifiers
supported modifiers are --playlist-start, --playlist-end, --playlist-reverse, --max-downloads
2025-02-06 11:24:09 +01:00
26 changed files with 124 additions and 366 deletions

View File

@@ -1 +0,0 @@
docker run -d -p 3033:3033 -v /downloads:/downloads marcobaobao/yt-dlp-webui

View File

@@ -3,7 +3,6 @@
result/ result/
result result
dist dist
.pnpm-store/
.pnpm-debug.log .pnpm-debug.log
node_modules node_modules
.env .env
@@ -21,11 +20,9 @@ cookies.txt
__debug* __debug*
ui/ ui/
.idea .idea
.idea/
frontend/.pnp.cjs frontend/.pnp.cjs
frontend/.pnp.loader.mjs frontend/.pnp.loader.mjs
frontend/.yarn/install-state.gz frontend/.yarn/install-state.gz
.db.lock .db.lock
livestreams.dat livestreams.dat
.vite/deps .git
archive.txt

View File

@@ -24,12 +24,11 @@ COPY --from=ui /usr/src/yt-dlp-webui/frontend /usr/src/yt-dlp-webui/frontend
RUN CGO_ENABLED=0 GOOS=linux go build -o yt-dlp-webui RUN CGO_ENABLED=0 GOOS=linux go build -o yt-dlp-webui
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Runtime --------------------------------------------------------------------- # dependencies ----------------------------------------------------------------
FROM python:3.13.2-alpine3.21 FROM alpine:edge
RUN apk update && \ RUN apk update && \
apk add ffmpeg ca-certificates curl wget gnutls --no-cache && \ apk add ffmpeg yt-dlp ca-certificates curl wget psmisc
pip install "yt-dlp[default,curl-cffi,mutagen,pycryptodomex,phantomjs,secretstorage]"
VOLUME /downloads /config VOLUME /downloads /config

View File

@@ -28,7 +28,7 @@ docker pull ghcr.io/marcopiovanello/yt-dlp-web-ui:latest
## Community stuff ## Community stuff
Feel free to join :) Feel free to join :)
[Discord](https://discord.gg/GZAX5FfGzE) [![Discord Banner](https://api.weblutions.com/discord/invite/3Sj9ZZHv/)](https://discord.gg/3Sj9ZZHv)
## Some screeshots ## Some screeshots
![image](https://github.com/user-attachments/assets/fc43a3fb-ecf9-449d-b5cb-5d5635020c00) ![image](https://github.com/user-attachments/assets/fc43a3fb-ecf9-449d-b5cb-5d5635020c00)
@@ -115,16 +115,6 @@ services:
restart: unless-stopped restart: unless-stopped
``` ```
### ⚡ One-Click Deploy
| Cloud Provider | Deploy Button |
|----------------|---------------|
| AWS | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=aws&language=cfn"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/aws.svg" height="38"></a> |
| DigitalOcean | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=do&language=dop"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/do.svg" height="38"></a> |
| Render | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=rnd&language=rnd"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/rnd.svg" height="38"></a> |
<sub>Generated by <a href="https://deploystack.io/c/marcopiovanello-yt-dlp-web-ui" target="_blank">DeployStack.io</a></sub>
## [Prebuilt binaries](https://github.com/marcopiovanello/yt-dlp-web-ui/releases) installation ## [Prebuilt binaries](https://github.com/marcopiovanello/yt-dlp-web-ui/releases) installation
```sh ```sh

View File

@@ -80,4 +80,3 @@ keys:
cronExpressionLabel: 'Cron expression' cronExpressionLabel: 'Cron expression'
editButtonLabel: 'Edit' editButtonLabel: 'Edit'
newSubscriptionButton: New subscription newSubscriptionButton: New subscription
clearCompletedButton: 'Clear completed'

View File

@@ -121,18 +121,11 @@ export const appTitleState = atomWithStorage(
export const serverAddressAndPortState = atom((get) => { export const serverAddressAndPortState = atom((get) => {
if (get(servedFromReverseProxySubDirState)) { if (get(servedFromReverseProxySubDirState)) {
return `${get(serverAddressState)}/${get(servedFromReverseProxySubDirState)}/` return `${get(serverAddressState)}/${get(servedFromReverseProxySubDirState)}/`
.replaceAll('"', '') // XXX: atomWithStorage uses JSON.stringify to serialize
.replaceAll('//', '/') // which puts extra double quotes.
} }
if (get(servedFromReverseProxyState)) { if (get(servedFromReverseProxyState)) {
return `${get(serverAddressState)}` return `${get(serverAddressState)}`
.replaceAll('"', '')
} }
return `${get(serverAddressState)}:${get(serverPortState)}`
const sap = `${get(serverAddressState)}:${get(serverPortState)}`
.replaceAll('"', '')
return sap.endsWith('/') ? sap.slice(0, -1) : sap
}) })
export const serverURL = atom((get) => export const serverURL = atom((get) =>
@@ -141,17 +134,15 @@ export const serverURL = atom((get) =>
export const rpcWebSocketEndpoint = atom((get) => { export const rpcWebSocketEndpoint = atom((get) => {
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:' const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
const sap = get(serverAddressAndPortState) return `${proto}//${get(serverAddressAndPortState)}/rpc/ws`
}
return `${proto}//${sap.endsWith('/') ? sap.slice(0, -1) : sap}/rpc/ws` )
})
export const rpcHTTPEndpoint = atom((get) => { export const rpcHTTPEndpoint = atom((get) => {
const proto = window.location.protocol const proto = window.location.protocol
const sap = get(serverAddressAndPortState) return `${proto}//${get(serverAddressAndPortState)}/rpc/http`
}
return `${proto}//${sap.endsWith('/') ? sap.slice(0, -1) : sap}/rpc/http` )
})
export const serverSideCookiesState = atom<Promise<string>>(async (get) => await pipe( export const serverSideCookiesState = atom<Promise<string>>(async (get) => await pipe(
ffetch<Readonly<{ cookies: string }>>(`${get(serverURL)}/api/v1/cookies`), ffetch<Readonly<{ cookies: string }>>(`${get(serverURL)}/api/v1/cookies`),
@@ -189,4 +180,5 @@ export const settingsState = atom<SettingsState>((get) => ({
listView: get(listViewState), listView: get(listViewState),
servedFromReverseProxy: get(servedFromReverseProxyState), servedFromReverseProxy: get(servedFromReverseProxyState),
appTitle: get(appTitleState) appTitle: get(appTitleState)
})) })
)

View File

@@ -1,6 +1,5 @@
import AddCircleIcon from '@mui/icons-material/AddCircle' import AddCircleIcon from '@mui/icons-material/AddCircle'
import BuildCircleIcon from '@mui/icons-material/BuildCircle' import BuildCircleIcon from '@mui/icons-material/BuildCircle'
import ClearAllIcon from '@mui/icons-material/ClearAll'
import DeleteForeverIcon from '@mui/icons-material/DeleteForever' import DeleteForeverIcon from '@mui/icons-material/DeleteForever'
import FolderZipIcon from '@mui/icons-material/FolderZip' import FolderZipIcon from '@mui/icons-material/FolderZip'
import FormatListBulleted from '@mui/icons-material/FormatListBulleted' import FormatListBulleted from '@mui/icons-material/FormatListBulleted'
@@ -32,7 +31,6 @@ const HomeSpeedDial: React.FC<Props> = ({ onDownloadOpen, onEditorOpen }) => {
ariaLabel="Home speed dial" ariaLabel="Home speed dial"
sx={{ position: 'absolute', bottom: 64, right: 24 }} sx={{ position: 'absolute', bottom: 64, right: 24 }}
icon={<SpeedDialIcon />} icon={<SpeedDialIcon />}
onClick={onDownloadOpen}
> >
<SpeedDialAction <SpeedDialAction
icon={listView ? <ViewAgendaIcon /> : <FormatListBulleted />} icon={listView ? <ViewAgendaIcon /> : <FormatListBulleted />}
@@ -44,11 +42,6 @@ const HomeSpeedDial: React.FC<Props> = ({ onDownloadOpen, onEditorOpen }) => {
tooltipTitle={i18n.t('bulkDownload')} tooltipTitle={i18n.t('bulkDownload')}
onClick={() => window.open(`${serverAddr}/archive/bulk?token=${localStorage.getItem('token')}`)} onClick={() => window.open(`${serverAddr}/archive/bulk?token=${localStorage.getItem('token')}`)}
/> />
<SpeedDialAction
icon={<ClearAllIcon />}
tooltipTitle={i18n.t('clearCompletedButton')}
onClick={() => client.clearCompleted()}
/>
<SpeedDialAction <SpeedDialAction
icon={<DeleteForeverIcon />} icon={<DeleteForeverIcon />}
tooltipTitle={i18n.t('abortAllButton')} tooltipTitle={i18n.t('abortAllButton')}

View File

@@ -1,9 +1,6 @@
import { tryCatch } from 'fp-ts/TaskEither' import { tryCatch } from 'fp-ts/TaskEither'
import * as J from 'fp-ts/Json'
import * as E from 'fp-ts/Either'
import { pipe } from 'fp-ts/lib/function'
async function fetcher(url: string, opt?: RequestInit, controller?: AbortController): Promise<string> { async function fetcher<T>(url: string, opt?: RequestInit): Promise<T> {
const jwt = localStorage.getItem('token') const jwt = localStorage.getItem('token')
if (opt && !opt.headers) { if (opt && !opt.headers) {
@@ -17,27 +14,17 @@ async function fetcher(url: string, opt?: RequestInit, controller?: AbortControl
headers: { headers: {
...opt?.headers, ...opt?.headers,
'X-Authentication': jwt ?? '' 'X-Authentication': jwt ?? ''
}, }
signal: controller?.signal
}) })
if (!res.ok) { if (!res.ok) {
throw await res.text() throw await res.text()
} }
return res.json() as T
return res.text()
} }
export const ffetch = <T>(url: string, opt?: RequestInit, controller?: AbortController) => tryCatch( export const ffetch = <T>(url: string, opt?: RequestInit) => tryCatch(
async () => pipe( () => fetcher<T>(url, opt),
await fetcher(url, opt, controller),
J.parse,
E.match(
(l) => l as T,
(r) => r as T
)
),
(e) => `error while fetching: ${e}` (e) => `error while fetching: ${e}`
) )

View File

@@ -200,11 +200,4 @@ export class RPCClient {
params: [] params: []
}) })
} }
public clearCompleted() {
return this.sendHTTP({
method: 'Service.ClearCompleted',
params: []
})
}
} }

View File

@@ -13,7 +13,6 @@ export type RPCMethods =
| "Service.ProgressLivestream" | "Service.ProgressLivestream"
| "Service.KillLivestream" | "Service.KillLivestream"
| "Service.KillAllLivestream" | "Service.KillAllLivestream"
| "Service.ClearCompleted"
export type RPCRequest = { export type RPCRequest = {
method: RPCMethods method: RPCMethods

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/marcopiovanello/yt-dlp-web-ui/v3 module github.com/marcopiovanello/yt-dlp-web-ui/v3
go 1.24 go 1.23
require ( require (
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef

View File

@@ -1,58 +0,0 @@
package archive
import (
"bufio"
"bytes"
"context"
"os"
"os/exec"
"path/filepath"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
// Perform a search on the archive.txt file an determines if a download
// has already be done.
func DownloadExists(ctx context.Context, url string) (bool, error) {
cmd := exec.CommandContext(
ctx,
config.Instance().DownloaderPath,
"--print",
"%(extractor)s %(id)s",
url,
)
stdout, err := cmd.Output()
if err != nil {
return false, err
}
extractorAndURL := bytes.Trim(stdout, "\n")
fd, err := os.Open(filepath.Join(config.Instance().Dir(), "archive.txt"))
if err != nil {
return false, err
}
defer fd.Close()
scanner := bufio.NewScanner(fd)
// search linearly for lower memory usage...
// the a pre-sorted with hashed values version of the archive.txt file can be loaded in memory
// and perform a binary search on it.
for scanner.Scan() {
if bytes.Equal(scanner.Bytes(), extractorAndURL) {
return true, nil
}
}
// data, err := io.ReadAll(fd)
// if err != nil {
// return false, err
// }
// slices.BinarySearchFunc(data, extractorAndURL, func(a []byte, b []byte) int {
// return hash(a).Compare(hash(b))
// })
return false, nil
}

View File

@@ -28,7 +28,6 @@ type Config struct {
OpenIdClientId string `yaml:"openid_client_id"` OpenIdClientId string `yaml:"openid_client_id"`
OpenIdClientSecret string `yaml:"openid_client_secret"` OpenIdClientSecret string `yaml:"openid_client_secret"`
OpenIdRedirectURL string `yaml:"openid_redirect_url"` OpenIdRedirectURL string `yaml:"openid_redirect_url"`
OpenIdEmailWhitelist []string `yaml:"openid_email_whitelist"`
FrontendPath string `yaml:"frontend_path"` FrontendPath string `yaml:"frontend_path"`
AutoArchive bool `yaml:"auto_archive"` AutoArchive bool `yaml:"auto_archive"`
} }

View File

@@ -2,7 +2,6 @@ package internal
import ( import (
"container/heap" "container/heap"
"log/slog"
) )
type LoadBalancer struct { type LoadBalancer struct {
@@ -10,29 +9,7 @@ type LoadBalancer struct {
done chan *Worker done chan *Worker
} }
func NewLoadBalancer(numWorker int) *LoadBalancer { func (b *LoadBalancer) Balance(work chan Process) {
var pool Pool
doneChan := make(chan *Worker)
for i := range numWorker {
w := &Worker{
requests: make(chan *Process, 1),
index: i,
}
go w.Work(doneChan)
pool = append(pool, w)
slog.Info("spawned worker", slog.Int("index", i))
}
return &LoadBalancer{
pool: pool,
done: doneChan,
}
}
func (b *LoadBalancer) Balance(work chan *Process) {
for { for {
select { select {
case req := <-work: case req := <-work:
@@ -43,7 +20,7 @@ func (b *LoadBalancer) Balance(work chan *Process) {
} }
} }
func (b *LoadBalancer) dispatch(req *Process) { func (b *LoadBalancer) dispatch(req Process) {
w := heap.Pop(&b.pool).(*Worker) w := heap.Pop(&b.pool).(*Worker)
w.requests <- req w.requests <- req
w.pending++ w.pending++

View File

@@ -141,13 +141,26 @@ func (l *LiveStream) monitorStartTime(r io.Reader) {
} }
} }
const TRIES = 5
/*
if it's waiting a livestream the 5th line will indicate the time to live
its a dumb and not robust method.
example:
[youtube] Extracting URL: https://www.youtube.com/watch?v=IQVbGfVVjgY
[youtube] IQVbGfVVjgY: Downloading webpage
[youtube] IQVbGfVVjgY: Downloading ios player API JSON
[youtube] IQVbGfVVjgY: Downloading web creator player API JSON
WARNING: [youtube] This live event will begin in 27 minutes. <- STDERR, ignore
[wait] Waiting for 00:27:15 - Press Ctrl+C to try now <- 5th line
*/
for range TRIES {
scanner.Scan() scanner.Scan()
for !strings.Contains(scanner.Text(), "Waiting for") { if strings.Contains(scanner.Text(), "Waiting for") {
scanner.Scan()
}
waitTimeScanner() waitTimeScanner()
}
}
} }
func (l *LiveStream) WaitTime() <-chan time.Duration { func (l *LiveStream) WaitTime() <-chan time.Duration {

View File

@@ -9,17 +9,15 @@ import (
) )
func setupTest() { func setupTest() {
config.Instance().DownloaderPath = "build/yt-dlp" config.Instance().DownloaderPath = "yt-dlp"
} }
const URL = "https://www.youtube.com/watch?v=pwoAyLGOysU"
func TestLivestream(t *testing.T) { func TestLivestream(t *testing.T) {
setupTest() setupTest()
done := make(chan *LiveStream) done := make(chan *LiveStream)
ls := New(URL, done, &internal.MessageQueue{}, &internal.MemoryDB{}) ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", done, &internal.MessageQueue{}, &internal.MemoryDB{})
go ls.Start() go ls.Start()
time.AfterFunc(time.Second*20, func() { time.AfterFunc(time.Second*20, func() {

View File

@@ -76,7 +76,7 @@ func (m *Monitor) Status() LiveStreamStatus {
// Persist the monitor current state to a file. // Persist the monitor current state to a file.
// The file is located in the configured config directory // The file is located in the configured config directory
func (m *Monitor) Persist() error { func (m *Monitor) Persist() error {
fd, err := os.Create(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat")) fd, err := os.Create(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
if err != nil { if err != nil {
return err return err
} }
@@ -95,7 +95,7 @@ func (m *Monitor) Persist() error {
// Restore a saved state and resume the monitored livestreams // Restore a saved state and resume the monitored livestreams
func (m *Monitor) Restore() error { func (m *Monitor) Restore() error {
fd, err := os.Open(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat")) fd, err := os.Open(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
if err != nil { if err != nil {
return err return err
} }

View File

@@ -50,7 +50,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
return errors.New("probably not a valid URL") return errors.New("probably not a valid URL")
} }
if m.IsPlaylist() { if m.Type == "playlist" {
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool { entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool {
return a.URL == b.URL return a.URL == b.URL
}) })
@@ -88,10 +88,10 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
proc.Info.URL = meta.URL proc.Info.URL = meta.URL
time.Sleep(time.Millisecond)
db.Set(proc) db.Set(proc)
mq.Publish(proc) mq.Publish(proc)
proc.Info.CreatedAt = meta.CreatedAt
} }
return nil return nil

View File

@@ -1,24 +1,16 @@
package internal package internal
// Pool implements heap.Interface interface as a standard priority queue
type Pool []*Worker type Pool []*Worker
func (h Pool) Len() int { return len(h) } func (h Pool) Len() int { return len(h) }
func (h Pool) Less(i, j int) bool { return h[i].pending < h[j].pending } func (h Pool) Less(i, j int) bool { return h[i].index < h[j].index }
func (h Pool) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h Pool) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) } func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
func (h *Pool) Pop() any { func (h *Pool) Pop() any {
old := *h old := *h
n := len(old) n := len(old)
x := old[n-1] x := old[n-1]
old[n-1] = nil
*h = old[0 : n-1] *h = old[0 : n-1]
return x return x
} }

View File

@@ -93,7 +93,6 @@ func (p *Process) Start() {
baseParams := []string{ baseParams := []string{
strings.Split(p.Url, "?list")[0], //no playlist strings.Split(p.Url, "?list")[0], //no playlist
"--no-exec",
"--newline", "--newline",
"--no-colors", "--no-colors",
"--no-playlist", "--no-playlist",

View File

@@ -1,7 +1,7 @@
package internal package internal
type Worker struct { type Worker struct {
requests chan *Process // downloads to do requests chan Process // downloads to do
pending int // downloads pending pending int // downloads pending
index int // index in the heap index int // index in the heap
} }

View File

@@ -6,12 +6,10 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"net/http" "net/http"
"slices"
"time" "time"
"github.com/coreos/go-oidc/v3/oidc" "github.com/coreos/go-oidc/v3/oidc"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"golang.org/x/oauth2" "golang.org/x/oauth2"
) )
@@ -78,21 +76,6 @@ func doAuthentification(r *http.Request, setCookieCallback func(t *oauth2.Token)
return nil, err return nil, err
} }
var claims struct {
Email string `json:"email"`
Verified bool `json:"email_verified"`
}
if err := idToken.Claims(&claims); err != nil {
return nil, err
}
whitelist := config.Instance().OpenIdEmailWhitelist
if len(whitelist) > 0 && !slices.Contains(whitelist, claims.Email) {
return nil, errors.New("email address not found in ACL")
}
nonce, err := r.Cookie("nonce") nonce, err := r.Cookie("nonce")
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -8,5 +8,3 @@ type Metadata struct {
PlaylistTitle string `json:"title"` PlaylistTitle string `json:"title"`
Type string `json:"_type"` Type string `json:"_type"`
} }
func (m *Metadata) IsPlaylist() bool { return m.Type == "playlist" }

View File

@@ -183,7 +183,6 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
} }
slog.Info("succesfully killed process", slog.String("id", proc.Id)) slog.Info("succesfully killed process", slog.String("id", proc.Id))
proc = nil // gc helper
} }
return nil return nil
@@ -196,35 +195,6 @@ func (s *Service) Clear(args string, killed *string) error {
return nil return nil
} }
// Removes completed processes
func (s *Service) ClearCompleted(cleared *string) error {
var (
keys = s.db.Keys()
removeFunc = func(p *internal.Process) error {
defer s.db.Delete(p.Id)
if p.Progress.Status != internal.StatusCompleted {
return nil
}
return p.Kill()
}
)
for _, key := range *keys {
proc, err := s.db.Get(key)
if err != nil {
return err
}
if err := removeFunc(proc); err != nil {
return err
}
}
return nil
}
// FreeSpace gets the available from package sys util // FreeSpace gets the available from package sys util
func (s *Service) FreeSpace(args NoArgs, free *uint64) error { func (s *Service) FreeSpace(args NoArgs, free *uint64) error {
freeSpace, err := sys.FreeSpace() freeSpace, err := sys.FreeSpace()

View File

@@ -53,7 +53,6 @@ func toDB(dto *domain.Subscription) data.Subscription {
// Delete implements domain.Service. // Delete implements domain.Service.
func (s *Service) Delete(ctx context.Context, id string) error { func (s *Service) Delete(ctx context.Context, id string) error {
s.runner.StopTask(id)
return s.r.Delete(ctx, id) return s.r.Delete(ctx, id)
} }

View File

@@ -7,9 +7,9 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings"
"time" "time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
@@ -19,12 +19,10 @@ import (
type TaskRunner interface { type TaskRunner interface {
Submit(subcription *domain.Subscription) error Submit(subcription *domain.Subscription) error
Spawner(ctx context.Context) Spawner(ctx context.Context)
StopTask(id string) error
Recoverer() Recoverer()
} }
type monitorTask struct { type taskPair struct {
Done chan struct{}
Schedule cron.Schedule Schedule cron.Schedule
Subscription *domain.Subscription Subscription *domain.Subscription
} }
@@ -33,22 +31,21 @@ type CronTaskRunner struct {
mq *internal.MessageQueue mq *internal.MessageQueue
db *internal.MemoryDB db *internal.MemoryDB
tasks chan monitorTask tasks chan taskPair
errors chan error errors chan error
running map[string]*monitorTask
} }
func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner { func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner {
return &CronTaskRunner{ return &CronTaskRunner{
mq: mq, mq: mq,
db: db, db: db,
tasks: make(chan monitorTask), tasks: make(chan taskPair),
errors: make(chan error), errors: make(chan error),
running: make(map[string]*monitorTask),
} }
} }
const commandTemplate = "-I1 --flat-playlist --print webpage_url $1"
var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`) var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`)
func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error { func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
@@ -57,8 +54,7 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
return err return err
} }
job := monitorTask{ job := taskPair{
Done: make(chan struct{}),
Schedule: schedule, Schedule: schedule,
Subscription: subcription, Subscription: subcription,
} }
@@ -68,110 +64,54 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
return nil return nil
} }
// Handles the entire lifecylce of a monitor job.
func (t *CronTaskRunner) Spawner(ctx context.Context) { func (t *CronTaskRunner) Spawner(ctx context.Context) {
for req := range t.tasks { for task := range t.tasks {
t.running[req.Subscription.Id] = &req // keep track of the current job
go func() {
ctx, cancel := context.WithCancel(ctx) // inject into the job's context a cancellation singal
fetcherEvents := t.doFetch(ctx, &req) // retrieve the channel of events of the job
for {
select {
case <-req.Done:
slog.Info("stopping cron job and removing schedule", slog.String("url", req.Subscription.URL))
cancel()
return
case <-fetcherEvents:
slog.Info("finished monitoring channel", slog.String("url", req.Subscription.URL))
}
}
}()
}
}
// Stop a currently scheduled job
func (t *CronTaskRunner) StopTask(id string) error {
task := t.running[id]
if task != nil {
t.running[id].Done <- struct{}{}
delete(t.running, id)
}
return nil
}
// Start a fetcher and notify on a channel when a fetcher has completed
func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan struct{} {
completed := make(chan struct{})
// generator func
go func() { go func() {
for { for {
sleepFor := t.fetcher(ctx, req) slog.Info("fetching latest video for channel", slog.String("channel", task.Subscription.URL))
completed <- struct{}{}
time.Sleep(sleepFor) fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", task.Subscription.URL, 1), " ")
}
}()
return completed
}
// Perform the retrieval of the latest video of the channel.
// Returns a time.Duration containing the amount of time to the next schedule.
func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Duration {
slog.Info("fetching latest video for channel", slog.String("channel", req.Subscription.URL))
nextSchedule := time.Until(req.Schedule.Next(time.Now()))
cmd := exec.CommandContext( cmd := exec.CommandContext(
ctx, ctx,
config.Instance().DownloaderPath, config.Instance().DownloaderPath,
"-I1", fetcherParams...,
"--flat-playlist",
"--print", "webpage_url",
req.Subscription.URL,
) )
stdout, err := cmd.Output() stdout, err := cmd.Output()
if err != nil { if err != nil {
t.errors <- err t.errors <- err
return time.Duration(0) return
} }
latestVideoURL := string(bytes.Trim(stdout, "\n")) latestChannelURL := string(bytes.Trim(stdout, "\n"))
// if the download exists there's not point in sending it into the message queue.
exists, err := archive.DownloadExists(ctx, latestVideoURL)
if exists && err == nil {
return nextSchedule
}
p := &internal.Process{ p := &internal.Process{
Url: latestVideoURL, Url: latestChannelURL,
Params: append( Params: append(argsSplitterRe.FindAllString(task.Subscription.Params, 1), []string{
argsSplitterRe.FindAllString(req.Subscription.Params, 1),
[]string{
"--break-on-existing",
"--download-archive", "--download-archive",
filepath.Join(config.Instance().Dir(), "archive.txt"), filepath.Join(config.Instance().Dir(), "archive.txt"),
}...), }...),
AutoRemove: true, AutoRemove: true,
} }
t.db.Set(p) // give it an id t.db.Set(p)
t.mq.Publish(p) // send it to the message queue waiting to be processed t.mq.Publish(p)
nextSchedule := time.Until(task.Schedule.Next(time.Now()))
slog.Info( slog.Info(
"cron task runner next schedule", "cron task runner next schedule",
slog.String("url", req.Subscription.URL), slog.String("url", task.Subscription.URL),
slog.Any("duration", nextSchedule), slog.Any("duration", nextSchedule),
) )
return nextSchedule time.Sleep(nextSchedule)
}
}()
}
} }
func (t *CronTaskRunner) Recoverer() { func (t *CronTaskRunner) Recoverer() {
panic("unimplemented") panic("Unimplemented")
} }