Compare commits
1 Commits
v3.2.6
...
feat-suppo
| Author | SHA1 | Date | |
|---|---|---|---|
| a4ba8cea27 |
@@ -1 +0,0 @@
|
|||||||
docker run -d -p 3033:3033 -v /downloads:/downloads marcobaobao/yt-dlp-webui
|
|
||||||
@@ -3,7 +3,6 @@
|
|||||||
result/
|
result/
|
||||||
result
|
result
|
||||||
dist
|
dist
|
||||||
.pnpm-store/
|
|
||||||
.pnpm-debug.log
|
.pnpm-debug.log
|
||||||
node_modules
|
node_modules
|
||||||
.env
|
.env
|
||||||
@@ -21,11 +20,9 @@ cookies.txt
|
|||||||
__debug*
|
__debug*
|
||||||
ui/
|
ui/
|
||||||
.idea
|
.idea
|
||||||
.idea/
|
|
||||||
frontend/.pnp.cjs
|
frontend/.pnp.cjs
|
||||||
frontend/.pnp.loader.mjs
|
frontend/.pnp.loader.mjs
|
||||||
frontend/.yarn/install-state.gz
|
frontend/.yarn/install-state.gz
|
||||||
.db.lock
|
.db.lock
|
||||||
livestreams.dat
|
livestreams.dat
|
||||||
.vite/deps
|
.git
|
||||||
archive.txt
|
|
||||||
12
README.md
12
README.md
@@ -28,7 +28,7 @@ docker pull ghcr.io/marcopiovanello/yt-dlp-web-ui:latest
|
|||||||
## Community stuff
|
## Community stuff
|
||||||
Feel free to join :)
|
Feel free to join :)
|
||||||
|
|
||||||
[](https://discord.gg/WRnVWr4y)
|
[](https://discord.gg/3Sj9ZZHv)
|
||||||
|
|
||||||
## Some screeshots
|
## Some screeshots
|
||||||

|

|
||||||
@@ -115,16 +115,6 @@ services:
|
|||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
```
|
```
|
||||||
|
|
||||||
### ⚡ One-Click Deploy
|
|
||||||
|
|
||||||
| Cloud Provider | Deploy Button |
|
|
||||||
|----------------|---------------|
|
|
||||||
| AWS | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=aws&language=cfn"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/aws.svg" height="38"></a> |
|
|
||||||
| DigitalOcean | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=do&language=dop"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/do.svg" height="38"></a> |
|
|
||||||
| Render | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=rnd&language=rnd"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/rnd.svg" height="38"></a> |
|
|
||||||
|
|
||||||
<sub>Generated by <a href="https://deploystack.io/c/marcopiovanello-yt-dlp-web-ui" target="_blank">DeployStack.io</a></sub>
|
|
||||||
|
|
||||||
## [Prebuilt binaries](https://github.com/marcopiovanello/yt-dlp-web-ui/releases) installation
|
## [Prebuilt binaries](https://github.com/marcopiovanello/yt-dlp-web-ui/releases) installation
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
|
|||||||
@@ -79,5 +79,4 @@ keys:
|
|||||||
The monitor job will be scheduled/triggered by a defined cron expression (defaults to every 5 minutes if left blank).
|
The monitor job will be scheduled/triggered by a defined cron expression (defaults to every 5 minutes if left blank).
|
||||||
cronExpressionLabel: 'Cron expression'
|
cronExpressionLabel: 'Cron expression'
|
||||||
editButtonLabel: 'Edit'
|
editButtonLabel: 'Edit'
|
||||||
newSubscriptionButton: New subscription
|
newSubscriptionButton: New subscription
|
||||||
clearCompletedButton: 'Clear completed'
|
|
||||||
@@ -121,14 +121,11 @@ export const appTitleState = atomWithStorage(
|
|||||||
export const serverAddressAndPortState = atom((get) => {
|
export const serverAddressAndPortState = atom((get) => {
|
||||||
if (get(servedFromReverseProxySubDirState)) {
|
if (get(servedFromReverseProxySubDirState)) {
|
||||||
return `${get(serverAddressState)}/${get(servedFromReverseProxySubDirState)}/`
|
return `${get(serverAddressState)}/${get(servedFromReverseProxySubDirState)}/`
|
||||||
.replaceAll('"', '') // TODO: atomWithStorage put extra double quotes on strings
|
|
||||||
}
|
}
|
||||||
if (get(servedFromReverseProxyState)) {
|
if (get(servedFromReverseProxyState)) {
|
||||||
return `${get(serverAddressState)}`
|
return `${get(serverAddressState)}`
|
||||||
.replaceAll('"', '')
|
|
||||||
}
|
}
|
||||||
return `${get(serverAddressState)}:${get(serverPortState)}`
|
return `${get(serverAddressState)}:${get(serverPortState)}`
|
||||||
.replaceAll('"', '')
|
|
||||||
})
|
})
|
||||||
|
|
||||||
export const serverURL = atom((get) =>
|
export const serverURL = atom((get) =>
|
||||||
@@ -138,12 +135,14 @@ export const serverURL = atom((get) =>
|
|||||||
export const rpcWebSocketEndpoint = atom((get) => {
|
export const rpcWebSocketEndpoint = atom((get) => {
|
||||||
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
|
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
|
||||||
return `${proto}//${get(serverAddressAndPortState)}/rpc/ws`
|
return `${proto}//${get(serverAddressAndPortState)}/rpc/ws`
|
||||||
})
|
}
|
||||||
|
)
|
||||||
|
|
||||||
export const rpcHTTPEndpoint = atom((get) => {
|
export const rpcHTTPEndpoint = atom((get) => {
|
||||||
const proto = window.location.protocol
|
const proto = window.location.protocol
|
||||||
return `${proto}//${get(serverAddressAndPortState)}/rpc/http`
|
return `${proto}//${get(serverAddressAndPortState)}/rpc/http`
|
||||||
})
|
}
|
||||||
|
)
|
||||||
|
|
||||||
export const serverSideCookiesState = atom<Promise<string>>(async (get) => await pipe(
|
export const serverSideCookiesState = atom<Promise<string>>(async (get) => await pipe(
|
||||||
ffetch<Readonly<{ cookies: string }>>(`${get(serverURL)}/api/v1/cookies`),
|
ffetch<Readonly<{ cookies: string }>>(`${get(serverURL)}/api/v1/cookies`),
|
||||||
@@ -181,4 +180,5 @@ export const settingsState = atom<SettingsState>((get) => ({
|
|||||||
listView: get(listViewState),
|
listView: get(listViewState),
|
||||||
servedFromReverseProxy: get(servedFromReverseProxyState),
|
servedFromReverseProxy: get(servedFromReverseProxyState),
|
||||||
appTitle: get(appTitleState)
|
appTitle: get(appTitleState)
|
||||||
}))
|
})
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import AddCircleIcon from '@mui/icons-material/AddCircle'
|
import AddCircleIcon from '@mui/icons-material/AddCircle'
|
||||||
import BuildCircleIcon from '@mui/icons-material/BuildCircle'
|
import BuildCircleIcon from '@mui/icons-material/BuildCircle'
|
||||||
import ClearAllIcon from '@mui/icons-material/ClearAll'
|
|
||||||
import DeleteForeverIcon from '@mui/icons-material/DeleteForever'
|
import DeleteForeverIcon from '@mui/icons-material/DeleteForever'
|
||||||
import FolderZipIcon from '@mui/icons-material/FolderZip'
|
import FolderZipIcon from '@mui/icons-material/FolderZip'
|
||||||
import FormatListBulleted from '@mui/icons-material/FormatListBulleted'
|
import FormatListBulleted from '@mui/icons-material/FormatListBulleted'
|
||||||
@@ -43,11 +42,6 @@ const HomeSpeedDial: React.FC<Props> = ({ onDownloadOpen, onEditorOpen }) => {
|
|||||||
tooltipTitle={i18n.t('bulkDownload')}
|
tooltipTitle={i18n.t('bulkDownload')}
|
||||||
onClick={() => window.open(`${serverAddr}/archive/bulk?token=${localStorage.getItem('token')}`)}
|
onClick={() => window.open(`${serverAddr}/archive/bulk?token=${localStorage.getItem('token')}`)}
|
||||||
/>
|
/>
|
||||||
<SpeedDialAction
|
|
||||||
icon={<ClearAllIcon />}
|
|
||||||
tooltipTitle={i18n.t('clearCompletedButton')}
|
|
||||||
onClick={() => client.clearCompleted()}
|
|
||||||
/>
|
|
||||||
<SpeedDialAction
|
<SpeedDialAction
|
||||||
icon={<DeleteForeverIcon />}
|
icon={<DeleteForeverIcon />}
|
||||||
tooltipTitle={i18n.t('abortAllButton')}
|
tooltipTitle={i18n.t('abortAllButton')}
|
||||||
|
|||||||
@@ -200,11 +200,4 @@ export class RPCClient {
|
|||||||
params: []
|
params: []
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
public clearCompleted() {
|
|
||||||
return this.sendHTTP({
|
|
||||||
method: 'Service.ClearCompleted',
|
|
||||||
params: []
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
@@ -13,7 +13,6 @@ export type RPCMethods =
|
|||||||
| "Service.ProgressLivestream"
|
| "Service.ProgressLivestream"
|
||||||
| "Service.KillLivestream"
|
| "Service.KillLivestream"
|
||||||
| "Service.KillAllLivestream"
|
| "Service.KillAllLivestream"
|
||||||
| "Service.ClearCompleted"
|
|
||||||
|
|
||||||
export type RPCRequest = {
|
export type RPCRequest = {
|
||||||
method: RPCMethods
|
method: RPCMethods
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -1,6 +1,6 @@
|
|||||||
module github.com/marcopiovanello/yt-dlp-web-ui/v3
|
module github.com/marcopiovanello/yt-dlp-web-ui/v3
|
||||||
|
|
||||||
go 1.24
|
go 1.23
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
|
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
|
||||||
|
|||||||
@@ -1,58 +0,0 @@
|
|||||||
package archive
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Perform a search on the archive.txt file an determines if a download
|
|
||||||
// has already be done.
|
|
||||||
func DownloadExists(ctx context.Context, url string) (bool, error) {
|
|
||||||
cmd := exec.CommandContext(
|
|
||||||
ctx,
|
|
||||||
config.Instance().DownloaderPath,
|
|
||||||
"--print",
|
|
||||||
"%(extractor)s %(id)s",
|
|
||||||
url,
|
|
||||||
)
|
|
||||||
stdout, err := cmd.Output()
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
extractorAndURL := bytes.Trim(stdout, "\n")
|
|
||||||
|
|
||||||
fd, err := os.Open(filepath.Join(config.Instance().Dir(), "archive.txt"))
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
defer fd.Close()
|
|
||||||
|
|
||||||
scanner := bufio.NewScanner(fd)
|
|
||||||
|
|
||||||
// search linearly for lower memory usage...
|
|
||||||
// the a pre-sorted with hashed values version of the archive.txt file can be loaded in memory
|
|
||||||
// and perform a binary search on it.
|
|
||||||
for scanner.Scan() {
|
|
||||||
if bytes.Equal(scanner.Bytes(), extractorAndURL) {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// data, err := io.ReadAll(fd)
|
|
||||||
// if err != nil {
|
|
||||||
// return false, err
|
|
||||||
// }
|
|
||||||
|
|
||||||
// slices.BinarySearchFunc(data, extractorAndURL, func(a []byte, b []byte) int {
|
|
||||||
// return hash(a).Compare(hash(b))
|
|
||||||
// })
|
|
||||||
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
@@ -2,7 +2,6 @@ package internal
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"log/slog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type LoadBalancer struct {
|
type LoadBalancer struct {
|
||||||
@@ -10,29 +9,7 @@ type LoadBalancer struct {
|
|||||||
done chan *Worker
|
done chan *Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLoadBalancer(numWorker int) *LoadBalancer {
|
func (b *LoadBalancer) Balance(work chan Process) {
|
||||||
var pool Pool
|
|
||||||
|
|
||||||
doneChan := make(chan *Worker)
|
|
||||||
|
|
||||||
for i := range numWorker {
|
|
||||||
w := &Worker{
|
|
||||||
requests: make(chan *Process, 1),
|
|
||||||
index: i,
|
|
||||||
}
|
|
||||||
go w.Work(doneChan)
|
|
||||||
pool = append(pool, w)
|
|
||||||
|
|
||||||
slog.Info("spawned worker", slog.Int("index", i))
|
|
||||||
}
|
|
||||||
|
|
||||||
return &LoadBalancer{
|
|
||||||
pool: pool,
|
|
||||||
done: doneChan,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *LoadBalancer) Balance(work chan *Process) {
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case req := <-work:
|
case req := <-work:
|
||||||
@@ -43,7 +20,7 @@ func (b *LoadBalancer) Balance(work chan *Process) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *LoadBalancer) dispatch(req *Process) {
|
func (b *LoadBalancer) dispatch(req Process) {
|
||||||
w := heap.Pop(&b.pool).(*Worker)
|
w := heap.Pop(&b.pool).(*Worker)
|
||||||
w.requests <- req
|
w.requests <- req
|
||||||
w.pending++
|
w.pending++
|
||||||
|
|||||||
@@ -141,13 +141,26 @@ func (l *LiveStream) monitorStartTime(r io.Reader) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
scanner.Scan()
|
const TRIES = 5
|
||||||
|
/*
|
||||||
|
if it's waiting a livestream the 5th line will indicate the time to live
|
||||||
|
its a dumb and not robust method.
|
||||||
|
|
||||||
for !strings.Contains(scanner.Text(), "Waiting for") {
|
example:
|
||||||
|
[youtube] Extracting URL: https://www.youtube.com/watch?v=IQVbGfVVjgY
|
||||||
|
[youtube] IQVbGfVVjgY: Downloading webpage
|
||||||
|
[youtube] IQVbGfVVjgY: Downloading ios player API JSON
|
||||||
|
[youtube] IQVbGfVVjgY: Downloading web creator player API JSON
|
||||||
|
WARNING: [youtube] This live event will begin in 27 minutes. <- STDERR, ignore
|
||||||
|
[wait] Waiting for 00:27:15 - Press Ctrl+C to try now <- 5th line
|
||||||
|
*/
|
||||||
|
for range TRIES {
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
}
|
|
||||||
|
|
||||||
waitTimeScanner()
|
if strings.Contains(scanner.Text(), "Waiting for") {
|
||||||
|
waitTimeScanner()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LiveStream) WaitTime() <-chan time.Duration {
|
func (l *LiveStream) WaitTime() <-chan time.Duration {
|
||||||
|
|||||||
@@ -9,17 +9,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func setupTest() {
|
func setupTest() {
|
||||||
config.Instance().DownloaderPath = "build/yt-dlp"
|
config.Instance().DownloaderPath = "yt-dlp"
|
||||||
}
|
}
|
||||||
|
|
||||||
const URL = "https://www.youtube.com/watch?v=pwoAyLGOysU"
|
|
||||||
|
|
||||||
func TestLivestream(t *testing.T) {
|
func TestLivestream(t *testing.T) {
|
||||||
setupTest()
|
setupTest()
|
||||||
|
|
||||||
done := make(chan *LiveStream)
|
done := make(chan *LiveStream)
|
||||||
|
|
||||||
ls := New(URL, done, &internal.MessageQueue{}, &internal.MemoryDB{})
|
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", done, &internal.MessageQueue{}, &internal.MemoryDB{})
|
||||||
go ls.Start()
|
go ls.Start()
|
||||||
|
|
||||||
time.AfterFunc(time.Second*20, func() {
|
time.AfterFunc(time.Second*20, func() {
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ func (m *Monitor) Status() LiveStreamStatus {
|
|||||||
// Persist the monitor current state to a file.
|
// Persist the monitor current state to a file.
|
||||||
// The file is located in the configured config directory
|
// The file is located in the configured config directory
|
||||||
func (m *Monitor) Persist() error {
|
func (m *Monitor) Persist() error {
|
||||||
fd, err := os.Create(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat"))
|
fd, err := os.Create(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -95,7 +95,7 @@ func (m *Monitor) Persist() error {
|
|||||||
|
|
||||||
// Restore a saved state and resume the monitored livestreams
|
// Restore a saved state and resume the monitored livestreams
|
||||||
func (m *Monitor) Restore() error {
|
func (m *Monitor) Restore() error {
|
||||||
fd, err := os.Open(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat"))
|
fd, err := os.Open(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
|
|||||||
return errors.New("probably not a valid URL")
|
return errors.New("probably not a valid URL")
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.IsPlaylist() {
|
if m.Type == "playlist" {
|
||||||
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool {
|
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool {
|
||||||
return a.URL == b.URL
|
return a.URL == b.URL
|
||||||
})
|
})
|
||||||
@@ -88,10 +88,10 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
|
|||||||
|
|
||||||
proc.Info.URL = meta.URL
|
proc.Info.URL = meta.URL
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
|
||||||
db.Set(proc)
|
db.Set(proc)
|
||||||
mq.Publish(proc)
|
mq.Publish(proc)
|
||||||
|
|
||||||
proc.Info.CreatedAt = meta.CreatedAt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -1,24 +1,16 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
// Pool implements heap.Interface interface as a standard priority queue
|
|
||||||
type Pool []*Worker
|
type Pool []*Worker
|
||||||
|
|
||||||
func (h Pool) Len() int { return len(h) }
|
func (h Pool) Len() int { return len(h) }
|
||||||
func (h Pool) Less(i, j int) bool { return h[i].pending < h[j].pending }
|
func (h Pool) Less(i, j int) bool { return h[i].index < h[j].index }
|
||||||
|
func (h Pool) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||||
func (h Pool) Swap(i, j int) {
|
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
|
||||||
h[i], h[j] = h[j], h[i]
|
|
||||||
h[i].index = i
|
|
||||||
h[j].index = j
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
|
|
||||||
|
|
||||||
func (h *Pool) Pop() any {
|
func (h *Pool) Pop() any {
|
||||||
old := *h
|
old := *h
|
||||||
n := len(old)
|
n := len(old)
|
||||||
x := old[n-1]
|
x := old[n-1]
|
||||||
old[n-1] = nil
|
|
||||||
*h = old[0 : n-1]
|
*h = old[0 : n-1]
|
||||||
return x
|
return x
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
type Worker struct {
|
type Worker struct {
|
||||||
requests chan *Process // downloads to do
|
requests chan Process // downloads to do
|
||||||
pending int // downloads pending
|
pending int // downloads pending
|
||||||
index int // index in the heap
|
index int // index in the heap
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Worker) Work(done chan *Worker) {
|
func (w *Worker) Work(done chan *Worker) {
|
||||||
|
|||||||
@@ -8,5 +8,3 @@ type Metadata struct {
|
|||||||
PlaylistTitle string `json:"title"`
|
PlaylistTitle string `json:"title"`
|
||||||
Type string `json:"_type"`
|
Type string `json:"_type"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Metadata) IsPlaylist() bool { return m.Type == "playlist" }
|
|
||||||
|
|||||||
@@ -183,7 +183,6 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("succesfully killed process", slog.String("id", proc.Id))
|
slog.Info("succesfully killed process", slog.String("id", proc.Id))
|
||||||
proc = nil // gc helper
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -196,35 +195,6 @@ func (s *Service) Clear(args string, killed *string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removes completed processes
|
|
||||||
func (s *Service) ClearCompleted(cleared *string) error {
|
|
||||||
var (
|
|
||||||
keys = s.db.Keys()
|
|
||||||
removeFunc = func(p *internal.Process) error {
|
|
||||||
defer s.db.Delete(p.Id)
|
|
||||||
|
|
||||||
if p.Progress.Status != internal.StatusCompleted {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return p.Kill()
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, key := range *keys {
|
|
||||||
proc, err := s.db.Get(key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := removeFunc(proc); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FreeSpace gets the available from package sys util
|
// FreeSpace gets the available from package sys util
|
||||||
func (s *Service) FreeSpace(args NoArgs, free *uint64) error {
|
func (s *Service) FreeSpace(args NoArgs, free *uint64) error {
|
||||||
freeSpace, err := sys.FreeSpace()
|
freeSpace, err := sys.FreeSpace()
|
||||||
|
|||||||
@@ -53,7 +53,6 @@ func toDB(dto *domain.Subscription) data.Subscription {
|
|||||||
|
|
||||||
// Delete implements domain.Service.
|
// Delete implements domain.Service.
|
||||||
func (s *Service) Delete(ctx context.Context, id string) error {
|
func (s *Service) Delete(ctx context.Context, id string) error {
|
||||||
s.runner.StopTask(id)
|
|
||||||
return s.r.Delete(ctx, id)
|
return s.r.Delete(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,9 +7,9 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive"
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
|
||||||
@@ -19,12 +19,10 @@ import (
|
|||||||
type TaskRunner interface {
|
type TaskRunner interface {
|
||||||
Submit(subcription *domain.Subscription) error
|
Submit(subcription *domain.Subscription) error
|
||||||
Spawner(ctx context.Context)
|
Spawner(ctx context.Context)
|
||||||
StopTask(id string) error
|
|
||||||
Recoverer()
|
Recoverer()
|
||||||
}
|
}
|
||||||
|
|
||||||
type monitorTask struct {
|
type taskPair struct {
|
||||||
Done chan struct{}
|
|
||||||
Schedule cron.Schedule
|
Schedule cron.Schedule
|
||||||
Subscription *domain.Subscription
|
Subscription *domain.Subscription
|
||||||
}
|
}
|
||||||
@@ -33,22 +31,21 @@ type CronTaskRunner struct {
|
|||||||
mq *internal.MessageQueue
|
mq *internal.MessageQueue
|
||||||
db *internal.MemoryDB
|
db *internal.MemoryDB
|
||||||
|
|
||||||
tasks chan monitorTask
|
tasks chan taskPair
|
||||||
errors chan error
|
errors chan error
|
||||||
|
|
||||||
running map[string]*monitorTask
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner {
|
func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner {
|
||||||
return &CronTaskRunner{
|
return &CronTaskRunner{
|
||||||
mq: mq,
|
mq: mq,
|
||||||
db: db,
|
db: db,
|
||||||
tasks: make(chan monitorTask),
|
tasks: make(chan taskPair),
|
||||||
errors: make(chan error),
|
errors: make(chan error),
|
||||||
running: make(map[string]*monitorTask),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const commandTemplate = "-I1 --flat-playlist --print webpage_url $1"
|
||||||
|
|
||||||
var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`)
|
var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`)
|
||||||
|
|
||||||
func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
|
func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
|
||||||
@@ -57,8 +54,7 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
job := monitorTask{
|
job := taskPair{
|
||||||
Done: make(chan struct{}),
|
|
||||||
Schedule: schedule,
|
Schedule: schedule,
|
||||||
Subscription: subcription,
|
Subscription: subcription,
|
||||||
}
|
}
|
||||||
@@ -68,110 +64,54 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handles the entire lifecylce of a monitor job.
|
|
||||||
func (t *CronTaskRunner) Spawner(ctx context.Context) {
|
func (t *CronTaskRunner) Spawner(ctx context.Context) {
|
||||||
for req := range t.tasks {
|
for task := range t.tasks {
|
||||||
t.running[req.Subscription.Id] = &req // keep track of the current job
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ctx, cancel := context.WithCancel(ctx) // inject into the job's context a cancellation singal
|
|
||||||
fetcherEvents := t.doFetch(ctx, &req) // retrieve the channel of events of the job
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
slog.Info("fetching latest video for channel", slog.String("channel", task.Subscription.URL))
|
||||||
case <-req.Done:
|
|
||||||
slog.Info("stopping cron job and removing schedule", slog.String("url", req.Subscription.URL))
|
fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", task.Subscription.URL, 1), " ")
|
||||||
cancel()
|
|
||||||
|
cmd := exec.CommandContext(
|
||||||
|
ctx,
|
||||||
|
config.Instance().DownloaderPath,
|
||||||
|
fetcherParams...,
|
||||||
|
)
|
||||||
|
|
||||||
|
stdout, err := cmd.Output()
|
||||||
|
if err != nil {
|
||||||
|
t.errors <- err
|
||||||
return
|
return
|
||||||
case <-fetcherEvents:
|
|
||||||
slog.Info("finished monitoring channel", slog.String("url", req.Subscription.URL))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
latestChannelURL := string(bytes.Trim(stdout, "\n"))
|
||||||
|
|
||||||
|
p := &internal.Process{
|
||||||
|
Url: latestChannelURL,
|
||||||
|
Params: append(argsSplitterRe.FindAllString(task.Subscription.Params, 1), []string{
|
||||||
|
"--download-archive",
|
||||||
|
filepath.Join(config.Instance().Dir(), "archive.txt"),
|
||||||
|
}...),
|
||||||
|
AutoRemove: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
t.db.Set(p)
|
||||||
|
t.mq.Publish(p)
|
||||||
|
|
||||||
|
nextSchedule := time.Until(task.Schedule.Next(time.Now()))
|
||||||
|
|
||||||
|
slog.Info(
|
||||||
|
"cron task runner next schedule",
|
||||||
|
slog.String("url", task.Subscription.URL),
|
||||||
|
slog.Any("duration", nextSchedule),
|
||||||
|
)
|
||||||
|
|
||||||
|
time.Sleep(nextSchedule)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop a currently scheduled job
|
|
||||||
func (t *CronTaskRunner) StopTask(id string) error {
|
|
||||||
task := t.running[id]
|
|
||||||
if task != nil {
|
|
||||||
t.running[id].Done <- struct{}{}
|
|
||||||
delete(t.running, id)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start a fetcher and notify on a channel when a fetcher has completed
|
|
||||||
func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan struct{} {
|
|
||||||
completed := make(chan struct{})
|
|
||||||
|
|
||||||
// generator func
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
sleepFor := t.fetcher(ctx, req)
|
|
||||||
completed <- struct{}{}
|
|
||||||
|
|
||||||
time.Sleep(sleepFor)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return completed
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform the retrieval of the latest video of the channel.
|
|
||||||
// Returns a time.Duration containing the amount of time to the next schedule.
|
|
||||||
func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Duration {
|
|
||||||
slog.Info("fetching latest video for channel", slog.String("channel", req.Subscription.URL))
|
|
||||||
|
|
||||||
nextSchedule := time.Until(req.Schedule.Next(time.Now()))
|
|
||||||
|
|
||||||
cmd := exec.CommandContext(
|
|
||||||
ctx,
|
|
||||||
config.Instance().DownloaderPath,
|
|
||||||
"-I1",
|
|
||||||
"--flat-playlist",
|
|
||||||
"--print", "webpage_url",
|
|
||||||
req.Subscription.URL,
|
|
||||||
)
|
|
||||||
|
|
||||||
stdout, err := cmd.Output()
|
|
||||||
if err != nil {
|
|
||||||
t.errors <- err
|
|
||||||
return time.Duration(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
latestVideoURL := string(bytes.Trim(stdout, "\n"))
|
|
||||||
|
|
||||||
// if the download exists there's not point in sending it into the message queue.
|
|
||||||
exists, err := archive.DownloadExists(ctx, latestVideoURL)
|
|
||||||
if exists && err == nil {
|
|
||||||
return nextSchedule
|
|
||||||
}
|
|
||||||
|
|
||||||
p := &internal.Process{
|
|
||||||
Url: latestVideoURL,
|
|
||||||
Params: append(
|
|
||||||
argsSplitterRe.FindAllString(req.Subscription.Params, 1),
|
|
||||||
[]string{
|
|
||||||
"--break-on-existing",
|
|
||||||
"--download-archive",
|
|
||||||
filepath.Join(config.Instance().Dir(), "archive.txt"),
|
|
||||||
}...),
|
|
||||||
AutoRemove: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
t.db.Set(p) // give it an id
|
|
||||||
t.mq.Publish(p) // send it to the message queue waiting to be processed
|
|
||||||
|
|
||||||
slog.Info(
|
|
||||||
"cron task runner next schedule",
|
|
||||||
slog.String("url", req.Subscription.URL),
|
|
||||||
slog.Any("duration", nextSchedule),
|
|
||||||
)
|
|
||||||
|
|
||||||
return nextSchedule
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *CronTaskRunner) Recoverer() {
|
func (t *CronTaskRunner) Recoverer() {
|
||||||
panic("unimplemented")
|
panic("Unimplemented")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user