diff --git a/server/subscription/service/service.go b/server/subscription/service/service.go index 1cca374..eb8bc77 100644 --- a/server/subscription/service/service.go +++ b/server/subscription/service/service.go @@ -53,6 +53,7 @@ func toDB(dto *domain.Subscription) data.Subscription { // Delete implements domain.Service. func (s *Service) Delete(ctx context.Context, id string) error { + s.runner.StopTask(id) return s.r.Delete(ctx, id) } diff --git a/server/subscription/task/runner.go b/server/subscription/task/runner.go index f4b5a7f..03a4812 100644 --- a/server/subscription/task/runner.go +++ b/server/subscription/task/runner.go @@ -19,10 +19,12 @@ import ( type TaskRunner interface { Submit(subcription *domain.Subscription) error Spawner(ctx context.Context) + StopTask(id string) error Recoverer() } -type taskPair struct { +type monitorTask struct { + Done chan struct{} Schedule cron.Schedule Subscription *domain.Subscription } @@ -31,20 +33,26 @@ type CronTaskRunner struct { mq *internal.MessageQueue db *internal.MemoryDB - tasks chan taskPair + tasks chan monitorTask errors chan error + + running map[string]*monitorTask } func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner { return &CronTaskRunner{ - mq: mq, - db: db, - tasks: make(chan taskPair), - errors: make(chan error), + mq: mq, + db: db, + tasks: make(chan monitorTask), + errors: make(chan error), + running: make(map[string]*monitorTask), } } -const commandTemplate = "-I1 --flat-playlist --print webpage_url $1" +const ( + commandTemplate = "-I1 --flat-playlist --print webpage_url $1" + getVideoIdTemplate = "--print \"%(extractor)s %(id)s\" $1" +) var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`) @@ -54,7 +62,8 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error { return err } - job := taskPair{ + job := monitorTask{ + Done: make(chan struct{}), Schedule: schedule, Subscription: subcription, } @@ -64,54 +73,100 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error { return nil } +// Handles the entire lifecylce of a monitor job. func (t *CronTaskRunner) Spawner(ctx context.Context) { - for task := range t.tasks { + for req := range t.tasks { + t.running[req.Subscription.Id] = &req // keep track of the current job + go func() { + ctx, cancel := context.WithCancel(ctx) // inject into the job's context a cancellation singal + fetcherEvents := t.doFetch(ctx, &req) // retrieve the channel of events of the job + for { - slog.Info("fetching latest video for channel", slog.String("channel", task.Subscription.URL)) - - fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", task.Subscription.URL, 1), " ") - - cmd := exec.CommandContext( - ctx, - config.Instance().DownloaderPath, - fetcherParams..., - ) - - stdout, err := cmd.Output() - if err != nil { - t.errors <- err + select { + case <-req.Done: + slog.Info("stopping cron job and removing schedule", slog.String("url", req.Subscription.URL)) + cancel() return + case <-fetcherEvents: + slog.Info("finished monitoring channel", slog.String("url", req.Subscription.URL)) } - - latestChannelURL := string(bytes.Trim(stdout, "\n")) - - p := &internal.Process{ - Url: latestChannelURL, - Params: append(argsSplitterRe.FindAllString(task.Subscription.Params, 1), []string{ - "--download-archive", - filepath.Join(config.Instance().Dir(), "archive.txt"), - }...), - AutoRemove: true, - } - - t.db.Set(p) - t.mq.Publish(p) - - nextSchedule := time.Until(task.Schedule.Next(time.Now())) - - slog.Info( - "cron task runner next schedule", - slog.String("url", task.Subscription.URL), - slog.Any("duration", nextSchedule), - ) - - time.Sleep(nextSchedule) } }() } } -func (t *CronTaskRunner) Recoverer() { - panic("Unimplemented") +// Stop a currently scheduled job +func (t *CronTaskRunner) StopTask(id string) error { + t.running[id].Done <- struct{}{} + delete(t.running, id) + return nil +} + +// Notify on a channel when a fetcher has completed +func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan struct{} { + events := make(chan struct{}) + + // generator func + go func() { + for { + sleepFor := t.fetcher(ctx, req) + events <- struct{}{} + + time.Sleep(sleepFor) + } + }() + + return events +} + +// Perform the retrieval of the latest video of the channel. +// Returns a time.Duration containing the amount of time to the next schedule. +func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Duration { + slog.Info("fetching latest video for channel", slog.String("channel", req.Subscription.URL)) + + fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", req.Subscription.URL, 1), " ") + + cmd := exec.CommandContext( + ctx, + config.Instance().DownloaderPath, + fetcherParams..., + ) + + stdout, err := cmd.Output() + if err != nil { + t.errors <- err + return time.Duration(0) + } + + latestChannelURL := string(bytes.Trim(stdout, "\n")) + + p := &internal.Process{ + Url: latestChannelURL, + Params: append( + argsSplitterRe.FindAllString(req.Subscription.Params, 1), + []string{ + "--break-on-existing", + "--download-archive", + filepath.Join(config.Instance().Dir(), "archive.txt"), + }...), + AutoRemove: true, + } + + t.db.Set(p) + t.mq.Publish(p) + + nextSchedule := time.Until(req.Schedule.Next(time.Now())) + + slog.Info( + "cron task runner next schedule", + slog.String("url", req.Subscription.URL), + slog.Any("duration", nextSchedule), + ) + + return nextSchedule +} + +func (t *CronTaskRunner) Recoverer() { + panic("unimplemented") }