diff --git a/.gitignore b/.gitignore index f87bef8..167234e 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,5 @@ frontend/.pnp.loader.mjs frontend/.yarn/install-state.gz .db.lock livestreams.dat -.vite/deps \ No newline at end of file +.vite/deps +archive.txt diff --git a/frontend/src/Layout.tsx b/frontend/src/Layout.tsx index a130239..0d136bc 100644 --- a/frontend/src/Layout.tsx +++ b/frontend/src/Layout.tsx @@ -6,6 +6,7 @@ import LiveTvIcon from '@mui/icons-material/LiveTv' import Menu from '@mui/icons-material/Menu' import SettingsIcon from '@mui/icons-material/Settings' import TerminalIcon from '@mui/icons-material/Terminal' +import UpdateIcon from '@mui/icons-material/Update' import { Box, createTheme } from '@mui/material' import CssBaseline from '@mui/material/CssBaseline' import Divider from '@mui/material/Divider' @@ -140,6 +141,19 @@ export default function Layout() { + + + + + + + + + + <SvgIcon sx={{ fontSize: '200px' }}> + <UpdateIcon /> + </SvgIcon> + + + {i18n.t('subscriptionsEmptyLabel')} + + + ) +} \ No newline at end of file diff --git a/frontend/src/components/subscriptions/SubscriptionsDialog.tsx b/frontend/src/components/subscriptions/SubscriptionsDialog.tsx new file mode 100644 index 0000000..88ee34f --- /dev/null +++ b/frontend/src/components/subscriptions/SubscriptionsDialog.tsx @@ -0,0 +1,164 @@ +import CloseIcon from '@mui/icons-material/Close' +import { + Alert, + AppBar, + Box, + Button, + Container, + Dialog, + Grid, + IconButton, + Paper, + Slide, + TextField, + Toolbar, + Typography +} from '@mui/material' +import { TransitionProps } from '@mui/material/transitions' +import { matchW } from 'fp-ts/lib/Either' +import { pipe } from 'fp-ts/lib/function' +import { useAtomValue } from 'jotai' +import { forwardRef, startTransition, useState } from 'react' +import { customArgsState } from '../../atoms/downloadTemplate' +import { serverURL } from '../../atoms/settings' +import { useToast } from '../../hooks/toast' +import { useI18n } from '../../hooks/useI18n' +import { ffetch } from '../../lib/httpClient' +import { Subscription } from '../../services/subscriptions' +import ExtraDownloadOptions from '../ExtraDownloadOptions' + +type Props = { + open: boolean + onClose: () => void +} + +const Transition = forwardRef(function Transition( + props: TransitionProps & { + children: React.ReactElement + }, + ref: React.Ref, +) { + return +}) + +const SubscriptionsDialog: React.FC = ({ open, onClose }) => { + const [subscriptionURL, setSubscriptionURL] = useState('') + const [subscriptionCron, setSubscriptionCron] = useState('') + + const customArgs = useAtomValue(customArgsState) + + const { i18n } = useI18n() + const { pushMessage } = useToast() + + const baseURL = useAtomValue(serverURL) + + const submit = async (sub: Omit) => { + const task = ffetch(`${baseURL}/subscriptions`, { + method: 'POST', + body: JSON.stringify(sub) + }) + const either = await task() + + pipe( + either, + matchW( + (l) => pushMessage(l, 'error'), + (_) => onClose() + ) + ) + } + + return ( + + + + + + + + {i18n.t('subscriptionsButtonLabel')} + + + + theme.palette.background.default, + minHeight: (theme) => `calc(99vh - ${theme.mixins.toolbar.minHeight}px)` + }}> + + + + + + + + {i18n.t('subscriptionsInfo')} + + + {i18n.t('livestreamExperimentalWarning')} + + + + setSubscriptionURL(e.target.value)} + /> + + + + + + setSubscriptionCron(e.target.value)} + /> + + + + + + + + + + + + ) +} + +export default SubscriptionsDialog \ No newline at end of file diff --git a/frontend/src/components/subscriptions/SubscriptionsEditDialog.tsx b/frontend/src/components/subscriptions/SubscriptionsEditDialog.tsx new file mode 100644 index 0000000..31cf553 --- /dev/null +++ b/frontend/src/components/subscriptions/SubscriptionsEditDialog.tsx @@ -0,0 +1,162 @@ +import CloseIcon from '@mui/icons-material/Close' +import { + Alert, + AppBar, + Box, + Button, + Container, + Dialog, + Grid, + IconButton, + Paper, + Slide, + TextField, + Toolbar, + Typography +} from '@mui/material' +import { TransitionProps } from '@mui/material/transitions' +import { matchW } from 'fp-ts/lib/Either' +import { pipe } from 'fp-ts/lib/function' +import { useAtomValue } from 'jotai' +import { forwardRef, startTransition, useState } from 'react' +import { customArgsState } from '../../atoms/downloadTemplate' +import { serverURL } from '../../atoms/settings' +import { useToast } from '../../hooks/toast' +import { useI18n } from '../../hooks/useI18n' +import { ffetch } from '../../lib/httpClient' +import { Subscription } from '../../services/subscriptions' +import ExtraDownloadOptions from '../ExtraDownloadOptions' + +type Props = { + subscription: Subscription | undefined + onClose: () => void +} + +const Transition = forwardRef(function Transition( + props: TransitionProps & { + children: React.ReactElement + }, + ref: React.Ref, +) { + return +}) + +const SubscriptionsEditDialog: React.FC = ({ subscription, onClose }) => { + const [subscriptionURL, setSubscriptionURL] = useState('') + const [subscriptionCron, setSubscriptionCron] = useState('') + + const customArgs = useAtomValue(customArgsState) + + const { i18n } = useI18n() + const { pushMessage } = useToast() + + const baseURL = useAtomValue(serverURL) + + const editSubscription = async (sub: Subscription) => { + const task = ffetch(`${baseURL}/subscriptions`, { + method: 'PATCH', + body: JSON.stringify(sub) + }) + const either = await task() + + pipe( + either, + matchW( + (l) => pushMessage(l, 'error'), + (_) => onClose() + ) + ) + } + + return ( + + + + onClose()} + aria-label="close" + > + + + + {i18n.t('subscriptionsButtonLabel')} + + + + theme.palette.background.default, + minHeight: (theme) => `calc(99vh - ${theme.mixins.toolbar.minHeight}px)` + }}> + + + + + + + + Editing {subscription?.url} + + + + setSubscriptionURL(e.target.value)} + /> + + + + + + setSubscriptionCron(e.target.value)} + /> + + + + + + + + + + + + ) +} + +export default SubscriptionsEditDialog \ No newline at end of file diff --git a/frontend/src/components/subscriptions/SubscriptionsSpeedDial.tsx b/frontend/src/components/subscriptions/SubscriptionsSpeedDial.tsx new file mode 100644 index 0000000..d5efbb2 --- /dev/null +++ b/frontend/src/components/subscriptions/SubscriptionsSpeedDial.tsx @@ -0,0 +1,27 @@ +import AddCircleIcon from '@mui/icons-material/AddCircle' +import { SpeedDial, SpeedDialAction, SpeedDialIcon } from '@mui/material' +import { useI18n } from '../../hooks/useI18n' + +type Props = { + onOpen: () => void +} + +const SubscriptionsSpeedDial: React.FC = ({ onOpen }) => { + const { i18n } = useI18n() + + return ( + } + > + } + tooltipTitle={i18n.t('newSubscriptionButton')} + onClick={onOpen} + /> + + ) +} + +export default SubscriptionsSpeedDial \ No newline at end of file diff --git a/frontend/src/hooks/useFetch.ts b/frontend/src/hooks/useFetch.ts new file mode 100644 index 0000000..45d596d --- /dev/null +++ b/frontend/src/hooks/useFetch.ts @@ -0,0 +1,35 @@ +import { pipe } from 'fp-ts/lib/function' +import { matchW } from 'fp-ts/lib/TaskEither' +import { useAtomValue } from 'jotai' +import { useEffect, useState } from 'react' +import { serverURL } from '../atoms/settings' +import { ffetch } from '../lib/httpClient' +import { useToast } from './toast' + +const useFetch = (resource: string) => { + const base = useAtomValue(serverURL) + + const { pushMessage } = useToast() + + const [data, setData] = useState() + const [error, setError] = useState() + + const fetcher = () => pipe( + ffetch(`${base}${resource}`), + matchW( + (l) => { + setError(l) + pushMessage(l, 'error') + }, + (r) => setData(r) + ) + )() + + useEffect(() => { + fetcher() + }, []) + + return { data, error, fetcher } +} + +export default useFetch \ No newline at end of file diff --git a/frontend/src/lib/httpClient.ts b/frontend/src/lib/httpClient.ts index ee7a15e..9ddb98a 100644 --- a/frontend/src/lib/httpClient.ts +++ b/frontend/src/lib/httpClient.ts @@ -1,12 +1,6 @@ import { tryCatch } from 'fp-ts/TaskEither' -export const ffetch = (url: string, opt?: RequestInit) => tryCatch( - () => fetcher(url, opt), - (e) => `error while fetching: ${e}` -) - - -const fetcher = async (url: string, opt?: RequestInit) => { +async function fetcher(url: string, opt?: RequestInit): Promise { const jwt = localStorage.getItem('token') if (opt && !opt.headers) { @@ -26,5 +20,11 @@ const fetcher = async (url: string, opt?: RequestInit) => { if (!res.ok) { throw await res.text() } + return res.json() as T -} \ No newline at end of file +} + +export const ffetch = (url: string, opt?: RequestInit) => tryCatch( + () => fetcher(url, opt), + (e) => `error while fetching: ${e}` +) diff --git a/frontend/src/lib/i18n.ts b/frontend/src/lib/i18n.ts index e35af34..b9ac5a6 100644 --- a/frontend/src/lib/i18n.ts +++ b/frontend/src/lib/i18n.ts @@ -38,7 +38,7 @@ export default class Translator { t(key: string): string { if (this.current) { //@ts-ignore - return this.current[key] ?? fallback.keys[key] + return this.current[key] ?? fallback.keys[key] ?? 'caption not defined' } return 'caption not defined' } diff --git a/frontend/src/lib/rpcClient.ts b/frontend/src/lib/rpcClient.ts index d02478a..1ea3d5c 100644 --- a/frontend/src/lib/rpcClient.ts +++ b/frontend/src/lib/rpcClient.ts @@ -42,10 +42,12 @@ export class RPCClient { } private argsSanitizer(args: string): string[] { + const splitOnlyWhitespaces = /[^\s"']+|"([^"]*)"|'([^']*)'/gm + return args - .split(' ') - .map(a => a.trim().replaceAll('"', '')) - .filter(Boolean) + .match(splitOnlyWhitespaces) + ?.map(a => a.trim()) + .filter(Boolean) ?? [] } private async sendHTTP(req: RPCRequest) { diff --git a/frontend/src/router.tsx b/frontend/src/router.tsx index dbee30a..8fbfaa4 100644 --- a/frontend/src/router.tsx +++ b/frontend/src/router.tsx @@ -10,6 +10,7 @@ const Archive = lazy(() => import('./views/Archive')) const Settings = lazy(() => import('./views/Settings')) const LiveStream = lazy(() => import('./views/Livestream')) const Filebrowser = lazy(() => import('./views/Filebrowser')) +const Subscriptions = lazy(() => import('./views/Subscriptions')) const ErrorBoundary = lazy(() => import('./components/ErrorBoundary')) @@ -73,6 +74,19 @@ export const router = createHashRouter([ ) }, + { + path: '/subscriptions', + element: ( + }> + + + ), + errorElement: ( + }> + + + ) + }, { path: '/login', element: ( diff --git a/frontend/src/services/subscriptions.ts b/frontend/src/services/subscriptions.ts new file mode 100644 index 0000000..95bdef4 --- /dev/null +++ b/frontend/src/services/subscriptions.ts @@ -0,0 +1,37 @@ +// import { PaginatedResponse } from '../types' + +export type Subscription = { + id: string + url: string + params: string + cron_expression: string +} + +// class SubscriptionService { +// private _baseURL: string = '' + +// public set baseURL(v: string) { +// this._baseURL = v +// } + +// public async delete(id: string): Promise { + +// } + +// public async listPaginated(start: number, limit: number = 50): Promise> { +// const res = await fetch(`${this._baseURL}/subscriptions?id=${start}&limit=${limit}`) +// const data: PaginatedResponse = await res.json() + +// return data +// } + +// public async submit(sub: Subscription): Promise { + +// } + +// public async edit(sub: Subscription): Promise { + +// } +// } + +// export default SubscriptionService \ No newline at end of file diff --git a/frontend/src/views/Subscriptions.tsx b/frontend/src/views/Subscriptions.tsx new file mode 100644 index 0000000..171f6fe --- /dev/null +++ b/frontend/src/views/Subscriptions.tsx @@ -0,0 +1,157 @@ +import DeleteIcon from '@mui/icons-material/Delete' +import EditIcon from '@mui/icons-material/Edit' +import { + Box, + Button, + Container, + Paper, + Table, TableBody, TableCell, TableContainer, + TableHead, TablePagination, TableRow +} from '@mui/material' +import { matchW } from 'fp-ts/lib/Either' +import { pipe } from 'fp-ts/lib/function' +import { useAtomValue } from 'jotai' +import { useState, useTransition } from 'react' +import { serverURL } from '../atoms/settings' +import LoadingBackdrop from '../components/LoadingBackdrop' +import NoSubscriptions from '../components/subscriptions/NoSubscriptions' +import SubscriptionsDialog from '../components/subscriptions/SubscriptionsDialog' +import SubscriptionsEditDialog from '../components/subscriptions/SubscriptionsEditDialog' +import SubscriptionsSpeedDial from '../components/subscriptions/SubscriptionsSpeedDial' +import { useToast } from '../hooks/toast' +import useFetch from '../hooks/useFetch' +import { useI18n } from '../hooks/useI18n' +import { ffetch } from '../lib/httpClient' +import { Subscription } from '../services/subscriptions' +import { PaginatedResponse } from '../types' + +const SubscriptionsView: React.FC = () => { + const { i18n } = useI18n() + const { pushMessage } = useToast() + + const baseURL = useAtomValue(serverURL) + + const [selectedSubscription, setSelectedSubscription] = useState() + const [openDialog, setOpenDialog] = useState(false) + + const [startId, setStartId] = useState(0) + const [limit, setLimit] = useState(9) + const [page, setPage] = useState(0) + + const { data: subs, fetcher: refecth } = useFetch>( + `/subscriptions?id=${startId}&limit=${limit}` + ) + + const [isPending, startTransition] = useTransition() + + const deleteSubscription = async (id: string) => { + const task = ffetch(`${baseURL}/subscriptions/${id}`, { + method: 'DELETE', + }) + const either = await task() + + pipe( + either, + matchW( + (l) => pushMessage(l, 'error'), + () => refecth() + ) + ) + } + + return ( + <> + + + setOpenDialog(s => !s)} /> + + { + setSelectedSubscription(undefined) + refecth() + }} + /> + { + setOpenDialog(s => !s) + refecth() + }} /> + + {!subs || subs.data.length === 0 ? + : + + + + + + + URL + Params + {i18n.t('cronExpressionLabel')} + Actions + + + + {subs.data.map(x => ( + + {x.url} + + {x.params} + + + {x.cron_expression} + + + + + + + ))} + +
+
+ { + if (p < page) { + setPage(s => (s - 1 <= 0 ? 0 : s - 1)) + setStartId(subs.first) + return + } + setPage(s => s + 1) + setStartId(subs.next) + }} + rowsPerPage={limit} + rowsPerPageOptions={[9, 10, 25, 50, 100]} + onRowsPerPageChange={(e) => { setLimit(parseInt(e.target.value)) }} + /> +
+
} + + ) +} + +export default SubscriptionsView \ No newline at end of file diff --git a/go.mod b/go.mod index 01199ea..b666154 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 + github.com/robfig/cron/v3 v3.0.0 golang.org/x/oauth2 v0.25.0 golang.org/x/sync v0.10.0 golang.org/x/sys v0.29.0 diff --git a/go.sum b/go.sum index e1fd5e1..4a1ff13 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= diff --git a/server/dbutil/migrate.go b/server/dbutil/migrate.go index 38c4fb5..2ba6f34 100644 --- a/server/dbutil/migrate.go +++ b/server/dbutil/migrate.go @@ -49,6 +49,18 @@ func Migrate(ctx context.Context, db *sql.DB) error { return err } + if _, err := db.ExecContext( + ctx, + `CREATE TABLE IF NOT EXISTS subscriptions ( + id CHAR(36) PRIMARY KEY, + url VARCHAR(2048) UNIQUE NOT NULL, + params TEXT NOT NULL, + cron TEXT + )`, + ); err != nil { + return err + } + if lockFileExists() { return nil } diff --git a/server/internal/memory_db.go b/server/internal/memory_db.go index cffb2d9..c59350f 100644 --- a/server/internal/memory_db.go +++ b/server/internal/memory_db.go @@ -3,6 +3,7 @@ package internal import ( "encoding/gob" "errors" + "log/slog" "os" "path/filepath" "sync" @@ -11,6 +12,8 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" ) +var memDbEvents = make(chan *Process) + // In-Memory Thread-Safe Key-Value Storage with optional persistence type MemoryDB struct { table map[string]*Process @@ -144,3 +147,12 @@ func (m *MemoryDB) Restore(mq *MessageQueue) { } } } + +func (m *MemoryDB) EventListener() { + for p := range memDbEvents { + if p.AutoRemove { + slog.Info("compacting MemoryDB", slog.String("id", p.Id)) + m.Delete(p.Id) + } + } +} diff --git a/server/internal/process.go b/server/internal/process.go index d4c6e0c..c28c7ab 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -48,6 +48,7 @@ type Process struct { Id string Url string Livestream bool + AutoRemove bool Params []string Info DownloadInfo Progress DownloadProgress @@ -253,6 +254,8 @@ func (p *Process) Complete() { slog.String("id", p.getShortId()), slog.String("url", p.Url), ) + + memDbEvents <- p } // Kill a process and remove it from the memory diff --git a/server/server.go b/server/server.go index b22442a..429046e 100644 --- a/server/server.go +++ b/server/server.go @@ -32,6 +32,8 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/rest" ytdlpRPC "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/rpc" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user" _ "modernc.org/sqlite" @@ -107,6 +109,7 @@ func RunBlocking(rc *RunConfig) { } mq.SetupConsumers() go mdb.Restore(mq) + go mdb.EventListener() lm := livestream.NewMonitor(mq, mdb) go lm.Schedule() @@ -151,6 +154,9 @@ func RunBlocking(rc *RunConfig) { func newServer(c serverConfig) *http.Server { archiver.Register(c.db) + cronTaskRunner := task.NewCronTaskRunner(c.mq, c.mdb) + go cronTaskRunner.Spawner(context.TODO()) + service := ytdlpRPC.Container(c.mdb, c.mq, c.lm) rpc.Register(service) @@ -226,6 +232,9 @@ func newServer(c serverConfig) *http.Server { // Status r.Route("/status", status.ApplyRouter(c.mdb)) + // Subscriptions + r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter()) + return &http.Server{Handler: r} } diff --git a/server/subscription/container.go b/server/subscription/container.go new file mode 100644 index 0000000..36f007a --- /dev/null +++ b/server/subscription/container.go @@ -0,0 +1,17 @@ +package subscription + +import ( + "database/sql" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task" +) + +func Container(db *sql.DB, runner task.TaskRunner) domain.RestHandler { + var ( + r = provideRepository(db) + s = provideService(r, runner) + h = provideHandler(s) + ) + return h +} diff --git a/server/subscription/data/models.go b/server/subscription/data/models.go new file mode 100644 index 0000000..e5700da --- /dev/null +++ b/server/subscription/data/models.go @@ -0,0 +1,8 @@ +package data + +type Subscription struct { + Id string + URL string + Params string + CronExpr string +} diff --git a/server/subscription/domain/subscription.go b/server/subscription/domain/subscription.go new file mode 100644 index 0000000..3af1f1e --- /dev/null +++ b/server/subscription/domain/subscription.go @@ -0,0 +1,47 @@ +package domain + +import ( + "context" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/data" +) + +type Subscription struct { + Id string `json:"id"` + URL string `json:"url"` + Params string `json:"params"` + CronExpr string `json:"cron_expression"` +} + +type PaginatedResponse[T any] struct { + First int64 `json:"first"` + Next int64 `json:"next"` + Data T `json:"data"` +} + +type Repository interface { + Submit(ctx context.Context, sub *data.Subscription) (*data.Subscription, error) + List(ctx context.Context, start int64, limit int) (*[]data.Subscription, error) + UpdateByExample(ctx context.Context, example *data.Subscription) error + Delete(ctx context.Context, id string) error + GetCursor(ctx context.Context, id string) (int64, error) +} + +type Service interface { + Submit(ctx context.Context, sub *Subscription) (*Subscription, error) + List(ctx context.Context, start int64, limit int) (*PaginatedResponse[[]Subscription], error) + UpdateByExample(ctx context.Context, example *Subscription) error + Delete(ctx context.Context, id string) error + GetCursor(ctx context.Context, id string) (int64, error) +} + +type RestHandler interface { + Submit() http.HandlerFunc + List() http.HandlerFunc + UpdateByExample() http.HandlerFunc + Delete() http.HandlerFunc + GetCursor() http.HandlerFunc + ApplyRouter() func(chi.Router) +} diff --git a/server/subscription/provider.go b/server/subscription/provider.go new file mode 100644 index 0000000..f8dc13f --- /dev/null +++ b/server/subscription/provider.go @@ -0,0 +1,43 @@ +package subscription + +import ( + "database/sql" + "sync" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/repository" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/rest" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/service" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task" +) + +var ( + repo domain.Repository + svc domain.Service + hand domain.RestHandler + + repoOnce sync.Once + svcOnce sync.Once + handOnce sync.Once +) + +func provideRepository(db *sql.DB) domain.Repository { + repoOnce.Do(func() { + repo = repository.New(db) + }) + return repo +} + +func provideService(r domain.Repository, runner task.TaskRunner) domain.Service { + svcOnce.Do(func() { + svc = service.New(r, runner) + }) + return svc +} + +func provideHandler(s domain.Service) domain.RestHandler { + handOnce.Do(func() { + hand = rest.New(s) + }) + return hand +} diff --git a/server/subscription/repository/repository.go b/server/subscription/repository/repository.go new file mode 100644 index 0000000..cd4756c --- /dev/null +++ b/server/subscription/repository/repository.go @@ -0,0 +1,133 @@ +package repository + +import ( + "context" + "database/sql" + + "github.com/google/uuid" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/data" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" +) + +type Repository struct { + db *sql.DB +} + +// Delete implements domain.Repository. +func (r *Repository) Delete(ctx context.Context, id string) error { + conn, err := r.db.Conn(ctx) + if err != nil { + return err + } + + defer conn.Close() + + _, err = conn.ExecContext(ctx, "DELETE FROM subscriptions WHERE id = ?", id) + + return err +} + +// GetCursor implements domain.Repository. +func (r *Repository) GetCursor(ctx context.Context, id string) (int64, error) { + conn, err := r.db.Conn(ctx) + if err != nil { + return -1, err + } + + defer conn.Close() + + row := conn.QueryRowContext(ctx, "SELECT rowid FROM subscriptions WHERE id = ?", id) + + var rowId int64 + + if err := row.Scan(&rowId); err != nil { + return -1, err + } + + return rowId, nil +} + +// List implements domain.Repository. +func (r *Repository) List(ctx context.Context, start int64, limit int) (*[]data.Subscription, error) { + conn, err := r.db.Conn(ctx) + if err != nil { + return nil, err + } + + defer conn.Close() + + var elements []data.Subscription + + rows, err := conn.QueryContext(ctx, "SELECT rowid, * FROM subscriptions WHERE rowid > ? LIMIT ?", start, limit) + if err != nil { + return nil, err + } + + for rows.Next() { + var rowId int64 + var element data.Subscription + + if err := rows.Scan( + &rowId, + &element.Id, + &element.URL, + &element.Params, + &element.CronExpr, + ); err != nil { + return &elements, err + } + + elements = append(elements, element) + } + + return &elements, nil +} + +// Submit implements domain.Repository. +func (r *Repository) Submit(ctx context.Context, sub *data.Subscription) (*data.Subscription, error) { + conn, err := r.db.Conn(ctx) + if err != nil { + return nil, err + } + + defer conn.Close() + + _, err = conn.ExecContext( + ctx, + "INSERT INTO subscriptions (id, url, params, cron) VALUES (?, ?, ?, ?)", + uuid.NewString(), + sub.URL, + sub.Params, + sub.CronExpr, + ) + + return sub, err +} + +// UpdateByExample implements domain.Repository. +func (r *Repository) UpdateByExample(ctx context.Context, example *data.Subscription) error { + conn, err := r.db.Conn(ctx) + if err != nil { + return err + } + + defer conn.Close() + + _, err = conn.ExecContext( + ctx, + "UPDATE subscriptions SET url = ?, params = ?, cron = ? WHERE id = ? OR url = ?", + example.URL, + example.Params, + example.CronExpr, + example.Id, + example.URL, + ) + + return err +} + +func New(db *sql.DB) domain.Repository { + return &Repository{ + db: db, + } +} diff --git a/server/subscription/rest/handler.go b/server/subscription/rest/handler.go new file mode 100644 index 0000000..81ffbeb --- /dev/null +++ b/server/subscription/rest/handler.go @@ -0,0 +1,168 @@ +package rest + +import ( + "encoding/json" + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + middlewares "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/middleware" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" +) + +type RestHandler struct { + svc domain.Service +} + +// ApplyRouter implements domain.RestHandler. +func (h *RestHandler) ApplyRouter() func(chi.Router) { + return func(r chi.Router) { + if config.Instance().RequireAuth { + r.Use(middlewares.Authenticated) + } + if config.Instance().UseOpenId { + r.Use(openid.Middleware) + } + + r.Delete("/{id}", h.Delete()) + r.Get("/cursor", h.GetCursor()) + r.Get("/", h.List()) + r.Post("/", h.Submit()) + r.Patch("/", h.UpdateByExample()) + } +} + +// Delete implements domain.RestHandler. +func (h *RestHandler) Delete() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + w.Header().Set("Content-Type", "application/json") + + id := chi.URLParam(r, "id") + + err := h.svc.Delete(r.Context(), id) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := json.NewEncoder(w).Encode("ok"); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +// GetCursor implements domain.RestHandler. +func (h *RestHandler) GetCursor() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + w.Header().Set("Content-Type", "application/json") + + id := chi.URLParam(r, "id") + + cursorId, err := h.svc.GetCursor(r.Context(), id) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := json.NewEncoder(w).Encode(cursorId); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +// List implements domain.RestHandler. +func (h *RestHandler) List() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + w.Header().Set("Content-Type", "application/json") + + var ( + startParam = r.URL.Query().Get("id") + LimitParam = r.URL.Query().Get("limit") + ) + + start, err := strconv.Atoi(startParam) + if err != nil { + start = 0 + } + + limit, err := strconv.Atoi(LimitParam) + if err != nil { + limit = 50 + } + + res, err := h.svc.List(r.Context(), int64(start), limit) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := json.NewEncoder(w).Encode(res); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +// Submit implements domain.RestHandler. +func (h *RestHandler) Submit() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + w.Header().Set("Content-Type", "application/json") + + var req domain.Subscription + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + res, err := h.svc.Submit(r.Context(), &req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := json.NewEncoder(w).Encode(res); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +// UpdateByExample implements domain.RestHandler. +func (h *RestHandler) UpdateByExample() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + w.Header().Set("Content-Type", "application/json") + + var req domain.Subscription + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := h.svc.UpdateByExample(r.Context(), &req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := json.NewEncoder(w).Encode(req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +func New(svc domain.Service) domain.RestHandler { + return &RestHandler{ + svc: svc, + } +} diff --git a/server/subscription/service/service.go b/server/subscription/service/service.go new file mode 100644 index 0000000..1cca374 --- /dev/null +++ b/server/subscription/service/service.go @@ -0,0 +1,140 @@ +package service + +import ( + "context" + "errors" + "math" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/data" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task" + "github.com/robfig/cron/v3" +) + +type Service struct { + r domain.Repository + runner task.TaskRunner +} + +func New(r domain.Repository, runner task.TaskRunner) domain.Service { + s := &Service{ + r: r, + runner: runner, + } + + // very crude recoverer + initial, _ := s.List(context.Background(), 0, math.MaxInt) + if initial != nil { + for _, v := range initial.Data { + s.runner.Submit(&v) + } + } + + return s +} + +func fromDB(model *data.Subscription) domain.Subscription { + return domain.Subscription{ + Id: model.Id, + URL: model.URL, + Params: model.Params, + CronExpr: model.CronExpr, + } +} + +func toDB(dto *domain.Subscription) data.Subscription { + return data.Subscription{ + Id: dto.Id, + URL: dto.URL, + Params: dto.Params, + CronExpr: dto.CronExpr, + } +} + +// Delete implements domain.Service. +func (s *Service) Delete(ctx context.Context, id string) error { + return s.r.Delete(ctx, id) +} + +// GetCursor implements domain.Service. +func (s *Service) GetCursor(ctx context.Context, id string) (int64, error) { + return s.r.GetCursor(ctx, id) +} + +// List implements domain.Service. +func (s *Service) List(ctx context.Context, start int64, limit int) ( + *domain.PaginatedResponse[[]domain.Subscription], + error, +) { + dbSubs, err := s.r.List(ctx, start, limit) + if err != nil { + return nil, err + } + + subs := make([]domain.Subscription, len(*dbSubs)) + + for i, v := range *dbSubs { + subs[i] = fromDB(&v) + } + + var ( + first int64 + next int64 + ) + + if len(subs) > 0 { + first, err = s.r.GetCursor(ctx, subs[0].Id) + if err != nil { + return nil, err + } + + next, err = s.r.GetCursor(ctx, subs[len(subs)-1].Id) + if err != nil { + return nil, err + } + } + + return &domain.PaginatedResponse[[]domain.Subscription]{ + First: first, + Next: next, + Data: subs, + }, nil +} + +// Submit implements domain.Service. +func (s *Service) Submit(ctx context.Context, sub *domain.Subscription) (*domain.Subscription, error) { + if sub.CronExpr == "" { + sub.CronExpr = "*/5 * * * *" + } + + _, err := cron.ParseStandard(sub.CronExpr) + if err != nil { + return nil, errors.Join(errors.New("failed parsing cron expression"), err) + } + + subDB, err := s.r.Submit(ctx, &data.Subscription{ + URL: sub.URL, + Params: sub.Params, + CronExpr: sub.CronExpr, + }) + + retval := fromDB(subDB) + + if err := s.runner.Submit(sub); err != nil { + return nil, err + } + + return &retval, err +} + +// UpdateByExample implements domain.Service. +func (s *Service) UpdateByExample(ctx context.Context, example *domain.Subscription) error { + _, err := cron.ParseStandard(example.CronExpr) + if err != nil { + return errors.Join(errors.New("failed parsing cron expression"), err) + } + + e := toDB(example) + + return s.r.UpdateByExample(ctx, &e) +} diff --git a/server/subscription/subscription.go b/server/subscription/subscription.go new file mode 100644 index 0000000..0e1dae2 --- /dev/null +++ b/server/subscription/subscription.go @@ -0,0 +1 @@ +package subscription diff --git a/server/subscription/task/runner.go b/server/subscription/task/runner.go new file mode 100644 index 0000000..f4b5a7f --- /dev/null +++ b/server/subscription/task/runner.go @@ -0,0 +1,117 @@ +package task + +import ( + "bytes" + "context" + "log/slog" + "os/exec" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" + "github.com/robfig/cron/v3" +) + +type TaskRunner interface { + Submit(subcription *domain.Subscription) error + Spawner(ctx context.Context) + Recoverer() +} + +type taskPair struct { + Schedule cron.Schedule + Subscription *domain.Subscription +} + +type CronTaskRunner struct { + mq *internal.MessageQueue + db *internal.MemoryDB + + tasks chan taskPair + errors chan error +} + +func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner { + return &CronTaskRunner{ + mq: mq, + db: db, + tasks: make(chan taskPair), + errors: make(chan error), + } +} + +const commandTemplate = "-I1 --flat-playlist --print webpage_url $1" + +var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`) + +func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error { + schedule, err := cron.ParseStandard(subcription.CronExpr) + if err != nil { + return err + } + + job := taskPair{ + Schedule: schedule, + Subscription: subcription, + } + + t.tasks <- job + + return nil +} + +func (t *CronTaskRunner) Spawner(ctx context.Context) { + for task := range t.tasks { + go func() { + for { + slog.Info("fetching latest video for channel", slog.String("channel", task.Subscription.URL)) + + fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", task.Subscription.URL, 1), " ") + + cmd := exec.CommandContext( + ctx, + config.Instance().DownloaderPath, + fetcherParams..., + ) + + stdout, err := cmd.Output() + if err != nil { + t.errors <- err + return + } + + latestChannelURL := string(bytes.Trim(stdout, "\n")) + + p := &internal.Process{ + Url: latestChannelURL, + Params: append(argsSplitterRe.FindAllString(task.Subscription.Params, 1), []string{ + "--download-archive", + filepath.Join(config.Instance().Dir(), "archive.txt"), + }...), + AutoRemove: true, + } + + t.db.Set(p) + t.mq.Publish(p) + + nextSchedule := time.Until(task.Schedule.Next(time.Now())) + + slog.Info( + "cron task runner next schedule", + slog.String("url", task.Subscription.URL), + slog.Any("duration", nextSchedule), + ) + + time.Sleep(nextSchedule) + } + }() + } +} + +func (t *CronTaskRunner) Recoverer() { + panic("Unimplemented") +}