diff --git a/frontend/src/atoms/downloadTemplate.ts b/frontend/src/atoms/downloadTemplate.ts index 10c206c..346931f 100644 --- a/frontend/src/atoms/downloadTemplate.ts +++ b/frontend/src/atoms/downloadTemplate.ts @@ -1,16 +1,15 @@ -import { atom, selector } from 'recoil' -import { CustomTemplate } from '../types' -import { ffetch } from '../lib/httpClient' -import { serverURL } from './settings' -import { pipe } from 'fp-ts/lib/function' import { getOrElse } from 'fp-ts/lib/Either' +import { pipe } from 'fp-ts/lib/function' +import { atom, selector } from 'recoil' +import { ffetch } from '../lib/httpClient' +import { CustomTemplate } from '../types' +import { serverSideCookiesState, serverURL } from './settings' -export const cookiesTemplateState = atom({ +export const cookiesTemplateState = selector({ key: 'cookiesTemplateState', - default: localStorage.getItem('cookiesTemplate') ?? '', - effects: [ - ({ onSet }) => onSet(e => localStorage.setItem('cookiesTemplate', e)) - ] + get: ({ get }) => get(serverSideCookiesState) + ? '--cookies=cookies.txt' + : '' }) export const customArgsState = atom({ diff --git a/frontend/src/atoms/settings.ts b/frontend/src/atoms/settings.ts index aea61e2..fd6eb31 100644 --- a/frontend/src/atoms/settings.ts +++ b/frontend/src/atoms/settings.ts @@ -1,4 +1,7 @@ +import { pipe } from 'fp-ts/lib/function' +import { matchW } from 'fp-ts/lib/TaskEither' import { atom, selector } from 'recoil' +import { ffetch } from '../lib/httpClient' import { prefersDarkMode } from '../utils' export const languages = [ @@ -187,13 +190,15 @@ export const rpcHTTPEndpoint = selector({ } }) -export const cookiesState = atom({ - key: 'cookiesState', - default: localStorage.getItem('yt-dlp-cookies') ?? '', - effects: [ - ({ onSet }) => - onSet(c => localStorage.setItem('yt-dlp-cookies', c)) - ] +export const serverSideCookiesState = selector({ + key: 'serverSideCookiesState', + get: async ({ get }) => await pipe( + ffetch>(`${get(serverURL)}/api/v1/cookies`), + matchW( + () => '', + (r) => r.cookies + ) + )() }) const themeSelector = selector({ diff --git a/frontend/src/atoms/status.ts b/frontend/src/atoms/status.ts index 5d24fef..2951203 100644 --- a/frontend/src/atoms/status.ts +++ b/frontend/src/atoms/status.ts @@ -1,5 +1,10 @@ +import { pipe } from 'fp-ts/lib/function' +import { of } from 'fp-ts/lib/Task' +import { getOrElse } from 'fp-ts/lib/TaskEither' import { atom, selector } from 'recoil' +import { ffetch } from '../lib/httpClient' import { rpcClientState } from './rpc' +import { serverURL } from './settings' export const connectedState = atom({ key: 'connectedState', @@ -22,4 +27,15 @@ export const availableDownloadPathsState = selector({ .catch(() => ({ result: [] })) return res.result } +}) + +export const ytdlpVersionState = selector({ + key: 'ytdlpVersionState', + get: async ({ get }) => await pipe( + ffetch(`${get(serverURL)}/api/v1/version`), + getOrElse(() => pipe( + 'unknown version', + of + )), + )() }) \ No newline at end of file diff --git a/frontend/src/components/CookiesTextField.tsx b/frontend/src/components/CookiesTextField.tsx index 4d2d148..40781f4 100644 --- a/frontend/src/components/CookiesTextField.tsx +++ b/frontend/src/components/CookiesTextField.tsx @@ -1,22 +1,20 @@ -import { TextField } from '@mui/material' +import { Button, TextField } from '@mui/material' import * as A from 'fp-ts/Array' import * as E from 'fp-ts/Either' import * as O from 'fp-ts/Option' +import { matchW } from 'fp-ts/lib/TaskEither' import { pipe } from 'fp-ts/lib/function' import { useMemo } from 'react' -import { useRecoilState, useRecoilValue } from 'recoil' +import { useRecoilValue } from 'recoil' import { Subject, debounceTime, distinctUntilChanged } from 'rxjs' -import { cookiesTemplateState } from '../atoms/downloadTemplate' -import { cookiesState, serverURL } from '../atoms/settings' +import { serverSideCookiesState, serverURL } from '../atoms/settings' import { useSubscription } from '../hooks/observable' import { useToast } from '../hooks/toast' import { ffetch } from '../lib/httpClient' const validateCookie = (cookie: string) => pipe( cookie, - cookie => cookie.replace(/\s\s+/g, ' '), - cookie => cookie.replaceAll('\t', ' '), - cookie => cookie.split(' '), + cookie => cookie.split('\t'), E.of, E.flatMap( E.fromPredicate( @@ -68,13 +66,19 @@ const validateCookie = (cookie: string) => pipe( ), ) +const noopValidator = (s: string): E.Either => pipe( + s, + s => s.split('\t'), + E.of +) + +const isCommentOrNewLine = (s: string) => s === '' || s.startsWith('\n') || s.startsWith('#') + const CookiesTextField: React.FC = () => { const serverAddr = useRecoilValue(serverURL) - const [, setCookies] = useRecoilState(cookiesTemplateState) - const [savedCookies, setSavedCookies] = useRecoilState(cookiesState) + const savedCookies = useRecoilValue(serverSideCookiesState) const { pushMessage } = useToast() - const flag = '--cookies=cookies.txt' const cookies$ = useMemo(() => new Subject(), []) @@ -86,28 +90,41 @@ const CookiesTextField: React.FC = () => { }) })() + const deleteCookies = () => pipe( + ffetch(`${serverAddr}/api/v1/cookies`, { + method: 'DELETE', + }), + matchW( + (l) => pushMessage(l, 'error'), + (_) => { + pushMessage('Deleted cookies', 'success') + pushMessage(`Reload the page to apply the changes`, 'info') + } + ) + )() + const validateNetscapeCookies = (cookies: string) => pipe( cookies, cookies => cookies.split('\n'), - cookies => cookies.filter(f => !f.startsWith('\n')), // empty lines - cookies => cookies.filter(f => !f.startsWith('# ')), // comments - cookies => cookies.filter(Boolean), // empty lines - A.map(validateCookie), - A.mapWithIndex((i, either) => pipe( + A.map(c => isCommentOrNewLine(c) ? noopValidator(c) : validateCookie(c)), // validate line + A.mapWithIndex((i, either) => pipe( // detect errors and return the either either, - E.matchW( - (l) => pushMessage(`Error in line ${i + 1}: ${l}`, 'warning'), - () => E.isRight(either) + E.match( + (l) => { + pushMessage(`Error in line ${i + 1}: ${l}`, 'warning') + return either + }, + (_) => either ), )), - A.filter(Boolean), - A.match( - () => false, - (c) => { - pushMessage(`Valid ${c.length} Netscape cookies`, 'info') - return true - } - ) + A.filter(c => E.isRight(c)), // filter the line who didn't pass the validation + A.map(E.getOrElse(() => new Array())), // cast the array of eithers to an array of tokens + A.filter(f => f.length > 0), // filter the empty tokens + A.map(f => f.join('\t')), // join the tokens in a TAB separated string + A.reduce('', (c, n) => `${c}${n}\n`), // reduce all to a single string separated by \n + parsed => parsed.length > 0 // if nothing has passed the validation return none + ? O.some(parsed) + : O.none ) useSubscription( @@ -117,22 +134,17 @@ const CookiesTextField: React.FC = () => { ), (cookies) => pipe( cookies, - cookies => { - setSavedCookies(cookies) - return cookies - }, validateNetscapeCookies, - O.fromPredicate(f => f === true), O.match( - () => setCookies(''), - async () => { + () => pushMessage('No valid cookies', 'warning'), + async (some) => { pipe( - await submitCookies(cookies), + await submitCookies(some.trimEnd()), E.match( (l) => pushMessage(`${l}`, 'error'), () => { - pushMessage(`Saved Netscape cookies`, 'success') - setCookies(flag) + pushMessage(`Saved ${some.split('\n').length} Netscape cookies`, 'success') + pushMessage('Reload the page to apply the changes', 'info') } ) ) @@ -142,15 +154,18 @@ const CookiesTextField: React.FC = () => { ) return ( - cookies$.next(e.currentTarget.value)} - /> + <> + cookies$.next(e.currentTarget.value)} + /> + + ) } diff --git a/frontend/src/components/Footer.tsx b/frontend/src/components/Footer.tsx index bd0a8e5..7b52c47 100644 --- a/frontend/src/components/Footer.tsx +++ b/frontend/src/components/Footer.tsx @@ -37,7 +37,9 @@ const Footer: React.FC = () => {
{/* TODO: make it dynamic */} - + + +
{ - const serverAddr = useRecoilValue(serverURL) - - const [version, setVersion] = useState('') - const { pushMessage } = useToast() - - const fetchVersion = async () => { - const res = await fetch(`${serverAddr}/api/v1/version`, { - headers: { - 'X-Authentication': localStorage.getItem('token') ?? '' - } - }) - - if (!res.ok) { - return pushMessage(await res.text(), 'error') - } - - setVersion(await res.json()) - } - - useEffect(() => { - fetchVersion() - }, []) + const version = useRecoilValue(ytdlpVersionState) return ( version diff --git a/frontend/src/lib/rpcClient.ts b/frontend/src/lib/rpcClient.ts index 3c0c90e..a5d00dc 100644 --- a/frontend/src/lib/rpcClient.ts +++ b/frontend/src/lib/rpcClient.ts @@ -82,7 +82,9 @@ export class RPCClient { : '' const sanitizedArgs = this.argsSanitizer( - req.args.replace('-o', '').replace(rename, '') + req.args + .replace('-o', '') + .replace(rename, '') ) if (req.playlist) { @@ -177,14 +179,14 @@ export class RPCClient { } public killLivestream(url: string) { - return this.sendHTTP({ + return this.sendHTTP({ method: 'Service.KillLivestream', params: [url] }) } public killAllLivestream() { - return this.sendHTTP({ + return this.sendHTTP({ method: 'Service.KillAllLivestream', params: [] }) diff --git a/frontend/src/views/Settings.tsx b/frontend/src/views/Settings.tsx index 695249e..269f393 100644 --- a/frontend/src/views/Settings.tsx +++ b/frontend/src/views/Settings.tsx @@ -18,7 +18,7 @@ import { Typography, capitalize } from '@mui/material' -import { useEffect, useMemo, useState } from 'react' +import { Suspense, useEffect, useMemo, useState } from 'react' import { useRecoilState } from 'recoil' import { Subject, @@ -347,7 +347,9 @@ export default function Settings() { Cookies - + + + diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go index 4ae100b..62f7be8 100644 --- a/server/internal/message_queue.go +++ b/server/internal/message_queue.go @@ -63,6 +63,10 @@ func (m *MessageQueue) downloadConsumer() { ) if p.Progress.Status != StatusCompleted { + slog.Info("started process", + slog.String("bus", queueName), + slog.String("id", p.getShortId()), + ) if p.Livestream { // livestreams have higher priorty and they ignore the semaphore go p.Start() @@ -70,11 +74,6 @@ func (m *MessageQueue) downloadConsumer() { p.Start() } } - - slog.Info("started process", - slog.String("bus", queueName), - slog.String("id", p.getShortId()), - ) }, false) } diff --git a/server/internal/process.go b/server/internal/process.go index 25f96a7..1426df6 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -3,6 +3,7 @@ package internal import ( "bufio" "bytes" + "context" "encoding/json" "errors" "fmt" @@ -21,7 +22,6 @@ import ( "github.com/marcopeocchi/yt-dlp-web-ui/server/cli" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" - "github.com/marcopeocchi/yt-dlp-web-ui/server/rx" ) const template = `download: @@ -36,7 +36,6 @@ const ( StatusDownloading StatusCompleted StatusErrored - StatusLivestream ) // Process descriptor @@ -103,81 +102,102 @@ func (p *Process) Start() { params := append(baseParams, p.Params...) - // ----------------- main block ----------------- // + slog.Info("requesting download", slog.String("url", p.Url), slog.Any("params", params)) + cmd := exec.Command(config.Instance().DownloaderPath, params...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - r, err := cmd.StdoutPipe() + stdout, err := cmd.StdoutPipe() if err != nil { - slog.Error( - "failed to connect to stdout", - slog.String("err", err.Error()), - ) + slog.Error("failed to connect to stdout", slog.Any("err", err)) + panic(err) + } + + stderr, err := cmd.StderrPipe() + if err != nil { + slog.Error("failed to connect to stdout", slog.Any("err", err)) panic(err) } if err := cmd.Start(); err != nil { - slog.Error( - "failed to start yt-dlp process", - slog.String("err", err.Error()), - ) + slog.Error("failed to start yt-dlp process", slog.Any("err", err)) panic(err) } p.proc = cmd.Process - // --------------- progress block --------------- // - var ( - sourceChan = make(chan []byte) - doneChan = make(chan struct{}) - ) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + stdout.Close() + p.Complete() + cancel() + }() - // spawn a goroutine that does the dirty job of parsing the stdout - // filling the channel with as many stdout line as yt-dlp produces (producer) + logs := make(chan []byte) + go produceLogs(stdout, logs) + go p.consumeLogs(ctx, logs) + + go p.detectYtDlpErrors(stderr) + + cmd.Wait() +} + +func produceLogs(r io.Reader, logs chan<- []byte) { go func() { - scan := bufio.NewScanner(r) + scanner := bufio.NewScanner(r) - defer func() { - r.Close() - p.Complete() - - doneChan <- struct{}{} - - close(sourceChan) - close(doneChan) - }() - - for scan.Scan() { - sourceChan <- scan.Bytes() + for scanner.Scan() { + logs <- scanner.Bytes() } }() +} - // Slows down the unmarshal operation to every 500ms - go func() { - rx.Sample(time.Millisecond*500, sourceChan, doneChan, func(event []byte) { - var progress ProgressTemplate - - if err := json.Unmarshal(event, &progress); err != nil { - return - } - - p.Progress = DownloadProgress{ - Status: StatusDownloading, - Percentage: progress.Percentage, - Speed: progress.Speed, - ETA: progress.Eta, - } - - slog.Info("progress", +func (p *Process) consumeLogs(ctx context.Context, logs <-chan []byte) { + for { + select { + case <-ctx.Done(): + slog.Info("detaching from yt-dlp stdout", slog.String("id", p.getShortId()), slog.String("url", p.Url), - slog.String("percentage", progress.Percentage), ) - }) - }() + return + case entry := <-logs: + p.parseLogEntry(entry) + } + } +} - // ------------- end progress block ------------- // - cmd.Wait() +func (p *Process) parseLogEntry(entry []byte) { + var progress ProgressTemplate + + if err := json.Unmarshal(entry, &progress); err != nil { + return + } + + p.Progress = DownloadProgress{ + Status: StatusDownloading, + Percentage: progress.Percentage, + Speed: progress.Speed, + ETA: progress.Eta, + } + + slog.Info("progress", + slog.String("id", p.getShortId()), + slog.String("url", p.Url), + slog.String("percentage", progress.Percentage), + ) +} + +func (p *Process) detectYtDlpErrors(r io.Reader) { + scanner := bufio.NewScanner(r) + + for scanner.Scan() { + slog.Error("yt-dlp process error", + slog.String("id", p.getShortId()), + slog.String("url", p.Url), + slog.String("err", scanner.Text()), + ) + } } // Keep process in the memoryDB but marks it as complete @@ -222,6 +242,7 @@ func (p *Process) Kill() error { } // Returns the available format for this URL +// // TODO: Move out from process.go func (p *Process) GetFormats() (DownloadFormats, error) { cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J") diff --git a/server/rest/container.go b/server/rest/container.go index 07bf32c..c61ecc6 100644 --- a/server/rest/container.go +++ b/server/rest/container.go @@ -28,7 +28,9 @@ func ApplyRouter(args *ContainerArgs) func(chi.Router) { r.Post("/exec", h.Exec()) r.Get("/running", h.Running()) r.Get("/version", h.GetVersion()) + r.Get("/cookies", h.GetCookies()) r.Post("/cookies", h.SetCookies()) + r.Delete("/cookies", h.DeleteCookies()) r.Post("/template", h.AddTemplate()) r.Get("/template/all", h.GetTemplates()) r.Delete("/template/{id}", h.DeleteTemplate()) diff --git a/server/rest/handlers.go b/server/rest/handlers.go index 5474953..deeeecb 100644 --- a/server/rest/handlers.go +++ b/server/rest/handlers.go @@ -60,6 +60,27 @@ func (h *Handler) Running() http.HandlerFunc { } } +func (h *Handler) GetCookies() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + cookies, err := h.service.GetCookies(r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + res := &internal.SetCookiesRequest{ + Cookies: string(cookies), + } + + if err := json.NewEncoder(w).Encode(res); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + func (h *Handler) SetCookies() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() @@ -87,6 +108,23 @@ func (h *Handler) SetCookies() http.HandlerFunc { } } +func (h *Handler) DeleteCookies() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + err := h.service.SetCookies(r.Context(), "") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + err = json.NewEncoder(w).Encode("ok") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } +} + func (h *Handler) AddTemplate() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() diff --git a/server/rest/service.go b/server/rest/service.go index 742485f..b8b3cc8 100644 --- a/server/rest/service.go +++ b/server/rest/service.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "io" "os" "os/exec" "time" @@ -44,6 +45,22 @@ func (s *Service) Running(ctx context.Context) (*[]internal.ProcessResponse, err } } +func (s *Service) GetCookies(ctx context.Context) ([]byte, error) { + fd, err := os.Open("cookies.txt") + if err != nil { + return nil, err + } + + defer fd.Close() + + cookies, err := io.ReadAll(fd) + if err != nil { + return nil, err + } + + return cookies, nil +} + func (s *Service) SetCookies(ctx context.Context, cookies string) error { fd, err := os.Create("cookies.txt") if err != nil {