Compare commits
24 Commits
feat-suppo
...
feat-twitc
| Author | SHA1 | Date | |
|---|---|---|---|
| 4f8510bac8 | |||
| 4e1b4bad0a | |||
|
|
14a03d6a77 | ||
|
|
8a73079fad | ||
| f578f44cfd | |||
| cbe16c5c6c | |||
| 3cebaf7f61 | |||
|
|
2d2cb1dc3a | ||
|
|
43bcc40907 | ||
|
|
2af27e51be | ||
|
|
8c18242aaf | ||
|
|
66bebb2529 | ||
|
|
e223e030ac | ||
| e4362468f7 | |||
| 6880f60d14 | |||
|
|
5d4aa7e2a3 | ||
|
|
2845196bc7 | ||
|
|
983915f8aa | ||
| ce2fb13ef2 | |||
| 99069fe5f7 | |||
| 761f26b387 | |||
| eec72bb6e2 | |||
| ceb92d066c | |||
|
|
cf74948840 |
1
.deploystack/docker-run.txt
Normal file
1
.deploystack/docker-run.txt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
docker run -d -p 3033:3033 -v /downloads:/downloads marcobaobao/yt-dlp-webui
|
||||||
@@ -3,6 +3,7 @@
|
|||||||
result/
|
result/
|
||||||
result
|
result
|
||||||
dist
|
dist
|
||||||
|
.pnpm-store/
|
||||||
.pnpm-debug.log
|
.pnpm-debug.log
|
||||||
node_modules
|
node_modules
|
||||||
.env
|
.env
|
||||||
@@ -20,9 +21,11 @@ cookies.txt
|
|||||||
__debug*
|
__debug*
|
||||||
ui/
|
ui/
|
||||||
.idea
|
.idea
|
||||||
|
.idea/
|
||||||
frontend/.pnp.cjs
|
frontend/.pnp.cjs
|
||||||
frontend/.pnp.loader.mjs
|
frontend/.pnp.loader.mjs
|
||||||
frontend/.yarn/install-state.gz
|
frontend/.yarn/install-state.gz
|
||||||
.db.lock
|
.db.lock
|
||||||
livestreams.dat
|
livestreams.dat
|
||||||
.git
|
.vite/deps
|
||||||
|
archive.txt
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -29,3 +29,4 @@ frontend/.yarn/install-state.gz
|
|||||||
livestreams.dat
|
livestreams.dat
|
||||||
.vite/deps
|
.vite/deps
|
||||||
archive.txt
|
archive.txt
|
||||||
|
twitch-monitor.dat
|
||||||
|
|||||||
@@ -24,11 +24,12 @@ COPY --from=ui /usr/src/yt-dlp-webui/frontend /usr/src/yt-dlp-webui/frontend
|
|||||||
RUN CGO_ENABLED=0 GOOS=linux go build -o yt-dlp-webui
|
RUN CGO_ENABLED=0 GOOS=linux go build -o yt-dlp-webui
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
# dependencies ----------------------------------------------------------------
|
# Runtime ---------------------------------------------------------------------
|
||||||
FROM alpine:edge
|
FROM python:3.13.2-alpine3.21
|
||||||
|
|
||||||
RUN apk update && \
|
RUN apk update && \
|
||||||
apk add ffmpeg yt-dlp ca-certificates curl wget psmisc
|
apk add ffmpeg ca-certificates curl wget gnutls --no-cache && \
|
||||||
|
pip install "yt-dlp[default,curl-cffi,mutagen,pycryptodomex,phantomjs,secretstorage]"
|
||||||
|
|
||||||
VOLUME /downloads /config
|
VOLUME /downloads /config
|
||||||
|
|
||||||
|
|||||||
12
README.md
12
README.md
@@ -28,7 +28,7 @@ docker pull ghcr.io/marcopiovanello/yt-dlp-web-ui:latest
|
|||||||
## Community stuff
|
## Community stuff
|
||||||
Feel free to join :)
|
Feel free to join :)
|
||||||
|
|
||||||
[](https://discord.gg/3Sj9ZZHv)
|
[Discord](https://discord.gg/GZAX5FfGzE)
|
||||||
|
|
||||||
## Some screeshots
|
## Some screeshots
|
||||||

|

|
||||||
@@ -115,6 +115,16 @@ services:
|
|||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### ⚡ One-Click Deploy
|
||||||
|
|
||||||
|
| Cloud Provider | Deploy Button |
|
||||||
|
|----------------|---------------|
|
||||||
|
| AWS | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=aws&language=cfn"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/aws.svg" height="38"></a> |
|
||||||
|
| DigitalOcean | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=do&language=dop"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/do.svg" height="38"></a> |
|
||||||
|
| Render | <a href="https://deploystack.io/deploy/marcopiovanello-yt-dlp-web-ui?provider=rnd&language=rnd"><img src="https://raw.githubusercontent.com/deploystackio/deploy-templates/refs/heads/main/.assets/img/rnd.svg" height="38"></a> |
|
||||||
|
|
||||||
|
<sub>Generated by <a href="https://deploystack.io/c/marcopiovanello-yt-dlp-web-ui" target="_blank">DeployStack.io</a></sub>
|
||||||
|
|
||||||
## [Prebuilt binaries](https://github.com/marcopiovanello/yt-dlp-web-ui/releases) installation
|
## [Prebuilt binaries](https://github.com/marcopiovanello/yt-dlp-web-ui/releases) installation
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ keys:
|
|||||||
splashText: No active downloads
|
splashText: No active downloads
|
||||||
archiveTitle: Archive
|
archiveTitle: Archive
|
||||||
clipboardAction: Copied URL to clipboard
|
clipboardAction: Copied URL to clipboard
|
||||||
playlistCheckbox: Download playlist (it will take time, after submitting you may close this window)
|
playlistCheckbox: Download playlist
|
||||||
restartAppMessage: Needs a page reload to take effect
|
restartAppMessage: Needs a page reload to take effect
|
||||||
servedFromReverseProxyCheckbox: Is behind a reverse proxy
|
servedFromReverseProxyCheckbox: Is behind a reverse proxy
|
||||||
urlBase: URL base, for reverse proxy support (subdir), defaults to empty
|
urlBase: URL base, for reverse proxy support (subdir), defaults to empty
|
||||||
@@ -80,3 +80,4 @@ keys:
|
|||||||
cronExpressionLabel: 'Cron expression'
|
cronExpressionLabel: 'Cron expression'
|
||||||
editButtonLabel: 'Edit'
|
editButtonLabel: 'Edit'
|
||||||
newSubscriptionButton: New subscription
|
newSubscriptionButton: New subscription
|
||||||
|
clearCompletedButton: 'Clear completed'
|
||||||
@@ -121,11 +121,18 @@ export const appTitleState = atomWithStorage(
|
|||||||
export const serverAddressAndPortState = atom((get) => {
|
export const serverAddressAndPortState = atom((get) => {
|
||||||
if (get(servedFromReverseProxySubDirState)) {
|
if (get(servedFromReverseProxySubDirState)) {
|
||||||
return `${get(serverAddressState)}/${get(servedFromReverseProxySubDirState)}/`
|
return `${get(serverAddressState)}/${get(servedFromReverseProxySubDirState)}/`
|
||||||
|
.replaceAll('"', '') // XXX: atomWithStorage uses JSON.stringify to serialize
|
||||||
|
.replaceAll('//', '/') // which puts extra double quotes.
|
||||||
}
|
}
|
||||||
if (get(servedFromReverseProxyState)) {
|
if (get(servedFromReverseProxyState)) {
|
||||||
return `${get(serverAddressState)}`
|
return `${get(serverAddressState)}`
|
||||||
|
.replaceAll('"', '')
|
||||||
}
|
}
|
||||||
return `${get(serverAddressState)}:${get(serverPortState)}`
|
|
||||||
|
const sap = `${get(serverAddressState)}:${get(serverPortState)}`
|
||||||
|
.replaceAll('"', '')
|
||||||
|
|
||||||
|
return sap.endsWith('/') ? sap.slice(0, -1) : sap
|
||||||
})
|
})
|
||||||
|
|
||||||
export const serverURL = atom((get) =>
|
export const serverURL = atom((get) =>
|
||||||
@@ -134,15 +141,17 @@ export const serverURL = atom((get) =>
|
|||||||
|
|
||||||
export const rpcWebSocketEndpoint = atom((get) => {
|
export const rpcWebSocketEndpoint = atom((get) => {
|
||||||
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
|
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
|
||||||
return `${proto}//${get(serverAddressAndPortState)}/rpc/ws`
|
const sap = get(serverAddressAndPortState)
|
||||||
}
|
|
||||||
)
|
return `${proto}//${sap.endsWith('/') ? sap.slice(0, -1) : sap}/rpc/ws`
|
||||||
|
})
|
||||||
|
|
||||||
export const rpcHTTPEndpoint = atom((get) => {
|
export const rpcHTTPEndpoint = atom((get) => {
|
||||||
const proto = window.location.protocol
|
const proto = window.location.protocol
|
||||||
return `${proto}//${get(serverAddressAndPortState)}/rpc/http`
|
const sap = get(serverAddressAndPortState)
|
||||||
}
|
|
||||||
)
|
return `${proto}//${sap.endsWith('/') ? sap.slice(0, -1) : sap}/rpc/http`
|
||||||
|
})
|
||||||
|
|
||||||
export const serverSideCookiesState = atom<Promise<string>>(async (get) => await pipe(
|
export const serverSideCookiesState = atom<Promise<string>>(async (get) => await pipe(
|
||||||
ffetch<Readonly<{ cookies: string }>>(`${get(serverURL)}/api/v1/cookies`),
|
ffetch<Readonly<{ cookies: string }>>(`${get(serverURL)}/api/v1/cookies`),
|
||||||
@@ -180,5 +189,4 @@ export const settingsState = atom<SettingsState>((get) => ({
|
|||||||
listView: get(listViewState),
|
listView: get(listViewState),
|
||||||
servedFromReverseProxy: get(servedFromReverseProxyState),
|
servedFromReverseProxy: get(servedFromReverseProxyState),
|
||||||
appTitle: get(appTitleState)
|
appTitle: get(appTitleState)
|
||||||
})
|
}))
|
||||||
)
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import AddCircleIcon from '@mui/icons-material/AddCircle'
|
import AddCircleIcon from '@mui/icons-material/AddCircle'
|
||||||
import BuildCircleIcon from '@mui/icons-material/BuildCircle'
|
import BuildCircleIcon from '@mui/icons-material/BuildCircle'
|
||||||
|
import ClearAllIcon from '@mui/icons-material/ClearAll'
|
||||||
import DeleteForeverIcon from '@mui/icons-material/DeleteForever'
|
import DeleteForeverIcon from '@mui/icons-material/DeleteForever'
|
||||||
import FolderZipIcon from '@mui/icons-material/FolderZip'
|
import FolderZipIcon from '@mui/icons-material/FolderZip'
|
||||||
import FormatListBulleted from '@mui/icons-material/FormatListBulleted'
|
import FormatListBulleted from '@mui/icons-material/FormatListBulleted'
|
||||||
@@ -31,6 +32,7 @@ const HomeSpeedDial: React.FC<Props> = ({ onDownloadOpen, onEditorOpen }) => {
|
|||||||
ariaLabel="Home speed dial"
|
ariaLabel="Home speed dial"
|
||||||
sx={{ position: 'absolute', bottom: 64, right: 24 }}
|
sx={{ position: 'absolute', bottom: 64, right: 24 }}
|
||||||
icon={<SpeedDialIcon />}
|
icon={<SpeedDialIcon />}
|
||||||
|
onClick={onDownloadOpen}
|
||||||
>
|
>
|
||||||
<SpeedDialAction
|
<SpeedDialAction
|
||||||
icon={listView ? <ViewAgendaIcon /> : <FormatListBulleted />}
|
icon={listView ? <ViewAgendaIcon /> : <FormatListBulleted />}
|
||||||
@@ -42,6 +44,11 @@ const HomeSpeedDial: React.FC<Props> = ({ onDownloadOpen, onEditorOpen }) => {
|
|||||||
tooltipTitle={i18n.t('bulkDownload')}
|
tooltipTitle={i18n.t('bulkDownload')}
|
||||||
onClick={() => window.open(`${serverAddr}/archive/bulk?token=${localStorage.getItem('token')}`)}
|
onClick={() => window.open(`${serverAddr}/archive/bulk?token=${localStorage.getItem('token')}`)}
|
||||||
/>
|
/>
|
||||||
|
<SpeedDialAction
|
||||||
|
icon={<ClearAllIcon />}
|
||||||
|
tooltipTitle={i18n.t('clearCompletedButton')}
|
||||||
|
onClick={() => client.clearCompleted()}
|
||||||
|
/>
|
||||||
<SpeedDialAction
|
<SpeedDialAction
|
||||||
icon={<DeleteForeverIcon />}
|
icon={<DeleteForeverIcon />}
|
||||||
tooltipTitle={i18n.t('abortAllButton')}
|
tooltipTitle={i18n.t('abortAllButton')}
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
import { tryCatch } from 'fp-ts/TaskEither'
|
import { tryCatch } from 'fp-ts/TaskEither'
|
||||||
|
import * as J from 'fp-ts/Json'
|
||||||
|
import * as E from 'fp-ts/Either'
|
||||||
|
import { pipe } from 'fp-ts/lib/function'
|
||||||
|
|
||||||
async function fetcher<T>(url: string, opt?: RequestInit): Promise<T> {
|
async function fetcher(url: string, opt?: RequestInit, controller?: AbortController): Promise<string> {
|
||||||
const jwt = localStorage.getItem('token')
|
const jwt = localStorage.getItem('token')
|
||||||
|
|
||||||
if (opt && !opt.headers) {
|
if (opt && !opt.headers) {
|
||||||
@@ -14,17 +17,27 @@ async function fetcher<T>(url: string, opt?: RequestInit): Promise<T> {
|
|||||||
headers: {
|
headers: {
|
||||||
...opt?.headers,
|
...opt?.headers,
|
||||||
'X-Authentication': jwt ?? ''
|
'X-Authentication': jwt ?? ''
|
||||||
}
|
},
|
||||||
|
signal: controller?.signal
|
||||||
})
|
})
|
||||||
|
|
||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
throw await res.text()
|
throw await res.text()
|
||||||
}
|
}
|
||||||
|
|
||||||
return res.json() as T
|
|
||||||
|
|
||||||
|
return res.text()
|
||||||
}
|
}
|
||||||
|
|
||||||
export const ffetch = <T>(url: string, opt?: RequestInit) => tryCatch(
|
export const ffetch = <T>(url: string, opt?: RequestInit, controller?: AbortController) => tryCatch(
|
||||||
() => fetcher<T>(url, opt),
|
async () => pipe(
|
||||||
|
await fetcher(url, opt, controller),
|
||||||
|
J.parse,
|
||||||
|
E.match(
|
||||||
|
(l) => l as T,
|
||||||
|
(r) => r as T
|
||||||
|
)
|
||||||
|
),
|
||||||
(e) => `error while fetching: ${e}`
|
(e) => `error while fetching: ${e}`
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -200,4 +200,11 @@ export class RPCClient {
|
|||||||
params: []
|
params: []
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public clearCompleted() {
|
||||||
|
return this.sendHTTP({
|
||||||
|
method: 'Service.ClearCompleted',
|
||||||
|
params: []
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -13,6 +13,7 @@ export type RPCMethods =
|
|||||||
| "Service.ProgressLivestream"
|
| "Service.ProgressLivestream"
|
||||||
| "Service.KillLivestream"
|
| "Service.KillLivestream"
|
||||||
| "Service.KillAllLivestream"
|
| "Service.KillAllLivestream"
|
||||||
|
| "Service.ClearCompleted"
|
||||||
|
|
||||||
export type RPCRequest = {
|
export type RPCRequest = {
|
||||||
method: RPCMethods
|
method: RPCMethods
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -1,6 +1,6 @@
|
|||||||
module github.com/marcopiovanello/yt-dlp-web-ui/v3
|
module github.com/marcopiovanello/yt-dlp-web-ui/v3
|
||||||
|
|
||||||
go 1.23
|
go 1.24
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
|
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
|
||||||
|
|||||||
58
server/archive/utils.go
Normal file
58
server/archive/utils.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package archive
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Perform a search on the archive.txt file an determines if a download
|
||||||
|
// has already be done.
|
||||||
|
func DownloadExists(ctx context.Context, url string) (bool, error) {
|
||||||
|
cmd := exec.CommandContext(
|
||||||
|
ctx,
|
||||||
|
config.Instance().DownloaderPath,
|
||||||
|
"--print",
|
||||||
|
"%(extractor)s %(id)s",
|
||||||
|
url,
|
||||||
|
)
|
||||||
|
stdout, err := cmd.Output()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
extractorAndURL := bytes.Trim(stdout, "\n")
|
||||||
|
|
||||||
|
fd, err := os.Open(filepath.Join(config.Instance().Dir(), "archive.txt"))
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer fd.Close()
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(fd)
|
||||||
|
|
||||||
|
// search linearly for lower memory usage...
|
||||||
|
// the a pre-sorted with hashed values version of the archive.txt file can be loaded in memory
|
||||||
|
// and perform a binary search on it.
|
||||||
|
for scanner.Scan() {
|
||||||
|
if bytes.Equal(scanner.Bytes(), extractorAndURL) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// data, err := io.ReadAll(fd)
|
||||||
|
// if err != nil {
|
||||||
|
// return false, err
|
||||||
|
// }
|
||||||
|
|
||||||
|
// slices.BinarySearchFunc(data, extractorAndURL, func(a []byte, b []byte) int {
|
||||||
|
// return hash(a).Compare(hash(b))
|
||||||
|
// })
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
18
server/common/types.go
Normal file
18
server/common/types.go
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Used to deser the yt-dlp -J output
|
||||||
|
type DownloadInfo struct {
|
||||||
|
URL string `json:"url"`
|
||||||
|
Title string `json:"title"`
|
||||||
|
Thumbnail string `json:"thumbnail"`
|
||||||
|
Resolution string `json:"resolution"`
|
||||||
|
Size int32 `json:"filesize_approx"`
|
||||||
|
VCodec string `json:"vcodec"`
|
||||||
|
ACodec string `json:"acodec"`
|
||||||
|
Extension string `json:"ext"`
|
||||||
|
OriginalURL string `json:"original_url"`
|
||||||
|
FileName string `json:"filename"`
|
||||||
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
@@ -28,8 +29,14 @@ type Config struct {
|
|||||||
OpenIdClientId string `yaml:"openid_client_id"`
|
OpenIdClientId string `yaml:"openid_client_id"`
|
||||||
OpenIdClientSecret string `yaml:"openid_client_secret"`
|
OpenIdClientSecret string `yaml:"openid_client_secret"`
|
||||||
OpenIdRedirectURL string `yaml:"openid_redirect_url"`
|
OpenIdRedirectURL string `yaml:"openid_redirect_url"`
|
||||||
|
OpenIdEmailWhitelist []string `yaml:"openid_email_whitelist"`
|
||||||
FrontendPath string `yaml:"frontend_path"`
|
FrontendPath string `yaml:"frontend_path"`
|
||||||
AutoArchive bool `yaml:"auto_archive"`
|
AutoArchive bool `yaml:"auto_archive"`
|
||||||
|
Twitch struct {
|
||||||
|
ClientId string `yaml:"client_id"`
|
||||||
|
ClientSecret string `yaml:"client_secret"`
|
||||||
|
CheckInterval time.Duration `yaml:"check_interval"`
|
||||||
|
} `yaml:"twitch"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package internal
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
"log/slog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LoadBalancer struct {
|
type LoadBalancer struct {
|
||||||
@@ -9,7 +10,29 @@ type LoadBalancer struct {
|
|||||||
done chan *Worker
|
done chan *Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *LoadBalancer) Balance(work chan Process) {
|
func NewLoadBalancer(numWorker int) *LoadBalancer {
|
||||||
|
var pool Pool
|
||||||
|
|
||||||
|
doneChan := make(chan *Worker)
|
||||||
|
|
||||||
|
for i := range numWorker {
|
||||||
|
w := &Worker{
|
||||||
|
requests: make(chan *Process, 1),
|
||||||
|
index: i,
|
||||||
|
}
|
||||||
|
go w.Work(doneChan)
|
||||||
|
pool = append(pool, w)
|
||||||
|
|
||||||
|
slog.Info("spawned worker", slog.Int("index", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &LoadBalancer{
|
||||||
|
pool: pool,
|
||||||
|
done: doneChan,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *LoadBalancer) Balance(work chan *Process) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case req := <-work:
|
case req := <-work:
|
||||||
@@ -20,7 +43,7 @@ func (b *LoadBalancer) Balance(work chan Process) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *LoadBalancer) dispatch(req Process) {
|
func (b *LoadBalancer) dispatch(req *Process) {
|
||||||
w := heap.Pop(&b.pool).(*Worker)
|
w := heap.Pop(&b.pool).(*Worker)
|
||||||
w.requests <- req
|
w.requests <- req
|
||||||
w.pending++
|
w.pending++
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
|
||||||
|
)
|
||||||
|
|
||||||
// Used to unmarshall yt-dlp progress
|
// Used to unmarshall yt-dlp progress
|
||||||
type ProgressTemplate struct {
|
type ProgressTemplate struct {
|
||||||
@@ -29,27 +31,12 @@ type DownloadProgress struct {
|
|||||||
ETA float64 `json:"eta"`
|
ETA float64 `json:"eta"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used to deser the yt-dlp -J output
|
|
||||||
type DownloadInfo struct {
|
|
||||||
URL string `json:"url"`
|
|
||||||
Title string `json:"title"`
|
|
||||||
Thumbnail string `json:"thumbnail"`
|
|
||||||
Resolution string `json:"resolution"`
|
|
||||||
Size int32 `json:"filesize_approx"`
|
|
||||||
VCodec string `json:"vcodec"`
|
|
||||||
ACodec string `json:"acodec"`
|
|
||||||
Extension string `json:"ext"`
|
|
||||||
OriginalURL string `json:"original_url"`
|
|
||||||
FileName string `json:"filename"`
|
|
||||||
CreatedAt time.Time `json:"created_at"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// struct representing the response sent to the client
|
// struct representing the response sent to the client
|
||||||
// as JSON-RPC result field
|
// as JSON-RPC result field
|
||||||
type ProcessResponse struct {
|
type ProcessResponse struct {
|
||||||
Id string `json:"id"`
|
Id string `json:"id"`
|
||||||
Progress DownloadProgress `json:"progress"`
|
Progress DownloadProgress `json:"progress"`
|
||||||
Info DownloadInfo `json:"info"`
|
Info common.DownloadInfo `json:"info"`
|
||||||
Output DownloadOutput `json:"output"`
|
Output DownloadOutput `json:"output"`
|
||||||
Params []string `json:"params"`
|
Params []string `json:"params"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -141,27 +141,14 @@ func (l *LiveStream) monitorStartTime(r io.Reader) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const TRIES = 5
|
|
||||||
/*
|
|
||||||
if it's waiting a livestream the 5th line will indicate the time to live
|
|
||||||
its a dumb and not robust method.
|
|
||||||
|
|
||||||
example:
|
|
||||||
[youtube] Extracting URL: https://www.youtube.com/watch?v=IQVbGfVVjgY
|
|
||||||
[youtube] IQVbGfVVjgY: Downloading webpage
|
|
||||||
[youtube] IQVbGfVVjgY: Downloading ios player API JSON
|
|
||||||
[youtube] IQVbGfVVjgY: Downloading web creator player API JSON
|
|
||||||
WARNING: [youtube] This live event will begin in 27 minutes. <- STDERR, ignore
|
|
||||||
[wait] Waiting for 00:27:15 - Press Ctrl+C to try now <- 5th line
|
|
||||||
*/
|
|
||||||
for range TRIES {
|
|
||||||
scanner.Scan()
|
scanner.Scan()
|
||||||
|
|
||||||
if strings.Contains(scanner.Text(), "Waiting for") {
|
for !strings.Contains(scanner.Text(), "Waiting for") {
|
||||||
|
scanner.Scan()
|
||||||
|
}
|
||||||
|
|
||||||
waitTimeScanner()
|
waitTimeScanner()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *LiveStream) WaitTime() <-chan time.Duration {
|
func (l *LiveStream) WaitTime() <-chan time.Duration {
|
||||||
return l.waitTimeChan
|
return l.waitTimeChan
|
||||||
|
|||||||
@@ -9,15 +9,17 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func setupTest() {
|
func setupTest() {
|
||||||
config.Instance().DownloaderPath = "yt-dlp"
|
config.Instance().DownloaderPath = "build/yt-dlp"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const URL = "https://www.youtube.com/watch?v=pwoAyLGOysU"
|
||||||
|
|
||||||
func TestLivestream(t *testing.T) {
|
func TestLivestream(t *testing.T) {
|
||||||
setupTest()
|
setupTest()
|
||||||
|
|
||||||
done := make(chan *LiveStream)
|
done := make(chan *LiveStream)
|
||||||
|
|
||||||
ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", done, &internal.MessageQueue{}, &internal.MemoryDB{})
|
ls := New(URL, done, &internal.MessageQueue{}, &internal.MemoryDB{})
|
||||||
go ls.Start()
|
go ls.Start()
|
||||||
|
|
||||||
time.AfterFunc(time.Second*20, func() {
|
time.AfterFunc(time.Second*20, func() {
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ func (m *Monitor) Status() LiveStreamStatus {
|
|||||||
// Persist the monitor current state to a file.
|
// Persist the monitor current state to a file.
|
||||||
// The file is located in the configured config directory
|
// The file is located in the configured config directory
|
||||||
func (m *Monitor) Persist() error {
|
func (m *Monitor) Persist() error {
|
||||||
fd, err := os.Create(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
|
fd, err := os.Create(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -95,7 +95,7 @@ func (m *Monitor) Persist() error {
|
|||||||
|
|
||||||
// Restore a saved state and resume the monitored livestreams
|
// Restore a saved state and resume the monitored livestreams
|
||||||
func (m *Monitor) Restore() error {
|
func (m *Monitor) Restore() error {
|
||||||
fd, err := os.Open(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
|
fd, err := os.Open(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,20 +9,18 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist"
|
||||||
)
|
)
|
||||||
|
|
||||||
type metadata struct {
|
|
||||||
Entries []DownloadInfo `json:"entries"`
|
|
||||||
Count int `json:"playlist_count"`
|
|
||||||
PlaylistTitle string `json:"title"`
|
|
||||||
Type string `json:"_type"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
|
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
|
||||||
|
params := append(req.Params, "--flat-playlist", "-J")
|
||||||
|
urlWithParams := append([]string{req.URL}, params...)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
downloader = config.Instance().DownloaderPath
|
downloader = config.Instance().DownloaderPath
|
||||||
cmd = exec.Command(downloader, req.URL, "--flat-playlist", "-J")
|
cmd = exec.Command(downloader, urlWithParams...)
|
||||||
)
|
)
|
||||||
|
|
||||||
stdout, err := cmd.StdoutPipe()
|
stdout, err := cmd.StdoutPipe()
|
||||||
@@ -30,7 +28,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var m metadata
|
var m playlist.Metadata
|
||||||
|
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -52,13 +50,21 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
|
|||||||
return errors.New("probably not a valid URL")
|
return errors.New("probably not a valid URL")
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.Type == "playlist" {
|
if m.IsPlaylist() {
|
||||||
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a DownloadInfo, b DownloadInfo) bool {
|
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool {
|
||||||
return a.URL == b.URL
|
return a.URL == b.URL
|
||||||
})
|
})
|
||||||
|
|
||||||
|
entries = slices.DeleteFunc(entries, func(e common.DownloadInfo) bool {
|
||||||
|
return strings.Contains(e.URL, "list=")
|
||||||
|
})
|
||||||
|
|
||||||
slog.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries)))
|
slog.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries)))
|
||||||
|
|
||||||
|
if err := playlist.ApplyModifiers(&entries, req.Params); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
for i, meta := range entries {
|
for i, meta := range entries {
|
||||||
// detect playlist title from metadata since each playlist entry will be
|
// detect playlist title from metadata since each playlist entry will be
|
||||||
// treated as an individual download
|
// treated as an individual download
|
||||||
@@ -82,11 +88,13 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
|
|||||||
|
|
||||||
proc.Info.URL = meta.URL
|
proc.Info.URL = meta.URL
|
||||||
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
|
|
||||||
db.Set(proc)
|
db.Set(proc)
|
||||||
mq.Publish(proc)
|
mq.Publish(proc)
|
||||||
|
|
||||||
|
proc.Info.CreatedAt = meta.CreatedAt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
proc := &Process{
|
proc := &Process{
|
||||||
|
|||||||
@@ -1,16 +1,24 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
|
// Pool implements heap.Interface interface as a standard priority queue
|
||||||
type Pool []*Worker
|
type Pool []*Worker
|
||||||
|
|
||||||
func (h Pool) Len() int { return len(h) }
|
func (h Pool) Len() int { return len(h) }
|
||||||
func (h Pool) Less(i, j int) bool { return h[i].index < h[j].index }
|
func (h Pool) Less(i, j int) bool { return h[i].pending < h[j].pending }
|
||||||
func (h Pool) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
||||||
|
func (h Pool) Swap(i, j int) {
|
||||||
|
h[i], h[j] = h[j], h[i]
|
||||||
|
h[i].index = i
|
||||||
|
h[j].index = j
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
|
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
|
||||||
|
|
||||||
func (h *Pool) Pop() any {
|
func (h *Pool) Pop() any {
|
||||||
old := *h
|
old := *h
|
||||||
n := len(old)
|
n := len(old)
|
||||||
x := old[n-1]
|
x := old[n-1]
|
||||||
|
old[n-1] = nil
|
||||||
*h = old[0 : n-1]
|
*h = old[0 : n-1]
|
||||||
return x
|
return x
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archiver"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archiver"
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -50,7 +51,7 @@ type Process struct {
|
|||||||
Livestream bool
|
Livestream bool
|
||||||
AutoRemove bool
|
AutoRemove bool
|
||||||
Params []string
|
Params []string
|
||||||
Info DownloadInfo
|
Info common.DownloadInfo
|
||||||
Progress DownloadProgress
|
Progress DownloadProgress
|
||||||
Output DownloadOutput
|
Output DownloadOutput
|
||||||
proc *os.Process
|
proc *os.Process
|
||||||
@@ -99,6 +100,7 @@ func (p *Process) Start() {
|
|||||||
templateReplacer.Replace(downloadTemplate),
|
templateReplacer.Replace(downloadTemplate),
|
||||||
"--progress-template",
|
"--progress-template",
|
||||||
templateReplacer.Replace(postprocessTemplate),
|
templateReplacer.Replace(postprocessTemplate),
|
||||||
|
"--no-exec",
|
||||||
}
|
}
|
||||||
|
|
||||||
// if user asked to manually override the output path...
|
// if user asked to manually override the output path...
|
||||||
@@ -302,7 +304,7 @@ func (p *Process) GetFileName(o *DownloadOutput) error {
|
|||||||
|
|
||||||
func (p *Process) SetPending() {
|
func (p *Process) SetPending() {
|
||||||
// Since video's title isn't available yet, fill in with the URL.
|
// Since video's title isn't available yet, fill in with the URL.
|
||||||
p.Info = DownloadInfo{
|
p.Info = common.DownloadInfo{
|
||||||
URL: p.Url,
|
URL: p.Url,
|
||||||
Title: p.Url,
|
Title: p.Url,
|
||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
@@ -334,7 +336,7 @@ func (p *Process) SetMetadata() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
info := DownloadInfo{
|
info := common.DownloadInfo{
|
||||||
URL: p.Url,
|
URL: p.Url,
|
||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package internal
|
package internal
|
||||||
|
|
||||||
type Worker struct {
|
type Worker struct {
|
||||||
requests chan Process // downloads to do
|
requests chan *Process // downloads to do
|
||||||
pending int // downloads pending
|
pending int // downloads pending
|
||||||
index int // index in the heap
|
index int // index in the heap
|
||||||
}
|
}
|
||||||
|
|||||||
20
server/middleware/utils.go
Normal file
20
server/middleware/utils.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package middlewares
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ApplyAuthenticationByConfig(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if config.Instance().RequireAuth {
|
||||||
|
Authenticated(next)
|
||||||
|
}
|
||||||
|
if config.Instance().UseOpenId {
|
||||||
|
openid.Middleware(next)
|
||||||
|
}
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -6,10 +6,12 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"slices"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/go-oidc/v3/oidc"
|
"github.com/coreos/go-oidc/v3/oidc"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||||
"golang.org/x/oauth2"
|
"golang.org/x/oauth2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -76,6 +78,21 @@ func doAuthentification(r *http.Request, setCookieCallback func(t *oauth2.Token)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var claims struct {
|
||||||
|
Email string `json:"email"`
|
||||||
|
Verified bool `json:"email_verified"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := idToken.Claims(&claims); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
whitelist := config.Instance().OpenIdEmailWhitelist
|
||||||
|
|
||||||
|
if len(whitelist) > 0 && !slices.Contains(whitelist, claims.Email) {
|
||||||
|
return nil, errors.New("email address not found in ACL")
|
||||||
|
}
|
||||||
|
|
||||||
nonce, err := r.Cookie("nonce")
|
nonce, err := r.Cookie("nonce")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
86
server/playlist/modifiers.go
Normal file
86
server/playlist/modifiers.go
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
package playlist
|
||||||
|
|
||||||
|
import (
|
||||||
|
"slices"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
Applicable modifiers
|
||||||
|
|
||||||
|
full | short | description
|
||||||
|
---------------------------------------------------------------------------------
|
||||||
|
--playlist-start NUMBER | -I NUMBER: | discard first N entries
|
||||||
|
--playlist-end NUMBER | -I :NUMBER | discard last N entries
|
||||||
|
--playlist-reverse | -I ::-1 | self explanatory
|
||||||
|
--max-downloads NUMBER | | stops after N completed downloads
|
||||||
|
*/
|
||||||
|
|
||||||
|
func ApplyModifiers(entries *[]common.DownloadInfo, args []string) error {
|
||||||
|
for i, modifier := range args {
|
||||||
|
switch modifier {
|
||||||
|
case "--playlist-start":
|
||||||
|
return playlistStart(i, modifier, args, entries)
|
||||||
|
|
||||||
|
case "--playlist-end":
|
||||||
|
return playlistEnd(i, modifier, args, entries)
|
||||||
|
|
||||||
|
case "--max-downloads":
|
||||||
|
return maxDownloads(i, modifier, args, entries)
|
||||||
|
|
||||||
|
case "--playlist-reverse":
|
||||||
|
slices.Reverse(*entries)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func playlistStart(i int, modifier string, args []string, entries *[]common.DownloadInfo) error {
|
||||||
|
if !guard(i, len(modifier)) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := strconv.Atoi(args[i+1])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*entries = (*entries)[n:]
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func playlistEnd(i int, modifier string, args []string, entries *[]common.DownloadInfo) error {
|
||||||
|
if !guard(i, len(modifier)) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := strconv.Atoi(args[i+1])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*entries = (*entries)[:n]
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func maxDownloads(i int, modifier string, args []string, entries *[]common.DownloadInfo) error {
|
||||||
|
if !guard(i, len(modifier)) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := strconv.Atoi(args[i+1])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*entries = (*entries)[0:n]
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func guard(i, len int) bool { return i+1 < len-1 }
|
||||||
12
server/playlist/types.go
Normal file
12
server/playlist/types.go
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
package playlist
|
||||||
|
|
||||||
|
import "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
|
||||||
|
|
||||||
|
type Metadata struct {
|
||||||
|
Entries []common.DownloadInfo `json:"entries"`
|
||||||
|
Count int `json:"playlist_count"`
|
||||||
|
PlaylistTitle string `json:"title"`
|
||||||
|
Type string `json:"_type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Metadata) IsPlaylist() bool { return m.Type == "playlist" }
|
||||||
@@ -183,6 +183,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("succesfully killed process", slog.String("id", proc.Id))
|
slog.Info("succesfully killed process", slog.String("id", proc.Id))
|
||||||
|
proc = nil // gc helper
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -195,6 +196,35 @@ func (s *Service) Clear(args string, killed *string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Removes completed processes
|
||||||
|
func (s *Service) ClearCompleted(cleared *string) error {
|
||||||
|
var (
|
||||||
|
keys = s.db.Keys()
|
||||||
|
removeFunc = func(p *internal.Process) error {
|
||||||
|
defer s.db.Delete(p.Id)
|
||||||
|
|
||||||
|
if p.Progress.Status != internal.StatusCompleted {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.Kill()
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, key := range *keys {
|
||||||
|
proc, err := s.db.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := removeFunc(proc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// FreeSpace gets the available from package sys util
|
// FreeSpace gets the available from package sys util
|
||||||
func (s *Service) FreeSpace(args NoArgs, free *uint64) error {
|
func (s *Service) FreeSpace(args NoArgs, free *uint64) error {
|
||||||
freeSpace, err := sys.FreeSpace()
|
freeSpace, err := sys.FreeSpace()
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/twitch"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user"
|
||||||
|
|
||||||
_ "modernc.org/sqlite"
|
_ "modernc.org/sqlite"
|
||||||
@@ -51,6 +52,7 @@ type serverConfig struct {
|
|||||||
db *sql.DB
|
db *sql.DB
|
||||||
mq *internal.MessageQueue
|
mq *internal.MessageQueue
|
||||||
lm *livestream.Monitor
|
lm *livestream.Monitor
|
||||||
|
tm *twitch.Monitor
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: change scope
|
// TODO: change scope
|
||||||
@@ -115,17 +117,33 @@ func RunBlocking(rc *RunConfig) {
|
|||||||
go lm.Schedule()
|
go lm.Schedule()
|
||||||
go lm.Restore()
|
go lm.Restore()
|
||||||
|
|
||||||
srv := newServer(serverConfig{
|
tm := twitch.NewMonitor(
|
||||||
|
twitch.NewAuthenticationManager(
|
||||||
|
config.Instance().Twitch.ClientId,
|
||||||
|
config.Instance().Twitch.ClientSecret,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
go tm.Monitor(
|
||||||
|
context.TODO(),
|
||||||
|
config.Instance().Twitch.CheckInterval,
|
||||||
|
twitch.DEFAULT_DOWNLOAD_HANDLER(mdb, mq),
|
||||||
|
)
|
||||||
|
go tm.Restore()
|
||||||
|
|
||||||
|
scfg := serverConfig{
|
||||||
frontend: rc.App,
|
frontend: rc.App,
|
||||||
swagger: rc.Swagger,
|
swagger: rc.Swagger,
|
||||||
mdb: mdb,
|
mdb: mdb,
|
||||||
mq: mq,
|
mq: mq,
|
||||||
db: db,
|
db: db,
|
||||||
lm: lm,
|
lm: lm,
|
||||||
})
|
tm: tm,
|
||||||
|
}
|
||||||
|
|
||||||
go gracefulShutdown(srv, mdb)
|
srv := newServer(scfg)
|
||||||
go autoPersist(time.Minute*5, mdb, lm)
|
|
||||||
|
go gracefulShutdown(srv, &scfg)
|
||||||
|
go autoPersist(time.Minute*5, mdb, lm, tm)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
network = "tcp"
|
network = "tcp"
|
||||||
@@ -188,12 +206,7 @@ func newServer(c serverConfig) *http.Server {
|
|||||||
|
|
||||||
// Filebrowser routes
|
// Filebrowser routes
|
||||||
r.Route("/filebrowser", func(r chi.Router) {
|
r.Route("/filebrowser", func(r chi.Router) {
|
||||||
if config.Instance().RequireAuth {
|
r.Use(middlewares.ApplyAuthenticationByConfig)
|
||||||
r.Use(middlewares.Authenticated)
|
|
||||||
}
|
|
||||||
if config.Instance().UseOpenId {
|
|
||||||
r.Use(openid.Middleware)
|
|
||||||
}
|
|
||||||
r.Post("/downloaded", filebrowser.ListDownloaded)
|
r.Post("/downloaded", filebrowser.ListDownloaded)
|
||||||
r.Post("/delete", filebrowser.DeleteFile)
|
r.Post("/delete", filebrowser.DeleteFile)
|
||||||
r.Get("/d/{id}", filebrowser.DownloadFile)
|
r.Get("/d/{id}", filebrowser.DownloadFile)
|
||||||
@@ -235,10 +248,17 @@ func newServer(c serverConfig) *http.Server {
|
|||||||
// Subscriptions
|
// Subscriptions
|
||||||
r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter())
|
r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter())
|
||||||
|
|
||||||
|
// Twitch
|
||||||
|
r.Route("/twitch", func(r chi.Router) {
|
||||||
|
r.Use(middlewares.ApplyAuthenticationByConfig)
|
||||||
|
r.Get("/all", twitch.GetMonitoredUsers(c.tm))
|
||||||
|
r.Post("/add", twitch.MonitorUserHandler(c.tm))
|
||||||
|
})
|
||||||
|
|
||||||
return &http.Server{Handler: r}
|
return &http.Server{Handler: r}
|
||||||
}
|
}
|
||||||
|
|
||||||
func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) {
|
func gracefulShutdown(srv *http.Server, cfg *serverConfig) {
|
||||||
ctx, stop := signal.NotifyContext(context.Background(),
|
ctx, stop := signal.NotifyContext(context.Background(),
|
||||||
os.Interrupt,
|
os.Interrupt,
|
||||||
syscall.SIGTERM,
|
syscall.SIGTERM,
|
||||||
@@ -250,7 +270,9 @@ func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) {
|
|||||||
slog.Info("shutdown signal received")
|
slog.Info("shutdown signal received")
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
db.Persist()
|
cfg.mdb.Persist()
|
||||||
|
cfg.lm.Persist()
|
||||||
|
cfg.tm.Persist()
|
||||||
|
|
||||||
stop()
|
stop()
|
||||||
srv.Shutdown(context.Background())
|
srv.Shutdown(context.Background())
|
||||||
@@ -258,8 +280,14 @@ func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func autoPersist(d time.Duration, db *internal.MemoryDB, lm *livestream.Monitor) {
|
func autoPersist(
|
||||||
|
d time.Duration,
|
||||||
|
db *internal.MemoryDB,
|
||||||
|
lm *livestream.Monitor,
|
||||||
|
tm *twitch.Monitor,
|
||||||
|
) {
|
||||||
for {
|
for {
|
||||||
|
time.Sleep(d)
|
||||||
if err := db.Persist(); err != nil {
|
if err := db.Persist(); err != nil {
|
||||||
slog.Warn("failed to persisted session", slog.Any("err", err))
|
slog.Warn("failed to persisted session", slog.Any("err", err))
|
||||||
}
|
}
|
||||||
@@ -267,7 +295,10 @@ func autoPersist(d time.Duration, db *internal.MemoryDB, lm *livestream.Monitor)
|
|||||||
slog.Warn(
|
slog.Warn(
|
||||||
"failed to persisted livestreams monitor session", slog.Any("err", err.Error()))
|
"failed to persisted livestreams monitor session", slog.Any("err", err.Error()))
|
||||||
}
|
}
|
||||||
|
if err := tm.Persist(); err != nil {
|
||||||
|
slog.Warn(
|
||||||
|
"failed to persisted twitch monitor session", slog.Any("err", err.Error()))
|
||||||
|
}
|
||||||
slog.Debug("sucessfully persisted session")
|
slog.Debug("sucessfully persisted session")
|
||||||
time.Sleep(d)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ func toDB(dto *domain.Subscription) data.Subscription {
|
|||||||
|
|
||||||
// Delete implements domain.Service.
|
// Delete implements domain.Service.
|
||||||
func (s *Service) Delete(ctx context.Context, id string) error {
|
func (s *Service) Delete(ctx context.Context, id string) error {
|
||||||
|
s.runner.StopTask(id)
|
||||||
return s.r.Delete(ctx, id)
|
return s.r.Delete(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,9 +7,9 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
|
||||||
@@ -19,10 +19,12 @@ import (
|
|||||||
type TaskRunner interface {
|
type TaskRunner interface {
|
||||||
Submit(subcription *domain.Subscription) error
|
Submit(subcription *domain.Subscription) error
|
||||||
Spawner(ctx context.Context)
|
Spawner(ctx context.Context)
|
||||||
|
StopTask(id string) error
|
||||||
Recoverer()
|
Recoverer()
|
||||||
}
|
}
|
||||||
|
|
||||||
type taskPair struct {
|
type monitorTask struct {
|
||||||
|
Done chan struct{}
|
||||||
Schedule cron.Schedule
|
Schedule cron.Schedule
|
||||||
Subscription *domain.Subscription
|
Subscription *domain.Subscription
|
||||||
}
|
}
|
||||||
@@ -31,21 +33,22 @@ type CronTaskRunner struct {
|
|||||||
mq *internal.MessageQueue
|
mq *internal.MessageQueue
|
||||||
db *internal.MemoryDB
|
db *internal.MemoryDB
|
||||||
|
|
||||||
tasks chan taskPair
|
tasks chan monitorTask
|
||||||
errors chan error
|
errors chan error
|
||||||
|
|
||||||
|
running map[string]*monitorTask
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner {
|
func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner {
|
||||||
return &CronTaskRunner{
|
return &CronTaskRunner{
|
||||||
mq: mq,
|
mq: mq,
|
||||||
db: db,
|
db: db,
|
||||||
tasks: make(chan taskPair),
|
tasks: make(chan monitorTask),
|
||||||
errors: make(chan error),
|
errors: make(chan error),
|
||||||
|
running: make(map[string]*monitorTask),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const commandTemplate = "-I1 --flat-playlist --print webpage_url $1"
|
|
||||||
|
|
||||||
var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`)
|
var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`)
|
||||||
|
|
||||||
func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
|
func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
|
||||||
@@ -54,7 +57,8 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
job := taskPair{
|
job := monitorTask{
|
||||||
|
Done: make(chan struct{}),
|
||||||
Schedule: schedule,
|
Schedule: schedule,
|
||||||
Subscription: subcription,
|
Subscription: subcription,
|
||||||
}
|
}
|
||||||
@@ -64,54 +68,110 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handles the entire lifecylce of a monitor job.
|
||||||
func (t *CronTaskRunner) Spawner(ctx context.Context) {
|
func (t *CronTaskRunner) Spawner(ctx context.Context) {
|
||||||
for task := range t.tasks {
|
for req := range t.tasks {
|
||||||
|
t.running[req.Subscription.Id] = &req // keep track of the current job
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ctx, cancel := context.WithCancel(ctx) // inject into the job's context a cancellation singal
|
||||||
|
fetcherEvents := t.doFetch(ctx, &req) // retrieve the channel of events of the job
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-req.Done:
|
||||||
|
slog.Info("stopping cron job and removing schedule", slog.String("url", req.Subscription.URL))
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
case <-fetcherEvents:
|
||||||
|
slog.Info("finished monitoring channel", slog.String("url", req.Subscription.URL))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop a currently scheduled job
|
||||||
|
func (t *CronTaskRunner) StopTask(id string) error {
|
||||||
|
task := t.running[id]
|
||||||
|
if task != nil {
|
||||||
|
t.running[id].Done <- struct{}{}
|
||||||
|
delete(t.running, id)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a fetcher and notify on a channel when a fetcher has completed
|
||||||
|
func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan struct{} {
|
||||||
|
completed := make(chan struct{})
|
||||||
|
|
||||||
|
// generator func
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
slog.Info("fetching latest video for channel", slog.String("channel", task.Subscription.URL))
|
sleepFor := t.fetcher(ctx, req)
|
||||||
|
completed <- struct{}{}
|
||||||
|
|
||||||
fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", task.Subscription.URL, 1), " ")
|
time.Sleep(sleepFor)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return completed
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the retrieval of the latest video of the channel.
|
||||||
|
// Returns a time.Duration containing the amount of time to the next schedule.
|
||||||
|
func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Duration {
|
||||||
|
slog.Info("fetching latest video for channel", slog.String("channel", req.Subscription.URL))
|
||||||
|
|
||||||
|
nextSchedule := time.Until(req.Schedule.Next(time.Now()))
|
||||||
|
|
||||||
cmd := exec.CommandContext(
|
cmd := exec.CommandContext(
|
||||||
ctx,
|
ctx,
|
||||||
config.Instance().DownloaderPath,
|
config.Instance().DownloaderPath,
|
||||||
fetcherParams...,
|
"-I1",
|
||||||
|
"--flat-playlist",
|
||||||
|
"--print", "webpage_url",
|
||||||
|
req.Subscription.URL,
|
||||||
)
|
)
|
||||||
|
|
||||||
stdout, err := cmd.Output()
|
stdout, err := cmd.Output()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.errors <- err
|
t.errors <- err
|
||||||
return
|
return time.Duration(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
latestChannelURL := string(bytes.Trim(stdout, "\n"))
|
latestVideoURL := string(bytes.Trim(stdout, "\n"))
|
||||||
|
|
||||||
|
// if the download exists there's not point in sending it into the message queue.
|
||||||
|
exists, err := archive.DownloadExists(ctx, latestVideoURL)
|
||||||
|
if exists && err == nil {
|
||||||
|
return nextSchedule
|
||||||
|
}
|
||||||
|
|
||||||
p := &internal.Process{
|
p := &internal.Process{
|
||||||
Url: latestChannelURL,
|
Url: latestVideoURL,
|
||||||
Params: append(argsSplitterRe.FindAllString(task.Subscription.Params, 1), []string{
|
Params: append(
|
||||||
|
argsSplitterRe.FindAllString(req.Subscription.Params, 1),
|
||||||
|
[]string{
|
||||||
|
"--break-on-existing",
|
||||||
"--download-archive",
|
"--download-archive",
|
||||||
filepath.Join(config.Instance().Dir(), "archive.txt"),
|
filepath.Join(config.Instance().Dir(), "archive.txt"),
|
||||||
}...),
|
}...),
|
||||||
AutoRemove: true,
|
AutoRemove: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
t.db.Set(p)
|
t.db.Set(p) // give it an id
|
||||||
t.mq.Publish(p)
|
t.mq.Publish(p) // send it to the message queue waiting to be processed
|
||||||
|
|
||||||
nextSchedule := time.Until(task.Schedule.Next(time.Now()))
|
|
||||||
|
|
||||||
slog.Info(
|
slog.Info(
|
||||||
"cron task runner next schedule",
|
"cron task runner next schedule",
|
||||||
slog.String("url", task.Subscription.URL),
|
slog.String("url", req.Subscription.URL),
|
||||||
slog.Any("duration", nextSchedule),
|
slog.Any("duration", nextSchedule),
|
||||||
)
|
)
|
||||||
|
|
||||||
time.Sleep(nextSchedule)
|
return nextSchedule
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *CronTaskRunner) Recoverer() {
|
func (t *CronTaskRunner) Recoverer() {
|
||||||
panic("Unimplemented")
|
panic("unimplemented")
|
||||||
}
|
}
|
||||||
|
|||||||
75
server/twitch/auth.go
Normal file
75
server/twitch/auth.go
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
package twitch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const authURL = "https://id.twitch.tv/oauth2/token"
|
||||||
|
|
||||||
|
type AuthResponse struct {
|
||||||
|
AccessToken string `json:"access_token"`
|
||||||
|
ExpiresIn int `json:"expires_in"`
|
||||||
|
TokenType string `json:"token_type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type AccessToken struct {
|
||||||
|
Token string
|
||||||
|
Expiry time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type AuthenticationManager struct {
|
||||||
|
clientId string
|
||||||
|
clientSecret string
|
||||||
|
accesToken *AccessToken
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAuthenticationManager(clientId, clientSecret string) *AuthenticationManager {
|
||||||
|
return &AuthenticationManager{
|
||||||
|
clientId: clientId,
|
||||||
|
clientSecret: clientSecret,
|
||||||
|
accesToken: &AccessToken{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AuthenticationManager) GetAccessToken() (*AccessToken, error) {
|
||||||
|
if a.accesToken != nil && a.accesToken.Token != "" && a.accesToken.Expiry.After(time.Now()) {
|
||||||
|
return a.accesToken, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
data := url.Values{}
|
||||||
|
data.Set("client_id", a.clientId)
|
||||||
|
data.Set("client_secret", a.clientSecret)
|
||||||
|
data.Set("grant_type", "client_credentials")
|
||||||
|
|
||||||
|
resp, err := http.PostForm(authURL, data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("errore richiesta token: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("status non OK: %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
var auth AuthResponse
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&auth); err != nil {
|
||||||
|
return nil, fmt.Errorf("errore decoding JSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
token := &AccessToken{
|
||||||
|
Token: auth.AccessToken,
|
||||||
|
Expiry: time.Now().Add(time.Duration(auth.ExpiresIn) * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
a.accesToken = token
|
||||||
|
|
||||||
|
return token, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AuthenticationManager) GetClientId() string {
|
||||||
|
return a.clientId
|
||||||
|
}
|
||||||
91
server/twitch/client.go
Normal file
91
server/twitch/client.go
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
package twitch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const twitchAPIURL = "https://api.twitch.tv/helix"
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
authenticationManager AuthenticationManager
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTwitchClient(am *AuthenticationManager) *Client {
|
||||||
|
return &Client{
|
||||||
|
authenticationManager: *am,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type streamResp struct {
|
||||||
|
Data []struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
UserName string `json:"user_name"`
|
||||||
|
Title string `json:"title"`
|
||||||
|
GameName string `json:"game_name"`
|
||||||
|
StartedAt string `json:"started_at"`
|
||||||
|
} `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) doRequest(endpoint string, params map[string]string) ([]byte, error) {
|
||||||
|
token, err := c.authenticationManager.GetAccessToken()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqURL := twitchAPIURL + endpoint
|
||||||
|
req, err := http.NewRequest("GET", reqURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
q := req.URL.Query()
|
||||||
|
for k, v := range params {
|
||||||
|
q.Set(k, v)
|
||||||
|
}
|
||||||
|
req.URL.RawQuery = q.Encode()
|
||||||
|
|
||||||
|
req.Header.Set("Client-Id", c.authenticationManager.GetClientId())
|
||||||
|
req.Header.Set("Authorization", "Bearer "+token.Token)
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
return io.ReadAll(resp.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) PollStream(channel string, liveChannel chan<- *StreamInfo) error {
|
||||||
|
body, err := c.doRequest("/streams", map[string]string{"user_login": channel})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var sr streamResp
|
||||||
|
if err := json.Unmarshal(body, &sr); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(sr.Data) == 0 {
|
||||||
|
liveChannel <- &StreamInfo{UserName: channel, IsLive: false}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s := sr.Data[0]
|
||||||
|
started, _ := time.Parse(time.RFC3339, s.StartedAt)
|
||||||
|
|
||||||
|
liveChannel <- &StreamInfo{
|
||||||
|
ID: s.ID,
|
||||||
|
UserName: s.UserName,
|
||||||
|
Title: s.Title,
|
||||||
|
GameName: s.GameName,
|
||||||
|
StartedAt: started,
|
||||||
|
IsLive: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
142
server/twitch/monitor.go
Normal file
142
server/twitch/monitor.go
Normal file
@@ -0,0 +1,142 @@
|
|||||||
|
package twitch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/gob"
|
||||||
|
"fmt"
|
||||||
|
"iter"
|
||||||
|
"log/slog"
|
||||||
|
"maps"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
||||||
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Monitor struct {
|
||||||
|
liveChannel chan *StreamInfo
|
||||||
|
monitored map[string]*Client
|
||||||
|
lastState map[string]bool
|
||||||
|
mu sync.RWMutex
|
||||||
|
authenticationManager *AuthenticationManager
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMonitor(authenticationManager *AuthenticationManager) *Monitor {
|
||||||
|
return &Monitor{
|
||||||
|
liveChannel: make(chan *StreamInfo, 16),
|
||||||
|
monitored: make(map[string]*Client),
|
||||||
|
lastState: make(map[string]bool),
|
||||||
|
authenticationManager: authenticationManager,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitor) Add(user string) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.monitored[user] = NewTwitchClient(m.authenticationManager)
|
||||||
|
slog.Info("added user to twitch monitor", slog.String("user", user))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler func(url string) error) {
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
m.mu.RLock()
|
||||||
|
for user, client := range m.monitored {
|
||||||
|
u := user
|
||||||
|
c := client
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := c.PollStream(u, m.liveChannel); err != nil {
|
||||||
|
slog.Error("polling failed", slog.String("user", u), slog.Any("err", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
case stream := <-m.liveChannel:
|
||||||
|
wasLive := m.lastState[stream.UserName]
|
||||||
|
if stream.IsLive && !wasLive {
|
||||||
|
slog.Info("stream went live", slog.String("user", stream.UserName))
|
||||||
|
if err := handler(fmt.Sprintf("https://www.twitch.tv/%s", stream.UserName)); err != nil {
|
||||||
|
slog.Error("handler failed", slog.String("user", stream.UserName), slog.Any("err", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.lastState[stream.UserName] = stream.IsLive
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
slog.Info("stopping twitch monitor")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitor) GetMonitoredUsers() iter.Seq[string] {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return maps.Keys(m.monitored)
|
||||||
|
}
|
||||||
|
|
||||||
|
func DEFAULT_DOWNLOAD_HANDLER(db *internal.MemoryDB, mq *internal.MessageQueue) func(url string) error {
|
||||||
|
return func(url string) error {
|
||||||
|
p := &internal.Process{
|
||||||
|
Url: url,
|
||||||
|
Livestream: true,
|
||||||
|
Params: []string{"--downloader", "ffmpeg", "--no-part"},
|
||||||
|
}
|
||||||
|
db.Set(p)
|
||||||
|
mq.Publish(p)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitor) Persist() error {
|
||||||
|
filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat")
|
||||||
|
|
||||||
|
f, err := os.Create(filename)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
enc := gob.NewEncoder(f)
|
||||||
|
users := make([]string, 0, len(m.monitored))
|
||||||
|
|
||||||
|
for user := range m.monitored {
|
||||||
|
users = append(users, user)
|
||||||
|
}
|
||||||
|
|
||||||
|
return enc.Encode(users)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitor) Restore() error {
|
||||||
|
filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat")
|
||||||
|
|
||||||
|
f, err := os.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
dec := gob.NewDecoder(f)
|
||||||
|
var users []string
|
||||||
|
if err := dec.Decode(&users); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.monitored = make(map[string]*Client)
|
||||||
|
for _, user := range users {
|
||||||
|
m.monitored[user] = NewTwitchClient(m.authenticationManager)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
40
server/twitch/rest.go
Normal file
40
server/twitch/rest.go
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
package twitch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"slices"
|
||||||
|
)
|
||||||
|
|
||||||
|
type addUserReq struct {
|
||||||
|
User string `json:"user"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func MonitorUserHandler(m *Monitor) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req addUserReq
|
||||||
|
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Add(req.User)
|
||||||
|
|
||||||
|
if err := json.NewEncoder(w).Encode("ok"); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetMonitoredUsers(m *Monitor) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
it := m.GetMonitoredUsers()
|
||||||
|
|
||||||
|
if err := json.NewEncoder(w).Encode(slices.Collect(it)); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
20
server/twitch/types.go
Normal file
20
server/twitch/types.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package twitch
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type StreamInfo struct {
|
||||||
|
ID string
|
||||||
|
UserName string
|
||||||
|
Title string
|
||||||
|
GameName string
|
||||||
|
StartedAt time.Time
|
||||||
|
IsLive bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type VodInfo struct {
|
||||||
|
ID string
|
||||||
|
Title string
|
||||||
|
URL string
|
||||||
|
Duration string
|
||||||
|
CreatedAt time.Time
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user