diff --git a/server/archive/utils.go b/server/archive/utils.go new file mode 100644 index 0000000..89eaa40 --- /dev/null +++ b/server/archive/utils.go @@ -0,0 +1,58 @@ +package archive + +import ( + "bufio" + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" +) + +// Perform a search on the archive.txt file an determines if a download +// has already be done. +func DownloadExists(ctx context.Context, url string) (bool, error) { + cmd := exec.CommandContext( + ctx, + config.Instance().DownloaderPath, + "--print", + "%(extractor)s %(id)s", + url, + ) + stdout, err := cmd.Output() + if err != nil { + return false, err + } + + extractorAndURL := bytes.Trim(stdout, "\n") + + fd, err := os.Open(filepath.Join(config.Instance().Dir(), "archive.txt")) + if err != nil { + return false, err + } + defer fd.Close() + + scanner := bufio.NewScanner(fd) + + // search linearly for lower memory usage... + // the a pre-sorted with hashed values version of the archive.txt file can be loaded in memory + // and perform a binary search on it. + for scanner.Scan() { + if bytes.Equal(scanner.Bytes(), extractorAndURL) { + return true, nil + } + } + + // data, err := io.ReadAll(fd) + // if err != nil { + // return false, err + // } + + // slices.BinarySearchFunc(data, extractorAndURL, func(a []byte, b []byte) int { + // return hash(a).Compare(hash(b)) + // }) + + return false, nil +} diff --git a/server/subscription/task/runner.go b/server/subscription/task/runner.go index 03a4812..e02f0b8 100644 --- a/server/subscription/task/runner.go +++ b/server/subscription/task/runner.go @@ -7,9 +7,9 @@ import ( "os/exec" "path/filepath" "regexp" - "strings" "time" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" @@ -49,11 +49,6 @@ func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRun } } -const ( - commandTemplate = "-I1 --flat-playlist --print webpage_url $1" - getVideoIdTemplate = "--print \"%(extractor)s %(id)s\" $1" -) - var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`) func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error { @@ -103,21 +98,21 @@ func (t *CronTaskRunner) StopTask(id string) error { return nil } -// Notify on a channel when a fetcher has completed +// Start a fetcher and notify on a channel when a fetcher has completed func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan struct{} { - events := make(chan struct{}) + completed := make(chan struct{}) // generator func go func() { for { sleepFor := t.fetcher(ctx, req) - events <- struct{}{} + completed <- struct{}{} time.Sleep(sleepFor) } }() - return events + return completed } // Perform the retrieval of the latest video of the channel. @@ -125,12 +120,15 @@ func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan s func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Duration { slog.Info("fetching latest video for channel", slog.String("channel", req.Subscription.URL)) - fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", req.Subscription.URL, 1), " ") + nextSchedule := time.Until(req.Schedule.Next(time.Now())) cmd := exec.CommandContext( ctx, config.Instance().DownloaderPath, - fetcherParams..., + "-I1", + "--flat-playlist", + "--print", "webpage_url", + req.Subscription.URL, ) stdout, err := cmd.Output() @@ -139,10 +137,16 @@ func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Dur return time.Duration(0) } - latestChannelURL := string(bytes.Trim(stdout, "\n")) + latestVideoURL := string(bytes.Trim(stdout, "\n")) + + // if the download exists there's not point in sending it into the message queue. + exists, err := archive.DownloadExists(ctx, latestVideoURL) + if exists && err == nil { + return nextSchedule + } p := &internal.Process{ - Url: latestChannelURL, + Url: latestVideoURL, Params: append( argsSplitterRe.FindAllString(req.Subscription.Params, 1), []string{ @@ -153,10 +157,8 @@ func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Dur AutoRemove: true, } - t.db.Set(p) - t.mq.Publish(p) - - nextSchedule := time.Until(req.Schedule.Next(time.Now())) + t.db.Set(p) // give it an id + t.mq.Publish(p) // send it to the message queue waiting to be processed slog.Info( "cron task runner next schedule",