diff --git a/.gitignore b/.gitignore
index 460926c..ec9b1ec 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,3 +20,4 @@ frontend/.pnp.cjs
frontend/.pnp.loader.mjs
frontend/.yarn/install-state.gz
.db.lock
+livestreams.dat
diff --git a/frontend/src/Layout.tsx b/frontend/src/Layout.tsx
index 042c83e..377bf43 100644
--- a/frontend/src/Layout.tsx
+++ b/frontend/src/Layout.tsx
@@ -2,6 +2,7 @@ import { ThemeProvider } from '@emotion/react'
import ArchiveIcon from '@mui/icons-material/Archive'
import ChevronLeft from '@mui/icons-material/ChevronLeft'
import Dashboard from '@mui/icons-material/Dashboard'
+import LiveTvIcon from '@mui/icons-material/LiveTv'
import Menu from '@mui/icons-material/Menu'
import SettingsIcon from '@mui/icons-material/Settings'
import TerminalIcon from '@mui/icons-material/Terminal'
@@ -121,6 +122,19 @@ export default function Layout() {
+
+
+
+
+
+
+
+
void
+}
+
+const Transition = forwardRef(function Transition(
+ props: TransitionProps & {
+ children: React.ReactElement
+ },
+ ref: React.Ref,
+) {
+ return
+})
+
+const LivestreamDialog: React.FC = ({ open, onClose }) => {
+ const [livestreamURL, setLivestreamURL] = useState('')
+
+ const { i18n } = useI18n()
+ const { client } = useRPC()
+ const { pushMessage } = useToast()
+
+ const exec = (url: string) => client.execLivestream(url)
+
+ return (
+
+ )
+}
+
+export default LivestreamDialog
\ No newline at end of file
diff --git a/frontend/src/components/livestream/LivestreamSpeedDial.tsx b/frontend/src/components/livestream/LivestreamSpeedDial.tsx
new file mode 100644
index 0000000..e047c36
--- /dev/null
+++ b/frontend/src/components/livestream/LivestreamSpeedDial.tsx
@@ -0,0 +1,34 @@
+import AddCircleIcon from '@mui/icons-material/AddCircle'
+import DeleteForeverIcon from '@mui/icons-material/DeleteForever'
+import { SpeedDial, SpeedDialAction, SpeedDialIcon } from '@mui/material'
+import { useI18n } from '../../hooks/useI18n'
+
+type Props = {
+ onOpen: () => void
+ onStopAll: () => void
+}
+
+const LivestreamSpeedDial: React.FC = ({ onOpen, onStopAll }) => {
+ const { i18n } = useI18n()
+
+ return (
+ }
+ >
+ }
+ tooltipTitle={i18n.t('abortAllButton')}
+ onClick={onStopAll}
+ />
+ }
+ tooltipTitle={i18n.t('newDownloadButton')}
+ onClick={onOpen}
+ />
+
+ )
+}
+
+export default LivestreamSpeedDial
\ No newline at end of file
diff --git a/frontend/src/components/livestream/NoLivestreams.tsx b/frontend/src/components/livestream/NoLivestreams.tsx
new file mode 100644
index 0000000..9d338ca
--- /dev/null
+++ b/frontend/src/components/livestream/NoLivestreams.tsx
@@ -0,0 +1,38 @@
+import LiveTvIcon from '@mui/icons-material/LiveTv'
+import { Container, SvgIcon, Typography, styled } from '@mui/material'
+import { useI18n } from '../../hooks/useI18n'
+
+const FlexContainer = styled(Container)({
+ display: 'flex',
+ minWidth: '100%',
+ minHeight: '80vh',
+ alignItems: 'center',
+ justifyContent: 'center',
+ flexDirection: 'column'
+})
+
+const Title = styled(Typography)({
+ display: 'flex',
+ width: '100%',
+ alignItems: 'center',
+ justifyContent: 'center',
+ paddingBottom: '0.5rem'
+})
+
+
+export default function NoLivestreams() {
+ const { i18n } = useI18n()
+
+ return (
+
+
+
+
+
+
+
+ No livestreams monitored
+
+
+ )
+}
\ No newline at end of file
diff --git a/frontend/src/lib/rpcClient.ts b/frontend/src/lib/rpcClient.ts
index 0d0348a..3c0c90e 100644
--- a/frontend/src/lib/rpcClient.ts
+++ b/frontend/src/lib/rpcClient.ts
@@ -1,5 +1,5 @@
import { Observable } from 'rxjs'
-import type { DLMetadata, RPCRequest, RPCResponse, RPCResult } from '../types'
+import type { DLMetadata, LiveStreamProgress, RPCRequest, RPCResponse, RPCResult } from '../types'
import { WebSocketSubject, webSocket } from 'rxjs/webSocket'
@@ -160,9 +160,32 @@ export class RPCClient {
})
}
- public updateExecutable() {
+ public execLivestream(url: string) {
return this.sendHTTP({
- method: 'Service.UpdateExecutable',
+ method: 'Service.ExecLivestream',
+ params: [{
+ URL: url
+ }]
+ })
+ }
+
+ public progressLivestream() {
+ return this.sendHTTP({
+ method: 'Service.ProgressLivestream',
+ params: []
+ })
+ }
+
+ public killLivestream(url: string) {
+ return this.sendHTTP({
+ method: 'Service.KillLivestream',
+ params: [url]
+ })
+ }
+
+ public killAllLivestream() {
+ return this.sendHTTP({
+ method: 'Service.KillAllLivestream',
params: []
})
}
diff --git a/frontend/src/router.tsx b/frontend/src/router.tsx
index 0856472..e48a52f 100644
--- a/frontend/src/router.tsx
+++ b/frontend/src/router.tsx
@@ -8,6 +8,7 @@ const Home = lazy(() => import('./views/Home'))
const Login = lazy(() => import('./views/Login'))
const Archive = lazy(() => import('./views/Archive'))
const Settings = lazy(() => import('./views/Settings'))
+const LiveStream = lazy(() => import('./views/Livestream'))
const ErrorBoundary = lazy(() => import('./components/ErrorBoundary'))
@@ -74,6 +75,14 @@ export const router = createHashRouter([
)
},
+ {
+ path: '/monitor',
+ element: (
+ }>
+
+
+ )
+ },
]
},
])
\ No newline at end of file
diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts
index 98fcc04..9160420 100644
--- a/frontend/src/types/index.ts
+++ b/frontend/src/types/index.ts
@@ -9,6 +9,10 @@ export type RPCMethods =
| "Service.ExecPlaylist"
| "Service.DirectoryTree"
| "Service.UpdateExecutable"
+ | "Service.ExecLivestream"
+ | "Service.ProgressLivestream"
+ | "Service.KillLivestream"
+ | "Service.KillAllLivestream"
export type RPCRequest = {
method: RPCMethods
@@ -96,4 +100,10 @@ export type CustomTemplate = {
id: string
name: string
content: string
-}
\ No newline at end of file
+}
+
+export type LiveStreamProgress = Record
\ No newline at end of file
diff --git a/frontend/src/views/Livestream.tsx b/frontend/src/views/Livestream.tsx
new file mode 100644
index 0000000..ee4ddef
--- /dev/null
+++ b/frontend/src/views/Livestream.tsx
@@ -0,0 +1,126 @@
+import {
+ Box,
+ Button,
+ Chip,
+ Container,
+ Paper,
+ Table, TableBody, TableCell, TableContainer, TableHead, TableRow
+} from '@mui/material'
+import { useState } from 'react'
+import { interval } from 'rxjs'
+import LivestreamDialog from '../components/livestream/LivestreamDialog'
+import LivestreamSpeedDial from '../components/livestream/LivestreamSpeedDial'
+import NoLivestreams from '../components/livestream/NoLivestreams'
+import { useSubscription } from '../hooks/observable'
+import { useRPC } from '../hooks/useRPC'
+import { LiveStreamProgress } from '../types'
+
+const LiveStreamMonitorView: React.FC = () => {
+ const { client } = useRPC()
+
+ const [progress, setProgress] = useState()
+ const [openDialog, setOpenDialog] = useState(false)
+
+ useSubscription(interval(1000), () => {
+ client
+ .progressLivestream()
+ .then(r => setProgress(r.result))
+ })
+
+ const formatMicro = (microseconds: number) => {
+ const ms = microseconds / 1_000_000
+ let s = ms / 1000
+
+ const hr = s / 3600
+ s %= 3600
+
+ const mt = s / 60
+ s %= 60
+
+ // huh?
+ const ss = (Math.abs(s - 1)).toFixed(0).padStart(2, '0')
+ const mts = mt.toFixed(0).padStart(2, '0')
+ const hrs = hr.toFixed(0).padStart(2, '0')
+
+ return `${hrs}:${mts}:${ss}`
+ }
+
+ const mapStatusToChip = (status: number): React.ReactNode => {
+ switch (status) {
+ case 0:
+ return
+ case 1:
+ return
+ case 2:
+ return
+ case 3:
+ return
+ default:
+ return
+ }
+ }
+
+ const stopAll = () => client.killAllLivestream()
+ const stop = (url: string) => client.killLivestream(url)
+
+ return (
+ <>
+ setOpenDialog(s => !s)} onStopAll={stopAll} />
+ {progress && Object.keys(progress).length === 0 ?
+ :
+
+
+
+
+
+
+ Livestream URL
+ Status
+ Time to live
+ Starts on
+ Actions
+
+
+
+ {progress && Object.keys(progress).map(k => (
+
+ {k}
+
+ {mapStatusToChip(progress[k].Status)}
+
+
+ {formatMicro(Number(progress[k].WaitTime))}
+
+
+ {new Date(progress[k].LiveDate).toLocaleString()}
+
+
+
+
+
+ ))}
+
+
+
+
+
+ }
+ setOpenDialog(s => !s)}
+ />
+ >
+ )
+}
+
+export default LiveStreamMonitorView
\ No newline at end of file
diff --git a/server/internal/livestream/livestream.go b/server/internal/livestream/livestream.go
new file mode 100644
index 0000000..f9e94a8
--- /dev/null
+++ b/server/internal/livestream/livestream.go
@@ -0,0 +1,193 @@
+package livestream
+
+import (
+ "bufio"
+ "errors"
+ "io"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/marcopeocchi/yt-dlp-web-ui/server/config"
+)
+
+const (
+ waiting = iota
+ inProgress
+ completed
+ errored
+)
+
+// Defines a generic livestream.
+// A livestream is identified by its url.
+type LiveStream struct {
+ url string
+ proc *os.Process // used to manually kill the yt-dlp process
+ status int // whether is monitoring or completed
+ log chan []byte // keeps tracks of the process logs while monitoring, not when started
+ done chan *LiveStream // where to signal the completition
+ waitTimeChan chan time.Duration // time to livestream start
+ errors chan error
+ waitTime time.Duration
+ liveDate time.Time
+}
+
+func New(url string, log chan []byte, done chan *LiveStream) *LiveStream {
+ return &LiveStream{
+ url: url,
+ done: done,
+ status: waiting,
+ waitTime: time.Second * 0,
+ log: log,
+ errors: make(chan error),
+ waitTimeChan: make(chan time.Duration),
+ }
+}
+
+// Start the livestream monitoring process, once completion signals on the done channel
+func (l *LiveStream) Start() error {
+ cmd := exec.Command(
+ config.Instance().DownloaderPath,
+ l.url,
+ "--wait-for-video", "10", // wait for the stream to be live and recheck every 10 secs
+ "--no-colors", // no ansi color fuzz
+ "--paths", config.Instance().DownloadPath,
+ )
+ l.proc = cmd.Process
+
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return err
+ }
+ defer stdout.Close()
+
+ if err := cmd.Start(); err != nil {
+ l.status = errored
+ return err
+ }
+
+ l.status = waiting
+
+ // Start monitoring when the livestream is goin to be live.
+ // If already live do nothing.
+ go l.monitorStartTime(stdout)
+
+ // Wait to the yt-dlp+ffmpeg process to finish.
+ cmd.Wait()
+
+ // Set the job as completed and notify the parent the completion.
+ l.status = completed
+ l.done <- l
+
+ return nil
+}
+
+func (l *LiveStream) monitorStartTime(r io.Reader) error {
+ // yt-dlp shows the time in the stdout
+ scanner := bufio.NewScanner(r)
+
+ defer func() {
+ close(l.waitTimeChan)
+ close(l.errors)
+ }()
+
+ // however the time to live is not shown in a new line (and atm there's nothing to do about)
+ // use a custom split funciton to set the line separator to \r instead of \r\n or \n
+ scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
+ for i := 0; i < len(data); i++ {
+ if data[i] == '\r' {
+ return i + 1, data[:i], nil
+ }
+ }
+ if !atEOF {
+ return 0, nil, nil
+ }
+
+ return 0, data, bufio.ErrFinalToken
+ })
+
+ // start scanning the stdout
+ for scanner.Scan() {
+ // l.log <- scanner.Bytes()
+
+ parts := strings.Split(scanner.Text(), ": ")
+ if len(parts) < 2 {
+ continue
+ }
+
+ // if this substring is in the current line the download is starting,
+ // no need to monitor the time to live.
+ //TODO: silly
+ if !strings.Contains(scanner.Text(), "Remaining time until next attempt") {
+ l.status = inProgress
+ return nil
+ }
+
+ startsIn := parts[1]
+ parsed, err := parseTimeSpan(startsIn)
+ if err != nil {
+ continue
+ }
+
+ l.liveDate = parsed
+
+ //TODO: check if useing channels is stupid or not
+ // l.waitTimeChan <- time.Until(start)
+ l.waitTime = time.Until(parsed)
+ }
+
+ return nil
+}
+
+func (l *LiveStream) WaitTime() <-chan time.Duration {
+ return l.waitTimeChan
+}
+
+// Kills a livestream process and signal its completition
+func (l *LiveStream) Kill() error {
+ l.done <- l
+
+ if l.proc != nil {
+ return l.proc.Kill()
+ }
+
+ return errors.New("nil yt-dlp process")
+}
+
+// Parse the timespan returned from yt-dlp (time to live)
+//
+// parsed := parseTimeSpan("76:12:15")
+// fmt.Println(parsed) // 2024-07-21 13:59:59.634781 +0200 CEST
+func parseTimeSpan(timeStr string) (time.Time, error) {
+ parts := strings.Split(timeStr, ":")
+
+ hh, err := strconv.Atoi(parts[0])
+ if err != nil {
+ return time.Time{}, err
+ }
+ mm, err := strconv.Atoi(parts[1])
+ if err != nil {
+ return time.Time{}, err
+ }
+ ss, err := strconv.Atoi(parts[2])
+ if err != nil {
+ return time.Time{}, err
+ }
+
+ dd := 0
+
+ if hh > 24 {
+ dd = hh / 24
+ hh = hh % 24
+ }
+
+ start := time.Now()
+ start = start.AddDate(0, 0, dd)
+ start = start.Add(time.Duration(hh) * time.Hour)
+ start = start.Add(time.Duration(mm) * time.Minute)
+ start = start.Add(time.Duration(ss) * time.Second)
+
+ return start, nil
+}
diff --git a/server/internal/livestream/livestream_test.go b/server/internal/livestream/livestream_test.go
new file mode 100644
index 0000000..d374b68
--- /dev/null
+++ b/server/internal/livestream/livestream_test.go
@@ -0,0 +1,36 @@
+package livestream
+
+import (
+ "testing"
+ "time"
+
+ "github.com/marcopeocchi/yt-dlp-web-ui/server/config"
+)
+
+func setupTest() {
+ config.Instance().DownloaderPath = "yt-dlp"
+}
+
+func TestLivestream(t *testing.T) {
+ setupTest()
+
+ done := make(chan *LiveStream)
+ log := make(chan []byte)
+
+ ls := New("https://www.youtube.com/watch?v=LSm1daKezcE", log, done)
+ go ls.Start()
+
+ time.AfterFunc(time.Second*20, func() {
+ ls.Kill()
+ })
+
+ for {
+ select {
+ case wt := <-ls.WaitTime():
+ t.Log(wt)
+ case <-done:
+ t.Log("done")
+ return
+ }
+ }
+}
diff --git a/server/internal/livestream/monitor.go b/server/internal/livestream/monitor.go
new file mode 100644
index 0000000..087fb1a
--- /dev/null
+++ b/server/internal/livestream/monitor.go
@@ -0,0 +1,118 @@
+package livestream
+
+import (
+ "encoding/gob"
+ "log/slog"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/marcopeocchi/yt-dlp-web-ui/server/config"
+)
+
+type Monitor struct {
+ streams map[string]*LiveStream // keeps track of the livestreams
+ done chan *LiveStream // to signal individual processes completition
+ logs chan []byte // to signal individual processes completition
+}
+
+func NewMonitor() *Monitor {
+ return &Monitor{
+ streams: make(map[string]*LiveStream),
+ done: make(chan *LiveStream),
+ }
+}
+
+// Detect each livestream completition, if done remove it from the monitor.
+func (m *Monitor) Schedule() {
+ for l := range m.done {
+ delete(m.streams, l.url)
+ }
+}
+
+func (m *Monitor) Add(url string) {
+ ls := New(url, m.logs, m.done)
+
+ go ls.Start()
+ m.streams[url] = ls
+}
+
+func (m *Monitor) Remove(url string) error {
+ return m.streams[url].Kill()
+}
+
+func (m *Monitor) RemoveAll() error {
+ for _, v := range m.streams {
+ if err := v.Kill(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (m *Monitor) Status() LiveStreamStatus {
+ status := make(LiveStreamStatus)
+
+ for k, v := range m.streams {
+ // wt, ok := <-v.WaitTime()
+ // if !ok {
+ // continue
+ // }
+
+ status[k] = struct {
+ Status int
+ WaitTime time.Duration
+ LiveDate time.Time
+ }{
+ Status: v.status,
+ WaitTime: v.waitTime,
+ LiveDate: v.liveDate,
+ }
+ }
+
+ return status
+}
+
+// Persist the monitor current state to a file.
+// The file is located in the configured config directory
+func (m *Monitor) Persist() error {
+ fd, err := os.Create(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
+ if err != nil {
+ return err
+ }
+
+ defer fd.Close()
+
+ slog.Debug("persisting livestream monitor state")
+
+ return gob.NewEncoder(fd).Encode(m.streams)
+}
+
+// Restore a saved state and resume the monitored livestreams
+func (m *Monitor) Restore() error {
+ fd, err := os.Open(filepath.Join(config.Instance().Dir(), "livestreams.dat"))
+ if err != nil {
+ return err
+ }
+
+ defer fd.Close()
+
+ restored := make(map[string]*LiveStream)
+
+ if err := gob.NewDecoder(fd).Decode(&restored); err != nil {
+ return err
+ }
+
+ for k := range restored {
+ m.Add(k)
+ }
+
+ slog.Debug("restored livestream monitor state")
+
+ return nil
+}
+
+// Return a fan-in logs channel
+func (m *Monitor) Logs() <-chan []byte {
+ return m.logs
+}
diff --git a/server/internal/livestream/monitor_test.go b/server/internal/livestream/monitor_test.go
new file mode 100644
index 0000000..28ca861
--- /dev/null
+++ b/server/internal/livestream/monitor_test.go
@@ -0,0 +1 @@
+package livestream
diff --git a/server/internal/livestream/status.go b/server/internal/livestream/status.go
new file mode 100644
index 0000000..86d8f8c
--- /dev/null
+++ b/server/internal/livestream/status.go
@@ -0,0 +1,11 @@
+package livestream
+
+import "time"
+
+type LiveStreamStatus = map[string]Status
+
+type Status = struct {
+ Status int
+ WaitTime time.Duration
+ LiveDate time.Time
+}
diff --git a/server/internal/memory_db.go b/server/internal/memory_db.go
index c329fb3..016e3e7 100644
--- a/server/internal/memory_db.go
+++ b/server/internal/memory_db.go
@@ -3,7 +3,6 @@ package internal
import (
"encoding/gob"
"errors"
- "log/slog"
"os"
"path/filepath"
"sync"
@@ -92,7 +91,7 @@ func (m *MemoryDB) Persist() error {
}
// Restore a persisted state
-func (m *MemoryDB) Restore(mq *MessageQueue, logger *slog.Logger) {
+func (m *MemoryDB) Restore(mq *MessageQueue) {
fd, err := os.Open("session.dat")
if err != nil {
return
@@ -112,7 +111,6 @@ func (m *MemoryDB) Restore(mq *MessageQueue, logger *slog.Logger) {
Progress: proc.Progress,
Output: proc.Output,
Params: proc.Params,
- Logger: logger,
}
m.table.Store(proc.Id, restored)
diff --git a/server/internal/message_queue.go b/server/internal/message_queue.go
index 1e00143..2f54a9c 100644
--- a/server/internal/message_queue.go
+++ b/server/internal/message_queue.go
@@ -15,14 +15,13 @@ const queueName = "process:pending"
type MessageQueue struct {
concurrency int
eventBus evbus.Bus
- logger *slog.Logger
}
// Creates a new message queue.
// By default it will be created with a size equals to nthe number of logical
// CPU cores -1.
// The queue size can be set via the qs flag.
-func NewMessageQueue(l *slog.Logger) (*MessageQueue, error) {
+func NewMessageQueue() (*MessageQueue, error) {
qs := config.Instance().QueueSize
if qs <= 0 {
@@ -32,7 +31,6 @@ func NewMessageQueue(l *slog.Logger) (*MessageQueue, error) {
return &MessageQueue{
concurrency: qs,
eventBus: evbus.New(),
- logger: l,
}, nil
}
@@ -59,7 +57,7 @@ func (m *MessageQueue) downloadConsumer() {
sem.Acquire(context.Background(), 1)
defer sem.Release(1)
- m.logger.Info("received process from event bus",
+ slog.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "downloadConsumer"),
slog.String("id", p.getShortId()),
@@ -69,7 +67,7 @@ func (m *MessageQueue) downloadConsumer() {
p.Start()
}
- m.logger.Info("started process",
+ slog.Info("started process",
slog.String("bus", queueName),
slog.String("id", p.getShortId()),
)
@@ -88,14 +86,14 @@ func (m *MessageQueue) metadataSubscriber() {
sem.Acquire(context.TODO(), 1)
defer sem.Release(1)
- m.logger.Info("received process from event bus",
+ slog.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "metadataConsumer"),
slog.String("id", p.getShortId()),
)
if p.Progress.Status == StatusCompleted {
- m.logger.Warn("proccess has an illegal state",
+ slog.Warn("proccess has an illegal state",
slog.String("id", p.getShortId()),
slog.Int("status", p.Progress.Status),
)
@@ -103,7 +101,7 @@ func (m *MessageQueue) metadataSubscriber() {
}
if err := p.SetMetadata(); err != nil {
- m.logger.Error("failed to retrieve metadata",
+ slog.Error("failed to retrieve metadata",
slog.String("id", p.getShortId()),
slog.String("err", err.Error()),
)
diff --git a/server/internal/playlist.go b/server/internal/playlist.go
index 2ebb99e..f3eed3e 100644
--- a/server/internal/playlist.go
+++ b/server/internal/playlist.go
@@ -19,7 +19,7 @@ type metadata struct {
Type string `json:"_type"`
}
-func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger *slog.Logger) error {
+func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
var (
downloader = config.Instance().DownloaderPath
cmd = exec.Command(downloader, req.URL, "--flat-playlist", "-J")
@@ -36,7 +36,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
return err
}
- logger.Info("decoding playlist metadata", slog.String("url", req.URL))
+ slog.Info("decoding playlist metadata", slog.String("url", req.URL))
if err := json.NewDecoder(stdout).Decode(&m); err != nil {
return err
@@ -46,7 +46,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
return err
}
- logger.Info("decoded playlist metadata", slog.String("url", req.URL))
+ slog.Info("decoded playlist metadata", slog.String("url", req.URL))
if m.Type == "" {
return errors.New("probably not a valid URL")
@@ -57,7 +57,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
return a.URL == b.URL
})
- logger.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries)))
+ slog.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries)))
for i, meta := range entries {
// detect playlist title from metadata since each playlist entry will be
@@ -78,7 +78,6 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
Output: DownloadOutput{Filename: req.Rename},
Info: meta,
Params: req.Params,
- Logger: logger,
}
proc.Info.URL = meta.URL
@@ -93,12 +92,11 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger
proc := &Process{
Url: req.URL,
Params: req.Params,
- Logger: logger,
}
db.Set(proc)
mq.Publish(proc)
- logger.Info("sending new process to message queue", slog.String("url", proc.Url))
+ slog.Info("sending new process to message queue", slog.String("url", proc.Url))
return cmd.Wait()
}
diff --git a/server/internal/process.go b/server/internal/process.go
index 326c4af..f268e66 100644
--- a/server/internal/process.go
+++ b/server/internal/process.go
@@ -47,7 +47,6 @@ type Process struct {
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
- Logger *slog.Logger
}
// Starts spawns/forks a new yt-dlp process and parse its stdout.
@@ -108,7 +107,7 @@ func (p *Process) Start() {
r, err := cmd.StdoutPipe()
if err != nil {
- p.Logger.Error(
+ slog.Error(
"failed to connect to stdout",
slog.String("err", err.Error()),
)
@@ -116,7 +115,7 @@ func (p *Process) Start() {
}
if err := cmd.Start(); err != nil {
- p.Logger.Error(
+ slog.Error(
"failed to start yt-dlp process",
slog.String("err", err.Error()),
)
@@ -167,7 +166,7 @@ func (p *Process) Start() {
ETA: progress.Eta,
}
- p.Logger.Info("progress",
+ slog.Info("progress",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("percentage", progress.Percentage),
@@ -190,7 +189,7 @@ func (p *Process) Complete() {
ETA: 0,
}
- p.Logger.Info("finished",
+ slog.Info("finished",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
@@ -227,7 +226,7 @@ func (p *Process) GetFormats() (DownloadFormats, error) {
stdout, err := cmd.Output()
if err != nil {
- p.Logger.Error("failed to retrieve metadata", slog.String("err", err.Error()))
+ slog.Error("failed to retrieve metadata", slog.String("err", err.Error()))
return DownloadFormats{}, err
}
@@ -247,7 +246,7 @@ func (p *Process) GetFormats() (DownloadFormats, error) {
p.Url,
)
- p.Logger.Info(
+ slog.Info(
"retrieving metadata",
slog.String("caller", "getFormats"),
slog.String("url", p.Url),
@@ -307,7 +306,7 @@ func (p *Process) SetMetadata() error {
stdout, err := cmd.StdoutPipe()
if err != nil {
- p.Logger.Error("failed to connect to stdout",
+ slog.Error("failed to connect to stdout",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
@@ -317,7 +316,7 @@ func (p *Process) SetMetadata() error {
stderr, err := cmd.StderrPipe()
if err != nil {
- p.Logger.Error("failed to connect to stderr",
+ slog.Error("failed to connect to stderr",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
@@ -340,7 +339,7 @@ func (p *Process) SetMetadata() error {
io.Copy(&bufferedStderr, stderr)
}()
- p.Logger.Info("retrieving metadata",
+ slog.Info("retrieving metadata",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
diff --git a/server/rest/common.go b/server/rest/common.go
index fcaf81b..c0d69f8 100644
--- a/server/rest/common.go
+++ b/server/rest/common.go
@@ -2,14 +2,12 @@ package rest
import (
"database/sql"
- "log/slog"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
)
type ContainerArgs struct {
- DB *sql.DB
- MDB *internal.MemoryDB
- MQ *internal.MessageQueue
- Logger *slog.Logger
+ DB *sql.DB
+ MDB *internal.MemoryDB
+ MQ *internal.MessageQueue
}
diff --git a/server/rest/provider.go b/server/rest/provider.go
index d4166d5..3ca408e 100644
--- a/server/rest/provider.go
+++ b/server/rest/provider.go
@@ -15,10 +15,9 @@ var (
func ProvideService(args *ContainerArgs) *Service {
serviceOnce.Do(func() {
service = &Service{
- mdb: args.MDB,
- db: args.DB,
- mq: args.MQ,
- logger: args.Logger,
+ mdb: args.MDB,
+ db: args.DB,
+ mq: args.MQ,
}
})
return service
diff --git a/server/rest/service.go b/server/rest/service.go
index 6047d8d..742485f 100644
--- a/server/rest/service.go
+++ b/server/rest/service.go
@@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"errors"
- "log/slog"
"os"
"os/exec"
"time"
@@ -15,10 +14,9 @@ import (
)
type Service struct {
- mdb *internal.MemoryDB
- db *sql.DB
- mq *internal.MessageQueue
- logger *slog.Logger
+ mdb *internal.MemoryDB
+ db *sql.DB
+ mq *internal.MessageQueue
}
func (s *Service) Exec(req internal.DownloadRequest) (string, error) {
@@ -29,7 +27,6 @@ func (s *Service) Exec(req internal.DownloadRequest) (string, error) {
Path: req.Path,
Filename: req.Rename,
},
- Logger: s.logger,
}
id := s.mdb.Set(p)
diff --git a/server/rpc/container.go b/server/rpc/container.go
index b1ab11c..ae54cd4 100644
--- a/server/rpc/container.go
+++ b/server/rpc/container.go
@@ -1,25 +1,20 @@
package rpc
import (
- "log/slog"
-
"github.com/go-chi/chi/v5"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
+ "github.com/marcopeocchi/yt-dlp-web-ui/server/internal/livestream"
middlewares "github.com/marcopeocchi/yt-dlp-web-ui/server/middleware"
"github.com/marcopeocchi/yt-dlp-web-ui/server/openid"
)
// Dependency injection container.
-func Container(
- db *internal.MemoryDB,
- mq *internal.MessageQueue,
- logger *slog.Logger,
-) *Service {
+func Container(db *internal.MemoryDB, mq *internal.MessageQueue, lm *livestream.Monitor) *Service {
return &Service{
- db: db,
- mq: mq,
- logger: logger,
+ db: db,
+ mq: mq,
+ lm: lm,
}
}
diff --git a/server/rpc/service.go b/server/rpc/service.go
index cd710a4..99bbd83 100644
--- a/server/rpc/service.go
+++ b/server/rpc/service.go
@@ -5,14 +5,15 @@ import (
"log/slog"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
+ "github.com/marcopeocchi/yt-dlp-web-ui/server/internal/livestream"
"github.com/marcopeocchi/yt-dlp-web-ui/server/sys"
"github.com/marcopeocchi/yt-dlp-web-ui/server/updater"
)
type Service struct {
- db *internal.MemoryDB
- mq *internal.MessageQueue
- logger *slog.Logger
+ db *internal.MemoryDB
+ mq *internal.MessageQueue
+ lm *livestream.Monitor
}
type Running []internal.ProcessResponse
@@ -36,7 +37,6 @@ func (s *Service) Exec(args internal.DownloadRequest, result *string) error {
Path: args.Path,
Filename: args.Rename,
},
- Logger: s.logger,
}
s.db.Set(p)
@@ -49,7 +49,7 @@ func (s *Service) Exec(args internal.DownloadRequest, result *string) error {
// Exec spawns a Process.
// The result of the execution is the newly spawned process Id.
func (s *Service) ExecPlaylist(args internal.DownloadRequest, result *string) error {
- err := internal.PlaylistDetect(args, s.mq, s.db, s.logger)
+ err := internal.PlaylistDetect(args, s.mq, s.db)
if err != nil {
return err
}
@@ -58,6 +58,38 @@ func (s *Service) ExecPlaylist(args internal.DownloadRequest, result *string) er
return nil
}
+// TODO: docs
+func (s *Service) ExecLivestream(args internal.DownloadRequest, result *string) error {
+ s.lm.Add(args.URL)
+
+ *result = args.URL
+ return nil
+}
+
+// TODO: docs
+func (s *Service) ProgressLivestream(args NoArgs, result *livestream.LiveStreamStatus) error {
+ *result = s.lm.Status()
+ return nil
+}
+
+// TODO: docs
+func (s *Service) KillLivestream(args string, result *struct{}) error {
+ slog.Info("killing livestream", slog.String("url", args))
+
+ err := s.lm.Remove(args)
+ if err != nil {
+ slog.Error("failed killing livestream", slog.String("url", args), slog.Any("err", err))
+ return err
+ }
+
+ return nil
+}
+
+// TODO: docs
+func (s *Service) KillAllLivestream(args NoArgs, result *struct{}) error {
+ return s.lm.RemoveAll()
+}
+
// Progess retrieves the Progress of a specific Process given its Id
func (s *Service) Progess(args Args, progress *internal.DownloadProgress) error {
proc, err := s.db.Get(args.Id)
@@ -73,7 +105,7 @@ func (s *Service) Progess(args Args, progress *internal.DownloadProgress) error
func (s *Service) Formats(args Args, meta *internal.DownloadFormats) error {
var (
err error
- p = internal.Process{Url: args.URL, Logger: s.logger}
+ p = internal.Process{Url: args.URL}
)
*meta, err = p.GetFormats()
return err
@@ -93,7 +125,7 @@ func (s *Service) Running(args NoArgs, running *Running) error {
// Kill kills a process given its id and remove it from the memoryDB
func (s *Service) Kill(args string, killed *string) error {
- s.logger.Info("Trying killing process with id", slog.String("id", args))
+ slog.Info("Trying killing process with id", slog.String("id", args))
proc, err := s.db.Get(args)
if err != nil {
@@ -105,12 +137,12 @@ func (s *Service) Kill(args string, killed *string) error {
}
if err := proc.Kill(); err != nil {
- s.logger.Info("failed killing process", slog.String("id", proc.Id), slog.Any("err", err))
+ slog.Info("failed killing process", slog.String("id", proc.Id), slog.Any("err", err))
return err
}
s.db.Delete(proc.Id)
- s.logger.Info("succesfully killed process", slog.String("id", proc.Id))
+ slog.Info("succesfully killed process", slog.String("id", proc.Id))
return nil
}
@@ -118,7 +150,7 @@ func (s *Service) Kill(args string, killed *string) error {
// KillAll kills all process unconditionally and removes them from
// the memory db
func (s *Service) KillAll(args NoArgs, killed *string) error {
- s.logger.Info("Killing all spawned processes")
+ slog.Info("Killing all spawned processes")
var (
keys = s.db.Keys()
@@ -140,7 +172,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
}
if err := removeFunc(proc); err != nil {
- s.logger.Info(
+ slog.Info(
"failed killing process",
slog.String("id", proc.Id),
slog.Any("err", err),
@@ -148,7 +180,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
continue
}
- s.logger.Info("succesfully killed process", slog.String("id", proc.Id))
+ slog.Info("succesfully killed process", slog.String("id", proc.Id))
}
return nil
@@ -156,7 +188,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
// Remove a process from the db rendering it unusable if active
func (s *Service) Clear(args string, killed *string) error {
- s.logger.Info("Clearing process with id", slog.String("id", args))
+ slog.Info("Clearing process with id", slog.String("id", args))
s.db.Delete(args)
return nil
}
@@ -190,16 +222,16 @@ func (s *Service) DirectoryTree(args NoArgs, tree *[]string) error {
// Updates the yt-dlp binary using its builtin function
func (s *Service) UpdateExecutable(args NoArgs, updated *bool) error {
- s.logger.Info("Updating yt-dlp executable to the latest release")
+ slog.Info("Updating yt-dlp executable to the latest release")
if err := updater.UpdateExecutable(); err != nil {
- s.logger.Error("Failed updating yt-dlp")
+ slog.Error("Failed updating yt-dlp")
*updated = false
return err
}
*updated = true
- s.logger.Info("Succesfully updated yt-dlp")
+ slog.Info("Succesfully updated yt-dlp")
return nil
}
diff --git a/server/server.go b/server/server.go
index 65e7e06..9b8ebc6 100644
--- a/server/server.go
+++ b/server/server.go
@@ -1,3 +1,4 @@
+// a stupid package name...
package server
import (
@@ -22,6 +23,7 @@ import (
"github.com/marcopeocchi/yt-dlp-web-ui/server/dbutil"
"github.com/marcopeocchi/yt-dlp-web-ui/server/handlers"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
+ "github.com/marcopeocchi/yt-dlp-web-ui/server/internal/livestream"
"github.com/marcopeocchi/yt-dlp-web-ui/server/logging"
middlewares "github.com/marcopeocchi/yt-dlp-web-ui/server/middleware"
"github.com/marcopeocchi/yt-dlp-web-ui/server/openid"
@@ -44,7 +46,6 @@ type RunConfig struct {
type serverConfig struct {
frontend fs.FS
swagger fs.FS
- logger *slog.Logger
host string
port int
mdb *internal.MemoryDB
@@ -57,9 +58,10 @@ func RunBlocking(cfg *RunConfig) {
logWriters := []io.Writer{
os.Stdout,
- logging.NewObservableLogger(),
+ logging.NewObservableLogger(), // for web-ui
}
+ // file based logging
if cfg.FileLogging {
logger, err := logging.NewRotableLogger(cfg.LogFile)
if err != nil {
@@ -76,31 +78,33 @@ func RunBlocking(cfg *RunConfig) {
logWriters = append(logWriters, logger)
}
- logger := slog.New(
- slog.NewTextHandler(io.MultiWriter(logWriters...), &slog.HandlerOptions{}),
- )
+ logger := slog.New(slog.NewTextHandler(io.MultiWriter(logWriters...), &slog.HandlerOptions{
+ Level: slog.LevelInfo, // TODO: detect when launched in debug mode -> slog.LevelDebug
+ }))
+
+ // make the new logger the default one with all the new writers
+ slog.SetDefault(logger)
db, err := sql.Open("sqlite", cfg.DBPath)
if err != nil {
- logger.Error("failed to open database", slog.String("err", err.Error()))
+ slog.Error("failed to open database", slog.String("err", err.Error()))
}
if err := dbutil.Migrate(context.Background(), db); err != nil {
- logger.Error("failed to init database", slog.String("err", err.Error()))
+ slog.Error("failed to init database", slog.String("err", err.Error()))
}
- mq, err := internal.NewMessageQueue(logger)
+ mq, err := internal.NewMessageQueue()
if err != nil {
panic(err)
}
mq.SetupConsumers()
- go mdb.Restore(mq, logger)
+ go mdb.Restore(mq)
srv := newServer(serverConfig{
frontend: cfg.App,
swagger: cfg.Swagger,
- logger: logger,
host: cfg.Host,
port: cfg.Port,
mdb: &mdb,
@@ -109,13 +113,14 @@ func RunBlocking(cfg *RunConfig) {
})
go gracefulShutdown(srv, &mdb)
- go autoPersist(time.Minute*5, &mdb, logger)
+ go autoPersist(time.Minute*5, &mdb)
var (
network = "tcp"
address = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
)
+ // support unix sockets
if strings.HasPrefix(cfg.Host, "/") {
network = "unix"
address = cfg.Host
@@ -123,19 +128,30 @@ func RunBlocking(cfg *RunConfig) {
listener, err := net.Listen(network, address)
if err != nil {
- logger.Error("failed to listen", slog.String("err", err.Error()))
+ slog.Error("failed to listen", slog.String("err", err.Error()))
return
}
- logger.Info("yt-dlp-webui started", slog.String("address", address))
+ slog.Info("yt-dlp-webui started", slog.String("address", address))
if err := srv.Serve(listener); err != nil {
- logger.Warn("http server stopped", slog.String("err", err.Error()))
+ slog.Warn("http server stopped", slog.String("err", err.Error()))
}
}
func newServer(c serverConfig) *http.Server {
- service := ytdlpRPC.Container(c.mdb, c.mq, c.logger)
+ lm := livestream.NewMonitor()
+ go lm.Schedule()
+ go lm.Restore()
+
+ go func() {
+ for {
+ lm.Persist()
+ time.Sleep(time.Minute * 5)
+ }
+ }()
+
+ service := ytdlpRPC.Container(c.mdb, c.mq, lm)
rpc.Register(service)
r := chi.NewRouter()
@@ -196,10 +212,9 @@ func newServer(c serverConfig) *http.Server {
// REST API handlers
r.Route("/api/v1", rest.ApplyRouter(&rest.ContainerArgs{
- DB: c.db,
- MDB: c.mdb,
- MQ: c.mq,
- Logger: c.logger,
+ DB: c.db,
+ MDB: c.mdb,
+ MQ: c.mq,
}))
// Logging
@@ -227,15 +242,15 @@ func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) {
}()
}
-func autoPersist(d time.Duration, db *internal.MemoryDB, logger *slog.Logger) {
+func autoPersist(d time.Duration, db *internal.MemoryDB) {
for {
if err := db.Persist(); err != nil {
- logger.Info(
+ slog.Warn(
"failed to persisted session",
slog.String("err", err.Error()),
)
}
- logger.Info("sucessfully persisted session")
+ slog.Debug("sucessfully persisted session")
time.Sleep(d)
}
}