handle cancellation of scheduled cron jobs

This commit is contained in:
2025-02-06 19:28:03 +01:00
parent ceb92d066c
commit eec72bb6e2
2 changed files with 104 additions and 48 deletions

View File

@@ -53,6 +53,7 @@ func toDB(dto *domain.Subscription) data.Subscription {
// Delete implements domain.Service. // Delete implements domain.Service.
func (s *Service) Delete(ctx context.Context, id string) error { func (s *Service) Delete(ctx context.Context, id string) error {
s.runner.StopTask(id)
return s.r.Delete(ctx, id) return s.r.Delete(ctx, id)
} }

View File

@@ -19,10 +19,12 @@ import (
type TaskRunner interface { type TaskRunner interface {
Submit(subcription *domain.Subscription) error Submit(subcription *domain.Subscription) error
Spawner(ctx context.Context) Spawner(ctx context.Context)
StopTask(id string) error
Recoverer() Recoverer()
} }
type taskPair struct { type monitorTask struct {
Done chan struct{}
Schedule cron.Schedule Schedule cron.Schedule
Subscription *domain.Subscription Subscription *domain.Subscription
} }
@@ -31,20 +33,26 @@ type CronTaskRunner struct {
mq *internal.MessageQueue mq *internal.MessageQueue
db *internal.MemoryDB db *internal.MemoryDB
tasks chan taskPair tasks chan monitorTask
errors chan error errors chan error
running map[string]*monitorTask
} }
func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner { func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner {
return &CronTaskRunner{ return &CronTaskRunner{
mq: mq, mq: mq,
db: db, db: db,
tasks: make(chan taskPair), tasks: make(chan monitorTask),
errors: make(chan error), errors: make(chan error),
running: make(map[string]*monitorTask),
} }
} }
const commandTemplate = "-I1 --flat-playlist --print webpage_url $1" const (
commandTemplate = "-I1 --flat-playlist --print webpage_url $1"
getVideoIdTemplate = "--print \"%(extractor)s %(id)s\" $1"
)
var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`) var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`)
@@ -54,7 +62,8 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
return err return err
} }
job := taskPair{ job := monitorTask{
Done: make(chan struct{}),
Schedule: schedule, Schedule: schedule,
Subscription: subcription, Subscription: subcription,
} }
@@ -64,13 +73,59 @@ func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
return nil return nil
} }
// Handles the entire lifecylce of a monitor job.
func (t *CronTaskRunner) Spawner(ctx context.Context) { func (t *CronTaskRunner) Spawner(ctx context.Context) {
for task := range t.tasks { for req := range t.tasks {
t.running[req.Subscription.Id] = &req // keep track of the current job
go func() {
ctx, cancel := context.WithCancel(ctx) // inject into the job's context a cancellation singal
fetcherEvents := t.doFetch(ctx, &req) // retrieve the channel of events of the job
for {
select {
case <-req.Done:
slog.Info("stopping cron job and removing schedule", slog.String("url", req.Subscription.URL))
cancel()
return
case <-fetcherEvents:
slog.Info("finished monitoring channel", slog.String("url", req.Subscription.URL))
}
}
}()
}
}
// Stop a currently scheduled job
func (t *CronTaskRunner) StopTask(id string) error {
t.running[id].Done <- struct{}{}
delete(t.running, id)
return nil
}
// Notify on a channel when a fetcher has completed
func (t *CronTaskRunner) doFetch(ctx context.Context, req *monitorTask) <-chan struct{} {
events := make(chan struct{})
// generator func
go func() { go func() {
for { for {
slog.Info("fetching latest video for channel", slog.String("channel", task.Subscription.URL)) sleepFor := t.fetcher(ctx, req)
events <- struct{}{}
fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", task.Subscription.URL, 1), " ") time.Sleep(sleepFor)
}
}()
return events
}
// Perform the retrieval of the latest video of the channel.
// Returns a time.Duration containing the amount of time to the next schedule.
func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Duration {
slog.Info("fetching latest video for channel", slog.String("channel", req.Subscription.URL))
fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", req.Subscription.URL, 1), " ")
cmd := exec.CommandContext( cmd := exec.CommandContext(
ctx, ctx,
@@ -81,14 +136,17 @@ func (t *CronTaskRunner) Spawner(ctx context.Context) {
stdout, err := cmd.Output() stdout, err := cmd.Output()
if err != nil { if err != nil {
t.errors <- err t.errors <- err
return return time.Duration(0)
} }
latestChannelURL := string(bytes.Trim(stdout, "\n")) latestChannelURL := string(bytes.Trim(stdout, "\n"))
p := &internal.Process{ p := &internal.Process{
Url: latestChannelURL, Url: latestChannelURL,
Params: append(argsSplitterRe.FindAllString(task.Subscription.Params, 1), []string{ Params: append(
argsSplitterRe.FindAllString(req.Subscription.Params, 1),
[]string{
"--break-on-existing",
"--download-archive", "--download-archive",
filepath.Join(config.Instance().Dir(), "archive.txt"), filepath.Join(config.Instance().Dir(), "archive.txt"),
}...), }...),
@@ -98,20 +156,17 @@ func (t *CronTaskRunner) Spawner(ctx context.Context) {
t.db.Set(p) t.db.Set(p)
t.mq.Publish(p) t.mq.Publish(p)
nextSchedule := time.Until(task.Schedule.Next(time.Now())) nextSchedule := time.Until(req.Schedule.Next(time.Now()))
slog.Info( slog.Info(
"cron task runner next schedule", "cron task runner next schedule",
slog.String("url", task.Subscription.URL), slog.String("url", req.Subscription.URL),
slog.Any("duration", nextSchedule), slog.Any("duration", nextSchedule),
) )
time.Sleep(nextSchedule) return nextSchedule
}
}()
}
} }
func (t *CronTaskRunner) Recoverer() { func (t *CronTaskRunner) Recoverer() {
panic("Unimplemented") panic("unimplemented")
} }