diff --git a/frontend/src/atoms/downloads.ts b/frontend/src/atoms/downloads.ts index c3ca2e0..cedf6e3 100644 --- a/frontend/src/atoms/downloads.ts +++ b/frontend/src/atoms/downloads.ts @@ -1,7 +1,22 @@ -import { atom } from 'recoil' +import * as O from 'fp-ts/Option' +import { pipe } from 'fp-ts/lib/function' +import { atom, selector } from 'recoil' import { RPCResult } from '../types' -export const activeDownloadsState = atom({ +export const downloadsState = atom>({ + key: 'downloadsState', + default: O.none +}) + +export const loadingDownloadsState = selector({ + key: 'loadingDownloadsState', + get: ({ get }) => O.isNone(get(downloadsState)) +}) + +export const activeDownloadsState = selector({ key: 'activeDownloadsState', - default: undefined + get: ({ get }) => pipe( + get(downloadsState), + O.getOrElse(() => new Array()) + ) }) \ No newline at end of file diff --git a/frontend/src/components/Downloads.tsx b/frontend/src/components/Downloads.tsx index fe6f1aa..6b49797 100644 --- a/frontend/src/components/Downloads.tsx +++ b/frontend/src/components/Downloads.tsx @@ -1,6 +1,6 @@ import { useEffect } from 'react' import { useRecoilState, useRecoilValue } from 'recoil' -import { activeDownloadsState } from '../atoms/downloads' +import { loadingDownloadsState } from '../atoms/downloads' import { listViewState } from '../atoms/settings' import { loadingAtom } from '../atoms/ui' import DownloadsCardView from './DownloadsCardView' @@ -8,15 +8,17 @@ import DownloadsListView from './DownloadsListView' const Downloads: React.FC = () => { const listView = useRecoilValue(listViewState) - const active = useRecoilValue(activeDownloadsState) + const loadingDownloads = useRecoilValue(loadingDownloadsState) const [isLoading, setIsLoading] = useRecoilState(loadingAtom) useEffect(() => { - if (active) { - setIsLoading(false) + if (loadingDownloads) { + setIsLoading(true) + return } - }, [active?.length, isLoading]) + setIsLoading(false) + }, [loadingDownloads, isLoading]) if (listView) { return ( diff --git a/frontend/src/components/DownloadsCardView.tsx b/frontend/src/components/DownloadsCardView.tsx index d2fc404..1fb6668 100644 --- a/frontend/src/components/DownloadsCardView.tsx +++ b/frontend/src/components/DownloadsCardView.tsx @@ -7,7 +7,7 @@ import { useRPC } from '../hooks/useRPC' import DownloadCard from './DownloadCard' const DownloadsCardView: React.FC = () => { - const downloads = useRecoilValue(activeDownloadsState) ?? [] + const downloads = useRecoilValue(activeDownloadsState) const { i18n } = useI18n() const { client } = useRPC() diff --git a/frontend/src/components/DownloadsListView.tsx b/frontend/src/components/DownloadsListView.tsx index 2324134..36963ea 100644 --- a/frontend/src/components/DownloadsListView.tsx +++ b/frontend/src/components/DownloadsListView.tsx @@ -18,7 +18,7 @@ import { ellipsis, formatSpeedMiB, roundMiB } from "../utils" const DownloadsListView: React.FC = () => { - const downloads = useRecoilValue(activeDownloadsState) ?? [] + const downloads = useRecoilValue(activeDownloadsState) const { client } = useRPC() diff --git a/frontend/src/components/SocketSubscriber.tsx b/frontend/src/components/SocketSubscriber.tsx index 3252aab..543ee9e 100644 --- a/frontend/src/components/SocketSubscriber.tsx +++ b/frontend/src/components/SocketSubscriber.tsx @@ -1,7 +1,8 @@ +import * as O from 'fp-ts/Option' import { useMemo } from 'react' import { useRecoilState, useRecoilValue } from 'recoil' import { interval, share, take } from 'rxjs' -import { activeDownloadsState } from '../atoms/downloads' +import { downloadsState } from '../atoms/downloads' import { serverAddressAndPortState } from '../atoms/settings' import { connectedState } from '../atoms/status' import { useSubscription } from '../hooks/observable' @@ -14,7 +15,7 @@ interface Props extends React.HTMLAttributes { } const SocketSubscriber: React.FC = ({ children }) => { const [, setIsConnected] = useRecoilState(connectedState) - const [, setActive] = useRecoilState(activeDownloadsState) + const [, setDownloads] = useRecoilState(downloadsState) const serverAddressAndPort = useRecoilValue(serverAddressAndPortState) @@ -38,14 +39,18 @@ const SocketSubscriber: React.FC = ({ children }) => { if (!isRPCResponse(event)) { return } if (!Array.isArray(event.result)) { return } - setActive( - (event.result || []) - .filter(f => !!f.info.url) - .sort((a, b) => datetimeCompareFunc( - b.info.created_at, - a.info.created_at, - )) - ) + if (event.result) { + return setDownloads( + O.of(event.result + .filter(f => !!f.info.url).sort((a, b) => datetimeCompareFunc( + b.info.created_at, + a.info.created_at, + )) + ) + ) + } + + setDownloads(O.none) }, (err) => { console.error(err) diff --git a/server/internal/process.go b/server/internal/process.go index 4e0ab2e..711f6a8 100644 --- a/server/internal/process.go +++ b/server/internal/process.go @@ -17,6 +17,7 @@ import ( "github.com/marcopeocchi/fazzoletti/slices" "github.com/marcopeocchi/yt-dlp-web-ui/server/cli" "github.com/marcopeocchi/yt-dlp-web-ui/server/config" + "github.com/marcopeocchi/yt-dlp-web-ui/server/rx" ) const template = `download: @@ -81,6 +82,7 @@ func (p *Process) Start() { if p.Output.Path != "" { out.Path = p.Output.Path } + if p.Output.Filename != "" { out.Filename = p.Output.Filename + ".%(ext)s" } @@ -113,11 +115,11 @@ func (p *Process) Start() { p.proc = cmd.Process - // ----------------- info block ----------------- // - // spawn a goroutine that retrieves the info for the download - // --------------- progress block --------------- // - // unbuffered channel connected to stdout + var ( + sourceChan = make(chan []byte) + doneChan = make(chan struct{}) + ) // spawn a goroutine that does the dirty job of parsing the stdout // filling the channel with as many stdout line as yt-dlp produces (producer) @@ -125,9 +127,19 @@ func (p *Process) Start() { defer func() { r.Close() p.Complete() + doneChan <- struct{}{} + close(sourceChan) + close(doneChan) }() for scan.Scan() { + sourceChan <- scan.Bytes() + } + }() + + // Slows down the unmarshal operation to every 500ms + go func() { + rx.Sample(time.Millisecond*500, sourceChan, doneChan, func(event []byte) { stdout := ProgressTemplate{} err := json.Unmarshal(scan.Bytes(), &stdout) if err == nil { @@ -143,7 +155,7 @@ func (p *Process) Start() { p.Url, stdout.Percentage, ) } - } + }) }() // ------------- end progress block ------------- // diff --git a/server/rx/extensions.go b/server/rx/extensions.go index 54600d0..1a994d5 100644 --- a/server/rx/extensions.go +++ b/server/rx/extensions.go @@ -2,47 +2,19 @@ package rx import "time" -/* - Package rx contains: - - Definitions for common reactive programming functions/patterns -*/ - -// ReactiveX inspired debounce function. -// -// Debounce emits a string from the source channel only after a particular -// time span determined a Go Interval -// -// --A--B--CD--EFG-------|> -// -// -t-> |> -// -t-> |> t is a timer tick -// -t-> |> -// -// --A-----C-----G-------|> -func Debounce(interval time.Duration, source chan string, f func(emit string)) { - var item string - timer := time.NewTimer(interval) - for { - select { - case item = <-source: - timer.Reset(interval) - case <-timer.C: - if item != "" { - f(item) - } - } - } -} - // ReactiveX inspired sample function. // // Debounce emits the most recently emitted value from the source // withing the timespan set by the span time.Duration -func Sample[T any](span time.Duration, source chan T, cb func(emit T)) { - timer := time.NewTimer(span) +func Sample[T any](span time.Duration, source chan T, done chan struct{}, fn func(e T)) { + ticker := time.NewTicker(span) for { - <-timer.C - cb(<-source) - timer.Reset(span) + select { + case <-ticker.C: + fn(<-source) + case <-done: + ticker.Stop() + return + } } }