Compare commits

...

4 Commits

Author SHA1 Message Date
991bea1a7b refactoring: config struct & pipelines 2025-09-04 15:33:07 +02:00
5dbe6d886f Merge remote-tracking branch 'origin/master' into feat-pipelines 2025-09-02 10:18:31 +02:00
658d43f9ea migrated to boltdb from sqlite + session files 2025-08-31 20:58:54 +02:00
4c35b0b41f refactoring-1
introduced pipelines and abstracted download process.go in Downloader interface
2025-08-30 10:18:41 +02:00
61 changed files with 2069 additions and 1515 deletions

45
go.mod
View File

@@ -3,30 +3,33 @@ module github.com/marcopiovanello/yt-dlp-web-ui/v3
go 1.24
require (
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/coreos/go-oidc/v3 v3.12.0
github.com/go-chi/chi/v5 v5.2.0
github.com/go-chi/cors v1.2.1
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/coreos/go-oidc/v3 v3.15.0
github.com/go-chi/chi/v5 v5.2.3
github.com/go-chi/cors v1.2.2
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/robfig/cron/v3 v3.0.0
golang.org/x/oauth2 v0.25.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.29.0
gopkg.in/yaml.v3 v3.0.1
modernc.org/sqlite v1.34.5
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/viper v1.20.1
go.etcd.io/bbolt v1.4.3
golang.org/x/crypto v0.41.0
golang.org/x/oauth2 v0.30.0
golang.org/x/sys v0.35.0
)
require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-jose/go-jose/v4 v4.0.4 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect
modernc.org/libc v1.61.11 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.8.2 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-jose/go-jose/v4 v4.1.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.12.0 // indirect
github.com/spf13/cast v1.7.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/text v0.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

129
go.sum
View File

@@ -1,79 +1,76 @@
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII=
github.com/coreos/go-oidc/v3 v3.12.0 h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo=
github.com/coreos/go-oidc/v3 v3.12.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0=
github.com/coreos/go-oidc/v3 v3.15.0 h1:R6Oz8Z4bqWR7VFQ+sPSvZPQv4x8M+sJkDO5ojgwlyAg=
github.com/coreos/go-oidc/v3 v3.15.0/go.mod h1:HaZ3szPaZ0e4r6ebqvsLWlk2Tn+aejfmrfah6hnSYEU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-jose/go-jose/v4 v4.0.4 h1:VsjPI33J0SB9vQM6PLmNjoHqMQNGPiZ0rHL7Ni7Q6/E=
github.com/go-jose/go-jose/v4 v4.0.4/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE=
github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/go-chi/cors v1.2.2 h1:Jmey33TE+b+rB7fT8MUy1u0I4L+NARQlK6LhzKPSyQE=
github.com/go-chi/cors v1.2.2/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-jose/go-jose/v4 v4.1.2 h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI=
github.com/go-jose/go-jose/v4 v4.1.2/go.mod h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo=
github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4=
golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70=
golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE=
golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo=
github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs=
github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4=
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4=
github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.24.4 h1:TFkx1s6dCkQpd6dKurBNmpo+G8Zl4Sq/ztJ+2+DEsh0=
modernc.org/cc/v4 v4.24.4/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.23.15 h1:wFDan71KnYqeHz4eF63vmGE6Q6Pc0PUGDpP0PRMYjDc=
modernc.org/ccgo/v4 v4.23.15/go.mod h1:nJX30dks/IWuBOnVa7VRii9Me4/9TZ1SC9GNtmARTy0=
modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
modernc.org/gc/v2 v2.6.2 h1:YBXi5Kqp6aCK3fIxwKQ3/fErvawVKwjOLItxj1brGds=
modernc.org/gc/v2 v2.6.2/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/libc v1.61.11 h1:6sZG8uB6EMMG7iTLPTndi8jyTdgAQNIeLGjCFICACZw=
modernc.org/libc v1.61.11/go.mod h1:HHX+srFdn839oaJRd0W8hBM3eg+mieyZCAjWwB08/nM=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.8.2 h1:cL9L4bcoAObu4NkxOlKWBWtNHIsnnACGF/TbqQ6sbcI=
modernc.org/memory v1.8.2/go.mod h1:ZbjSvMO5NQ1A2i3bWeDiVMxIorXwdClKE/0SZ+BMotU=
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.34.5 h1:Bb6SR13/fjp15jt70CL4f18JIN7p7dnMExd+UFnF15g=
modernc.org/sqlite v1.34.5/go.mod h1:YLuNmX9NKs8wRNK2ko1LW1NGYcc9FkBO69JOt1AR9JE=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

161
main.go
View File

@@ -1,117 +1,102 @@
package main
import (
"context"
"embed"
"flag"
"io/fs"
"log"
"log/slog"
"os"
"os/signal"
"runtime"
"syscall"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/cli"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid"
"github.com/spf13/viper"
)
var (
host string
port int
queueSize int
configFile string
downloadPath string
downloaderPath string
sessionFilePath string
localDatabasePath string
frontendPath string
//go:embed frontend/dist/index.html
//go:embed frontend/dist/assets/*
var frontend embed.FS
requireAuth bool
username string
password string
userFromEnv = os.Getenv("USERNAME")
passFromEnv = os.Getenv("PASSWORD")
logFile string
enableFileLogging bool
//go:embed frontend/dist/index.html
//go:embed frontend/dist/assets/*
frontend embed.FS
//go:embed openapi/*
swagger embed.FS
)
func init() {
flag.StringVar(&host, "host", "0.0.0.0", "Host where server will listen at")
flag.IntVar(&port, "port", 3033, "Port where server will listen at")
flag.IntVar(&queueSize, "qs", 2, "Queue size (concurrent downloads)")
flag.StringVar(&configFile, "conf", "./config.yml", "Config file path")
flag.StringVar(&downloadPath, "out", ".", "Where files will be saved")
flag.StringVar(&downloaderPath, "driver", "yt-dlp", "yt-dlp executable path")
flag.StringVar(&sessionFilePath, "session", ".", "session file path")
flag.StringVar(&localDatabasePath, "db", "local.db", "local database path")
flag.StringVar(&frontendPath, "web", "", "frontend web resources path")
flag.BoolVar(&enableFileLogging, "fl", false, "enable outputting logs to a file")
flag.StringVar(&logFile, "lf", "yt-dlp-webui.log", "set log file location")
flag.BoolVar(&requireAuth, "auth", false, "Enable RPC authentication")
flag.StringVar(&username, "user", userFromEnv, "Username required for auth")
flag.StringVar(&password, "pass", passFromEnv, "Password required for auth")
flag.Parse()
}
//go:embed openapi/*
var swagger embed.FS
func main() {
frontend, err := fs.Sub(frontend, "frontend/dist")
if err != nil {
log.Fatalln(err)
// Parse optional config path from flag
var configFile string
flag.StringVar(&configFile, "conf", "./config.yml", "Config file path")
flag.Parse()
v := viper.New()
v.SetConfigFile(configFile)
v.SetConfigType("yaml")
// Defaults
v.SetDefault("server.host", "0.0.0.0")
v.SetDefault("server.port", 3033)
v.SetDefault("server.queue_size", 2)
v.SetDefault("paths.download_path", ".")
v.SetDefault("paths.downloader_path", "yt-dlp")
v.SetDefault("paths.local_database_path", ".")
v.SetDefault("logging.log_path", "yt-dlp-webui.log")
v.SetDefault("logging.enable_file_logging", false)
v.SetDefault("authentication.require_auth", false)
// Env binding
v.SetEnvPrefix("APP")
v.AutomaticEnv()
// Load YAML file if exists
if err := v.ReadInConfig(); err != nil {
slog.Debug("using defaults")
}
if frontendPath != "" {
frontend = os.DirFS(frontendPath)
cfg := config.Instance()
if err := v.Unmarshal(&cfg); err != nil {
slog.Error("failed to load config", "error", err)
}
c := config.Instance()
{
// init the config struct with the values from flags
// TODO: find an alternative way to populate the config struct from flags or config file
c.Host = host
c.Port = port
c.QueueSize = queueSize
c.DownloadPath = downloadPath
c.DownloaderPath = downloaderPath
c.SessionFilePath = sessionFilePath
c.LocalDatabasePath = localDatabasePath
c.LogPath = logFile
c.EnableFileLogging = enableFileLogging
c.RequireAuth = requireAuth
c.Username = username
c.Password = password
if cfg.Server.QueueSize <= 0 || runtime.NumCPU() <= 2 {
cfg.Server.QueueSize = 2
}
// limit concurrent downloads for systems with 2 or less logical cores
if runtime.NumCPU() <= 2 {
c.QueueSize = 1
}
// if config file is found it will be merged with the current config struct
if err := c.LoadFile(configFile); err != nil {
log.Println(cli.BgRed, "config", cli.Reset, err)
// 6. Frontend FS
var appFS fs.FS
if fp := v.GetString("frontend_path"); fp != "" {
appFS = os.DirFS(fp)
} else {
sub, err := fs.Sub(frontend, "frontend/dist")
if err != nil {
slog.Error("failed to load embedded frontend", "error", err)
os.Exit(1)
}
appFS = sub
}
// Configure OpenID if needed
openid.Configure()
server.RunBlocking(&server.RunConfig{
App: frontend,
// Graceful shutdown
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
slog.Info("starting server",
"host", cfg.Server.Host,
"port", cfg.Server.Port,
"queue_size", cfg.Server.QueueSize,
)
if err := server.Run(ctx, &server.RunConfig{
App: appFS,
Swagger: swagger,
})
}); err != nil {
slog.Error("server stopped with error", "error", err)
os.Exit(1)
}
slog.Info("server exited cleanly")
}

View File

@@ -146,10 +146,10 @@ func (h *Handler) GetCursor() http.HandlerFunc {
// ApplyRouter implements domain.RestHandler.
func (h *Handler) ApplyRouter() func(chi.Router) {
return func(r chi.Router) {
if config.Instance().RequireAuth {
if config.Instance().Authentication.RequireAuth {
r.Use(middlewares.Authenticated)
}
if config.Instance().UseOpenId {
if config.Instance().OpenId.UseOpenId {
r.Use(openid.Middleware)
}

View File

@@ -16,7 +16,7 @@ import (
func DownloadExists(ctx context.Context, url string) (bool, error) {
cmd := exec.CommandContext(
ctx,
config.Instance().DownloaderPath,
config.Instance().Paths.DownloaderPath,
"--print",
"%(extractor)s %(id)s",
url,

View File

@@ -5,15 +5,12 @@ import (
"database/sql"
"log/slog"
evbus "github.com/asaskevich/EventBus"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
const QueueName = "process:archive"
var (
eventBus = evbus.New()
ch = make(chan *Message, 1)
archiveService archive.Service
)
@@ -25,18 +22,20 @@ func Register(db *sql.DB) {
}
func init() {
eventBus.Subscribe(QueueName, func(m *Message) {
slog.Info(
"archiving completed download",
slog.String("title", m.Title),
slog.String("source", m.Source),
)
archiveService.Archive(context.Background(), m)
})
go func() {
for m := range ch {
slog.Info(
"archiving completed download",
slog.String("title", m.Title),
slog.String("source", m.Source),
)
archiveService.Archive(context.Background(), m)
}
}()
}
func Publish(m *Message) {
if config.Instance().AutoArchive {
eventBus.Publish(QueueName, m)
ch <- m
}
}

View File

@@ -3,7 +3,7 @@ package common
import "time"
// Used to deser the yt-dlp -J output
type DownloadInfo struct {
type DownloadMetadata struct {
URL string `json:"url"`
Title string `json:"title"`
Thumbnail string `json:"thumbnail"`

View File

@@ -1,42 +1,64 @@
package config
import (
"os"
"path/filepath"
"sync"
"time"
"gopkg.in/yaml.v3"
)
type Config struct {
LogPath string `yaml:"log_path"`
EnableFileLogging bool `yaml:"enable_file_logging"`
BaseURL string `yaml:"base_url"`
Host string `yaml:"host"`
Port int `yaml:"port"`
DownloadPath string `yaml:"downloadPath"`
DownloaderPath string `yaml:"downloaderPath"`
RequireAuth bool `yaml:"require_auth"`
Username string `yaml:"username"`
Password string `yaml:"password"`
QueueSize int `yaml:"queue_size"`
LocalDatabasePath string `yaml:"local_database_path"`
SessionFilePath string `yaml:"session_file_path"`
path string // private
UseOpenId bool `yaml:"use_openid"`
OpenIdProviderURL string `yaml:"openid_provider_url"`
OpenIdClientId string `yaml:"openid_client_id"`
OpenIdClientSecret string `yaml:"openid_client_secret"`
OpenIdRedirectURL string `yaml:"openid_redirect_url"`
OpenIdEmailWhitelist []string `yaml:"openid_email_whitelist"`
FrontendPath string `yaml:"frontend_path"`
AutoArchive bool `yaml:"auto_archive"`
Twitch struct {
ClientId string `yaml:"client_id"`
ClientSecret string `yaml:"client_secret"`
CheckInterval time.Duration `yaml:"check_interval"`
} `yaml:"twitch"`
Server ServerConfig `yaml:"server"`
Logging LoggingConfig `yaml:"logging"`
Paths PathsConfig `yaml:"paths"`
Authentication AuthConfig `yaml:"authentication"`
OpenId OpenIdConfig `yaml:"openid"`
Frontend FrontendConfig `yaml:"frontend"`
AutoArchive bool `yaml:"auto_archive"`
Twitch TwitchConfig `yaml:"twitch"`
path string
}
type ServerConfig struct {
BaseURL string `yaml:"base_url"`
Host string `yaml:"host"`
Port int `yaml:"port"`
QueueSize int `yaml:"queue_size"`
}
type LoggingConfig struct {
LogPath string `yaml:"log_path"`
EnableFileLogging bool `yaml:"enable_file_logging"`
}
type PathsConfig struct {
DownloadPath string `yaml:"download_path"`
DownloaderPath string `yaml:"downloader_path"`
LocalDatabasePath string `yaml:"local_database_path"`
}
type AuthConfig struct {
RequireAuth bool `yaml:"require_auth"`
Username string `yaml:"username"`
PasswordHash string `yaml:"password"`
}
type OpenIdConfig struct {
UseOpenId bool `yaml:"use_openid"`
ProviderURL string `yaml:"openid_provider_url"`
ClientId string `yaml:"openid_client_id"`
ClientSecret string `yaml:"openid_client_secret"`
RedirectURL string `yaml:"openid_redirect_url"`
EmailWhitelist []string `yaml:"openid_email_whitelist"`
}
type FrontendConfig struct {
FrontendPath string `yaml:"frontend_path"`
}
type TwitchConfig struct {
ClientId string `yaml:"client_id"`
ClientSecret string `yaml:"client_secret"`
CheckInterval time.Duration `yaml:"check_interval"`
}
var (
@@ -54,22 +76,6 @@ func Instance() *Config {
return instance
}
// Initialises the Config struct given its config file
func (c *Config) LoadFile(filename string) error {
fd, err := os.Open(filename)
if err != nil {
return err
}
c.path = filename
if err := yaml.NewDecoder(fd).Decode(c); err != nil {
return err
}
return nil
}
// Path of the directory containing the config file
func (c *Config) Dir() string { return filepath.Dir(c.path) }

View File

@@ -19,6 +19,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
)
/*
@@ -88,7 +89,7 @@ type ListRequest struct {
}
func ListDownloaded(w http.ResponseWriter, r *http.Request) {
root := config.Instance().DownloadPath
root := config.Instance().Paths.DownloadPath
req := new(ListRequest)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@@ -156,7 +157,7 @@ func SendFile(w http.ResponseWriter, r *http.Request) {
filename := string(decoded)
root := config.Instance().DownloadPath
root := config.Instance().Paths.DownloadPath
if strings.Contains(filepath.Dir(filepath.Clean(filename)), filepath.Clean(root)) {
http.ServeFile(w, r, filename)
@@ -188,7 +189,7 @@ func DownloadFile(w http.ResponseWriter, r *http.Request) {
filename := string(decoded)
root := config.Instance().DownloadPath
root := config.Instance().Paths.DownloadPath
if strings.Contains(filepath.Dir(filepath.Clean(filename)), filepath.Clean(root)) {
w.Header().Add("Content-Disposition", "inline; filename=\""+filepath.Base(filename)+"\"")
@@ -207,9 +208,9 @@ func DownloadFile(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
}
func BulkDownload(mdb *internal.MemoryDB) http.HandlerFunc {
func BulkDownload(mdb *kv.Store) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ps := slices.DeleteFunc(*mdb.All(), func(e internal.ProcessResponse) bool {
ps := slices.DeleteFunc(*mdb.All(), func(e internal.ProcessSnapshot) bool {
return e.Progress.Status != internal.StatusCompleted
})

View File

@@ -10,7 +10,7 @@ import (
)
func ParseURL(url string) (*Metadata, error) {
cmd := exec.Command(config.Instance().DownloaderPath, url, "-J")
cmd := exec.Command(config.Instance().Paths.DownloaderPath, url, "-J")
stdout, err := cmd.Output()
if err != nil {

View File

@@ -1,57 +0,0 @@
package internal
import (
"container/heap"
"log/slog"
)
type LoadBalancer struct {
pool Pool
done chan *Worker
}
func NewLoadBalancer(numWorker int) *LoadBalancer {
var pool Pool
doneChan := make(chan *Worker)
for i := range numWorker {
w := &Worker{
requests: make(chan *Process, 1),
index: i,
}
go w.Work(doneChan)
pool = append(pool, w)
slog.Info("spawned worker", slog.Int("index", i))
}
return &LoadBalancer{
pool: pool,
done: doneChan,
}
}
func (b *LoadBalancer) Balance(work chan *Process) {
for {
select {
case req := <-work:
b.dispatch(req)
case w := <-b.done:
b.completed(w)
}
}
}
func (b *LoadBalancer) dispatch(req *Process) {
w := heap.Pop(&b.pool).(*Worker)
w.requests <- req
w.pending++
heap.Push(&b.pool, w)
}
func (b *LoadBalancer) completed(w *Worker) {
w.pending--
heap.Remove(&b.pool, w.index)
heap.Push(&b.pool, w)
}

View File

@@ -33,18 +33,19 @@ type DownloadProgress struct {
// struct representing the response sent to the client
// as JSON-RPC result field
type ProcessResponse struct {
Id string `json:"id"`
Progress DownloadProgress `json:"progress"`
Info common.DownloadInfo `json:"info"`
Output DownloadOutput `json:"output"`
Params []string `json:"params"`
type ProcessSnapshot struct {
Id string `json:"id"`
Progress DownloadProgress `json:"progress"`
Info common.DownloadMetadata `json:"info"`
Output DownloadOutput `json:"output"`
Params []string `json:"params"`
DownloaderName string `json:"downloader_name"`
}
// struct representing the current status of the memoryDB
// used for serializaton/persistence reasons
type Session struct {
Processes []ProcessResponse `json:"processes"`
Snapshots []ProcessSnapshot `json:"processes"`
}
// struct representing the intent to stop a specific process
@@ -72,3 +73,11 @@ type CustomTemplate struct {
Name string `json:"name"`
Content string `json:"content"`
}
const (
StatusPending = iota
StatusDownloading
StatusCompleted
StatusErrored
StatusLiveStream
)

View File

@@ -0,0 +1,42 @@
package downloaders
import (
"log/slog"
"sync"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
)
type DownloaderBase struct {
Id string
URL string
Metadata common.DownloadMetadata
Pending bool
Completed bool
mutex sync.Mutex
}
func (d *DownloaderBase) FetchMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) {
d.mutex.Lock()
defer d.mutex.Unlock()
meta, err := fetcher(d.URL)
if err != nil {
slog.Error("failed to retrieve metadata", slog.Any("err", err))
return
}
d.Metadata = *meta
}
func (d *DownloaderBase) SetPending(p bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.Pending = p
}
func (d *DownloaderBase) Complete() {
d.mutex.Lock()
defer d.mutex.Unlock()
d.Completed = true
}

View File

@@ -0,0 +1,26 @@
package downloaders
import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
type Downloader interface {
Start() error
Stop() error
Status() *internal.ProcessSnapshot
SetOutput(output internal.DownloadOutput)
SetProgress(progress internal.DownloadProgress)
SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error))
SetPending(p bool)
IsCompleted() bool
UpdateSavedFilePath(path string)
RestoreFromSnapshot(*internal.ProcessSnapshot) error
GetId() string
GetUrl() string
}

View File

@@ -0,0 +1,211 @@
package downloaders
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/exec"
"slices"
"strings"
"syscall"
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
const downloadTemplate = `download:
{
"eta":%(progress.eta)s,
"percentage":"%(progress._percent_str)s",
"speed":%(progress.speed)s
}`
// filename not returning the correct extension after postprocess
const postprocessTemplate = `postprocess:
{
"filepath":"%(info.filepath)s"
}
`
type GenericDownloader struct {
Params []string
AutoRemove bool
progress internal.DownloadProgress
output internal.DownloadOutput
proc *os.Process
logConsumer LogConsumer
// embedded
DownloaderBase
}
func NewGenericDownload(url string, params []string) Downloader {
g := &GenericDownloader{
logConsumer: NewJSONLogConsumer(),
}
// in base
g.Id = uuid.NewString()
g.URL = url
return g
}
func (g *GenericDownloader) Start() error {
g.SetPending(true)
g.Params = argsSanitizer(g.Params)
out := internal.DownloadOutput{
Path: config.Instance().Paths.DownloadPath,
Filename: "%(title)s.%(ext)s",
}
if g.output.Path != "" {
out.Path = g.output.Path
}
if g.output.Filename != "" {
out.Filename = g.output.Filename
}
buildFilename(&g.output)
templateReplacer := strings.NewReplacer("\n", "", "\t", "", " ", "")
baseParams := []string{
strings.Split(g.URL, "?list")[0], //no playlist
"--newline",
"--no-colors",
"--no-playlist",
"--progress-template",
templateReplacer.Replace(downloadTemplate),
"--progress-template",
templateReplacer.Replace(postprocessTemplate),
"--no-exec",
}
// if user asked to manually override the output path...
if !(slices.Contains(g.Params, "-P") || slices.Contains(g.Params, "--paths")) {
g.Params = append(g.Params, "-o")
g.Params = append(g.Params, fmt.Sprintf("%s/%s", out.Path, out.Filename))
}
params := append(baseParams, g.Params...)
slog.Info("requesting download", slog.String("url", g.URL), slog.Any("params", params))
cmd := exec.Command(config.Instance().Paths.DownloaderPath, params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("failed to get a stdout pipe", slog.Any("err", err))
panic(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to get a stderr pipe", slog.Any("err", err))
panic(err)
}
if err := cmd.Start(); err != nil {
slog.Error("failed to start yt-dlp process", slog.Any("err", err))
panic(err)
}
g.proc = cmd.Process
ctx, cancel := context.WithCancel(context.Background())
defer func() {
stdout.Close()
g.Complete()
cancel()
}()
logs := make(chan []byte)
go produceLogs(stdout, logs)
go consumeLogs(ctx, logs, g.logConsumer, g)
go printYtDlpErrors(stderr, g.Id, g.URL)
g.SetPending(false)
return cmd.Wait()
}
func (g *GenericDownloader) Stop() error {
defer func() {
g.progress.Status = internal.StatusCompleted
g.Complete()
}()
// yt-dlp uses multiple child process the parent process
// has been spawned with setPgid = true. To properly kill
// all subprocesses a SIGTERM need to be sent to the correct
// process group
if g.proc == nil {
return errors.New("*os.Process not set")
}
pgid, err := syscall.Getpgid(g.proc.Pid)
if err != nil {
return err
}
if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil {
return err
}
return nil
}
func (g *GenericDownloader) Status() *internal.ProcessSnapshot {
return &internal.ProcessSnapshot{
Id: g.Id,
Info: g.Metadata,
Progress: g.progress,
Output: g.output,
Params: g.Params,
DownloaderName: "generic",
}
}
func (g *GenericDownloader) UpdateSavedFilePath(p string) { g.output.SavedFilePath = p }
func (g *GenericDownloader) SetOutput(o internal.DownloadOutput) { g.output = o }
func (g *GenericDownloader) SetProgress(p internal.DownloadProgress) { g.progress = p }
func (g *GenericDownloader) SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) {
g.FetchMetadata(fetcher)
}
func (g *GenericDownloader) SetPending(p bool) {
g.Pending = p
}
func (g *GenericDownloader) GetId() string { return g.Id }
func (g *GenericDownloader) GetUrl() string { return g.URL }
func (g *GenericDownloader) RestoreFromSnapshot(snap *internal.ProcessSnapshot) error {
if snap == nil {
return errors.New("cannot restore nil snapshot")
}
s := *snap
g.Id = s.Id
g.URL = s.Info.URL
g.Metadata = s.Info
g.progress = s.Progress
g.output = s.Output
g.Params = s.Params
return nil
}
func (g *GenericDownloader) IsCompleted() bool { return g.Completed }

View File

@@ -0,0 +1,205 @@
package downloaders
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"path/filepath"
"slices"
"syscall"
"time"
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes"
)
type LiveStreamDownloader struct {
progress internal.DownloadProgress
proc *os.Process
logConsumer LogConsumer
pipes []pipes.Pipe
// embedded
DownloaderBase
}
func NewLiveStreamDownloader(url string, pipes []pipes.Pipe) Downloader {
l := &LiveStreamDownloader{
logConsumer: NewFFMpegLogConsumer(),
pipes: pipes,
}
// in base
l.Id = uuid.NewString()
l.URL = url
return l
}
func (l *LiveStreamDownloader) Start() error {
l.SetPending(true)
baseParams := []string{
l.URL,
"--newline",
"--no-colors",
"--no-playlist",
"--no-exec",
}
params := append(baseParams, "-o", "-")
cmd := exec.Command(config.Instance().Paths.DownloaderPath, params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// stdout = media stream
media, err := cmd.StdoutPipe()
if err != nil {
slog.Error("failed to get media stdout", slog.Any("err", err))
panic(err)
}
// stderr = log/progress
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to get stderr pipe", slog.Any("err", err))
panic(err)
}
if err := cmd.Start(); err != nil {
slog.Error("failed to start yt-dlp process", slog.Any("err", err))
panic(err)
}
l.proc = cmd.Process
ctx, cancel := context.WithCancel(context.Background())
defer func() {
l.Complete()
cancel()
}()
// --- costruisci pipeline ---
reader := io.Reader(media)
for _, pipe := range l.pipes {
nr, err := pipe.Connect(reader)
if err != nil {
slog.Error("pipe failed", slog.String("pipe", pipe.Name()), slog.Any("err", err))
return err
}
reader = nr
}
// --- fallback: se nessun FileWriter, scrivi su file ---
if !l.hasFileWriter() {
go func() {
filepath.Join(
config.Instance().Paths.DownloadPath,
fmt.Sprintf("%s (live) %s.mp4", l.Id, time.Now().Format(time.ANSIC)),
)
defaultPath := filepath.Join(config.Instance().Paths.DownloadPath)
f, err := os.Create(defaultPath)
if err != nil {
slog.Error("failed to create fallback file", slog.Any("err", err))
return
}
defer f.Close()
_, err = io.Copy(f, reader)
if err != nil {
slog.Error("copy error", slog.Any("err", err))
}
slog.Info("download saved", slog.String("path", defaultPath))
}()
}
// --- logs consumer ---
logs := make(chan []byte)
go produceLogs(stderr, logs)
go consumeLogs(ctx, logs, l.logConsumer, l)
l.progress.Status = internal.StatusLiveStream
return cmd.Wait()
}
func (l *LiveStreamDownloader) Stop() error {
defer func() {
l.progress.Status = internal.StatusCompleted
l.Complete()
}()
// yt-dlp uses multiple child process the parent process
// has been spawned with setPgid = true. To properly kill
// all subprocesses a SIGTERM need to be sent to the correct
// process group
if l.proc == nil {
return errors.New("*os.Process not set")
}
pgid, err := syscall.Getpgid(l.proc.Pid)
if err != nil {
return err
}
if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil {
return err
}
return nil
}
func (l *LiveStreamDownloader) Status() *internal.ProcessSnapshot {
return &internal.ProcessSnapshot{
Id: l.Id,
Info: l.Metadata,
Progress: l.progress,
DownloaderName: "livestream",
}
}
func (l *LiveStreamDownloader) UpdateSavedFilePath(p string) {}
func (l *LiveStreamDownloader) SetOutput(o internal.DownloadOutput) {}
func (l *LiveStreamDownloader) SetProgress(p internal.DownloadProgress) { l.progress = p }
func (l *LiveStreamDownloader) SetMetadata(fetcher func(url string) (*common.DownloadMetadata, error)) {
l.FetchMetadata(fetcher)
}
func (l *LiveStreamDownloader) SetPending(p bool) {
l.Pending = p
}
func (l *LiveStreamDownloader) GetId() string { return l.Id }
func (l *LiveStreamDownloader) GetUrl() string { return l.URL }
func (l *LiveStreamDownloader) RestoreFromSnapshot(snap *internal.ProcessSnapshot) error {
if snap == nil {
return errors.New("cannot restore nil snapshot")
}
s := *snap
l.Id = s.Id
l.URL = s.Info.URL
l.Metadata = s.Info
l.progress = s.Progress
return nil
}
func (l *LiveStreamDownloader) IsCompleted() bool { return l.Completed }
func (l *LiveStreamDownloader) hasFileWriter() bool {
return slices.ContainsFunc(l.pipes, func(p pipes.Pipe) bool {
return p.Name() == "file-writer"
})
}

View File

@@ -0,0 +1,68 @@
package downloaders
import (
"encoding/json"
"log/slog"
"strings"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
type LogConsumer interface {
GetName() string
ParseLogEntry(entry []byte, downloader Downloader)
}
type JSONLogConsumer struct{}
func NewJSONLogConsumer() LogConsumer {
return &JSONLogConsumer{}
}
func (j *JSONLogConsumer) GetName() string { return "json-log-consumer" }
func (j *JSONLogConsumer) ParseLogEntry(entry []byte, d Downloader) {
var progress internal.ProgressTemplate
var postprocess internal.PostprocessTemplate
if err := json.Unmarshal(entry, &progress); err == nil {
d.SetProgress(internal.DownloadProgress{
Status: internal.StatusDownloading,
Percentage: progress.Percentage,
Speed: progress.Speed,
ETA: progress.Eta,
})
slog.Info("progress",
slog.String("id", j.GetShortId(d.GetId())),
slog.String("url", d.GetUrl()),
slog.String("percentage", progress.Percentage),
)
}
if err := json.Unmarshal(entry, &postprocess); err == nil {
d.UpdateSavedFilePath(postprocess.FilePath)
}
}
func (j *JSONLogConsumer) GetShortId(id string) string {
return strings.Split(id, "-")[0]
}
//TODO: split in different files
type FFMpegLogConsumer struct{}
func NewFFMpegLogConsumer() LogConsumer {
return &JSONLogConsumer{}
}
func (f *FFMpegLogConsumer) GetName() string { return "ffmpeg-log-consumer" }
func (f *FFMpegLogConsumer) ParseLogEntry(entry []byte, d Downloader) {
slog.Info("ffmpeg output",
slog.String("id", d.GetId()),
slog.String("url", d.GetUrl()),
slog.String("output", string(entry)),
)
}

View File

@@ -0,0 +1 @@
package downloaders

View File

@@ -0,0 +1,76 @@
package downloaders
import (
"bufio"
"context"
"io"
"log/slog"
"regexp"
"slices"
"strings"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
func argsSanitizer(params []string) []string {
params = slices.DeleteFunc(params, func(e string) bool {
match, _ := regexp.MatchString(`(\$\{)|(\&\&)`, e)
return match
})
params = slices.DeleteFunc(params, func(e string) bool {
return e == ""
})
return params
}
func buildFilename(o *internal.DownloadOutput) {
if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") {
o.Filename += ".%(ext)s"
}
o.Filename = strings.Replace(
o.Filename,
".%(ext)s.%(ext)s",
".%(ext)s",
1,
)
}
func produceLogs(r io.Reader, logs chan<- []byte) {
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
logs <- scanner.Bytes()
}
}()
}
func consumeLogs(ctx context.Context, logs <-chan []byte, c LogConsumer, d Downloader) {
for {
select {
case <-ctx.Done():
slog.Info("detaching logs",
slog.String("url", d.GetUrl()),
slog.String("id", c.GetName()),
)
return
case entry := <-logs:
c.ParseLogEntry(entry, d)
}
}
}
func printYtDlpErrors(stdout io.Reader, shortId, url string) {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
slog.Error("yt-dlp process error",
slog.String("id", shortId),
slog.String("url", url),
slog.String("err", scanner.Text()),
)
}
}

173
server/internal/kv/store.go Normal file
View File

@@ -0,0 +1,173 @@
package kv
import (
"encoding/json"
"errors"
"log/slog"
"runtime"
"sync"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
bolt "go.etcd.io/bbolt"
)
var (
bucket = []byte("downloads")
memDbEvents = make(chan downloaders.Downloader, runtime.NumCPU())
)
// In-Memory Thread-Safe Key-Value Storage with optional persistence
type Store struct {
db *bolt.DB
table map[string]downloaders.Downloader
mu sync.RWMutex
}
func NewStore(db *bolt.DB, snaptshotInteval time.Duration) (*Store, error) {
// init bucket
err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucket)
return err
})
if err != nil {
return nil, err
}
s := &Store{
db: db,
table: make(map[string]downloaders.Downloader),
}
go func() {
ticker := time.NewTicker(snaptshotInteval)
for range ticker.C {
s.Snapshot()
}
}()
return s, err
}
// Get a process pointer given its id
func (m *Store) Get(id string) (downloaders.Downloader, error) {
m.mu.RLock()
defer m.mu.RUnlock()
entry, ok := m.table[id]
if !ok {
return nil, errors.New("no process found for the given key")
}
return entry, nil
}
// Store a pointer of a process and return its id
func (m *Store) Set(d downloaders.Downloader) string {
m.mu.Lock()
m.table[d.GetId()] = d
m.mu.Unlock()
return d.GetId()
}
// Removes a process progress, given the process id
func (m *Store) Delete(id string) {
m.mu.Lock()
delete(m.table, id)
m.mu.Unlock()
}
func (m *Store) Keys() *[]string {
var running []string
m.mu.RLock()
defer m.mu.RUnlock()
for id := range m.table {
running = append(running, id)
}
return &running
}
// Returns a slice of all currently stored processes progess
func (m *Store) All() *[]internal.ProcessSnapshot {
running := []internal.ProcessSnapshot{}
m.mu.RLock()
for _, v := range m.table {
running = append(running, *(v.Status()))
}
m.mu.RUnlock()
return &running
}
// Restore a persisted state
func (m *Store) Restore(mq *queue.MessageQueue) {
m.mu.Lock()
defer m.mu.Unlock()
var snapshot []internal.ProcessSnapshot
m.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.ForEach(func(k, v []byte) error {
var snap internal.ProcessSnapshot
if err := json.Unmarshal(v, &snap); err != nil {
return err
}
snapshot = append(snapshot, snap)
return nil
})
})
for _, snap := range snapshot {
var restored downloaders.Downloader
if snap.DownloaderName == "generic" {
d := downloaders.NewGenericDownload("", []string{})
err := d.RestoreFromSnapshot(&snap)
if err != nil {
continue
}
restored = d
m.table[snap.Id] = restored
if !restored.(*downloaders.GenericDownloader).DownloaderBase.Completed {
mq.Publish(restored)
}
}
}
}
func (m *Store) EventListener() {
for p := range memDbEvents {
if p.Status().DownloaderName == "livestream" {
slog.Info("compacting Store", slog.String("id", p.GetId()))
m.Delete(p.GetId())
}
}
}
func (m *Store) Snapshot() error {
slog.Debug("snapshotting downloads state")
running := m.All()
return m.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
for _, v := range *running {
data, err := json.Marshal(v)
if err != nil {
return err
}
if err := b.Put([]byte(v.Id), data); err != nil {
return err
}
}
return nil
})
}

View File

@@ -0,0 +1,9 @@
package kv
import "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
// struct representing the current status of the memoryDB
// used for serializaton/persistence reasons
type Session struct {
Processes []internal.ProcessSnapshot `json:"processes"`
}

View File

@@ -11,7 +11,10 @@ import (
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
)
const (
@@ -32,11 +35,11 @@ type LiveStream struct {
waitTime time.Duration
liveDate time.Time
mq *internal.MessageQueue
db *internal.MemoryDB
mq *queue.MessageQueue
store *kv.Store
}
func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *internal.MemoryDB) *LiveStream {
func New(url string, done chan *LiveStream, mq *queue.MessageQueue, store *kv.Store) *LiveStream {
return &LiveStream{
url: url,
done: done,
@@ -44,20 +47,20 @@ func New(url string, done chan *LiveStream, mq *internal.MessageQueue, db *inter
waitTime: time.Second * 0,
waitTimeChan: make(chan time.Duration),
mq: mq,
db: db,
store: store,
}
}
// Start the livestream monitoring process, once completion signals on the done channel
func (l *LiveStream) Start() error {
cmd := exec.Command(
config.Instance().DownloaderPath,
config.Instance().Paths.DownloaderPath,
l.url,
"--wait-for-video", "30", // wait for the stream to be live and recheck every 10 secs
"--no-colors", // no ansi color fuzz
"--simulate",
"--newline",
"--paths", config.Instance().DownloadPath,
"--paths", config.Instance().Paths.DownloadPath,
)
stdout, err := cmd.StdoutPipe()
@@ -87,13 +90,12 @@ func (l *LiveStream) Start() error {
l.done <- l
// Send the started livestream to the message queue! :D
p := &internal.Process{
Url: l.url,
Livestream: true,
Params: []string{"--downloader", "ffmpeg", "--no-part"},
}
l.db.Set(p)
l.mq.Publish(p)
//TODO: add pipes
d := downloaders.NewLiveStreamDownloader(l.url, []pipes.Pipe{})
l.store.Set(d)
l.mq.Publish(d)
return nil
}

View File

@@ -5,11 +5,12 @@ import (
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
)
func setupTest() {
config.Instance().DownloaderPath = "build/yt-dlp"
config.Instance().Paths.DownloaderPath = "build/yt-dlp"
}
const URL = "https://www.youtube.com/watch?v=pwoAyLGOysU"
@@ -19,7 +20,7 @@ func TestLivestream(t *testing.T) {
done := make(chan *LiveStream)
ls := New(URL, done, &internal.MessageQueue{}, &internal.MemoryDB{})
ls := New(URL, done, &queue.MessageQueue{}, &kv.Store{})
go ls.Start()
time.AfterFunc(time.Second*20, func() {

View File

@@ -1,27 +1,31 @@
package livestream
import (
"encoding/gob"
"log/slog"
"maps"
"os"
"path/filepath"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
bolt "go.etcd.io/bbolt"
)
var bucket = []byte("livestreams")
type Monitor struct {
db *internal.MemoryDB // where the just started livestream will be published
mq *internal.MessageQueue // where the just started livestream will be published
db *bolt.DB
store *kv.Store // where the just started livestream will be published
mq *queue.MessageQueue // where the just started livestream will be published
streams map[string]*LiveStream // keeps track of the livestreams
done chan *LiveStream // to signal individual processes completition
}
func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor {
func NewMonitor(mq *queue.MessageQueue, store *kv.Store, db *bolt.DB) *Monitor {
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucket)
return err
})
return &Monitor{
mq: mq,
db: db,
store: store,
streams: make(map[string]*LiveStream),
done: make(chan *LiveStream),
}
@@ -31,14 +35,24 @@ func NewMonitor(mq *internal.MessageQueue, db *internal.MemoryDB) *Monitor {
func (m *Monitor) Schedule() {
for l := range m.done {
delete(m.streams, l.url)
m.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.Delete([]byte(l.url))
})
}
}
func (m *Monitor) Add(url string) {
ls := New(url, m.done, m.mq, m.db)
ls := New(url, m.done, m.mq, m.store)
go ls.Start()
m.streams[url] = ls
m.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.Put([]byte(url), []byte{})
})
}
func (m *Monitor) Remove(url string) error {
@@ -58,11 +72,6 @@ func (m *Monitor) Status() LiveStreamStatus {
status := make(LiveStreamStatus)
for k, v := range m.streams {
// wt, ok := <-v.WaitTime()
// if !ok {
// continue
// }
status[k] = Status{
Status: v.status,
WaitTime: v.waitTime,
@@ -73,46 +82,13 @@ func (m *Monitor) Status() LiveStreamStatus {
return status
}
// Persist the monitor current state to a file.
// The file is located in the configured config directory
func (m *Monitor) Persist() error {
fd, err := os.Create(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat"))
if err != nil {
return err
}
defer fd.Close()
slog.Debug("persisting livestream monitor state")
var toPersist []string
for url := range maps.Keys(m.streams) {
toPersist = append(toPersist, url)
}
return gob.NewEncoder(fd).Encode(toPersist)
}
// Restore a saved state and resume the monitored livestreams
func (m *Monitor) Restore() error {
fd, err := os.Open(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat"))
if err != nil {
return err
}
defer fd.Close()
var toRestore []string
if err := gob.NewDecoder(fd).Decode(&toRestore); err != nil {
return err
}
for _, url := range toRestore {
m.Add(url)
}
slog.Debug("restored livestream monitor state")
return nil
return m.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.ForEach(func(k, v []byte) error {
m.Add(string(k))
return nil
})
})
}

View File

@@ -1,158 +0,0 @@
package internal
import (
"encoding/gob"
"errors"
"log/slog"
"os"
"path/filepath"
"sync"
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
var memDbEvents = make(chan *Process)
// In-Memory Thread-Safe Key-Value Storage with optional persistence
type MemoryDB struct {
table map[string]*Process
mu sync.RWMutex
}
func NewMemoryDB() *MemoryDB {
return &MemoryDB{
table: make(map[string]*Process),
}
}
// Get a process pointer given its id
func (m *MemoryDB) Get(id string) (*Process, error) {
m.mu.RLock()
defer m.mu.RUnlock()
entry, ok := m.table[id]
if !ok {
return nil, errors.New("no process found for the given key")
}
return entry, nil
}
// Store a pointer of a process and return its id
func (m *MemoryDB) Set(process *Process) string {
id := uuid.NewString()
m.mu.Lock()
process.Id = id
m.table[id] = process
m.mu.Unlock()
return id
}
// Removes a process progress, given the process id
func (m *MemoryDB) Delete(id string) {
m.mu.Lock()
delete(m.table, id)
m.mu.Unlock()
}
func (m *MemoryDB) Keys() *[]string {
var running []string
m.mu.RLock()
defer m.mu.RUnlock()
for id := range m.table {
running = append(running, id)
}
return &running
}
// Returns a slice of all currently stored processes progess
func (m *MemoryDB) All() *[]ProcessResponse {
running := []ProcessResponse{}
m.mu.RLock()
for k, v := range m.table {
running = append(running, ProcessResponse{
Id: k,
Info: v.Info,
Progress: v.Progress,
Output: v.Output,
Params: v.Params,
})
}
m.mu.RUnlock()
return &running
}
// Persist the database in a single file named "session.dat"
func (m *MemoryDB) Persist() error {
running := m.All()
sf := filepath.Join(config.Instance().SessionFilePath, "session.dat")
fd, err := os.Create(sf)
if err != nil {
return errors.Join(errors.New("failed to persist session"), err)
}
m.mu.RLock()
defer m.mu.RUnlock()
session := Session{Processes: *running}
if err := gob.NewEncoder(fd).Encode(session); err != nil {
return errors.Join(errors.New("failed to persist session"), err)
}
return nil
}
// Restore a persisted state
func (m *MemoryDB) Restore(mq *MessageQueue) {
sf := filepath.Join(config.Instance().SessionFilePath, "session.dat")
fd, err := os.Open(sf)
if err != nil {
return
}
var session Session
if err := gob.NewDecoder(fd).Decode(&session); err != nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
for _, proc := range session.Processes {
restored := &Process{
Id: proc.Id,
Url: proc.Info.URL,
Info: proc.Info,
Progress: proc.Progress,
Output: proc.Output,
Params: proc.Params,
}
m.table[proc.Id] = restored
if restored.Progress.Status != StatusCompleted {
mq.Publish(restored)
}
}
}
func (m *MemoryDB) EventListener() {
for p := range memDbEvents {
if p.AutoRemove {
slog.Info("compacting MemoryDB", slog.String("id", p.Id))
m.Delete(p.Id)
}
}
}

View File

@@ -1,112 +0,0 @@
package internal
import (
"context"
"errors"
"log/slog"
evbus "github.com/asaskevich/EventBus"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"golang.org/x/sync/semaphore"
)
const queueName = "process:pending"
type MessageQueue struct {
concurrency int
eventBus evbus.Bus
}
// Creates a new message queue.
// By default it will be created with a size equals to nthe number of logical
// CPU cores -1.
// The queue size can be set via the qs flag.
func NewMessageQueue() (*MessageQueue, error) {
qs := config.Instance().QueueSize
if qs <= 0 {
return nil, errors.New("invalid queue size")
}
return &MessageQueue{
concurrency: qs,
eventBus: evbus.New(),
}, nil
}
// Publish a message to the queue and set the task to a peding state.
func (m *MessageQueue) Publish(p *Process) {
// needs to have an id set before
p.SetPending()
m.eventBus.Publish(queueName, p)
}
func (m *MessageQueue) SetupConsumers() {
go m.downloadConsumer()
go m.metadataSubscriber()
}
// Setup the consumer listener which subscribes to the changes to the producer
// channel and triggers the "download" action.
func (m *MessageQueue) downloadConsumer() {
sem := semaphore.NewWeighted(int64(m.concurrency))
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
sem.Acquire(context.Background(), 1)
defer sem.Release(1)
slog.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "downloadConsumer"),
slog.String("id", p.getShortId()),
)
if p.Progress.Status != StatusCompleted {
slog.Info("started process",
slog.String("bus", queueName),
slog.String("id", p.getShortId()),
)
if p.Livestream {
// livestreams have higher priorty and they ignore the semaphore
go p.Start()
} else {
p.Start()
}
}
}, false)
}
// Setup the metadata consumer listener which subscribes to the changes to the
// producer channel and adds metadata to each download.
func (m *MessageQueue) metadataSubscriber() {
// How many concurrent metadata fetcher jobs are spawned
// Since there's ongoing downloads, 1 job at time seems a good compromise
sem := semaphore.NewWeighted(1)
m.eventBus.SubscribeAsync(queueName, func(p *Process) {
sem.Acquire(context.Background(), 1)
defer sem.Release(1)
slog.Info("received process from event bus",
slog.String("bus", queueName),
slog.String("consumer", "metadataConsumer"),
slog.String("id", p.getShortId()),
)
if p.Progress.Status == StatusCompleted {
slog.Warn("proccess has an illegal state",
slog.String("id", p.getShortId()),
slog.Int("status", p.Progress.Status),
)
return
}
if err := p.SetMetadata(); err != nil {
slog.Error("failed to retrieve metadata",
slog.String("id", p.getShortId()),
slog.String("err", err.Error()),
)
}
}, false)
}

View File

@@ -0,0 +1,57 @@
package metadata
import (
"bytes"
"encoding/json"
"errors"
"io"
"log/slog"
"os/exec"
"syscall"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
func DefaultFetcher(url string) (*common.DownloadMetadata, error) {
cmd := exec.Command(config.Instance().Paths.DownloaderPath, url, "-J")
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
meta := common.DownloadMetadata{
URL: url,
CreatedAt: time.Now(),
}
if err := cmd.Start(); err != nil {
return nil, err
}
var bufferedStderr bytes.Buffer
go func() {
io.Copy(&bufferedStderr, stderr)
}()
slog.Info("retrieving metadata", slog.String("url", url))
if err := json.NewDecoder(stdout).Decode(&meta); err != nil {
return nil, err
}
if err := cmd.Wait(); err != nil {
return nil, errors.New(bufferedStderr.String())
}
return &meta, nil
}

View File

@@ -0,0 +1,92 @@
package pipeline
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
bolt "go.etcd.io/bbolt"
)
type handler struct {
store *Store
}
func NewRestHandler(db *bolt.DB) *handler {
store, _ := NewStore(db)
return &handler{
store: store,
}
}
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id := chi.URLParam(r, "id")
p, err := h.store.Get(id)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := json.NewEncoder(w).Encode(p); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (h *handler) GetAllPipelines(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
p, err := h.store.List()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(p); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (h *handler) SavePipeline(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
defer r.Body.Close()
var req Pipeline
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
id, err := h.store.Save(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := json.NewEncoder(w).Encode(id); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id := chi.URLParam(r, "id")
err := h.store.Delete(id)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := json.NewEncoder(w).Encode("ok"); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

View File

@@ -0,0 +1,103 @@
package pipeline
import (
"encoding/json"
"fmt"
"github.com/google/uuid"
bolt "go.etcd.io/bbolt"
)
var bucket = []byte("pipelines")
type Step struct {
Type string `json:"type"` // es. "transcoder", "filewriter"
FFmpegArgs []string `json:"ffmpeg_args,omitempty"` // args da passare a ffmpeg
Path string `json:"path,omitempty"` // solo per filewriter
Extension string `json:"extension,omitempty"` // solo per filewriter
}
type Pipeline struct {
ID string `json:"id"`
Name string `json:"name"`
Steps []Step `json:"steps"`
}
type Store struct {
db *bolt.DB
}
func NewStore(db *bolt.DB) (*Store, error) {
// init bucket
err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucket)
return err
})
if err != nil {
return nil, err
}
return &Store{db: db}, nil
}
func (s *Store) Save(p Pipeline) (string, error) {
if p.ID == "" {
p.ID = uuid.NewString()
}
data, err := json.Marshal(p)
if err != nil {
return "", err
}
return p.ID, s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.Put([]byte(p.ID), data)
})
}
func (s *Store) Get(id string) (*Pipeline, error) {
var p Pipeline
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
v := b.Get([]byte(id))
if v == nil {
return fmt.Errorf("pipeline %s not found", id)
}
return json.Unmarshal(v, &p)
})
if err != nil {
return nil, err
}
return &p, nil
}
func (s *Store) List() ([]Pipeline, error) {
var result []Pipeline
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.ForEach(func(k, v []byte) error {
var p Pipeline
if err := json.Unmarshal(v, &p); err != nil {
return err
}
result = append(result, p)
return nil
})
})
if err != nil {
return nil, err
}
return result, nil
}
func (s *Store) Delete(id string) error {
return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.Delete([]byte(id))
})
}

View File

@@ -0,0 +1,45 @@
package pipes
import (
"io"
"log/slog"
"os"
)
type FileWriter struct {
Path string
IsFinal bool
}
func (f *FileWriter) Name() string { return "file-writer" }
func (f *FileWriter) Connect(r io.Reader) (io.Reader, error) {
file, err := os.Create(f.Path)
if err != nil {
return nil, err
}
if f.IsFinal {
go func() {
defer file.Close()
if _, err := io.Copy(file, r); err != nil {
slog.Error("FileWriter (final) error", slog.Any("err", err))
}
}()
return r, nil
}
pr, pw := io.Pipe()
go func() {
defer file.Close()
defer pw.Close()
writer := io.MultiWriter(file, pw)
if _, err := io.Copy(writer, r); err != nil {
slog.Error("FileWriter (pipeline) error", slog.Any("err", err))
}
}()
return pr, nil
}

View File

@@ -0,0 +1,66 @@
package pipes
import (
"bufio"
"errors"
"io"
"log/slog"
"os/exec"
"strings"
)
type Transcoder struct {
Args []string
}
func (t *Transcoder) Name() string { return "ffmpeg-transcoder" }
func (t *Transcoder) Connect(r io.Reader) (io.Reader, error) {
cmd := exec.Command("ffmpeg",
append([]string{"-i", "pipe:0"}, append(t.Args, "-f", "webm", "pipe:1")...)...,
)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
go func() {
reader := bufio.NewReader(stderr)
var line string
for {
part, err := reader.ReadString('\r')
line += part
if err != nil {
break
}
line = strings.TrimRight(line, "\r\n")
slog.Info("ffmpeg transcoder", slog.String("log", line))
line = ""
}
}()
go func() {
defer stdin.Close()
_, err := io.Copy(stdin, r)
if err != nil && !errors.Is(err, io.EOF) {
slog.Error("transcoder stdin error", slog.Any("err", err))
}
}()
if err := cmd.Start(); err != nil {
return nil, err
}
return stdout, nil
}

View File

@@ -0,0 +1,8 @@
package pipes
import "io"
type Pipe interface {
Name() string
Connect(r io.Reader) (io.Reader, error)
}

View File

@@ -1,24 +0,0 @@
package internal
// Pool implements heap.Interface interface as a standard priority queue
type Pool []*Worker
func (h Pool) Len() int { return len(h) }
func (h Pool) Less(i, j int) bool { return h[i].pending < h[j].pending }
func (h Pool) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *Pool) Push(x any) { *h = append(*h, x.(*Worker)) }
func (h *Pool) Pop() any {
old := *h
n := len(old)
x := old[n-1]
old[n-1] = nil
*h = old[0 : n-1]
return x
}

View File

@@ -1,386 +0,0 @@
package internal
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"regexp"
"slices"
"syscall"
"os"
"os/exec"
"strings"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archiver"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
)
const downloadTemplate = `download:
{
"eta":%(progress.eta)s,
"percentage":"%(progress._percent_str)s",
"speed":%(progress.speed)s
}`
// filename not returning the correct extension after postprocess
const postprocessTemplate = `postprocess:
{
"filepath":"%(info.filepath)s"
}
`
const (
StatusPending = iota
StatusDownloading
StatusCompleted
StatusErrored
)
// Process descriptor
type Process struct {
Id string
Url string
Livestream bool
AutoRemove bool
Params []string
Info common.DownloadInfo
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
}
// Starts spawns/forks a new yt-dlp process and parse its stdout.
// The process is spawned to outputting a custom progress text that
// Resembles a JSON Object in order to Unmarshal it later.
// This approach is anyhow not perfect: quotes are not escaped properly.
// Each process is not identified by its PID but by a UUIDv4
func (p *Process) Start() {
// escape bash variable escaping and command piping, you'll never know
// what they might come with...
p.Params = slices.DeleteFunc(p.Params, func(e string) bool {
match, _ := regexp.MatchString(`(\$\{)|(\&\&)`, e)
return match
})
p.Params = slices.DeleteFunc(p.Params, func(e string) bool {
return e == ""
})
out := DownloadOutput{
Path: config.Instance().DownloadPath,
Filename: "%(title)s.%(ext)s",
}
if p.Output.Path != "" {
out.Path = p.Output.Path
}
if p.Output.Filename != "" {
out.Filename = p.Output.Filename
}
buildFilename(&p.Output)
templateReplacer := strings.NewReplacer("\n", "", "\t", "", " ", "")
baseParams := []string{
strings.Split(p.Url, "?list")[0], //no playlist
"--newline",
"--no-colors",
"--no-playlist",
"--progress-template",
templateReplacer.Replace(downloadTemplate),
"--progress-template",
templateReplacer.Replace(postprocessTemplate),
"--no-exec",
}
// if user asked to manually override the output path...
if !(slices.Contains(p.Params, "-P") || slices.Contains(p.Params, "--paths")) {
p.Params = append(p.Params, "-o")
p.Params = append(p.Params, fmt.Sprintf("%s/%s", out.Path, out.Filename))
}
params := append(baseParams, p.Params...)
slog.Info("requesting download", slog.String("url", p.Url), slog.Any("params", params))
cmd := exec.Command(config.Instance().DownloaderPath, params...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("failed to get a stdout pipe", slog.Any("err", err))
panic(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to get a stderr pipe", slog.Any("err", err))
panic(err)
}
if err := cmd.Start(); err != nil {
slog.Error("failed to start yt-dlp process", slog.Any("err", err))
panic(err)
}
p.proc = cmd.Process
ctx, cancel := context.WithCancel(context.Background())
defer func() {
stdout.Close()
p.Complete()
cancel()
}()
logs := make(chan []byte)
go produceLogs(stdout, logs)
go p.consumeLogs(ctx, logs)
go p.detectYtDlpErrors(stderr)
cmd.Wait()
}
func produceLogs(r io.Reader, logs chan<- []byte) {
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
logs <- scanner.Bytes()
}
}()
}
func (p *Process) consumeLogs(ctx context.Context, logs <-chan []byte) {
for {
select {
case <-ctx.Done():
slog.Info("detaching from yt-dlp stdout",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
return
case entry := <-logs:
p.parseLogEntry(entry)
}
}
}
func (p *Process) parseLogEntry(entry []byte) {
var progress ProgressTemplate
var postprocess PostprocessTemplate
if err := json.Unmarshal(entry, &progress); err == nil {
p.Progress = DownloadProgress{
Status: StatusDownloading,
Percentage: progress.Percentage,
Speed: progress.Speed,
ETA: progress.Eta,
}
slog.Info("progress",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("percentage", progress.Percentage),
)
}
if err := json.Unmarshal(entry, &postprocess); err == nil {
p.Output.SavedFilePath = postprocess.FilePath
// slog.Info("postprocess",
// slog.String("id", p.getShortId()),
// slog.String("url", p.Url),
// slog.String("filepath", postprocess.FilePath),
// )
}
}
func (p *Process) detectYtDlpErrors(r io.Reader) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
slog.Error("yt-dlp process error",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", scanner.Text()),
)
}
}
// Keep process in the memoryDB but marks it as complete
// Convention: All completed processes has progress -1
// and speed 0 bps.
func (p *Process) Complete() {
// auto archive
// TODO: it's not that deterministic :/
if p.Progress.Percentage == "" && p.Progress.Speed == 0 {
var serializedMetadata bytes.Buffer
json.NewEncoder(&serializedMetadata).Encode(p.Info)
archiver.Publish(&archiver.Message{
Id: p.Id,
Path: p.Output.SavedFilePath,
Title: p.Info.Title,
Thumbnail: p.Info.Thumbnail,
Source: p.Url,
Metadata: serializedMetadata.String(),
CreatedAt: p.Info.CreatedAt,
})
}
p.Progress = DownloadProgress{
Status: StatusCompleted,
Percentage: "-1",
Speed: 0,
ETA: 0,
}
// for safety, if the filename is not set, set it with original function
if p.Output.SavedFilePath == "" {
p.GetFileName(&p.Output)
}
slog.Info("finished",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
memDbEvents <- p
}
// Kill a process and remove it from the memory
func (p *Process) Kill() error {
defer func() {
p.Progress.Status = StatusCompleted
}()
// yt-dlp uses multiple child process the parent process
// has been spawned with setPgid = true. To properly kill
// all subprocesses a SIGTERM need to be sent to the correct
// process group
if p.proc == nil {
return errors.New("*os.Process not set")
}
pgid, err := syscall.Getpgid(p.proc.Pid)
if err != nil {
return err
}
if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil {
return err
}
return nil
}
func (p *Process) GetFileName(o *DownloadOutput) error {
cmd := exec.Command(
config.Instance().DownloaderPath,
"--print", "filename",
"-o", fmt.Sprintf("%s/%s", o.Path, o.Filename),
p.Url,
)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
out, err := cmd.Output()
if err != nil {
return err
}
p.Output.SavedFilePath = strings.Trim(string(out), "\n")
return nil
}
func (p *Process) SetPending() {
// Since video's title isn't available yet, fill in with the URL.
p.Info = common.DownloadInfo{
URL: p.Url,
Title: p.Url,
CreatedAt: time.Now(),
}
p.Progress.Status = StatusPending
}
func (p *Process) SetMetadata() error {
cmd := exec.Command(config.Instance().DownloaderPath, p.Url, "-J")
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("failed to connect to stdout",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
)
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
slog.Error("failed to connect to stderr",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
)
return err
}
info := common.DownloadInfo{
URL: p.Url,
CreatedAt: time.Now(),
}
if err := cmd.Start(); err != nil {
return err
}
var bufferedStderr bytes.Buffer
go func() {
io.Copy(&bufferedStderr, stderr)
}()
slog.Info("retrieving metadata",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
if err := json.NewDecoder(stdout).Decode(&info); err != nil {
return err
}
p.Info = info
p.Progress.Status = StatusPending
if err := cmd.Wait(); err != nil {
return errors.New(bufferedStderr.String())
}
return nil
}
func (p *Process) getShortId() string { return strings.Split(p.Id, "-")[0] }
func buildFilename(o *DownloadOutput) {
if o.Filename != "" && strings.Contains(o.Filename, ".%(ext)s") {
o.Filename += ".%(ext)s"
}
o.Filename = strings.Replace(
o.Filename,
".%(ext)s.%(ext)s",
".%(ext)s",
1,
)
}

View File

@@ -0,0 +1,123 @@
package queue
import (
"context"
"errors"
"log/slog"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/metadata"
)
type MessageQueue struct {
concurrency int
downloadQueue chan downloaders.Downloader
metadataQueue chan downloaders.Downloader
ctx context.Context
cancel context.CancelFunc
}
func NewMessageQueue() (*MessageQueue, error) {
qs := config.Instance().Server.QueueSize
if qs <= 0 {
return nil, errors.New("invalid queue size")
}
ctx, cancel := context.WithCancel(context.Background())
return &MessageQueue{
concurrency: qs,
downloadQueue: make(chan downloaders.Downloader, qs*2),
metadataQueue: make(chan downloaders.Downloader, qs*4),
ctx: ctx,
cancel: cancel,
}, nil
}
// Publish download job
func (m *MessageQueue) Publish(d downloaders.Downloader) {
d.SetPending(true)
select {
case m.downloadQueue <- d:
slog.Info("published download", slog.String("id", d.GetId()))
case <-m.ctx.Done():
slog.Warn("queue stopped, dropping download", slog.String("id", d.GetId()))
}
}
// Workers: download + metadata
func (m *MessageQueue) SetupConsumers() {
// N parallel workers for downloadQueue
for i := 0; i < m.concurrency; i++ {
go m.downloadWorker(i)
}
// 1 serial worker for metadata
go m.metadataWorker()
}
// Worker dei download
func (m *MessageQueue) downloadWorker(workerId int) {
for {
select {
case <-m.ctx.Done():
return
case p := <-m.downloadQueue:
if p == nil {
continue
}
if p.IsCompleted() {
continue
}
slog.Info("download worker started",
slog.Int("worker", workerId),
slog.String("id", p.GetId()),
)
p.Start()
// after the download starts succesfully we pass it to the metadata queue
select {
case m.metadataQueue <- p:
slog.Info("queued for metadata", slog.String("id", p.GetId()))
case <-m.ctx.Done():
return
}
}
}
}
func (m *MessageQueue) metadataWorker() {
for {
select {
case <-m.ctx.Done():
return
case p := <-m.metadataQueue:
if p == nil {
continue
}
slog.Info("metadata worker started",
slog.String("id", p.GetId()),
)
if p.IsCompleted() {
slog.Warn("metadata skipped, illegal state",
slog.String("id", p.GetId()),
)
continue
}
p.SetMetadata(metadata.DefaultFetcher)
}
}
}
func (m *MessageQueue) Stop() {
m.cancel()
close(m.downloadQueue)
close(m.metadataQueue)
}

View File

@@ -1,15 +0,0 @@
package internal
type Worker struct {
requests chan *Process // downloads to do
pending int // downloads pending
index int // index in the heap
}
func (w *Worker) Work(done chan *Worker) {
for {
req := <-w.requests
req.Start()
done <- w
}
}

View File

@@ -91,10 +91,10 @@ func sse(logger *ObservableLogger) http.HandlerFunc {
func ApplyRouter(logger *ObservableLogger) func(chi.Router) {
return func(r chi.Router) {
if config.Instance().RequireAuth {
if config.Instance().Authentication.RequireAuth {
r.Use(middlewares.Authenticated)
}
if config.Instance().UseOpenId {
if config.Instance().OpenId.UseOpenId {
r.Use(openid.Middleware)
}
r.Get("/ws", webSocket(logger))

View File

@@ -8,14 +8,14 @@ import (
)
func ApplyAuthenticationByConfig(next http.Handler) http.Handler {
handler := next
handler := next
if config.Instance().RequireAuth {
handler = Authenticated(handler)
}
if config.Instance().UseOpenId {
handler = openid.Middleware(handler)
}
if config.Instance().Authentication.RequireAuth {
handler = Authenticated(handler)
}
if config.Instance().OpenId.UseOpenId {
handler = openid.Middleware(handler)
}
return handler
return handler
}

View File

@@ -14,24 +14,27 @@ var (
)
func Configure() {
if !config.Instance().UseOpenId {
if !config.Instance().OpenId.UseOpenId {
return
}
provider, err := oidc.NewProvider(context.Background(), config.Instance().OpenIdProviderURL)
provider, err := oidc.NewProvider(
context.Background(),
config.Instance().OpenId.ProviderURL,
)
if err != nil {
panic(err)
}
oauth2Config = oauth2.Config{
ClientID: config.Instance().OpenIdClientId,
ClientSecret: config.Instance().OpenIdClientSecret,
RedirectURL: config.Instance().OpenIdRedirectURL,
ClientID: config.Instance().OpenId.ClientId,
ClientSecret: config.Instance().OpenId.ClientSecret,
RedirectURL: config.Instance().OpenId.RedirectURL,
Endpoint: provider.Endpoint(),
Scopes: []string{oidc.ScopeOpenID, "profile", "email"},
}
verifier = provider.Verifier(&oidc.Config{
ClientID: config.Instance().OpenIdClientId,
ClientID: config.Instance().OpenId.ClientId,
})
}

View File

@@ -87,7 +87,7 @@ func doAuthentification(r *http.Request, setCookieCallback func(t *oauth2.Token)
return nil, err
}
whitelist := config.Instance().OpenIdEmailWhitelist
whitelist := config.Instance().OpenId.EmailWhitelist
if len(whitelist) > 0 && !slices.Contains(whitelist, claims.Email) {
return nil, errors.New("email address not found in ACL")

View File

@@ -18,7 +18,7 @@ import (
--max-downloads NUMBER | | stops after N completed downloads
*/
func ApplyModifiers(entries *[]common.DownloadInfo, args []string) error {
func ApplyModifiers(entries *[]common.DownloadMetadata, args []string) error {
for i, modifier := range args {
switch modifier {
case "--playlist-start":
@@ -38,7 +38,7 @@ func ApplyModifiers(entries *[]common.DownloadInfo, args []string) error {
return nil
}
func playlistStart(i int, modifier string, args []string, entries *[]common.DownloadInfo) error {
func playlistStart(i int, modifier string, args []string, entries *[]common.DownloadMetadata) error {
if !guard(i, len(modifier)) {
return nil
}
@@ -53,7 +53,7 @@ func playlistStart(i int, modifier string, args []string, entries *[]common.Down
return nil
}
func playlistEnd(i int, modifier string, args []string, entries *[]common.DownloadInfo) error {
func playlistEnd(i int, modifier string, args []string, entries *[]common.DownloadMetadata) error {
if !guard(i, len(modifier)) {
return nil
}
@@ -68,7 +68,7 @@ func playlistEnd(i int, modifier string, args []string, entries *[]common.Downlo
return nil
}
func maxDownloads(i int, modifier string, args []string, entries *[]common.DownloadInfo) error {
func maxDownloads(i int, modifier string, args []string, entries *[]common.DownloadMetadata) error {
if !guard(i, len(modifier)) {
return nil
}

View File

@@ -1,4 +1,4 @@
package internal
package playlist
import (
"encoding/json"
@@ -11,15 +11,18 @@ import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
)
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
func PlaylistDetect(req internal.DownloadRequest, mq *queue.MessageQueue, db *kv.Store) error {
params := append(req.Params, "--flat-playlist", "-J")
urlWithParams := append([]string{req.URL}, params...)
var (
downloader = config.Instance().DownloaderPath
downloader = config.Instance().Paths.DownloaderPath
cmd = exec.Command(downloader, urlWithParams...)
)
@@ -28,7 +31,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
return err
}
var m playlist.Metadata
var m Metadata
if err := cmd.Start(); err != nil {
return err
@@ -51,17 +54,17 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
}
if m.IsPlaylist() {
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadInfo, b common.DownloadInfo) bool {
entries := slices.CompactFunc(slices.Compact(m.Entries), func(a common.DownloadMetadata, b common.DownloadMetadata) bool {
return a.URL == b.URL
})
entries = slices.DeleteFunc(entries, func(e common.DownloadInfo) bool {
entries = slices.DeleteFunc(entries, func(e common.DownloadMetadata) bool {
return strings.Contains(e.URL, "list=")
})
slog.Info("playlist detected", slog.String("url", req.URL), slog.Int("count", len(entries)))
if err := playlist.ApplyModifiers(&entries, req.Params); err != nil {
if err := ApplyModifiers(&entries, req.Params); err != nil {
return err
}
@@ -78,33 +81,22 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
//XXX: it's idiotic but it works: virtually delay the creation time
meta.CreatedAt = time.Now().Add(time.Millisecond * time.Duration(i*10))
proc := &Process{
Url: meta.URL,
Progress: DownloadProgress{},
Output: DownloadOutput{Filename: req.Rename},
Info: meta,
Params: req.Params,
}
downloader := downloaders.NewGenericDownload(meta.URL, req.Params)
downloader.SetOutput(internal.DownloadOutput{Filename: req.Rename})
// downloader.SetMetadata(meta)
proc.Info.URL = meta.URL
db.Set(proc)
mq.Publish(proc)
proc.Info.CreatedAt = meta.CreatedAt
db.Set(downloader)
mq.Publish(downloader)
}
return nil
}
proc := &Process{
Url: req.URL,
Params: req.Params,
}
d := downloaders.NewGenericDownload(req.URL, req.Params)
db.Set(proc)
mq.Publish(proc)
slog.Info("sending new process to message queue", slog.String("url", proc.Url))
db.Set(d)
mq.Publish(d)
slog.Info("sending new process to message queue", slog.String("url", d.GetUrl()))
return cmd.Wait()
}

View File

@@ -3,10 +3,10 @@ package playlist
import "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/common"
type Metadata struct {
Entries []common.DownloadInfo `json:"entries"`
Count int `json:"playlist_count"`
PlaylistTitle string `json:"title"`
Type string `json:"_type"`
Entries []common.DownloadMetadata `json:"entries"`
Count int `json:"playlist_count"`
PlaylistTitle string `json:"title"`
Type string `json:"_type"`
}
func (m *Metadata) IsPlaylist() bool { return m.Type == "playlist" }

View File

@@ -1,13 +1,16 @@
package rest
import (
"database/sql"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
bolt "go.etcd.io/bbolt"
)
type ContainerArgs struct {
DB *sql.DB
MDB *internal.MemoryDB
MQ *internal.MessageQueue
DB *bolt.DB
MDB *kv.Store
MQ *queue.MessageQueue
LM *livestream.Monitor
}

View File

@@ -19,10 +19,10 @@ func ApplyRouter(args *ContainerArgs) func(chi.Router) {
h := Container(args)
return func(r chi.Router) {
if config.Instance().RequireAuth {
if config.Instance().Authentication.RequireAuth {
r.Use(middlewares.Authenticated)
}
if config.Instance().UseOpenId {
if config.Instance().OpenId.UseOpenId {
r.Use(openid.Middleware)
}
r.Post("/exec", h.Exec())

View File

@@ -14,11 +14,7 @@ var (
func ProvideService(args *ContainerArgs) *Service {
serviceOnce.Do(func() {
service = &Service{
mdb: args.MDB,
db: args.DB,
mq: args.MQ,
}
service = NewService(args.MDB, args.DB, args.MQ, args.LM)
})
return service
}

View File

@@ -2,8 +2,9 @@ package rest
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
@@ -12,41 +13,62 @@ import (
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist"
bolt "go.etcd.io/bbolt"
)
type Service struct {
mdb *internal.MemoryDB
db *sql.DB
mq *internal.MessageQueue
mdb *kv.Store
db *bolt.DB
mq *queue.MessageQueue
lm *livestream.Monitor
}
func (s *Service) Exec(req internal.DownloadRequest) (string, error) {
p := &internal.Process{
Url: req.URL,
Params: req.Params,
Output: internal.DownloadOutput{
Path: req.Path,
Filename: req.Rename,
},
func NewService(
mdb *kv.Store,
db *bolt.DB,
mq *queue.MessageQueue,
lm *livestream.Monitor,
) *Service {
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("templates"))
return err
})
return &Service{
mdb: mdb,
db: db,
mq: mq,
lm: lm,
}
}
id := s.mdb.Set(p)
s.mq.Publish(p)
func (s *Service) Exec(req internal.DownloadRequest) (string, error) {
d := downloaders.NewGenericDownload(req.URL, req.Params)
d.SetOutput(internal.DownloadOutput{
Path: req.Path,
Filename: req.Rename,
})
id := s.mdb.Set(d)
s.mq.Publish(d)
return id, nil
}
func (s *Service) ExecPlaylist(req internal.DownloadRequest) error {
return internal.PlaylistDetect(req, s.mq, s.mdb)
return playlist.PlaylistDetect(req, s.mq, s.mdb)
}
func (s *Service) ExecLivestream(req internal.DownloadRequest) {
s.lm.Add(req.URL)
}
func (s *Service) Running(ctx context.Context) (*[]internal.ProcessResponse, error) {
func (s *Service) Running(ctx context.Context) (*[]internal.ProcessSnapshot, error) {
select {
case <-ctx.Done():
return nil, context.Canceled
@@ -84,64 +106,56 @@ func (s *Service) SetCookies(ctx context.Context, cookies string) error {
}
func (s *Service) SaveTemplate(ctx context.Context, template *internal.CustomTemplate) error {
conn, err := s.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.ExecContext(
ctx,
"INSERT INTO templates (id, name, content) VALUES (?, ?, ?)",
uuid.NewString(),
template.Name,
template.Content,
)
return err
return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("templates"))
v, err := json.Marshal(template)
if err != nil {
return err
}
return b.Put([]byte(uuid.NewString()), v)
})
}
func (s *Service) GetTemplates(ctx context.Context) (*[]internal.CustomTemplate, error) {
conn, err := s.db.Conn(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
rows, err := conn.QueryContext(ctx, "SELECT * FROM templates")
if err != nil {
return nil, err
}
defer rows.Close()
templates := make([]internal.CustomTemplate, 0)
for rows.Next() {
t := internal.CustomTemplate{}
err := rows.Scan(&t.Id, &t.Name, &t.Content)
if err != nil {
return nil, err
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("templates"))
if b == nil {
return nil // bucket vuoto, restituisco lista vuota
}
templates = append(templates, t)
return b.ForEach(func(k, v []byte) error {
var t internal.CustomTemplate
if err := json.Unmarshal(v, &t); err != nil {
return err
}
templates = append(templates, t)
return nil
})
})
if err != nil {
return nil, err
}
return &templates, nil
}
func (s *Service) UpdateTemplate(ctx context.Context, t *internal.CustomTemplate) (*internal.CustomTemplate, error) {
conn, err := s.db.Conn(ctx)
data, err := json.Marshal(t)
if err != nil {
return nil, err
}
defer conn.Close()
err = s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("templates"))
if b == nil {
return fmt.Errorf("bucket templates not found")
}
return b.Put([]byte(t.Id), data)
})
_, err = conn.ExecContext(ctx, "UPDATE templates SET name = ?, content = ? WHERE id = ?", t.Name, t.Content, t.Id)
if err != nil {
return nil, err
}
@@ -150,16 +164,10 @@ func (s *Service) UpdateTemplate(ctx context.Context, t *internal.CustomTemplate
}
func (s *Service) DeleteTemplate(ctx context.Context, id string) error {
conn, err := s.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.ExecContext(ctx, "DELETE FROM templates WHERE id = ?", id)
return err
return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("templates"))
return b.Delete([]byte(id))
})
}
func (s *Service) GetVersion(ctx context.Context) (string, string, error) {
@@ -171,7 +179,7 @@ func (s *Service) GetVersion(ctx context.Context) (string, string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
cmd := exec.CommandContext(ctx, config.Instance().DownloaderPath, "--version")
cmd := exec.CommandContext(ctx, config.Instance().Paths.DownloaderPath, "--version")
go func() {
stdout, _ := cmd.Output()
result <- string(stdout)

View File

@@ -3,14 +3,15 @@ package rpc
import (
"github.com/go-chi/chi/v5"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
middlewares "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/middleware"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid"
)
// Dependency injection container.
func Container(db *internal.MemoryDB, mq *internal.MessageQueue, lm *livestream.Monitor) *Service {
func Container(db *kv.Store, mq *queue.MessageQueue, lm *livestream.Monitor) *Service {
return &Service{
db: db,
mq: mq,
@@ -21,10 +22,10 @@ func Container(db *internal.MemoryDB, mq *internal.MessageQueue, lm *livestream.
// RPC service must be registered before applying this router!
func ApplyRouter() func(chi.Router) {
return func(r chi.Router) {
if config.Instance().RequireAuth {
if config.Instance().Authentication.RequireAuth {
r.Use(middlewares.Authenticated)
}
if config.Instance().UseOpenId {
if config.Instance().OpenId.UseOpenId {
r.Use(openid.Middleware)
}
r.Get("/ws", WebSocket)

View File

@@ -6,18 +6,22 @@ import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/formats"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/sys"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/updater"
)
type Service struct {
db *internal.MemoryDB
mq *internal.MessageQueue
db *kv.Store
mq *queue.MessageQueue
lm *livestream.Monitor
}
type Running []internal.ProcessResponse
type Running []internal.ProcessSnapshot
type Pending []string
type NoArgs struct{}
@@ -25,26 +29,23 @@ type NoArgs struct{}
// Exec spawns a Process.
// The result of the execution is the newly spawned process Id.
func (s *Service) Exec(args internal.DownloadRequest, result *string) error {
p := &internal.Process{
Url: args.URL,
Params: args.Params,
Output: internal.DownloadOutput{
Path: args.Path,
Filename: args.Rename,
},
}
d := downloaders.NewGenericDownload(args.URL, args.Params)
d.SetOutput(internal.DownloadOutput{
Path: args.Path,
Filename: args.Rename,
})
s.db.Set(p)
s.mq.Publish(p)
s.db.Set(d)
s.mq.Publish(d)
*result = p.Id
*result = d.GetId()
return nil
}
// Exec spawns a Process.
// The result of the execution is the newly spawned process Id.
func (s *Service) ExecPlaylist(args internal.DownloadRequest, result *string) error {
err := internal.PlaylistDetect(args, s.mq, s.db)
err := playlist.PlaylistDetect(args, s.mq, s.db)
if err != nil {
return err
}
@@ -87,12 +88,12 @@ func (s *Service) KillAllLivestream(args NoArgs, result *struct{}) error {
// Progess retrieves the Progress of a specific Process given its Id
func (s *Service) Progess(args internal.DownloadRequest, progress *internal.DownloadProgress) error {
proc, err := s.db.Get(args.Id)
dl, err := s.db.Get(args.Id)
if err != nil {
return err
}
*progress = proc.Progress
*progress = dl.Status().Progress
return nil
}
@@ -106,7 +107,7 @@ func (s *Service) Formats(args internal.DownloadRequest, meta *formats.Metadata)
}
if metadata.IsPlaylist() {
go internal.PlaylistDetect(args, s.mq, s.db)
go playlist.PlaylistDetect(args, s.mq, s.db)
}
*meta = *metadata
@@ -129,22 +130,22 @@ func (s *Service) Running(args NoArgs, running *Running) error {
func (s *Service) Kill(args string, killed *string) error {
slog.Info("Trying killing process with id", slog.String("id", args))
proc, err := s.db.Get(args)
download, err := s.db.Get(args)
if err != nil {
return err
}
if proc == nil {
if download == nil {
return errors.New("nil process")
}
if err := proc.Kill(); err != nil {
slog.Info("failed killing process", slog.String("id", proc.Id), slog.Any("err", err))
if err := download.Stop(); err != nil {
slog.Info("failed killing process", slog.String("id", download.GetId()), slog.Any("err", err))
return err
}
s.db.Delete(proc.Id)
slog.Info("succesfully killed process", slog.String("id", proc.Id))
s.db.Delete(download.GetId())
slog.Info("succesfully killed process", slog.String("id", download.GetId()))
return nil
}
@@ -156,34 +157,33 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
var (
keys = s.db.Keys()
removeFunc = func(p *internal.Process) error {
defer s.db.Delete(p.Id)
return p.Kill()
removeFunc = func(d downloaders.Downloader) error {
defer s.db.Delete(d.GetId())
return d.Stop()
}
)
for _, key := range *keys {
proc, err := s.db.Get(key)
dl, err := s.db.Get(key)
if err != nil {
return err
}
if proc == nil {
if dl == nil {
s.db.Delete(key)
continue
}
if err := removeFunc(proc); err != nil {
if err := removeFunc(dl); err != nil {
slog.Info(
"failed killing process",
slog.String("id", proc.Id),
slog.String("id", dl.GetId()),
slog.Any("err", err),
)
continue
}
slog.Info("succesfully killed process", slog.String("id", proc.Id))
proc = nil // gc helper
slog.Info("succesfully killed process", slog.String("id", dl.GetId()))
}
return nil
@@ -200,14 +200,14 @@ func (s *Service) Clear(args string, killed *string) error {
func (s *Service) ClearCompleted(cleared *string) error {
var (
keys = s.db.Keys()
removeFunc = func(p *internal.Process) error {
defer s.db.Delete(p.Id)
removeFunc = func(d downloaders.Downloader) error {
defer s.db.Delete(d.GetId())
if p.Progress.Status != internal.StatusCompleted {
if !d.IsCompleted() {
return nil
}
return p.Kill()
return d.Stop()
}
)

View File

@@ -3,7 +3,6 @@ package server
import (
"context"
"database/sql"
"fmt"
"io"
"io/fs"
@@ -12,20 +11,18 @@ import (
"net/http"
"net/rpc"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/cors"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archiver"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/dbutil"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/filebrowser"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipeline"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/logging"
middlewares "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/middleware"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid"
@@ -37,7 +34,7 @@ import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/twitch"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user"
_ "modernc.org/sqlite"
bolt "go.etcd.io/bbolt"
)
type RunConfig struct {
@@ -46,20 +43,31 @@ type RunConfig struct {
}
type serverConfig struct {
frontend fs.FS
swagger fs.FS
mdb *internal.MemoryDB
db *sql.DB
mq *internal.MessageQueue
lm *livestream.Monitor
tm *twitch.Monitor
frontend fs.FS
swagger fs.FS
mdb *kv.Store
db *bolt.DB
mq *queue.MessageQueue
lm *livestream.Monitor
taskRunner task.TaskRunner
twitchMonitor *twitch.Monitor
}
// TODO: change scope
var observableLogger = logging.NewObservableLogger()
func RunBlocking(rc *RunConfig) {
mdb := internal.NewMemoryDB()
func Run(ctx context.Context, rc *RunConfig) error {
dbPath := filepath.Join(config.Instance().Paths.LocalDatabasePath, "bolt.db")
boltdb, err := bolt.Open(dbPath, 0600, nil)
if err != nil {
return err
}
mdb, err := kv.NewStore(boltdb, time.Second*15)
if err != nil {
return err
}
// ---- LOGGING ---------------------------------------------------
logWriters := []io.Writer{
@@ -70,10 +78,10 @@ func RunBlocking(rc *RunConfig) {
conf := config.Instance()
// file based logging
if conf.EnableFileLogging {
logger, err := logging.NewRotableLogger(conf.LogPath)
if conf.Logging.EnableFileLogging {
logger, err := logging.NewRotableLogger(conf.Logging.LogPath)
if err != nil {
panic(err)
return err
}
defer logger.Rotate()
@@ -96,24 +104,15 @@ func RunBlocking(rc *RunConfig) {
slog.SetDefault(logger)
// ----------------------------------------------------------------
db, err := sql.Open("sqlite", conf.LocalDatabasePath)
mq, err := queue.NewMessageQueue()
if err != nil {
slog.Error("failed to open database", slog.String("err", err.Error()))
}
if err := dbutil.Migrate(context.Background(), db); err != nil {
slog.Error("failed to init database", slog.String("err", err.Error()))
}
mq, err := internal.NewMessageQueue()
if err != nil {
panic(err)
return err
}
mq.SetupConsumers()
go mdb.Restore(mq)
go mdb.EventListener()
lm := livestream.NewMonitor(mq, mdb)
lm := livestream.NewMonitor(mq, mdb, boltdb)
go lm.Schedule()
go lm.Restore()
@@ -122,44 +121,48 @@ func RunBlocking(rc *RunConfig) {
config.Instance().Twitch.ClientId,
config.Instance().Twitch.ClientSecret,
),
boltdb,
)
go tm.Monitor(
context.TODO(),
ctx,
config.Instance().Twitch.CheckInterval,
twitch.DEFAULT_DOWNLOAD_HANDLER(mdb, mq),
)
go tm.Restore()
cronTaskRunner := task.NewCronTaskRunner(mq, mdb)
go cronTaskRunner.Spawner(ctx)
scfg := serverConfig{
frontend: rc.App,
swagger: rc.Swagger,
mdb: mdb,
mq: mq,
db: db,
lm: lm,
tm: tm,
frontend: rc.App,
swagger: rc.Swagger,
mdb: mdb,
db: boltdb,
mq: mq,
lm: lm,
twitchMonitor: tm,
taskRunner: cronTaskRunner,
}
srv := newServer(scfg)
go gracefulShutdown(srv, &scfg)
go autoPersist(time.Minute*5, mdb, lm, tm)
go gracefulShutdown(ctx, srv, &scfg)
var (
network = "tcp"
address = fmt.Sprintf("%s:%d", conf.Host, conf.Port)
address = fmt.Sprintf("%s:%d", conf.Server.Host, conf.Server.Port)
)
// support unix sockets
if strings.HasPrefix(conf.Host, "/") {
if strings.HasPrefix(conf.Server.Host, "/") {
network = "unix"
address = conf.Host
address = conf.Server.Host
}
listener, err := net.Listen(network, address)
if err != nil {
slog.Error("failed to listen", slog.String("err", err.Error()))
return
return err
}
slog.Info("yt-dlp-webui started", slog.String("address", address))
@@ -167,14 +170,12 @@ func RunBlocking(rc *RunConfig) {
if err := srv.Serve(listener); err != nil {
slog.Warn("http server stopped", slog.String("err", err.Error()))
}
return nil
}
func newServer(c serverConfig) *http.Server {
archiver.Register(c.db)
cronTaskRunner := task.NewCronTaskRunner(c.mq, c.mdb)
go cronTaskRunner.Spawner(context.TODO())
// archiver.Register(c.db)
service := ytdlpRPC.Container(c.mdb, c.mq, c.lm)
rpc.Register(service)
@@ -198,7 +199,7 @@ func newServer(c serverConfig) *http.Server {
// use in dev
// r.Use(middleware.Logger)
baseUrl := config.Instance().BaseURL
baseUrl := config.Instance().Server.BaseURL
r.Mount(baseUrl+"/", http.StripPrefix(baseUrl, http.FileServerFS(c.frontend)))
// swagger
@@ -215,7 +216,7 @@ func newServer(c serverConfig) *http.Server {
})
// Archive routes
r.Route("/archive", archive.ApplyRouter(c.db))
// r.Route("/archive", archive.ApplyRouter(c.db))
// Authentication routes
r.Route("/auth", func(r chi.Router) {
@@ -237,6 +238,7 @@ func newServer(c serverConfig) *http.Server {
DB: c.db,
MDB: c.mdb,
MQ: c.mq,
LM: c.lm,
}))
// Logging
@@ -246,60 +248,35 @@ func newServer(c serverConfig) *http.Server {
r.Route("/status", status.ApplyRouter(c.mdb))
// Subscriptions
r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter())
r.Route("/subscriptions", subscription.Container(c.db, c.taskRunner).ApplyRouter())
// Twitch
r.Route("/twitch", func(r chi.Router) {
r.Use(middlewares.ApplyAuthenticationByConfig)
r.Get("/users", twitch.GetMonitoredUsers(c.tm))
r.Post("/user", twitch.MonitorUserHandler(c.tm))
r.Delete("/user/{user}", twitch.DeleteUser(c.tm))
r.Get("/users", twitch.GetMonitoredUsers(c.twitchMonitor))
r.Post("/user", twitch.MonitorUserHandler(c.twitchMonitor))
r.Delete("/user/{user}", twitch.DeleteUser(c.twitchMonitor))
})
// Pipelines
r.Route("/pipelines", func(r chi.Router) {
h := pipeline.NewRestHandler(c.db)
r.Use(middlewares.ApplyAuthenticationByConfig)
r.Get("/id/{id}", h.GetPipeline)
r.Get("/all", h.GetAllPipelines)
r.Post("/", h.SavePipeline)
r.Delete("/id/{id}", h.DeletePipeline)
})
return &http.Server{Handler: r}
}
func gracefulShutdown(srv *http.Server, cfg *serverConfig) {
ctx, stop := signal.NotifyContext(context.Background(),
os.Interrupt,
syscall.SIGTERM,
syscall.SIGQUIT,
)
func gracefulShutdown(ctx context.Context, srv *http.Server, cfg *serverConfig) {
<-ctx.Done()
slog.Info("shutdown signal received")
go func() {
<-ctx.Done()
slog.Info("shutdown signal received")
defer func() {
cfg.mdb.Persist()
cfg.lm.Persist()
cfg.tm.Persist()
stop()
srv.Shutdown(context.Background())
}()
defer func() {
cfg.db.Close()
srv.Shutdown(context.Background())
}()
}
func autoPersist(
d time.Duration,
db *internal.MemoryDB,
lm *livestream.Monitor,
tm *twitch.Monitor,
) {
for {
time.Sleep(d)
if err := db.Persist(); err != nil {
slog.Warn("failed to persisted session", slog.Any("err", err))
}
if err := lm.Persist(); err != nil {
slog.Warn(
"failed to persisted livestreams monitor session", slog.Any("err", err.Error()))
}
if err := tm.Persist(); err != nil {
slog.Warn(
"failed to persisted twitch monitor session", slog.Any("err", err.Error()))
}
slog.Debug("sucessfully persisted session")
}
}

View File

@@ -5,11 +5,12 @@ import (
"slices"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status/domain"
)
type Repository struct {
mdb *internal.MemoryDB
mdb *kv.Store
}
// DownloadSpeed implements domain.Repository.
@@ -29,7 +30,7 @@ func (r *Repository) DownloadSpeed(ctx context.Context) int64 {
func (r *Repository) Completed(ctx context.Context) int {
processes := r.mdb.All()
completed := slices.DeleteFunc(*processes, func(p internal.ProcessResponse) bool {
completed := slices.DeleteFunc(*processes, func(p internal.ProcessSnapshot) bool {
return p.Progress.Status != internal.StatusCompleted
})
@@ -40,7 +41,7 @@ func (r *Repository) Completed(ctx context.Context) int {
func (r *Repository) Downloading(ctx context.Context) int {
processes := r.mdb.All()
downloading := slices.DeleteFunc(*processes, func(p internal.ProcessResponse) bool {
downloading := slices.DeleteFunc(*processes, func(p internal.ProcessSnapshot) bool {
return p.Progress.Status != internal.StatusDownloading
})
@@ -51,14 +52,14 @@ func (r *Repository) Downloading(ctx context.Context) int {
func (r *Repository) Pending(ctx context.Context) int {
processes := r.mdb.All()
pending := slices.DeleteFunc(*processes, func(p internal.ProcessResponse) bool {
pending := slices.DeleteFunc(*processes, func(p internal.ProcessSnapshot) bool {
return p.Progress.Status != internal.StatusPending
})
return len(pending)
}
func New(mdb *internal.MemoryDB) domain.Repository {
func New(mdb *kv.Store) domain.Repository {
return &Repository{
mdb: mdb,
}

View File

@@ -2,16 +2,16 @@ package status
import (
"github.com/go-chi/chi/v5"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status/repository"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status/rest"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status/service"
)
func ApplyRouter(mdb *internal.MemoryDB) func(chi.Router) {
func ApplyRouter(mdb *kv.Store) func(chi.Router) {
var (
r = repository.New(mdb)
s = service.New(r, nil) //TODO: nil, wtf?
s = service.New(r, nil)
h = rest.New(s)
)

View File

@@ -1,13 +1,13 @@
package subscription
import (
"database/sql"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
bolt "go.etcd.io/bbolt"
)
func Container(db *sql.DB, runner task.TaskRunner) domain.RestHandler {
func Container(db *bolt.DB, runner task.TaskRunner) domain.RestHandler {
var (
r = provideRepository(db)
s = provideService(r, runner)

View File

@@ -1,7 +1,6 @@
package subscription
import (
"database/sql"
"sync"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
@@ -9,6 +8,8 @@ import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/rest"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/service"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task"
bolt "go.etcd.io/bbolt"
)
var (
@@ -21,7 +22,7 @@ var (
handOnce sync.Once
)
func provideRepository(db *sql.DB) domain.Repository {
func provideRepository(db *bolt.DB) domain.Repository {
repoOnce.Do(func() {
repo = repository.New(db)
})

View File

@@ -2,131 +2,142 @@ package repository
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/data"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
bolt "go.etcd.io/bbolt"
)
var bucketName = []byte("subscriptions")
type Repository struct {
db *sql.DB
db *bolt.DB
}
// Delete implements domain.Repository.
func (r *Repository) Delete(ctx context.Context, id string) error {
conn, err := r.db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.ExecContext(ctx, "DELETE FROM subscriptions WHERE id = ?", id)
return err
return r.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketName)
return b.Delete([]byte(id))
})
}
// GetCursor implements domain.Repository.
func (r *Repository) GetCursor(ctx context.Context, id string) (int64, error) {
conn, err := r.db.Conn(ctx)
func (s *Repository) GetCursor(ctx context.Context, id string) (int64, error) {
var cursor int64
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("subscriptions"))
v := b.Get([]byte(id))
if v == nil {
return fmt.Errorf("subscription %s not found", id)
}
var data struct {
Cursor int64 `json:"cursor"`
}
if err := json.Unmarshal(v, &data); err != nil {
return err
}
cursor = data.Cursor
return nil
})
if err != nil {
return -1, err
}
defer conn.Close()
row := conn.QueryRowContext(ctx, "SELECT rowid FROM subscriptions WHERE id = ?", id)
var rowId int64
if err := row.Scan(&rowId); err != nil {
return -1, err
}
return rowId, nil
return cursor, nil
}
// List implements domain.Repository.
func (r *Repository) List(ctx context.Context, start int64, limit int) (*[]data.Subscription, error) {
conn, err := r.db.Conn(ctx)
var subs []data.Subscription
err := r.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketName)
return b.ForEach(func(k, v []byte) error {
var sub data.Subscription
if err := json.Unmarshal(v, &sub); err != nil {
return err
}
subs = append(subs, sub)
return nil
})
})
if err != nil {
return nil, err
}
defer conn.Close()
var elements []data.Subscription
rows, err := conn.QueryContext(ctx, "SELECT rowid, * FROM subscriptions WHERE rowid > ? LIMIT ?", start, limit)
if err != nil {
return nil, err
}
for rows.Next() {
var rowId int64
var element data.Subscription
if err := rows.Scan(
&rowId,
&element.Id,
&element.URL,
&element.Params,
&element.CronExpr,
); err != nil {
return &elements, err
}
elements = append(elements, element)
}
return &elements, nil
return &subs, nil
}
// Submit implements domain.Repository.
func (r *Repository) Submit(ctx context.Context, sub *data.Subscription) (*data.Subscription, error) {
conn, err := r.db.Conn(ctx)
func (s *Repository) Submit(ctx context.Context, sub *data.Subscription) (*data.Subscription, error) {
if sub.Id == "" {
sub.Id = uuid.NewString()
}
data, err := json.Marshal(sub)
if err != nil {
return nil, err
}
defer conn.Close()
err = s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("subscriptions"))
return b.Put([]byte(sub.Id), data)
})
_, err = conn.ExecContext(
ctx,
"INSERT INTO subscriptions (id, url, params, cron) VALUES (?, ?, ?, ?)",
uuid.NewString(),
sub.URL,
sub.Params,
sub.CronExpr,
)
if err != nil {
return nil, err
}
return sub, err
return sub, nil
}
// UpdateByExample implements domain.Repository.
func (r *Repository) UpdateByExample(ctx context.Context, example *data.Subscription) error {
conn, err := r.db.Conn(ctx)
if err != nil {
return err
}
func (s *Repository) UpdateByExample(ctx context.Context, example *data.Subscription) error {
return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("subscriptions"))
defer conn.Close()
return b.ForEach(func(k, v []byte) error {
var sub data.Subscription
if err := json.Unmarshal(v, &sub); err != nil {
return err
}
_, err = conn.ExecContext(
ctx,
"UPDATE subscriptions SET url = ?, params = ?, cron = ? WHERE id = ? OR url = ?",
example.URL,
example.Params,
example.CronExpr,
example.Id,
example.URL,
)
if sub.Id == example.Id || sub.URL == example.URL {
// aggiorna i campi
sub.URL = example.URL
sub.Params = example.Params
sub.CronExpr = example.CronExpr
return err
data, err := json.Marshal(sub)
if err != nil {
return err
}
if err := b.Put(k, data); err != nil {
return err
}
}
return nil
})
})
}
func New(db *sql.DB) domain.Repository {
func New(db *bolt.DB) domain.Repository {
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucketName)
return err
})
return &Repository{
db: db,
}

View File

@@ -19,10 +19,10 @@ type RestHandler struct {
// ApplyRouter implements domain.RestHandler.
func (h *RestHandler) ApplyRouter() func(chi.Router) {
return func(r chi.Router) {
if config.Instance().RequireAuth {
if config.Instance().Authentication.RequireAuth {
r.Use(middlewares.Authenticated)
}
if config.Instance().UseOpenId {
if config.Instance().OpenId.UseOpenId {
r.Use(openid.Middleware)
}

View File

@@ -11,7 +11,9 @@ import (
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain"
"github.com/robfig/cron/v3"
)
@@ -30,8 +32,8 @@ type monitorTask struct {
}
type CronTaskRunner struct {
mq *internal.MessageQueue
db *internal.MemoryDB
mq *queue.MessageQueue
db *kv.Store
tasks chan monitorTask
errors chan error
@@ -39,7 +41,7 @@ type CronTaskRunner struct {
running map[string]*monitorTask
}
func NewCronTaskRunner(mq *internal.MessageQueue, db *internal.MemoryDB) TaskRunner {
func NewCronTaskRunner(mq *queue.MessageQueue, db *kv.Store) TaskRunner {
return &CronTaskRunner{
mq: mq,
db: db,
@@ -127,7 +129,7 @@ func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Dur
cmd := exec.CommandContext(
ctx,
config.Instance().DownloaderPath,
config.Instance().Paths.DownloaderPath,
"-I1",
"--flat-playlist",
"--print", "webpage_url",
@@ -148,20 +150,20 @@ func (t *CronTaskRunner) fetcher(ctx context.Context, req *monitorTask) time.Dur
return nextSchedule
}
p := &internal.Process{
Url: latestVideoURL,
Params: append(
// TODO: autoremove hook
d := downloaders.NewGenericDownload(
latestVideoURL,
append(
argsSplitterRe.FindAllString(req.Subscription.Params, 1),
[]string{
"--break-on-existing",
"--download-archive",
filepath.Join(config.Instance().Dir(), "archive.txt"),
}...),
AutoRemove: true,
}
)
t.db.Set(p) // give it an id
t.mq.Publish(p) // send it to the message queue waiting to be processed
t.db.Set(d) // give it an id
t.mq.Publish(d) // send it to the message queue waiting to be processed
slog.Info(
"cron task runner next schedule",

View File

@@ -14,7 +14,7 @@ import (
// FreeSpace gets the available Bytes writable to download directory
func FreeSpace() (uint64, error) {
var stat unix.Statfs_t
unix.Statfs(config.Instance().DownloadPath, &stat)
unix.Statfs(config.Instance().Paths.DownloadPath, &stat)
return (stat.Bavail * uint64(stat.Bsize)), nil
}
@@ -27,7 +27,7 @@ func DirectoryTree() (*[]string, error) {
}
var (
rootPath = config.Instance().DownloadPath
rootPath = config.Instance().Paths.DownloadPath
stack = internal.NewStack[Node]()
flattened = make([]string, 0)

View File

@@ -2,45 +2,65 @@ package twitch
import (
"context"
"encoding/gob"
"fmt"
"iter"
"log/slog"
"maps"
"os"
"path/filepath"
"sync"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue"
bolt "go.etcd.io/bbolt"
)
var bucket = []byte("twitch-monitor")
type Monitor struct {
liveChannel chan *StreamInfo
monitored map[string]*Client
lastState map[string]bool
mu sync.RWMutex
db *bolt.DB
authenticationManager *AuthenticationManager
}
func NewMonitor(authenticationManager *AuthenticationManager) *Monitor {
func NewMonitor(authenticationManager *AuthenticationManager, db *bolt.DB) *Monitor {
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucket)
return err
})
return &Monitor{
liveChannel: make(chan *StreamInfo, 16),
monitored: make(map[string]*Client),
lastState: make(map[string]bool),
authenticationManager: authenticationManager,
db: db,
}
}
func (m *Monitor) Add(user string) {
m.mu.Lock()
defer m.mu.Unlock()
m.monitored[user] = NewTwitchClient(m.authenticationManager)
m.mu.Unlock()
m.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
//TODO: the empty byte array will be replaced with configs per user
err := b.Put([]byte(user), []byte(""))
return err
})
slog.Info("added user to twitch monitor", slog.String("user", user))
}
func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler func(url string) error) {
func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler func(user string) error) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
@@ -64,7 +84,7 @@ func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler f
wasLive := m.lastState[stream.UserName]
if stream.IsLive && !wasLive {
slog.Info("stream went live", slog.String("user", stream.UserName))
if err := handler(fmt.Sprintf("https://www.twitch.tv/%s", stream.UserName)); err != nil {
if err := handler(stream.UserName); err != nil {
slog.Error("handler failed", slog.String("user", stream.UserName), slog.Any("err", err))
}
}
@@ -85,60 +105,60 @@ func (m *Monitor) GetMonitoredUsers() iter.Seq[string] {
func (m *Monitor) DeleteUser(user string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.monitored, user)
delete(m.lastState, user)
m.mu.Unlock()
m.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
err := b.Delete([]byte(user))
return err
})
}
func DEFAULT_DOWNLOAD_HANDLER(db *internal.MemoryDB, mq *internal.MessageQueue) func(url string) error {
return func(url string) error {
p := &internal.Process{
Url: url,
Livestream: true,
Params: []string{"--downloader", "ffmpeg", "--no-part"},
}
db.Set(p)
mq.Publish(p)
func DEFAULT_DOWNLOAD_HANDLER(db *kv.Store, mq *queue.MessageQueue) func(user string) error {
return func(user string) error {
var (
url = fmt.Sprintf("https://www.twitch.tv/%s", user)
filename = filepath.Join(
config.Instance().Paths.DownloadPath,
fmt.Sprintf("%s (live) %s", user, time.Now().Format(time.ANSIC)),
)
ext = ".webm"
path = filename + ext
)
d := downloaders.NewLiveStreamDownloader(url, []pipes.Pipe{
&pipes.Transcoder{
Args: []string{
"-c:a", "libopus",
"-c:v", "libsvtav1",
"-crf", "30",
"-preset", "7",
},
},
&pipes.FileWriter{
Path: path,
IsFinal: true,
},
})
db.Set(d)
mq.Publish(d)
return nil
}
}
func (m *Monitor) Persist() error {
filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat")
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
enc := gob.NewEncoder(f)
users := make([]string, 0, len(m.monitored))
for user := range m.monitored {
users = append(users, user)
}
return enc.Encode(users)
}
func (m *Monitor) Restore() error {
filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat")
f, err := os.Open(filename)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()
dec := gob.NewDecoder(f)
var users []string
if err := dec.Decode(&users); err != nil {
return err
}
m.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
return b.ForEach(func(k, v []byte) error {
users = append(users, string(k))
return nil
})
})
m.monitored = make(map[string]*Client)
for _, user := range users {

View File

@@ -8,7 +8,7 @@ import (
// Update using the builtin function of yt-dlp
func UpdateExecutable() error {
cmd := exec.Command(config.Instance().DownloaderPath, "-U")
cmd := exec.Command(config.Instance().Paths.DownloaderPath, "-U")
err := cmd.Start()
if err != nil {

View File

@@ -8,6 +8,7 @@ import (
"github.com/golang-jwt/jwt/v5"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"golang.org/x/crypto/bcrypt"
)
const TOKEN_COOKIE_NAME = "jwt-yt-dlp-webui"
@@ -26,11 +27,17 @@ func Login(w http.ResponseWriter, r *http.Request) {
}
var (
username = config.Instance().Username
password = config.Instance().Password
username = config.Instance().Authentication.Username
passwordHash = config.Instance().Authentication.PasswordHash
)
if username != req.Username || password != req.Password {
err := bcrypt.CompareHashAndPassword([]byte(passwordHash), []byte(req.Password))
if err != nil {
http.Error(w, "invalid username or password", http.StatusBadRequest)
return
}
if username != req.Username {
http.Error(w, "invalid username or password", http.StatusBadRequest)
return
}