support for cron based subscriptions management

This commit is contained in:
2025-02-04 13:58:58 +01:00
parent 016d8557e6
commit ff93bd552f
30 changed files with 1388 additions and 17 deletions

View File

@@ -49,6 +49,18 @@ func Migrate(ctx context.Context, db *sql.DB) error {
return err
}
if _, err := db.ExecContext(
ctx,
`CREATE TABLE IF NOT EXISTS subscriptions (
id CHAR(36) PRIMARY KEY,
url VARCHAR(2048) UNIQUE NOT NULL,
params TEXT NOT NULL,
cron TEXT
)`,
); err != nil {
return err
}
if lockFileExists() {
return nil
}

View File

@@ -3,6 +3,7 @@ package internal
import (
"encoding/gob"
"errors"
"log/slog"
"os"
"path/filepath"
"sync"
@@ -11,6 +12,8 @@ import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
var memDbEvents = make(chan *Process)
// In-Memory Thread-Safe Key-Value Storage with optional persistence
type MemoryDB struct {
table map[string]*Process
@@ -144,3 +147,12 @@ func (m *MemoryDB) Restore(mq *MessageQueue) {
}
}
}
func (m *MemoryDB) EventListener() {
for p := range memDbEvents {
if p.AutoRemove {
slog.Info("compacting MemoryDB", slog.String("id", p.Id))
m.Delete(p.Id)
}
}
}

View File

@@ -48,6 +48,7 @@ type Process struct {
Id string
Url string
Livestream bool
AutoRemove bool
Params []string
Info DownloadInfo
Progress DownloadProgress
@@ -253,6 +254,8 @@ func (p *Process) Complete() {
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
memDbEvents <- p
}
// Kill a process and remove it from the memory

View File

@@ -32,6 +32,8 @@ import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/rest"
ytdlpRPC "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/rpc"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user"
_ "modernc.org/sqlite"
@@ -107,6 +109,7 @@ func RunBlocking(rc *RunConfig) {
}
mq.SetupConsumers()
go mdb.Restore(mq)
go mdb.EventListener()
lm := livestream.NewMonitor(mq, mdb)
go lm.Schedule()
@@ -151,6 +154,9 @@ func RunBlocking(rc *RunConfig) {
func newServer(c serverConfig) *http.Server {
archiver.Register(c.db)
cronTaskRunner := task.NewCronTaskRunner(c.mq, c.mdb)
go cronTaskRunner.Spawner(context.TODO())
service := ytdlpRPC.Container(c.mdb, c.mq, c.lm)
rpc.Register(service)
@@ -226,6 +232,9 @@ func newServer(c serverConfig) *http.Server {
// Status
r.Route("/status", status.ApplyRouter(c.mdb))
// Subscriptions
r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter())
return &http.Server{Handler: r}
}

View File

@@ -0,0 +1,17 @@
package subscription
import (
"database/sql"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
)
func Container(db *sql.DB, runner task.TaskRunner) domain.RestHandler {
var (
r = provideRepository(db)
s = provideService(r, runner)
h = provideHandler(s)
)
return h
}

View File

@@ -0,0 +1,8 @@
package data
type Subscription struct {
Id string
URL string
Params string
CronExpr string
}

View File

@@ -0,0 +1,47 @@
package domain
import (
"context"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/data"
)
type Subscription struct {
Id string `json:"id"`
URL string `json:"url"`
Params string `json:"params"`
CronExpr string `json:"cron_expression"`
}
type PaginatedResponse[T any] struct {
First int64 `json:"first"`
Next int64 `json:"next"`
Data T `json:"data"`
}
type Repository interface {
Submit(ctx context.Context, sub *data.Subscription) (*data.Subscription, error)
List(ctx context.Context, start int64, limit int) (*[]data.Subscription, error)
UpdateByExample(ctx context.Context, example *data.Subscription) error
Delete(ctx context.Context, id string) error
GetCursor(ctx context.Context, id string) (int64, error)
}
type Service interface {
Submit(ctx context.Context, sub *Subscription) (*Subscription, error)
List(ctx context.Context, start int64, limit int) (*PaginatedResponse[[]Subscription], error)
UpdateByExample(ctx context.Context, example *Subscription) error
Delete(ctx context.Context, id string) error
GetCursor(ctx context.Context, id string) (int64, error)
}
type RestHandler interface {
Submit() http.HandlerFunc
List() http.HandlerFunc
UpdateByExample() http.HandlerFunc
Delete() http.HandlerFunc
GetCursor() http.HandlerFunc
ApplyRouter() func(chi.Router)
}

View File

@@ -0,0 +1,43 @@
package subscription
import (
"database/sql"
"sync"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/repository"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/rest"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/service"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
)
var (
repo domain.Repository
svc domain.Service
hand domain.RestHandler
repoOnce sync.Once
svcOnce sync.Once
handOnce sync.Once
)
func provideRepository(db *sql.DB) domain.Repository {
repoOnce.Do(func() {
repo = repository.New(db)
})
return repo
}
func provideService(r domain.Repository, runner task.TaskRunner) domain.Service {
svcOnce.Do(func() {
svc = service.New(r, runner)
})
return svc
}
func provideHandler(s domain.Service) domain.RestHandler {
handOnce.Do(func() {
hand = rest.New(s)
})
return hand
}

View File

@@ -0,0 +1,133 @@
package repository
import (
"context"
"database/sql"
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/data"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
)
type Repository struct {
db *sql.DB
}
// Delete implements domain.Repository.
func (r *Repository) Delete(ctx context.Context, id string) error {
conn, err := r.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.ExecContext(ctx, "DELETE FROM subscriptions WHERE id = ?", id)
return err
}
// GetCursor implements domain.Repository.
func (r *Repository) GetCursor(ctx context.Context, id string) (int64, error) {
conn, err := r.db.Conn(ctx)
if err != nil {
return -1, err
}
defer conn.Close()
row := conn.QueryRowContext(ctx, "SELECT rowid FROM subscriptions WHERE id = ?", id)
var rowId int64
if err := row.Scan(&rowId); err != nil {
return -1, err
}
return rowId, nil
}
// List implements domain.Repository.
func (r *Repository) List(ctx context.Context, start int64, limit int) (*[]data.Subscription, error) {
conn, err := r.db.Conn(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
var elements []data.Subscription
rows, err := conn.QueryContext(ctx, "SELECT rowid, * FROM subscriptions WHERE rowid > ? LIMIT ?", start, limit)
if err != nil {
return nil, err
}
for rows.Next() {
var rowId int64
var element data.Subscription
if err := rows.Scan(
&rowId,
&element.Id,
&element.URL,
&element.Params,
&element.CronExpr,
); err != nil {
return &elements, err
}
elements = append(elements, element)
}
return &elements, nil
}
// Submit implements domain.Repository.
func (r *Repository) Submit(ctx context.Context, sub *data.Subscription) (*data.Subscription, error) {
conn, err := r.db.Conn(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
_, err = conn.ExecContext(
ctx,
"INSERT INTO subscriptions (id, url, params, cron) VALUES (?, ?, ?, ?)",
uuid.NewString(),
sub.URL,
sub.Params,
sub.CronExpr,
)
return sub, err
}
// UpdateByExample implements domain.Repository.
func (r *Repository) UpdateByExample(ctx context.Context, example *data.Subscription) error {
conn, err := r.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.ExecContext(
ctx,
"UPDATE subscriptions SET url = ?, params = ?, cron = ? WHERE id = ? OR url = ?",
example.URL,
example.Params,
example.CronExpr,
example.Id,
example.URL,
)
return err
}
func New(db *sql.DB) domain.Repository {
return &Repository{
db: db,
}
}

View File

@@ -0,0 +1,168 @@
package rest
import (
"encoding/json"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
middlewares "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/middleware"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
)
type RestHandler struct {
svc domain.Service
}
// ApplyRouter implements domain.RestHandler.
func (h *RestHandler) ApplyRouter() func(chi.Router) {
return func(r chi.Router) {
if config.Instance().RequireAuth {
r.Use(middlewares.Authenticated)
}
if config.Instance().UseOpenId {
r.Use(openid.Middleware)
}
r.Delete("/{id}", h.Delete())
r.Get("/cursor", h.GetCursor())
r.Get("/", h.List())
r.Post("/", h.Submit())
r.Patch("/", h.UpdateByExample())
}
}
// Delete implements domain.RestHandler.
func (h *RestHandler) Delete() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
id := chi.URLParam(r, "id")
err := h.svc.Delete(r.Context(), id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode("ok"); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
// GetCursor implements domain.RestHandler.
func (h *RestHandler) GetCursor() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
id := chi.URLParam(r, "id")
cursorId, err := h.svc.GetCursor(r.Context(), id)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := json.NewEncoder(w).Encode(cursorId); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
// List implements domain.RestHandler.
func (h *RestHandler) List() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
var (
startParam = r.URL.Query().Get("id")
LimitParam = r.URL.Query().Get("limit")
)
start, err := strconv.Atoi(startParam)
if err != nil {
start = 0
}
limit, err := strconv.Atoi(LimitParam)
if err != nil {
limit = 50
}
res, err := h.svc.List(r.Context(), int64(start), limit)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(res); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
// Submit implements domain.RestHandler.
func (h *RestHandler) Submit() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
var req domain.Subscription
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
res, err := h.svc.Submit(r.Context(), &req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(res); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
// UpdateByExample implements domain.RestHandler.
func (h *RestHandler) UpdateByExample() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
var req domain.Subscription
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := h.svc.UpdateByExample(r.Context(), &req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func New(svc domain.Service) domain.RestHandler {
return &RestHandler{
svc: svc,
}
}

View File

@@ -0,0 +1,140 @@
package service
import (
"context"
"errors"
"math"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/data"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
"github.com/robfig/cron/v3"
)
type Service struct {
r domain.Repository
runner task.TaskRunner
}
func New(r domain.Repository, runner task.TaskRunner) domain.Service {
s := &Service{
r: r,
runner: runner,
}
// very crude recoverer
initial, _ := s.List(context.Background(), 0, math.MaxInt)
if initial != nil {
for _, v := range initial.Data {
s.runner.Submit(&v)
}
}
return s
}
func fromDB(model *data.Subscription) domain.Subscription {
return domain.Subscription{
Id: model.Id,
URL: model.URL,
Params: model.Params,
CronExpr: model.CronExpr,
}
}
func toDB(dto *domain.Subscription) data.Subscription {
return data.Subscription{
Id: dto.Id,
URL: dto.URL,
Params: dto.Params,
CronExpr: dto.CronExpr,
}
}
// Delete implements domain.Service.
func (s *Service) Delete(ctx context.Context, id string) error {
return s.r.Delete(ctx, id)
}
// GetCursor implements domain.Service.
func (s *Service) GetCursor(ctx context.Context, id string) (int64, error) {
return s.r.GetCursor(ctx, id)
}
// List implements domain.Service.
func (s *Service) List(ctx context.Context, start int64, limit int) (
*domain.PaginatedResponse[[]domain.Subscription],
error,
) {
dbSubs, err := s.r.List(ctx, start, limit)
if err != nil {
return nil, err
}
subs := make([]domain.Subscription, len(*dbSubs))
for i, v := range *dbSubs {
subs[i] = fromDB(&v)
}
var (
first int64
next int64
)
if len(subs) > 0 {
first, err = s.r.GetCursor(ctx, subs[0].Id)
if err != nil {
return nil, err
}
next, err = s.r.GetCursor(ctx, subs[len(subs)-1].Id)
if err != nil {
return nil, err
}
}
return &domain.PaginatedResponse[[]domain.Subscription]{
First: first,
Next: next,
Data: subs,
}, nil
}
// Submit implements domain.Service.
func (s *Service) Submit(ctx context.Context, sub *domain.Subscription) (*domain.Subscription, error) {
if sub.CronExpr == "" {
sub.CronExpr = "*/5 * * * *"
}
_, err := cron.ParseStandard(sub.CronExpr)
if err != nil {
return nil, errors.Join(errors.New("failed parsing cron expression"), err)
}
subDB, err := s.r.Submit(ctx, &data.Subscription{
URL: sub.URL,
Params: sub.Params,
CronExpr: sub.CronExpr,
})
retval := fromDB(subDB)
if err := s.runner.Submit(sub); err != nil {
return nil, err
}
return &retval, err
}
// UpdateByExample implements domain.Service.
func (s *Service) UpdateByExample(ctx context.Context, example *domain.Subscription) error {
_, err := cron.ParseStandard(example.CronExpr)
if err != nil {
return errors.Join(errors.New("failed parsing cron expression"), err)
}
e := toDB(example)
return s.r.UpdateByExample(ctx, &e)
}

View File

@@ -0,0 +1 @@
package subscription

View File

@@ -0,0 +1,117 @@
package task
import (
"bytes"
"context"
"log/slog"
"os/exec"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
"github.com/robfig/cron/v3"
)
type TaskRunner interface {
Submit(subcription *domain.Subscription) error
Spawner(ctx context.Context)
Recoverer()
}
type taskPair struct {
Schedule cron.Schedule
Subscription *domain.Subscription
}
type CronTaskRunner struct {
mq *internal.MessageQueue
db *internal.MemoryDB
tasks chan taskPair
errors chan error
}
func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner {
return &CronTaskRunner{
mq: mq,
db: db,
tasks: make(chan taskPair),
errors: make(chan error),
}
}
const commandTemplate = "-I1 --flat-playlist --print webpage_url $1"
var argsSplitterRe = regexp.MustCompile(`(?mi)[^\s"']+|"([^"]*)"|'([^']*)'`)
func (t *CronTaskRunner) Submit(subcription *domain.Subscription) error {
schedule, err := cron.ParseStandard(subcription.CronExpr)
if err != nil {
return err
}
job := taskPair{
Schedule: schedule,
Subscription: subcription,
}
t.tasks <- job
return nil
}
func (t *CronTaskRunner) Spawner(ctx context.Context) {
for task := range t.tasks {
go func() {
for {
slog.Info("fetching latest video for channel", slog.String("channel", task.Subscription.URL))
fetcherParams := strings.Split(strings.Replace(commandTemplate, "$1", task.Subscription.URL, 1), " ")
cmd := exec.CommandContext(
ctx,
config.Instance().DownloaderPath,
fetcherParams...,
)
stdout, err := cmd.Output()
if err != nil {
t.errors <- err
return
}
latestChannelURL := string(bytes.Trim(stdout, "\n"))
p := &internal.Process{
Url: latestChannelURL,
Params: append(argsSplitterRe.FindAllString(task.Subscription.Params, 1), []string{
"--download-archive",
filepath.Join(config.Instance().Dir(), "archive.txt"),
}...),
AutoRemove: true,
}
t.db.Set(p)
t.mq.Publish(p)
nextSchedule := time.Until(task.Schedule.Next(time.Now()))
slog.Info(
"cron task runner next schedule",
slog.String("url", task.Subscription.URL),
slog.Any("duration", nextSchedule),
)
time.Sleep(nextSchedule)
}
}()
}
}
func (t *CronTaskRunner) Recoverer() {
panic("Unimplemented")
}