Compare commits
1 Commits
feat-twitc
...
feat-openi
| Author | SHA1 | Date | |
|---|---|---|---|
| 9ea000c912 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -29,4 +29,3 @@ frontend/.yarn/install-state.gz
|
|||||||
livestreams.dat
|
livestreams.dat
|
||||||
.vite/deps
|
.vite/deps
|
||||||
archive.txt
|
archive.txt
|
||||||
twitch-monitor.dat
|
|
||||||
|
|||||||
@@ -24,12 +24,11 @@ COPY --from=ui /usr/src/yt-dlp-webui/frontend /usr/src/yt-dlp-webui/frontend
|
|||||||
RUN CGO_ENABLED=0 GOOS=linux go build -o yt-dlp-webui
|
RUN CGO_ENABLED=0 GOOS=linux go build -o yt-dlp-webui
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|
||||||
# Runtime ---------------------------------------------------------------------
|
# dependencies ----------------------------------------------------------------
|
||||||
FROM python:3.13.2-alpine3.21
|
FROM alpine:edge
|
||||||
|
|
||||||
RUN apk update && \
|
RUN apk update && \
|
||||||
apk add ffmpeg ca-certificates curl wget gnutls --no-cache && \
|
apk add ffmpeg yt-dlp ca-certificates curl wget psmisc
|
||||||
pip install "yt-dlp[default,curl-cffi,mutagen,pycryptodomex,phantomjs,secretstorage]"
|
|
||||||
|
|
||||||
VOLUME /downloads /config
|
VOLUME /downloads /config
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ docker pull ghcr.io/marcopiovanello/yt-dlp-web-ui:latest
|
|||||||
## Community stuff
|
## Community stuff
|
||||||
Feel free to join :)
|
Feel free to join :)
|
||||||
|
|
||||||
[Discord](https://discord.gg/GZAX5FfGzE)
|
[](https://discord.gg/WRnVWr4y)
|
||||||
|
|
||||||
## Some screeshots
|
## Some screeshots
|
||||||

|

|
||||||
|
|||||||
@@ -121,18 +121,14 @@ export const appTitleState = atomWithStorage(
|
|||||||
export const serverAddressAndPortState = atom((get) => {
|
export const serverAddressAndPortState = atom((get) => {
|
||||||
if (get(servedFromReverseProxySubDirState)) {
|
if (get(servedFromReverseProxySubDirState)) {
|
||||||
return `${get(serverAddressState)}/${get(servedFromReverseProxySubDirState)}/`
|
return `${get(serverAddressState)}/${get(servedFromReverseProxySubDirState)}/`
|
||||||
.replaceAll('"', '') // XXX: atomWithStorage uses JSON.stringify to serialize
|
.replaceAll('"', '') // TODO: atomWithStorage put extra double quotes on strings
|
||||||
.replaceAll('//', '/') // which puts extra double quotes.
|
|
||||||
}
|
}
|
||||||
if (get(servedFromReverseProxyState)) {
|
if (get(servedFromReverseProxyState)) {
|
||||||
return `${get(serverAddressState)}`
|
return `${get(serverAddressState)}`
|
||||||
.replaceAll('"', '')
|
.replaceAll('"', '')
|
||||||
}
|
}
|
||||||
|
return `${get(serverAddressState)}:${get(serverPortState)}`
|
||||||
const sap = `${get(serverAddressState)}:${get(serverPortState)}`
|
|
||||||
.replaceAll('"', '')
|
.replaceAll('"', '')
|
||||||
|
|
||||||
return sap.endsWith('/') ? sap.slice(0, -1) : sap
|
|
||||||
})
|
})
|
||||||
|
|
||||||
export const serverURL = atom((get) =>
|
export const serverURL = atom((get) =>
|
||||||
@@ -141,16 +137,12 @@ export const serverURL = atom((get) =>
|
|||||||
|
|
||||||
export const rpcWebSocketEndpoint = atom((get) => {
|
export const rpcWebSocketEndpoint = atom((get) => {
|
||||||
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
|
const proto = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
|
||||||
const sap = get(serverAddressAndPortState)
|
return `${proto}//${get(serverAddressAndPortState)}/rpc/ws`
|
||||||
|
|
||||||
return `${proto}//${sap.endsWith('/') ? sap.slice(0, -1) : sap}/rpc/ws`
|
|
||||||
})
|
})
|
||||||
|
|
||||||
export const rpcHTTPEndpoint = atom((get) => {
|
export const rpcHTTPEndpoint = atom((get) => {
|
||||||
const proto = window.location.protocol
|
const proto = window.location.protocol
|
||||||
const sap = get(serverAddressAndPortState)
|
return `${proto}//${get(serverAddressAndPortState)}/rpc/http`
|
||||||
|
|
||||||
return `${proto}//${sap.endsWith('/') ? sap.slice(0, -1) : sap}/rpc/http`
|
|
||||||
})
|
})
|
||||||
|
|
||||||
export const serverSideCookiesState = atom<Promise<string>>(async (get) => await pipe(
|
export const serverSideCookiesState = atom<Promise<string>>(async (get) => await pipe(
|
||||||
|
|||||||
@@ -32,7 +32,6 @@ const HomeSpeedDial: React.FC<Props> = ({ onDownloadOpen, onEditorOpen }) => {
|
|||||||
ariaLabel="Home speed dial"
|
ariaLabel="Home speed dial"
|
||||||
sx={{ position: 'absolute', bottom: 64, right: 24 }}
|
sx={{ position: 'absolute', bottom: 64, right: 24 }}
|
||||||
icon={<SpeedDialIcon />}
|
icon={<SpeedDialIcon />}
|
||||||
onClick={onDownloadOpen}
|
|
||||||
>
|
>
|
||||||
<SpeedDialAction
|
<SpeedDialAction
|
||||||
icon={listView ? <ViewAgendaIcon /> : <FormatListBulleted />}
|
icon={listView ? <ViewAgendaIcon /> : <FormatListBulleted />}
|
||||||
|
|||||||
@@ -1,9 +1,6 @@
|
|||||||
import { tryCatch } from 'fp-ts/TaskEither'
|
import { tryCatch } from 'fp-ts/TaskEither'
|
||||||
import * as J from 'fp-ts/Json'
|
|
||||||
import * as E from 'fp-ts/Either'
|
|
||||||
import { pipe } from 'fp-ts/lib/function'
|
|
||||||
|
|
||||||
async function fetcher(url: string, opt?: RequestInit, controller?: AbortController): Promise<string> {
|
async function fetcher<T>(url: string, opt?: RequestInit): Promise<T> {
|
||||||
const jwt = localStorage.getItem('token')
|
const jwt = localStorage.getItem('token')
|
||||||
|
|
||||||
if (opt && !opt.headers) {
|
if (opt && !opt.headers) {
|
||||||
@@ -17,27 +14,17 @@ async function fetcher(url: string, opt?: RequestInit, controller?: AbortControl
|
|||||||
headers: {
|
headers: {
|
||||||
...opt?.headers,
|
...opt?.headers,
|
||||||
'X-Authentication': jwt ?? ''
|
'X-Authentication': jwt ?? ''
|
||||||
},
|
}
|
||||||
signal: controller?.signal
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
throw await res.text()
|
throw await res.text()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return res.json() as T
|
||||||
|
|
||||||
return res.text()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const ffetch = <T>(url: string, opt?: RequestInit, controller?: AbortController) => tryCatch(
|
export const ffetch = <T>(url: string, opt?: RequestInit) => tryCatch(
|
||||||
async () => pipe(
|
() => fetcher<T>(url, opt),
|
||||||
await fetcher(url, opt, controller),
|
|
||||||
J.parse,
|
|
||||||
E.match(
|
|
||||||
(l) => l as T,
|
|
||||||
(r) => r as T
|
|
||||||
)
|
|
||||||
),
|
|
||||||
(e) => `error while fetching: ${e}`
|
(e) => `error while fetching: ${e}`
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
@@ -32,11 +31,6 @@ type Config struct {
|
|||||||
OpenIdEmailWhitelist []string `yaml:"openid_email_whitelist"`
|
OpenIdEmailWhitelist []string `yaml:"openid_email_whitelist"`
|
||||||
FrontendPath string `yaml:"frontend_path"`
|
FrontendPath string `yaml:"frontend_path"`
|
||||||
AutoArchive bool `yaml:"auto_archive"`
|
AutoArchive bool `yaml:"auto_archive"`
|
||||||
Twitch struct {
|
|
||||||
ClientId string `yaml:"client_id"`
|
|
||||||
ClientSecret string `yaml:"client_secret"`
|
|
||||||
CheckInterval time.Duration `yaml:"check_interval"`
|
|
||||||
} `yaml:"twitch"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|||||||
@@ -100,7 +100,6 @@ func (p *Process) Start() {
|
|||||||
templateReplacer.Replace(downloadTemplate),
|
templateReplacer.Replace(downloadTemplate),
|
||||||
"--progress-template",
|
"--progress-template",
|
||||||
templateReplacer.Replace(postprocessTemplate),
|
templateReplacer.Replace(postprocessTemplate),
|
||||||
"--no-exec",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if user asked to manually override the output path...
|
// if user asked to manually override the output path...
|
||||||
|
|||||||
@@ -1,20 +0,0 @@
|
|||||||
package middlewares
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid"
|
|
||||||
)
|
|
||||||
|
|
||||||
func ApplyAuthenticationByConfig(next http.Handler) http.Handler {
|
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if config.Instance().RequireAuth {
|
|
||||||
Authenticated(next)
|
|
||||||
}
|
|
||||||
if config.Instance().UseOpenId {
|
|
||||||
openid.Middleware(next)
|
|
||||||
}
|
|
||||||
next.ServeHTTP(w, r)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
@@ -34,7 +34,6 @@ import (
|
|||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/twitch"
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user"
|
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user"
|
||||||
|
|
||||||
_ "modernc.org/sqlite"
|
_ "modernc.org/sqlite"
|
||||||
@@ -52,7 +51,6 @@ type serverConfig struct {
|
|||||||
db *sql.DB
|
db *sql.DB
|
||||||
mq *internal.MessageQueue
|
mq *internal.MessageQueue
|
||||||
lm *livestream.Monitor
|
lm *livestream.Monitor
|
||||||
tm *twitch.Monitor
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: change scope
|
// TODO: change scope
|
||||||
@@ -117,33 +115,17 @@ func RunBlocking(rc *RunConfig) {
|
|||||||
go lm.Schedule()
|
go lm.Schedule()
|
||||||
go lm.Restore()
|
go lm.Restore()
|
||||||
|
|
||||||
tm := twitch.NewMonitor(
|
srv := newServer(serverConfig{
|
||||||
twitch.NewAuthenticationManager(
|
|
||||||
config.Instance().Twitch.ClientId,
|
|
||||||
config.Instance().Twitch.ClientSecret,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
go tm.Monitor(
|
|
||||||
context.TODO(),
|
|
||||||
config.Instance().Twitch.CheckInterval,
|
|
||||||
twitch.DEFAULT_DOWNLOAD_HANDLER(mdb, mq),
|
|
||||||
)
|
|
||||||
go tm.Restore()
|
|
||||||
|
|
||||||
scfg := serverConfig{
|
|
||||||
frontend: rc.App,
|
frontend: rc.App,
|
||||||
swagger: rc.Swagger,
|
swagger: rc.Swagger,
|
||||||
mdb: mdb,
|
mdb: mdb,
|
||||||
mq: mq,
|
mq: mq,
|
||||||
db: db,
|
db: db,
|
||||||
lm: lm,
|
lm: lm,
|
||||||
tm: tm,
|
})
|
||||||
}
|
|
||||||
|
|
||||||
srv := newServer(scfg)
|
go gracefulShutdown(srv, mdb)
|
||||||
|
go autoPersist(time.Minute*5, mdb, lm)
|
||||||
go gracefulShutdown(srv, &scfg)
|
|
||||||
go autoPersist(time.Minute*5, mdb, lm, tm)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
network = "tcp"
|
network = "tcp"
|
||||||
@@ -206,7 +188,12 @@ func newServer(c serverConfig) *http.Server {
|
|||||||
|
|
||||||
// Filebrowser routes
|
// Filebrowser routes
|
||||||
r.Route("/filebrowser", func(r chi.Router) {
|
r.Route("/filebrowser", func(r chi.Router) {
|
||||||
r.Use(middlewares.ApplyAuthenticationByConfig)
|
if config.Instance().RequireAuth {
|
||||||
|
r.Use(middlewares.Authenticated)
|
||||||
|
}
|
||||||
|
if config.Instance().UseOpenId {
|
||||||
|
r.Use(openid.Middleware)
|
||||||
|
}
|
||||||
r.Post("/downloaded", filebrowser.ListDownloaded)
|
r.Post("/downloaded", filebrowser.ListDownloaded)
|
||||||
r.Post("/delete", filebrowser.DeleteFile)
|
r.Post("/delete", filebrowser.DeleteFile)
|
||||||
r.Get("/d/{id}", filebrowser.DownloadFile)
|
r.Get("/d/{id}", filebrowser.DownloadFile)
|
||||||
@@ -248,17 +235,10 @@ func newServer(c serverConfig) *http.Server {
|
|||||||
// Subscriptions
|
// Subscriptions
|
||||||
r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter())
|
r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter())
|
||||||
|
|
||||||
// Twitch
|
|
||||||
r.Route("/twitch", func(r chi.Router) {
|
|
||||||
r.Use(middlewares.ApplyAuthenticationByConfig)
|
|
||||||
r.Get("/all", twitch.GetMonitoredUsers(c.tm))
|
|
||||||
r.Post("/add", twitch.MonitorUserHandler(c.tm))
|
|
||||||
})
|
|
||||||
|
|
||||||
return &http.Server{Handler: r}
|
return &http.Server{Handler: r}
|
||||||
}
|
}
|
||||||
|
|
||||||
func gracefulShutdown(srv *http.Server, cfg *serverConfig) {
|
func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) {
|
||||||
ctx, stop := signal.NotifyContext(context.Background(),
|
ctx, stop := signal.NotifyContext(context.Background(),
|
||||||
os.Interrupt,
|
os.Interrupt,
|
||||||
syscall.SIGTERM,
|
syscall.SIGTERM,
|
||||||
@@ -270,9 +250,7 @@ func gracefulShutdown(srv *http.Server, cfg *serverConfig) {
|
|||||||
slog.Info("shutdown signal received")
|
slog.Info("shutdown signal received")
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
cfg.mdb.Persist()
|
db.Persist()
|
||||||
cfg.lm.Persist()
|
|
||||||
cfg.tm.Persist()
|
|
||||||
|
|
||||||
stop()
|
stop()
|
||||||
srv.Shutdown(context.Background())
|
srv.Shutdown(context.Background())
|
||||||
@@ -280,14 +258,8 @@ func gracefulShutdown(srv *http.Server, cfg *serverConfig) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func autoPersist(
|
func autoPersist(d time.Duration, db *internal.MemoryDB, lm *livestream.Monitor) {
|
||||||
d time.Duration,
|
|
||||||
db *internal.MemoryDB,
|
|
||||||
lm *livestream.Monitor,
|
|
||||||
tm *twitch.Monitor,
|
|
||||||
) {
|
|
||||||
for {
|
for {
|
||||||
time.Sleep(d)
|
|
||||||
if err := db.Persist(); err != nil {
|
if err := db.Persist(); err != nil {
|
||||||
slog.Warn("failed to persisted session", slog.Any("err", err))
|
slog.Warn("failed to persisted session", slog.Any("err", err))
|
||||||
}
|
}
|
||||||
@@ -295,10 +267,7 @@ func autoPersist(
|
|||||||
slog.Warn(
|
slog.Warn(
|
||||||
"failed to persisted livestreams monitor session", slog.Any("err", err.Error()))
|
"failed to persisted livestreams monitor session", slog.Any("err", err.Error()))
|
||||||
}
|
}
|
||||||
if err := tm.Persist(); err != nil {
|
|
||||||
slog.Warn(
|
|
||||||
"failed to persisted twitch monitor session", slog.Any("err", err.Error()))
|
|
||||||
}
|
|
||||||
slog.Debug("sucessfully persisted session")
|
slog.Debug("sucessfully persisted session")
|
||||||
|
time.Sleep(d)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,75 +0,0 @@
|
|||||||
package twitch
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const authURL = "https://id.twitch.tv/oauth2/token"
|
|
||||||
|
|
||||||
type AuthResponse struct {
|
|
||||||
AccessToken string `json:"access_token"`
|
|
||||||
ExpiresIn int `json:"expires_in"`
|
|
||||||
TokenType string `json:"token_type"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type AccessToken struct {
|
|
||||||
Token string
|
|
||||||
Expiry time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type AuthenticationManager struct {
|
|
||||||
clientId string
|
|
||||||
clientSecret string
|
|
||||||
accesToken *AccessToken
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAuthenticationManager(clientId, clientSecret string) *AuthenticationManager {
|
|
||||||
return &AuthenticationManager{
|
|
||||||
clientId: clientId,
|
|
||||||
clientSecret: clientSecret,
|
|
||||||
accesToken: &AccessToken{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *AuthenticationManager) GetAccessToken() (*AccessToken, error) {
|
|
||||||
if a.accesToken != nil && a.accesToken.Token != "" && a.accesToken.Expiry.After(time.Now()) {
|
|
||||||
return a.accesToken, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
data := url.Values{}
|
|
||||||
data.Set("client_id", a.clientId)
|
|
||||||
data.Set("client_secret", a.clientSecret)
|
|
||||||
data.Set("grant_type", "client_credentials")
|
|
||||||
|
|
||||||
resp, err := http.PostForm(authURL, data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("errore richiesta token: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return nil, fmt.Errorf("status non OK: %s", resp.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
var auth AuthResponse
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&auth); err != nil {
|
|
||||||
return nil, fmt.Errorf("errore decoding JSON: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
token := &AccessToken{
|
|
||||||
Token: auth.AccessToken,
|
|
||||||
Expiry: time.Now().Add(time.Duration(auth.ExpiresIn) * time.Second),
|
|
||||||
}
|
|
||||||
|
|
||||||
a.accesToken = token
|
|
||||||
|
|
||||||
return token, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *AuthenticationManager) GetClientId() string {
|
|
||||||
return a.clientId
|
|
||||||
}
|
|
||||||
@@ -1,91 +0,0 @@
|
|||||||
package twitch
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const twitchAPIURL = "https://api.twitch.tv/helix"
|
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
authenticationManager AuthenticationManager
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTwitchClient(am *AuthenticationManager) *Client {
|
|
||||||
return &Client{
|
|
||||||
authenticationManager: *am,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type streamResp struct {
|
|
||||||
Data []struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
UserName string `json:"user_name"`
|
|
||||||
Title string `json:"title"`
|
|
||||||
GameName string `json:"game_name"`
|
|
||||||
StartedAt string `json:"started_at"`
|
|
||||||
} `json:"data"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) doRequest(endpoint string, params map[string]string) ([]byte, error) {
|
|
||||||
token, err := c.authenticationManager.GetAccessToken()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
reqURL := twitchAPIURL + endpoint
|
|
||||||
req, err := http.NewRequest("GET", reqURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
q := req.URL.Query()
|
|
||||||
for k, v := range params {
|
|
||||||
q.Set(k, v)
|
|
||||||
}
|
|
||||||
req.URL.RawQuery = q.Encode()
|
|
||||||
|
|
||||||
req.Header.Set("Client-Id", c.authenticationManager.GetClientId())
|
|
||||||
req.Header.Set("Authorization", "Bearer "+token.Token)
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
return io.ReadAll(resp.Body)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) PollStream(channel string, liveChannel chan<- *StreamInfo) error {
|
|
||||||
body, err := c.doRequest("/streams", map[string]string{"user_login": channel})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var sr streamResp
|
|
||||||
if err := json.Unmarshal(body, &sr); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(sr.Data) == 0 {
|
|
||||||
liveChannel <- &StreamInfo{UserName: channel, IsLive: false}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
s := sr.Data[0]
|
|
||||||
started, _ := time.Parse(time.RFC3339, s.StartedAt)
|
|
||||||
|
|
||||||
liveChannel <- &StreamInfo{
|
|
||||||
ID: s.ID,
|
|
||||||
UserName: s.UserName,
|
|
||||||
Title: s.Title,
|
|
||||||
GameName: s.GameName,
|
|
||||||
StartedAt: started,
|
|
||||||
IsLive: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -1,142 +0,0 @@
|
|||||||
package twitch
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/gob"
|
|
||||||
"fmt"
|
|
||||||
"iter"
|
|
||||||
"log/slog"
|
|
||||||
"maps"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
|
|
||||||
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Monitor struct {
|
|
||||||
liveChannel chan *StreamInfo
|
|
||||||
monitored map[string]*Client
|
|
||||||
lastState map[string]bool
|
|
||||||
mu sync.RWMutex
|
|
||||||
authenticationManager *AuthenticationManager
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMonitor(authenticationManager *AuthenticationManager) *Monitor {
|
|
||||||
return &Monitor{
|
|
||||||
liveChannel: make(chan *StreamInfo, 16),
|
|
||||||
monitored: make(map[string]*Client),
|
|
||||||
lastState: make(map[string]bool),
|
|
||||||
authenticationManager: authenticationManager,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Monitor) Add(user string) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
m.monitored[user] = NewTwitchClient(m.authenticationManager)
|
|
||||||
slog.Info("added user to twitch monitor", slog.String("user", user))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler func(url string) error) {
|
|
||||||
ticker := time.NewTicker(interval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
m.mu.RLock()
|
|
||||||
for user, client := range m.monitored {
|
|
||||||
u := user
|
|
||||||
c := client
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
if err := c.PollStream(u, m.liveChannel); err != nil {
|
|
||||||
slog.Error("polling failed", slog.String("user", u), slog.Any("err", err))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
m.mu.RUnlock()
|
|
||||||
|
|
||||||
case stream := <-m.liveChannel:
|
|
||||||
wasLive := m.lastState[stream.UserName]
|
|
||||||
if stream.IsLive && !wasLive {
|
|
||||||
slog.Info("stream went live", slog.String("user", stream.UserName))
|
|
||||||
if err := handler(fmt.Sprintf("https://www.twitch.tv/%s", stream.UserName)); err != nil {
|
|
||||||
slog.Error("handler failed", slog.String("user", stream.UserName), slog.Any("err", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
m.lastState[stream.UserName] = stream.IsLive
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
|
||||||
slog.Info("stopping twitch monitor")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Monitor) GetMonitoredUsers() iter.Seq[string] {
|
|
||||||
m.mu.RLock()
|
|
||||||
defer m.mu.RUnlock()
|
|
||||||
return maps.Keys(m.monitored)
|
|
||||||
}
|
|
||||||
|
|
||||||
func DEFAULT_DOWNLOAD_HANDLER(db *internal.MemoryDB, mq *internal.MessageQueue) func(url string) error {
|
|
||||||
return func(url string) error {
|
|
||||||
p := &internal.Process{
|
|
||||||
Url: url,
|
|
||||||
Livestream: true,
|
|
||||||
Params: []string{"--downloader", "ffmpeg", "--no-part"},
|
|
||||||
}
|
|
||||||
db.Set(p)
|
|
||||||
mq.Publish(p)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Monitor) Persist() error {
|
|
||||||
filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat")
|
|
||||||
|
|
||||||
f, err := os.Create(filename)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
enc := gob.NewEncoder(f)
|
|
||||||
users := make([]string, 0, len(m.monitored))
|
|
||||||
|
|
||||||
for user := range m.monitored {
|
|
||||||
users = append(users, user)
|
|
||||||
}
|
|
||||||
|
|
||||||
return enc.Encode(users)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Monitor) Restore() error {
|
|
||||||
filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat")
|
|
||||||
|
|
||||||
f, err := os.Open(filename)
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
dec := gob.NewDecoder(f)
|
|
||||||
var users []string
|
|
||||||
if err := dec.Decode(&users); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
m.monitored = make(map[string]*Client)
|
|
||||||
for _, user := range users {
|
|
||||||
m.monitored[user] = NewTwitchClient(m.authenticationManager)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -1,40 +0,0 @@
|
|||||||
package twitch
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
|
||||||
"slices"
|
|
||||||
)
|
|
||||||
|
|
||||||
type addUserReq struct {
|
|
||||||
User string `json:"user"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func MonitorUserHandler(m *Monitor) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
var req addUserReq
|
|
||||||
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
m.Add(req.User)
|
|
||||||
|
|
||||||
if err := json.NewEncoder(w).Encode("ok"); err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetMonitoredUsers(m *Monitor) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
it := m.GetMonitoredUsers()
|
|
||||||
|
|
||||||
if err := json.NewEncoder(w).Encode(slices.Collect(it)); err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,20 +0,0 @@
|
|||||||
package twitch
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
|
|
||||||
type StreamInfo struct {
|
|
||||||
ID string
|
|
||||||
UserName string
|
|
||||||
Title string
|
|
||||||
GameName string
|
|
||||||
StartedAt time.Time
|
|
||||||
IsLive bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type VodInfo struct {
|
|
||||||
ID string
|
|
||||||
Title string
|
|
||||||
URL string
|
|
||||||
Duration string
|
|
||||||
CreatedAt time.Time
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user