diff --git a/go.mod b/go.mod index 9fdaf70..33ca57a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/marcopiovanello/yt-dlp-web-ui/v3 go 1.24 require ( - github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/coreos/go-oidc/v3 v3.15.0 github.com/go-chi/chi/v5 v5.2.3 github.com/go-chi/cors v1.2.2 @@ -11,14 +10,26 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/robfig/cron/v3 v3.0.1 + github.com/spf13/viper v1.20.1 go.etcd.io/bbolt v1.4.3 + golang.org/x/crypto v0.41.0 golang.org/x/oauth2 v0.30.0 - golang.org/x/sync v0.16.0 golang.org/x/sys v0.35.0 - gopkg.in/yaml.v3 v3.0.1 ) require ( + github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/go-jose/go-jose/v4 v4.1.2 // indirect - golang.org/x/crypto v0.41.0 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/pelletier/go-toml/v2 v2.2.3 // indirect + github.com/sagikazarmark/locafero v0.7.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.12.0 // indirect + github.com/spf13/cast v1.7.1 // indirect + github.com/spf13/pflag v1.0.6 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + golang.org/x/text v0.28.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index be4aadf..fe6cd85 100644 --- a/go.sum +++ b/go.sum @@ -1,29 +1,64 @@ -github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= -github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= github.com/coreos/go-oidc/v3 v3.15.0 h1:R6Oz8Z4bqWR7VFQ+sPSvZPQv4x8M+sJkDO5ojgwlyAg= github.com/coreos/go-oidc/v3 v3.15.0/go.mod h1:HaZ3szPaZ0e4r6ebqvsLWlk2Tn+aejfmrfah6hnSYEU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-chi/cors v1.2.2 h1:Jmey33TE+b+rB7fT8MUy1u0I4L+NARQlK6LhzKPSyQE= github.com/go-chi/cors v1.2.2/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58= github.com/go-jose/go-jose/v4 v4.1.2 h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI= github.com/go-jose/go-jose/v4 v4.1.2/go.mod h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= +github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= +github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= +github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= +github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= +github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= +github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= +github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= @@ -32,7 +67,10 @@ golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 813ed19..546150b 100644 --- a/main.go +++ b/main.go @@ -1,117 +1,102 @@ package main import ( + "context" "embed" "flag" "io/fs" - "log" + "log/slog" "os" + "os/signal" "runtime" + "syscall" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/cli" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid" + + "github.com/spf13/viper" ) -var ( - host string - port int - queueSize int - configFile string - downloadPath string - downloaderPath string - sessionFilePath string - localDatabasePath string - frontendPath string +//go:embed frontend/dist/index.html +//go:embed frontend/dist/assets/* +var frontend embed.FS - requireAuth bool - username string - password string - - userFromEnv = os.Getenv("USERNAME") - passFromEnv = os.Getenv("PASSWORD") - - logFile string - enableFileLogging bool - - //go:embed frontend/dist/index.html - //go:embed frontend/dist/assets/* - frontend embed.FS - - //go:embed openapi/* - swagger embed.FS -) - -func init() { - flag.StringVar(&host, "host", "0.0.0.0", "Host where server will listen at") - flag.IntVar(&port, "port", 3033, "Port where server will listen at") - flag.IntVar(&queueSize, "qs", 2, "Queue size (concurrent downloads)") - - flag.StringVar(&configFile, "conf", "./config.yml", "Config file path") - flag.StringVar(&downloadPath, "out", ".", "Where files will be saved") - flag.StringVar(&downloaderPath, "driver", "yt-dlp", "yt-dlp executable path") - flag.StringVar(&sessionFilePath, "session", ".", "session file path") - flag.StringVar(&localDatabasePath, "db", "local.db", "local database path") - flag.StringVar(&frontendPath, "web", "", "frontend web resources path") - - flag.BoolVar(&enableFileLogging, "fl", false, "enable outputting logs to a file") - flag.StringVar(&logFile, "lf", "yt-dlp-webui.log", "set log file location") - - flag.BoolVar(&requireAuth, "auth", false, "Enable RPC authentication") - flag.StringVar(&username, "user", userFromEnv, "Username required for auth") - flag.StringVar(&password, "pass", passFromEnv, "Password required for auth") - - flag.Parse() -} +//go:embed openapi/* +var swagger embed.FS func main() { - frontend, err := fs.Sub(frontend, "frontend/dist") - if err != nil { - log.Fatalln(err) + // Parse optional config path from flag + var configFile string + flag.StringVar(&configFile, "conf", "./config.yml", "Config file path") + flag.Parse() + + v := viper.New() + v.SetConfigFile(configFile) + v.SetConfigType("yaml") + + // Defaults + v.SetDefault("server.host", "0.0.0.0") + v.SetDefault("server.port", 3033) + v.SetDefault("server.queue_size", 2) + v.SetDefault("paths.download_path", ".") + v.SetDefault("paths.downloader_path", "yt-dlp") + v.SetDefault("paths.local_database_path", ".") + v.SetDefault("logging.log_path", "yt-dlp-webui.log") + v.SetDefault("logging.enable_file_logging", false) + v.SetDefault("authentication.require_auth", false) + + // Env binding + v.SetEnvPrefix("APP") + v.AutomaticEnv() + + // Load YAML file if exists + if err := v.ReadInConfig(); err != nil { + slog.Debug("using defaults") } - if frontendPath != "" { - frontend = os.DirFS(frontendPath) + cfg := config.Instance() + if err := v.Unmarshal(&cfg); err != nil { + slog.Error("failed to load config", "error", err) } - c := config.Instance() - - { - // init the config struct with the values from flags - // TODO: find an alternative way to populate the config struct from flags or config file - c.Host = host - c.Port = port - - c.QueueSize = queueSize - - c.DownloadPath = downloadPath - c.DownloaderPath = downloaderPath - c.SessionFilePath = sessionFilePath - c.LocalDatabasePath = localDatabasePath - - c.LogPath = logFile - c.EnableFileLogging = enableFileLogging - - c.RequireAuth = requireAuth - c.Username = username - c.Password = password + if cfg.Server.QueueSize <= 0 || runtime.NumCPU() <= 2 { + cfg.Server.QueueSize = 2 } - // limit concurrent downloads for systems with 2 or less logical cores - if runtime.NumCPU() <= 2 { - c.QueueSize = 1 - } - - // if config file is found it will be merged with the current config struct - if err := c.LoadFile(configFile); err != nil { - log.Println(cli.BgRed, "config", cli.Reset, err) + // 6. Frontend FS + var appFS fs.FS + if fp := v.GetString("frontend_path"); fp != "" { + appFS = os.DirFS(fp) + } else { + sub, err := fs.Sub(frontend, "frontend/dist") + if err != nil { + slog.Error("failed to load embedded frontend", "error", err) + os.Exit(1) + } + appFS = sub } + // Configure OpenID if needed openid.Configure() - server.RunBlocking(&server.RunConfig{ - App: frontend, + // Graceful shutdown + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + slog.Info("starting server", + "host", cfg.Server.Host, + "port", cfg.Server.Port, + "queue_size", cfg.Server.QueueSize, + ) + + if err := server.Run(ctx, &server.RunConfig{ + App: appFS, Swagger: swagger, - }) + }); err != nil { + slog.Error("server stopped with error", "error", err) + os.Exit(1) + } + + slog.Info("server exited cleanly") } diff --git a/server/archive/rest/handler.go b/server/archive/rest/handler.go index 0fe5b5c..40a4666 100644 --- a/server/archive/rest/handler.go +++ b/server/archive/rest/handler.go @@ -146,10 +146,10 @@ func (h *Handler) GetCursor() http.HandlerFunc { // ApplyRouter implements domain.RestHandler. func (h *Handler) ApplyRouter() func(chi.Router) { return func(r chi.Router) { - if config.Instance().RequireAuth { + if config.Instance().Authentication.RequireAuth { r.Use(middlewares.Authenticated) } - if config.Instance().UseOpenId { + if config.Instance().OpenId.UseOpenId { r.Use(openid.Middleware) } diff --git a/server/archive/utils.go b/server/archive/utils.go index 89eaa40..09f29e3 100644 --- a/server/archive/utils.go +++ b/server/archive/utils.go @@ -16,7 +16,7 @@ import ( func DownloadExists(ctx context.Context, url string) (bool, error) { cmd := exec.CommandContext( ctx, - config.Instance().DownloaderPath, + config.Instance().Paths.DownloaderPath, "--print", "%(extractor)s %(id)s", url, diff --git a/server/archiver/archiver.go b/server/archiver/archiver.go index 3da056c..7ae88f4 100644 --- a/server/archiver/archiver.go +++ b/server/archiver/archiver.go @@ -5,15 +5,12 @@ import ( "database/sql" "log/slog" - evbus "github.com/asaskevich/EventBus" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" ) -const QueueName = "process:archive" - var ( - eventBus = evbus.New() + ch = make(chan *Message, 1) archiveService archive.Service ) @@ -25,18 +22,20 @@ func Register(db *sql.DB) { } func init() { - eventBus.Subscribe(QueueName, func(m *Message) { - slog.Info( - "archiving completed download", - slog.String("title", m.Title), - slog.String("source", m.Source), - ) - archiveService.Archive(context.Background(), m) - }) + go func() { + for m := range ch { + slog.Info( + "archiving completed download", + slog.String("title", m.Title), + slog.String("source", m.Source), + ) + archiveService.Archive(context.Background(), m) + } + }() } func Publish(m *Message) { if config.Instance().AutoArchive { - eventBus.Publish(QueueName, m) + ch <- m } } diff --git a/server/config/config.go b/server/config/config.go index c9448fd..cd2046f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -1,42 +1,64 @@ package config import ( - "os" "path/filepath" "sync" "time" - - "gopkg.in/yaml.v3" ) type Config struct { - LogPath string `yaml:"log_path"` - EnableFileLogging bool `yaml:"enable_file_logging"` - BaseURL string `yaml:"base_url"` - Host string `yaml:"host"` - Port int `yaml:"port"` - DownloadPath string `yaml:"downloadPath"` - DownloaderPath string `yaml:"downloaderPath"` - RequireAuth bool `yaml:"require_auth"` - Username string `yaml:"username"` - Password string `yaml:"password"` - QueueSize int `yaml:"queue_size"` - LocalDatabasePath string `yaml:"local_database_path"` - SessionFilePath string `yaml:"session_file_path"` - path string // private - UseOpenId bool `yaml:"use_openid"` - OpenIdProviderURL string `yaml:"openid_provider_url"` - OpenIdClientId string `yaml:"openid_client_id"` - OpenIdClientSecret string `yaml:"openid_client_secret"` - OpenIdRedirectURL string `yaml:"openid_redirect_url"` - OpenIdEmailWhitelist []string `yaml:"openid_email_whitelist"` - FrontendPath string `yaml:"frontend_path"` - AutoArchive bool `yaml:"auto_archive"` - Twitch struct { - ClientId string `yaml:"client_id"` - ClientSecret string `yaml:"client_secret"` - CheckInterval time.Duration `yaml:"check_interval"` - } `yaml:"twitch"` + Server ServerConfig `yaml:"server"` + Logging LoggingConfig `yaml:"logging"` + Paths PathsConfig `yaml:"paths"` + Authentication AuthConfig `yaml:"authentication"` + OpenId OpenIdConfig `yaml:"openid"` + Frontend FrontendConfig `yaml:"frontend"` + AutoArchive bool `yaml:"auto_archive"` + Twitch TwitchConfig `yaml:"twitch"` + path string +} + +type ServerConfig struct { + BaseURL string `yaml:"base_url"` + Host string `yaml:"host"` + Port int `yaml:"port"` + QueueSize int `yaml:"queue_size"` +} + +type LoggingConfig struct { + LogPath string `yaml:"log_path"` + EnableFileLogging bool `yaml:"enable_file_logging"` +} + +type PathsConfig struct { + DownloadPath string `yaml:"download_path"` + DownloaderPath string `yaml:"downloader_path"` + LocalDatabasePath string `yaml:"local_database_path"` +} + +type AuthConfig struct { + RequireAuth bool `yaml:"require_auth"` + Username string `yaml:"username"` + PasswordHash string `yaml:"password"` +} + +type OpenIdConfig struct { + UseOpenId bool `yaml:"use_openid"` + ProviderURL string `yaml:"openid_provider_url"` + ClientId string `yaml:"openid_client_id"` + ClientSecret string `yaml:"openid_client_secret"` + RedirectURL string `yaml:"openid_redirect_url"` + EmailWhitelist []string `yaml:"openid_email_whitelist"` +} + +type FrontendConfig struct { + FrontendPath string `yaml:"frontend_path"` +} + +type TwitchConfig struct { + ClientId string `yaml:"client_id"` + ClientSecret string `yaml:"client_secret"` + CheckInterval time.Duration `yaml:"check_interval"` } var ( @@ -54,22 +76,6 @@ func Instance() *Config { return instance } -// Initialises the Config struct given its config file -func (c *Config) LoadFile(filename string) error { - fd, err := os.Open(filename) - if err != nil { - return err - } - - c.path = filename - - if err := yaml.NewDecoder(fd).Decode(c); err != nil { - return err - } - - return nil -} - // Path of the directory containing the config file func (c *Config) Dir() string { return filepath.Dir(c.path) } diff --git a/server/filebrowser/handlers.go b/server/filebrowser/handlers.go index f584719..5d81335 100644 --- a/server/filebrowser/handlers.go +++ b/server/filebrowser/handlers.go @@ -89,7 +89,7 @@ type ListRequest struct { } func ListDownloaded(w http.ResponseWriter, r *http.Request) { - root := config.Instance().DownloadPath + root := config.Instance().Paths.DownloadPath req := new(ListRequest) if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -157,7 +157,7 @@ func SendFile(w http.ResponseWriter, r *http.Request) { filename := string(decoded) - root := config.Instance().DownloadPath + root := config.Instance().Paths.DownloadPath if strings.Contains(filepath.Dir(filepath.Clean(filename)), filepath.Clean(root)) { http.ServeFile(w, r, filename) @@ -189,7 +189,7 @@ func DownloadFile(w http.ResponseWriter, r *http.Request) { filename := string(decoded) - root := config.Instance().DownloadPath + root := config.Instance().Paths.DownloadPath if strings.Contains(filepath.Dir(filepath.Clean(filename)), filepath.Clean(root)) { w.Header().Add("Content-Disposition", "inline; filename=\""+filepath.Base(filename)+"\"") diff --git a/server/formats/parser.go b/server/formats/parser.go index cc42787..e8ecba1 100644 --- a/server/formats/parser.go +++ b/server/formats/parser.go @@ -10,7 +10,7 @@ import ( ) func ParseURL(url string) (*Metadata, error) { - cmd := exec.Command(config.Instance().DownloaderPath, url, "-J") + cmd := exec.Command(config.Instance().Paths.DownloaderPath, url, "-J") stdout, err := cmd.Output() if err != nil { diff --git a/server/internal/downloaders/generic.go b/server/internal/downloaders/generic.go index 0088d77..1b6228d 100644 --- a/server/internal/downloaders/generic.go +++ b/server/internal/downloaders/generic.go @@ -63,7 +63,7 @@ func (g *GenericDownloader) Start() error { g.Params = argsSanitizer(g.Params) out := internal.DownloadOutput{ - Path: config.Instance().DownloadPath, + Path: config.Instance().Paths.DownloadPath, Filename: "%(title)s.%(ext)s", } @@ -101,7 +101,7 @@ func (g *GenericDownloader) Start() error { slog.Info("requesting download", slog.String("url", g.URL), slog.Any("params", params)) - cmd := exec.Command(config.Instance().DownloaderPath, params...) + cmd := exec.Command(config.Instance().Paths.DownloaderPath, params...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} stdout, err := cmd.StdoutPipe() diff --git a/server/internal/downloaders/livestream.go b/server/internal/downloaders/livestream.go index 86b6af1..0eb2a9d 100644 --- a/server/internal/downloaders/livestream.go +++ b/server/internal/downloaders/livestream.go @@ -57,7 +57,7 @@ func (l *LiveStreamDownloader) Start() error { params := append(baseParams, "-o", "-") - cmd := exec.Command(config.Instance().DownloaderPath, params...) + cmd := exec.Command(config.Instance().Paths.DownloaderPath, params...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // stdout = media stream @@ -102,11 +102,11 @@ func (l *LiveStreamDownloader) Start() error { if !l.hasFileWriter() { go func() { filepath.Join( - config.Instance().DownloadPath, + config.Instance().Paths.DownloadPath, fmt.Sprintf("%s (live) %s.mp4", l.Id, time.Now().Format(time.ANSIC)), ) - defaultPath := filepath.Join(config.Instance().DownloadPath) + defaultPath := filepath.Join(config.Instance().Paths.DownloadPath) f, err := os.Create(defaultPath) if err != nil { slog.Error("failed to create fallback file", slog.Any("err", err)) diff --git a/server/internal/kv/store.go b/server/internal/kv/store.go index 0ae8788..caf5769 100644 --- a/server/internal/kv/store.go +++ b/server/internal/kv/store.go @@ -1,17 +1,13 @@ package kv import ( - "encoding/gob" "encoding/json" "errors" "log/slog" - "os" - "path/filepath" "runtime" "sync" "time" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" @@ -111,28 +107,6 @@ func (m *Store) All() *[]internal.ProcessSnapshot { return &running } -// Persist the database in a single file named "session.dat" -func (m *Store) Persist() error { - running := m.All() - - sf := filepath.Join(config.Instance().SessionFilePath, "session.dat") - - fd, err := os.Create(sf) - if err != nil { - return errors.Join(errors.New("failed to persist session"), err) - } - - m.mu.RLock() - defer m.mu.RUnlock() - session := Session{Processes: *running} - - if err := gob.NewEncoder(fd).Encode(session); err != nil { - return errors.Join(errors.New("failed to persist session"), err) - } - - return nil -} - // Restore a persisted state func (m *Store) Restore(mq *queue.MessageQueue) { m.mu.Lock() diff --git a/server/internal/livestream/livestream.go b/server/internal/livestream/livestream.go index 3d54005..48299eb 100644 --- a/server/internal/livestream/livestream.go +++ b/server/internal/livestream/livestream.go @@ -54,13 +54,13 @@ func New(url string, done chan *LiveStream, mq *queue.MessageQueue, store *kv.St // Start the livestream monitoring process, once completion signals on the done channel func (l *LiveStream) Start() error { cmd := exec.Command( - config.Instance().DownloaderPath, + config.Instance().Paths.DownloaderPath, l.url, "--wait-for-video", "30", // wait for the stream to be live and recheck every 10 secs "--no-colors", // no ansi color fuzz "--simulate", "--newline", - "--paths", config.Instance().DownloadPath, + "--paths", config.Instance().Paths.DownloadPath, ) stdout, err := cmd.StdoutPipe() diff --git a/server/internal/livestream/livestream_test.go b/server/internal/livestream/livestream_test.go index 3883a0a..f39c2e7 100644 --- a/server/internal/livestream/livestream_test.go +++ b/server/internal/livestream/livestream_test.go @@ -10,7 +10,7 @@ import ( ) func setupTest() { - config.Instance().DownloaderPath = "build/yt-dlp" + config.Instance().Paths.DownloaderPath = "build/yt-dlp" } const URL = "https://www.youtube.com/watch?v=pwoAyLGOysU" diff --git a/server/internal/livestream/monitor.go b/server/internal/livestream/monitor.go index b42affc..253730e 100644 --- a/server/internal/livestream/monitor.go +++ b/server/internal/livestream/monitor.go @@ -17,6 +17,11 @@ type Monitor struct { } func NewMonitor(mq *queue.MessageQueue, store *kv.Store, db *bolt.DB) *Monitor { + db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucket) + return err + }) + return &Monitor{ mq: mq, db: db, diff --git a/server/internal/metadata/fetchers.go b/server/internal/metadata/fetchers.go index 900052f..f03f70a 100644 --- a/server/internal/metadata/fetchers.go +++ b/server/internal/metadata/fetchers.go @@ -15,7 +15,7 @@ import ( ) func DefaultFetcher(url string) (*common.DownloadMetadata, error) { - cmd := exec.Command(config.Instance().DownloaderPath, url, "-J") + cmd := exec.Command(config.Instance().Paths.DownloaderPath, url, "-J") cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} stdout, err := cmd.StdoutPipe() diff --git a/server/internal/pipeline/rest.go b/server/internal/pipeline/rest.go new file mode 100644 index 0000000..db73a9b --- /dev/null +++ b/server/internal/pipeline/rest.go @@ -0,0 +1,92 @@ +package pipeline + +import ( + "encoding/json" + "net/http" + + "github.com/go-chi/chi/v5" + bolt "go.etcd.io/bbolt" +) + +type handler struct { + store *Store +} + +func NewRestHandler(db *bolt.DB) *handler { + store, _ := NewStore(db) + return &handler{ + store: store, + } +} + +func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + id := chi.URLParam(r, "id") + + p, err := h.store.Get(id) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := json.NewEncoder(w).Encode(p); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (h *handler) GetAllPipelines(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + p, err := h.store.List() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := json.NewEncoder(w).Encode(p); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (h *handler) SavePipeline(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + defer r.Body.Close() + var req Pipeline + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + id, err := h.store.Save(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := json.NewEncoder(w).Encode(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + id := chi.URLParam(r, "id") + + err := h.store.Delete(id) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := json.NewEncoder(w).Encode("ok"); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} diff --git a/server/internal/pipeline/store.go b/server/internal/pipeline/store.go index 7240f95..d641742 100644 --- a/server/internal/pipeline/store.go +++ b/server/internal/pipeline/store.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "github.com/google/uuid" bolt "go.etcd.io/bbolt" ) @@ -13,6 +14,7 @@ type Step struct { Type string `json:"type"` // es. "transcoder", "filewriter" FFmpegArgs []string `json:"ffmpeg_args,omitempty"` // args da passare a ffmpeg Path string `json:"path,omitempty"` // solo per filewriter + Extension string `json:"extension,omitempty"` // solo per filewriter } type Pipeline struct { @@ -25,14 +27,9 @@ type Store struct { db *bolt.DB } -func NewStore(path string) (*Store, error) { - db, err := bolt.Open(path, 0600, nil) - if err != nil { - return nil, err - } - +func NewStore(db *bolt.DB) (*Store, error) { // init bucket - err = db.Update(func(tx *bolt.Tx) error { + err := db.Update(func(tx *bolt.Tx) error { _, err := tx.CreateBucketIfNotExists(bucket) return err }) @@ -43,13 +40,17 @@ func NewStore(path string) (*Store, error) { return &Store{db: db}, nil } -func (s *Store) Save(p Pipeline) error { - data, err := json.Marshal(p) - if err != nil { - return err +func (s *Store) Save(p Pipeline) (string, error) { + if p.ID == "" { + p.ID = uuid.NewString() } - return s.db.Update(func(tx *bolt.Tx) error { + data, err := json.Marshal(p) + if err != nil { + return "", err + } + + return p.ID, s.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(bucket) return b.Put([]byte(p.ID), data) }) @@ -93,3 +94,10 @@ func (s *Store) List() ([]Pipeline, error) { return result, nil } + +func (s *Store) Delete(id string) error { + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.Delete([]byte(id)) + }) +} diff --git a/server/internal/queue/message_queue.go b/server/internal/queue/message_queue.go index 7d501f8..8ac799b 100644 --- a/server/internal/queue/message_queue.go +++ b/server/internal/queue/message_queue.go @@ -5,101 +5,119 @@ import ( "errors" "log/slog" - evbus "github.com/asaskevich/EventBus" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/metadata" - "golang.org/x/sync/semaphore" ) -const queueName = "process:pending" - type MessageQueue struct { - concurrency int - eventBus evbus.Bus + concurrency int + downloadQueue chan downloaders.Downloader + metadataQueue chan downloaders.Downloader + ctx context.Context + cancel context.CancelFunc } -// Creates a new message queue. -// By default it will be created with a size equals to nthe number of logical -// CPU cores -1. -// The queue size can be set via the qs flag. func NewMessageQueue() (*MessageQueue, error) { - qs := config.Instance().QueueSize - + qs := config.Instance().Server.QueueSize if qs <= 0 { return nil, errors.New("invalid queue size") } + ctx, cancel := context.WithCancel(context.Background()) + return &MessageQueue{ - concurrency: qs, - eventBus: evbus.New(), + concurrency: qs, + downloadQueue: make(chan downloaders.Downloader, qs*2), + metadataQueue: make(chan downloaders.Downloader, qs*4), + ctx: ctx, + cancel: cancel, }, nil } -// Publish a message to the queue and set the task to a peding state. -func (m *MessageQueue) Publish(p downloaders.Downloader) { - // needs to have an id set before - p.SetPending(true) +// Publish download job +func (m *MessageQueue) Publish(d downloaders.Downloader) { + d.SetPending(true) - m.eventBus.Publish(queueName, p) + select { + case m.downloadQueue <- d: + slog.Info("published download", slog.String("id", d.GetId())) + case <-m.ctx.Done(): + slog.Warn("queue stopped, dropping download", slog.String("id", d.GetId())) + } } +// Workers: download + metadata func (m *MessageQueue) SetupConsumers() { - go m.downloadConsumer() - go m.metadataSubscriber() + // N parallel workers for downloadQueue + for i := 0; i < m.concurrency; i++ { + go m.downloadWorker(i) + } + + // 1 serial worker for metadata + go m.metadataWorker() } -// Setup the consumer listener which subscribes to the changes to the producer -// channel and triggers the "download" action. -func (m *MessageQueue) downloadConsumer() { - sem := semaphore.NewWeighted(int64(m.concurrency)) - - m.eventBus.SubscribeAsync(queueName, func(p downloaders.Downloader) { - sem.Acquire(context.Background(), 1) - defer sem.Release(1) - - slog.Info("received process from event bus", - slog.String("bus", queueName), - slog.String("consumer", "downloadConsumer"), - slog.String("id", p.GetId()), - ) - - if !p.IsCompleted() { - slog.Info("started process", - slog.String("bus", queueName), - slog.String("id", p.GetId()), - ) - p.Start() - } - }, false) -} - -// Setup the metadata consumer listener which subscribes to the changes to the -// producer channel and adds metadata to each download. -func (m *MessageQueue) metadataSubscriber() { - // How many concurrent metadata fetcher jobs are spawned - // Since there's ongoing downloads, 1 job at time seems a good compromise - sem := semaphore.NewWeighted(1) - - m.eventBus.SubscribeAsync(queueName, func(p downloaders.Downloader) { - sem.Acquire(context.Background(), 1) - defer sem.Release(1) - - slog.Info("received process from event bus", - slog.String("bus", queueName), - slog.String("consumer", "metadataConsumer"), - slog.String("id", p.GetId()), - ) - - if p.IsCompleted() { - slog.Warn("proccess has an illegal state", - slog.String("id", p.GetId()), - slog.String("status", "completed"), - ) +// Worker dei download +func (m *MessageQueue) downloadWorker(workerId int) { + for { + select { + case <-m.ctx.Done(): return + case p := <-m.downloadQueue: + if p == nil { + continue + } + if p.IsCompleted() { + continue + } + + slog.Info("download worker started", + slog.Int("worker", workerId), + slog.String("id", p.GetId()), + ) + + p.Start() + + // after the download starts succesfully we pass it to the metadata queue + select { + case m.metadataQueue <- p: + slog.Info("queued for metadata", slog.String("id", p.GetId())) + case <-m.ctx.Done(): + return + } } - - p.SetMetadata(metadata.DefaultFetcher) - - }, false) + } +} + +func (m *MessageQueue) metadataWorker() { + for { + select { + case <-m.ctx.Done(): + return + case p := <-m.metadataQueue: + if p == nil { + continue + } + + slog.Info("metadata worker started", + slog.String("id", p.GetId()), + ) + + if p.IsCompleted() { + slog.Warn("metadata skipped, illegal state", + slog.String("id", p.GetId()), + ) + continue + } + + p.SetMetadata(metadata.DefaultFetcher) + } + } +} + +func (m *MessageQueue) Stop() { + m.cancel() + close(m.downloadQueue) + close(m.metadataQueue) } diff --git a/server/logging/handler.go b/server/logging/handler.go index 5385e80..d843c2f 100644 --- a/server/logging/handler.go +++ b/server/logging/handler.go @@ -91,10 +91,10 @@ func sse(logger *ObservableLogger) http.HandlerFunc { func ApplyRouter(logger *ObservableLogger) func(chi.Router) { return func(r chi.Router) { - if config.Instance().RequireAuth { + if config.Instance().Authentication.RequireAuth { r.Use(middlewares.Authenticated) } - if config.Instance().UseOpenId { + if config.Instance().OpenId.UseOpenId { r.Use(openid.Middleware) } r.Get("/ws", webSocket(logger)) diff --git a/server/middleware/utils.go b/server/middleware/utils.go index eec5be7..d9e1d96 100644 --- a/server/middleware/utils.go +++ b/server/middleware/utils.go @@ -8,14 +8,14 @@ import ( ) func ApplyAuthenticationByConfig(next http.Handler) http.Handler { - handler := next + handler := next - if config.Instance().RequireAuth { - handler = Authenticated(handler) - } - if config.Instance().UseOpenId { - handler = openid.Middleware(handler) - } + if config.Instance().Authentication.RequireAuth { + handler = Authenticated(handler) + } + if config.Instance().OpenId.UseOpenId { + handler = openid.Middleware(handler) + } - return handler -} \ No newline at end of file + return handler +} diff --git a/server/openid/config.go b/server/openid/config.go index aba3538..70f3ed3 100644 --- a/server/openid/config.go +++ b/server/openid/config.go @@ -14,24 +14,27 @@ var ( ) func Configure() { - if !config.Instance().UseOpenId { + if !config.Instance().OpenId.UseOpenId { return } - provider, err := oidc.NewProvider(context.Background(), config.Instance().OpenIdProviderURL) + provider, err := oidc.NewProvider( + context.Background(), + config.Instance().OpenId.ProviderURL, + ) if err != nil { panic(err) } oauth2Config = oauth2.Config{ - ClientID: config.Instance().OpenIdClientId, - ClientSecret: config.Instance().OpenIdClientSecret, - RedirectURL: config.Instance().OpenIdRedirectURL, + ClientID: config.Instance().OpenId.ClientId, + ClientSecret: config.Instance().OpenId.ClientSecret, + RedirectURL: config.Instance().OpenId.RedirectURL, Endpoint: provider.Endpoint(), Scopes: []string{oidc.ScopeOpenID, "profile", "email"}, } verifier = provider.Verifier(&oidc.Config{ - ClientID: config.Instance().OpenIdClientId, + ClientID: config.Instance().OpenId.ClientId, }) } diff --git a/server/openid/handler.go b/server/openid/handler.go index 2b906b2..d8a4191 100644 --- a/server/openid/handler.go +++ b/server/openid/handler.go @@ -87,7 +87,7 @@ func doAuthentification(r *http.Request, setCookieCallback func(t *oauth2.Token) return nil, err } - whitelist := config.Instance().OpenIdEmailWhitelist + whitelist := config.Instance().OpenId.EmailWhitelist if len(whitelist) > 0 && !slices.Contains(whitelist, claims.Email) { return nil, errors.New("email address not found in ACL") diff --git a/server/playlist/playlist.go b/server/playlist/playlist.go index d3c865a..9c80ffe 100644 --- a/server/playlist/playlist.go +++ b/server/playlist/playlist.go @@ -22,7 +22,7 @@ func PlaylistDetect(req internal.DownloadRequest, mq *queue.MessageQueue, db *kv urlWithParams := append([]string{req.URL}, params...) var ( - downloader = config.Instance().DownloaderPath + downloader = config.Instance().Paths.DownloaderPath cmd = exec.Command(downloader, urlWithParams...) ) diff --git a/server/rest/container.go b/server/rest/container.go index 0c386aa..b319e4d 100644 --- a/server/rest/container.go +++ b/server/rest/container.go @@ -19,10 +19,10 @@ func ApplyRouter(args *ContainerArgs) func(chi.Router) { h := Container(args) return func(r chi.Router) { - if config.Instance().RequireAuth { + if config.Instance().Authentication.RequireAuth { r.Use(middlewares.Authenticated) } - if config.Instance().UseOpenId { + if config.Instance().OpenId.UseOpenId { r.Use(openid.Middleware) } r.Post("/exec", h.Exec()) diff --git a/server/rest/service.go b/server/rest/service.go index 31616b6..3bf70d7 100644 --- a/server/rest/service.go +++ b/server/rest/service.go @@ -179,7 +179,7 @@ func (s *Service) GetVersion(ctx context.Context) (string, string, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() - cmd := exec.CommandContext(ctx, config.Instance().DownloaderPath, "--version") + cmd := exec.CommandContext(ctx, config.Instance().Paths.DownloaderPath, "--version") go func() { stdout, _ := cmd.Output() result <- string(stdout) diff --git a/server/rpc/container.go b/server/rpc/container.go index 9ca1b8e..9ec6281 100644 --- a/server/rpc/container.go +++ b/server/rpc/container.go @@ -22,10 +22,10 @@ func Container(db *kv.Store, mq *queue.MessageQueue, lm *livestream.Monitor) *Se // RPC service must be registered before applying this router! func ApplyRouter() func(chi.Router) { return func(r chi.Router) { - if config.Instance().RequireAuth { + if config.Instance().Authentication.RequireAuth { r.Use(middlewares.Authenticated) } - if config.Instance().UseOpenId { + if config.Instance().OpenId.UseOpenId { r.Use(openid.Middleware) } r.Get("/ws", WebSocket) diff --git a/server/server.go b/server/server.go index 6e7bcfc..f6b0e06 100644 --- a/server/server.go +++ b/server/server.go @@ -11,10 +11,8 @@ import ( "net/http" "net/rpc" "os" - "os/signal" "path/filepath" "strings" - "syscall" "time" "github.com/go-chi/chi/v5" @@ -23,6 +21,7 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/filebrowser" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipeline" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/logging" middlewares "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/middleware" @@ -44,29 +43,30 @@ type RunConfig struct { } type serverConfig struct { - frontend fs.FS - swagger fs.FS - mdb *kv.Store - db *bolt.DB - mq *queue.MessageQueue - lm *livestream.Monitor - tm *twitch.Monitor + frontend fs.FS + swagger fs.FS + mdb *kv.Store + db *bolt.DB + mq *queue.MessageQueue + lm *livestream.Monitor + taskRunner task.TaskRunner + twitchMonitor *twitch.Monitor } // TODO: change scope var observableLogger = logging.NewObservableLogger() -func RunBlocking(rc *RunConfig) { - dbPath := filepath.Join(config.Instance().SessionFilePath, "bolt.db") +func Run(ctx context.Context, rc *RunConfig) error { + dbPath := filepath.Join(config.Instance().Paths.LocalDatabasePath, "bolt.db") boltdb, err := bolt.Open(dbPath, 0600, nil) if err != nil { - panic(err) + return err } mdb, err := kv.NewStore(boltdb, time.Second*15) if err != nil { - panic(err) + return err } // ---- LOGGING --------------------------------------------------- @@ -78,10 +78,10 @@ func RunBlocking(rc *RunConfig) { conf := config.Instance() // file based logging - if conf.EnableFileLogging { - logger, err := logging.NewRotableLogger(conf.LogPath) + if conf.Logging.EnableFileLogging { + logger, err := logging.NewRotableLogger(conf.Logging.LogPath) if err != nil { - panic(err) + return err } defer logger.Rotate() @@ -106,7 +106,7 @@ func RunBlocking(rc *RunConfig) { mq, err := queue.NewMessageQueue() if err != nil { - panic(err) + return err } mq.SetupConsumers() go mdb.Restore(mq) @@ -124,41 +124,45 @@ func RunBlocking(rc *RunConfig) { boltdb, ) go tm.Monitor( - context.TODO(), + ctx, config.Instance().Twitch.CheckInterval, twitch.DEFAULT_DOWNLOAD_HANDLER(mdb, mq), ) go tm.Restore() + cronTaskRunner := task.NewCronTaskRunner(mq, mdb) + go cronTaskRunner.Spawner(ctx) + scfg := serverConfig{ - frontend: rc.App, - swagger: rc.Swagger, - mdb: mdb, - db: boltdb, - mq: mq, - lm: lm, - tm: tm, + frontend: rc.App, + swagger: rc.Swagger, + mdb: mdb, + db: boltdb, + mq: mq, + lm: lm, + twitchMonitor: tm, + taskRunner: cronTaskRunner, } srv := newServer(scfg) - go gracefulShutdown(srv, &scfg) + go gracefulShutdown(ctx, srv, &scfg) var ( network = "tcp" - address = fmt.Sprintf("%s:%d", conf.Host, conf.Port) + address = fmt.Sprintf("%s:%d", conf.Server.Host, conf.Server.Port) ) // support unix sockets - if strings.HasPrefix(conf.Host, "/") { + if strings.HasPrefix(conf.Server.Host, "/") { network = "unix" - address = conf.Host + address = conf.Server.Host } listener, err := net.Listen(network, address) if err != nil { slog.Error("failed to listen", slog.String("err", err.Error())) - return + return err } slog.Info("yt-dlp-webui started", slog.String("address", address)) @@ -166,14 +170,12 @@ func RunBlocking(rc *RunConfig) { if err := srv.Serve(listener); err != nil { slog.Warn("http server stopped", slog.String("err", err.Error())) } + + return nil } func newServer(c serverConfig) *http.Server { // archiver.Register(c.db) - - cronTaskRunner := task.NewCronTaskRunner(c.mq, c.mdb) - go cronTaskRunner.Spawner(context.TODO()) - service := ytdlpRPC.Container(c.mdb, c.mq, c.lm) rpc.Register(service) @@ -197,7 +199,7 @@ func newServer(c serverConfig) *http.Server { // use in dev // r.Use(middleware.Logger) - baseUrl := config.Instance().BaseURL + baseUrl := config.Instance().Server.BaseURL r.Mount(baseUrl+"/", http.StripPrefix(baseUrl, http.FileServerFS(c.frontend))) // swagger @@ -246,36 +248,35 @@ func newServer(c serverConfig) *http.Server { r.Route("/status", status.ApplyRouter(c.mdb)) // Subscriptions - r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter()) + r.Route("/subscriptions", subscription.Container(c.db, c.taskRunner).ApplyRouter()) // Twitch r.Route("/twitch", func(r chi.Router) { r.Use(middlewares.ApplyAuthenticationByConfig) - r.Get("/users", twitch.GetMonitoredUsers(c.tm)) - r.Post("/user", twitch.MonitorUserHandler(c.tm)) - r.Delete("/user/{user}", twitch.DeleteUser(c.tm)) + r.Get("/users", twitch.GetMonitoredUsers(c.twitchMonitor)) + r.Post("/user", twitch.MonitorUserHandler(c.twitchMonitor)) + r.Delete("/user/{user}", twitch.DeleteUser(c.twitchMonitor)) + }) + + // Pipelines + r.Route("/pipelines", func(r chi.Router) { + h := pipeline.NewRestHandler(c.db) + r.Use(middlewares.ApplyAuthenticationByConfig) + r.Get("/id/{id}", h.GetPipeline) + r.Get("/all", h.GetAllPipelines) + r.Post("/", h.SavePipeline) + r.Delete("/id/{id}", h.DeletePipeline) }) return &http.Server{Handler: r} } -func gracefulShutdown(srv *http.Server, cfg *serverConfig) { - ctx, stop := signal.NotifyContext(context.Background(), - os.Interrupt, - syscall.SIGTERM, - syscall.SIGQUIT, - ) +func gracefulShutdown(ctx context.Context, srv *http.Server, cfg *serverConfig) { + <-ctx.Done() + slog.Info("shutdown signal received") - go func() { - <-ctx.Done() - slog.Info("shutdown signal received") - - defer func() { - cfg.mdb.Persist() - cfg.db.Close() - - stop() - srv.Shutdown(context.Background()) - }() + defer func() { + cfg.db.Close() + srv.Shutdown(context.Background()) }() } diff --git a/server/subscription/rest/handler.go b/server/subscription/rest/handler.go index 81ffbeb..ffc492d 100644 --- a/server/subscription/rest/handler.go +++ b/server/subscription/rest/handler.go @@ -19,10 +19,10 @@ type RestHandler struct { // ApplyRouter implements domain.RestHandler. func (h *RestHandler) ApplyRouter() func(chi.Router) { return func(r chi.Router) { - if config.Instance().RequireAuth { + if config.Instance().Authentication.RequireAuth { r.Use(middlewares.Authenticated) } - if config.Instance().UseOpenId { + if config.Instance().OpenId.UseOpenId { r.Use(openid.Middleware) } diff --git a/server/subscription/task/runner.go b/server/subscription/task/runner.go index dbf948a..3d6647a 100644 --- a/server/subscription/task/runner.go +++ b/server/subscription/task/runner.go @@ -129,7 +129,7 @@ func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Dur cmd := exec.CommandContext( ctx, - config.Instance().DownloaderPath, + config.Instance().Paths.DownloaderPath, "-I1", "--flat-playlist", "--print", "webpage_url", diff --git a/server/sys/fs.go b/server/sys/fs.go index fca2c45..11e0873 100644 --- a/server/sys/fs.go +++ b/server/sys/fs.go @@ -14,7 +14,7 @@ import ( // FreeSpace gets the available Bytes writable to download directory func FreeSpace() (uint64, error) { var stat unix.Statfs_t - unix.Statfs(config.Instance().DownloadPath, &stat) + unix.Statfs(config.Instance().Paths.DownloadPath, &stat) return (stat.Bavail * uint64(stat.Bsize)), nil } @@ -27,7 +27,7 @@ func DirectoryTree() (*[]string, error) { } var ( - rootPath = config.Instance().DownloadPath + rootPath = config.Instance().Paths.DownloadPath stack = internal.NewStack[Node]() flattened = make([]string, 0) diff --git a/server/twitch/monitor.go b/server/twitch/monitor.go index 82ae0f9..9875361 100644 --- a/server/twitch/monitor.go +++ b/server/twitch/monitor.go @@ -121,7 +121,7 @@ func DEFAULT_DOWNLOAD_HANDLER(db *kv.Store, mq *queue.MessageQueue) func(user st var ( url = fmt.Sprintf("https://www.twitch.tv/%s", user) filename = filepath.Join( - config.Instance().DownloadPath, + config.Instance().Paths.DownloadPath, fmt.Sprintf("%s (live) %s", user, time.Now().Format(time.ANSIC)), ) ext = ".webm" diff --git a/server/updater/update.go b/server/updater/update.go index 17f8159..e91e6c8 100644 --- a/server/updater/update.go +++ b/server/updater/update.go @@ -8,7 +8,7 @@ import ( // Update using the builtin function of yt-dlp func UpdateExecutable() error { - cmd := exec.Command(config.Instance().DownloaderPath, "-U") + cmd := exec.Command(config.Instance().Paths.DownloaderPath, "-U") err := cmd.Start() if err != nil { diff --git a/server/user/handlers.go b/server/user/handlers.go index 621450f..11d26e6 100644 --- a/server/user/handlers.go +++ b/server/user/handlers.go @@ -8,6 +8,7 @@ import ( "github.com/golang-jwt/jwt/v5" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + "golang.org/x/crypto/bcrypt" ) const TOKEN_COOKIE_NAME = "jwt-yt-dlp-webui" @@ -26,11 +27,17 @@ func Login(w http.ResponseWriter, r *http.Request) { } var ( - username = config.Instance().Username - password = config.Instance().Password + username = config.Instance().Authentication.Username + passwordHash = config.Instance().Authentication.PasswordHash ) - if username != req.Username || password != req.Password { + err := bcrypt.CompareHashAndPassword([]byte(passwordHash), []byte(req.Password)) + if err != nil { + http.Error(w, "invalid username or password", http.StatusBadRequest) + return + } + + if username != req.Username { http.Error(w, "invalid username or password", http.StatusBadRequest) return }