subscriptions: prevent downloading already existing file

This commit is contained in:
2025-02-07 09:37:47 +01:00
parent eec72bb6e2
commit 761f26b387
2 changed files with 78 additions and 18 deletions

58
server/archive/utils.go Normal file
View File

@@ -0,0 +1,58 @@
package archive
import (
"bufio"
"bytes"
"context"
"os"
"os/exec"
"path/filepath"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
// Perform a search on the archive.txt file an determines if a download
// has already be done.
func DownloadExists(ctx context.Context, url string) (bool, error) {
cmd := exec.CommandContext(
ctx,
config.Instance().DownloaderPath,
"--print",
"%(extractor)s %(id)s",
url,
)
stdout, err := cmd.Output()
if err != nil {
return false, err
}
extractorAndURL := bytes.Trim(stdout, "\n")
fd, err := os.Open(filepath.Join(config.Instance().Dir(), "archive.txt"))
if err != nil {
return false, err
}
defer fd.Close()
scanner := bufio.NewScanner(fd)
// search linearly for lower memory usage...
// the a pre-sorted with hashed values version of the archive.txt file can be loaded in memory
// and perform a binary search on it.
for scanner.Scan() {
if bytes.Equal(scanner.Bytes(), extractorAndURL) {
return true, nil
}
}
// data, err := io.ReadAll(fd)
// if err != nil {
// return false, err
// }
// slices.BinarySearchFunc(data, extractorAndURL, func(a []byte, b []byte) int {
// return hash(a).Compare(hash(b))
// })
return false, nil
}

View File

@@ -7,9 +7,9 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings"
"time" "time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
@@ -49,11 +49,6 @@ func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRun
} }
} }
const (
commandTemplate = "-I1 --flat-playlist --print webpage_url $1"
getVideoIdTemplate = "--print \"%(extractor)s %(id)s\" $1"
)
var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`) var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`)
func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error { func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
@@ -103,21 +98,21 @@ func (t *CronTaskRunner) StopTask(id string) error {
return nil return nil
} }
// Notify on a channel when a fetcher has completed // Start a fetcher and notify on a channel when a fetcher has completed
func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan struct{} { func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan struct{} {
events := make(chan struct{}) completed := make(chan struct{})
// generator func // generator func
go func() { go func() {
for { for {
sleepFor := t.fetcher(ctx, req) sleepFor := t.fetcher(ctx, req)
events <- struct{}{} completed <- struct{}{}
time.Sleep(sleepFor) time.Sleep(sleepFor)
} }
}() }()
return events return completed
} }
// Perform the retrieval of the latest video of the channel. // Perform the retrieval of the latest video of the channel.
@@ -125,12 +120,15 @@ func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan s
func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Duration { func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Duration {
slog.Info("fetching latest video for channel", slog.String("channel", req.Subscription.URL)) slog.Info("fetching latest video for channel", slog.String("channel", req.Subscription.URL))
fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", req.Subscription.URL, 1), " ") nextSchedule := time.Until(req.Schedule.Next(time.Now()))
cmd := exec.CommandContext( cmd := exec.CommandContext(
ctx, ctx,
config.Instance().DownloaderPath, config.Instance().DownloaderPath,
fetcherParams..., "-I1",
"--flat-playlist",
"--print", "webpage_url",
req.Subscription.URL,
) )
stdout, err := cmd.Output() stdout, err := cmd.Output()
@@ -139,10 +137,16 @@ func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Dur
return time.Duration(0) return time.Duration(0)
} }
latestChannelURL := string(bytes.Trim(stdout, "\n")) latestVideoURL := string(bytes.Trim(stdout, "\n"))
// if the download exists there's not point in sending it into the message queue.
exists, err := archive.DownloadExists(ctx, latestVideoURL)
if exists && err == nil {
return nextSchedule
}
p := &internal.Process{ p := &internal.Process{
Url: latestChannelURL, Url: latestVideoURL,
Params: append( Params: append(
argsSplitterRe.FindAllString(req.Subscription.Params, 1), argsSplitterRe.FindAllString(req.Subscription.Params, 1),
[]string{ []string{
@@ -153,10 +157,8 @@ func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Dur
AutoRemove: true, AutoRemove: true,
} }
t.db.Set(p) t.db.Set(p) // give it an id
t.mq.Publish(p) t.mq.Publish(p) // send it to the message queue waiting to be processed
nextSchedule := time.Until(req.Schedule.Next(time.Now()))
slog.Info( slog.Info(
"cron task runner next schedule", "cron task runner next schedule",