From 658d43f9ea88f79fcfa4dc6a243a0359e2bac748 Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Sun, 31 Aug 2025 20:58:54 +0200 Subject: [PATCH] migrated to boltdb from sqlite + session files --- go.mod | 30 ++-- go.sum | 89 +++------- server/internal/kv/store.go | 85 ++++++--- server/internal/livestream/livestream.go | 10 +- server/internal/livestream/monitor.go | 80 +++------ server/internal/pipeline/store.go | 95 ++++++++++ server/middleware/utils.go | 2 + server/rest/common.go | 8 +- server/rest/provider.go | 6 +- server/rest/service.go | 111 ++++++------ server/server.go | 67 +++---- server/subscription/container.go | 6 +- server/subscription/provider.go | 5 +- server/subscription/repository/repository.go | 175 ++++++++++--------- server/twitch/monitor.go | 79 ++++----- 15 files changed, 448 insertions(+), 400 deletions(-) create mode 100644 server/internal/pipeline/store.go diff --git a/go.mod b/go.mod index 0542d1b..9fdaf70 100644 --- a/go.mod +++ b/go.mod @@ -4,29 +4,21 @@ go 1.24 require ( github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef - github.com/coreos/go-oidc/v3 v3.12.0 - github.com/go-chi/chi/v5 v5.2.0 - github.com/go-chi/cors v1.2.1 - github.com/golang-jwt/jwt/v5 v5.2.1 + github.com/coreos/go-oidc/v3 v3.15.0 + github.com/go-chi/chi/v5 v5.2.3 + github.com/go-chi/cors v1.2.2 + github.com/golang-jwt/jwt/v5 v5.3.0 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 - github.com/robfig/cron/v3 v3.0.0 - golang.org/x/oauth2 v0.25.0 - golang.org/x/sync v0.10.0 - golang.org/x/sys v0.29.0 + github.com/robfig/cron/v3 v3.0.1 + go.etcd.io/bbolt v1.4.3 + golang.org/x/oauth2 v0.30.0 + golang.org/x/sync v0.16.0 + golang.org/x/sys v0.35.0 gopkg.in/yaml.v3 v3.0.1 - modernc.org/sqlite v1.34.5 ) require ( - github.com/dustin/go-humanize v1.0.1 // indirect - github.com/go-jose/go-jose/v4 v4.0.4 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect - github.com/ncruces/go-strftime v0.1.9 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - golang.org/x/crypto v0.32.0 // indirect - golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect - modernc.org/libc v1.61.11 // indirect - modernc.org/mathutil v1.7.1 // indirect - modernc.org/memory v1.8.2 // indirect + github.com/go-jose/go-jose/v4 v4.1.2 // indirect + golang.org/x/crypto v0.41.0 // indirect ) diff --git a/go.sum b/go.sum index 4a1ff13..be4aadf 100644 --- a/go.sum +++ b/go.sum @@ -1,79 +1,38 @@ github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= -github.com/coreos/go-oidc/v3 v3.12.0 h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo= -github.com/coreos/go-oidc/v3 v3.12.0/go.mod h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0= +github.com/coreos/go-oidc/v3 v3.15.0 h1:R6Oz8Z4bqWR7VFQ+sPSvZPQv4x8M+sJkDO5ojgwlyAg= +github.com/coreos/go-oidc/v3 v3.15.0/go.mod h1:HaZ3szPaZ0e4r6ebqvsLWlk2Tn+aejfmrfah6hnSYEU= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0= -github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= -github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4= -github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58= -github.com/go-jose/go-jose/v4 v4.0.4 h1:VsjPI33J0SB9vQM6PLmNjoHqMQNGPiZ0rHL7Ni7Q6/E= -github.com/go-jose/go-jose/v4 v4.0.4/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc= -github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= -github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= -github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= +github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/go-chi/cors v1.2.2 h1:Jmey33TE+b+rB7fT8MUy1u0I4L+NARQlK6LhzKPSyQE= +github.com/go-chi/cors v1.2.2/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58= +github.com/go-jose/go-jose/v4 v4.1.2 h1:TK/7NqRQZfgAh+Td8AlsrvtPoUyiHh0LqVvokh+1vHI= +github.com/go-jose/go-jose/v4 v4.1.2/go.mod h1:22cg9HWM1pOlnRiY+9cQYJ9XHmya1bYW8OeDM6Ku6Oo= +github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= +github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= -github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= -github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= -golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= -golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= -golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= -golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= -golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70= -golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= -golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -modernc.org/cc/v4 v4.24.4 h1:TFkx1s6dCkQpd6dKurBNmpo+G8Zl4Sq/ztJ+2+DEsh0= -modernc.org/cc/v4 v4.24.4/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= -modernc.org/ccgo/v4 v4.23.15 h1:wFDan71KnYqeHz4eF63vmGE6Q6Pc0PUGDpP0PRMYjDc= -modernc.org/ccgo/v4 v4.23.15/go.mod h1:nJX30dks/IWuBOnVa7VRii9Me4/9TZ1SC9GNtmARTy0= -modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= -modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= -modernc.org/gc/v2 v2.6.2 h1:YBXi5Kqp6aCK3fIxwKQ3/fErvawVKwjOLItxj1brGds= -modernc.org/gc/v2 v2.6.2/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= -modernc.org/libc v1.61.11 h1:6sZG8uB6EMMG7iTLPTndi8jyTdgAQNIeLGjCFICACZw= -modernc.org/libc v1.61.11/go.mod h1:HHX+srFdn839oaJRd0W8hBM3eg+mieyZCAjWwB08/nM= -modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= -modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= -modernc.org/memory v1.8.2 h1:cL9L4bcoAObu4NkxOlKWBWtNHIsnnACGF/TbqQ6sbcI= -modernc.org/memory v1.8.2/go.mod h1:ZbjSvMO5NQ1A2i3bWeDiVMxIorXwdClKE/0SZ+BMotU= -modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= -modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= -modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= -modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= -modernc.org/sqlite v1.34.5 h1:Bb6SR13/fjp15jt70CL4f18JIN7p7dnMExd+UFnF15g= -modernc.org/sqlite v1.34.5/go.mod h1:YLuNmX9NKs8wRNK2ko1LW1NGYcc9FkBO69JOt1AR9JE= -modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= -modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= -modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= -modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/server/internal/kv/store.go b/server/internal/kv/store.go index ceff592..0ae8788 100644 --- a/server/internal/kv/store.go +++ b/server/internal/kv/store.go @@ -2,31 +2,58 @@ package kv import ( "encoding/gob" + "encoding/json" "errors" "log/slog" "os" "path/filepath" "runtime" "sync" + "time" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/downloaders" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" + + bolt "go.etcd.io/bbolt" ) -var memDbEvents = make(chan downloaders.Downloader, runtime.NumCPU()) +var ( + bucket = []byte("downloads") + memDbEvents = make(chan downloaders.Downloader, runtime.NumCPU()) +) // In-Memory Thread-Safe Key-Value Storage with optional persistence type Store struct { + db *bolt.DB table map[string]downloaders.Downloader mu sync.RWMutex } -func NewStore() *Store { - return &Store{ +func NewStore(db *bolt.DB, snaptshotInteval time.Duration) (*Store, error) { + // init bucket + err := db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucket) + return err + }) + if err != nil { + return nil, err + } + + s := &Store{ + db: db, table: make(map[string]downloaders.Downloader), } + + go func() { + ticker := time.NewTicker(snaptshotInteval) + for range ticker.C { + s.Snapshot() + } + }() + + return s, err } // Get a process pointer given its id @@ -108,25 +135,25 @@ func (m *Store) Persist() error { // Restore a persisted state func (m *Store) Restore(mq *queue.MessageQueue) { - sf := filepath.Join(config.Instance().SessionFilePath, "session.dat") - - fd, err := os.Open(sf) - if err != nil { - return - } - - var session Session - - if err := gob.NewDecoder(fd).Decode(&session); err != nil { - return - } - m.mu.Lock() defer m.mu.Unlock() - for _, snap := range session.Processes { - var restored downloaders.Downloader + var snapshot []internal.ProcessSnapshot + m.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.ForEach(func(k, v []byte) error { + var snap internal.ProcessSnapshot + if err := json.Unmarshal(v, &snap); err != nil { + return err + } + snapshot = append(snapshot, snap) + return nil + }) + }) + + for _, snap := range snapshot { + var restored downloaders.Downloader if snap.DownloaderName == "generic" { d := downloaders.NewGenericDownload("", []string{}) err := d.RestoreFromSnapshot(&snap) @@ -134,9 +161,7 @@ func (m *Store) Restore(mq *queue.MessageQueue) { continue } restored = d - m.table[snap.Id] = restored - if !restored.(*downloaders.GenericDownloader).DownloaderBase.Completed { mq.Publish(restored) } @@ -152,3 +177,23 @@ func (m *Store) EventListener() { } } } + +func (m *Store) Snapshot() error { + slog.Debug("snapshotting downloads state") + + running := m.All() + + return m.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + for _, v := range *running { + data, err := json.Marshal(v) + if err != nil { + return err + } + if err := b.Put([]byte(v.Id), data); err != nil { + return err + } + } + return nil + }) +} diff --git a/server/internal/livestream/livestream.go b/server/internal/livestream/livestream.go index 6a4bdc4..3d54005 100644 --- a/server/internal/livestream/livestream.go +++ b/server/internal/livestream/livestream.go @@ -35,11 +35,11 @@ type LiveStream struct { waitTime time.Duration liveDate time.Time - mq *queue.MessageQueue - db *kv.Store + mq *queue.MessageQueue + store *kv.Store } -func New(url string, done chan *LiveStream, mq *queue.MessageQueue, db *kv.Store) *LiveStream { +func New(url string, done chan *LiveStream, mq *queue.MessageQueue, store *kv.Store) *LiveStream { return &LiveStream{ url: url, done: done, @@ -47,7 +47,7 @@ func New(url string, done chan *LiveStream, mq *queue.MessageQueue, db *kv.Store waitTime: time.Second * 0, waitTimeChan: make(chan time.Duration), mq: mq, - db: db, + store: store, } } @@ -94,7 +94,7 @@ func (l *LiveStream) Start() error { //TODO: add pipes d := downloaders.NewLiveStreamDownloader(l.url, []pipes.Pipe{}) - l.db.Set(d) + l.store.Set(d) l.mq.Publish(d) return nil diff --git a/server/internal/livestream/monitor.go b/server/internal/livestream/monitor.go index 3b0cecf..b42affc 100644 --- a/server/internal/livestream/monitor.go +++ b/server/internal/livestream/monitor.go @@ -1,28 +1,26 @@ package livestream import ( - "encoding/gob" - "log/slog" - "maps" - "os" - "path/filepath" - - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" + bolt "go.etcd.io/bbolt" ) +var bucket = []byte("livestreams") + type Monitor struct { - db *kv.Store // where the just started livestream will be published + db *bolt.DB + store *kv.Store // where the just started livestream will be published mq *queue.MessageQueue // where the just started livestream will be published streams map[string]*LiveStream // keeps track of the livestreams done chan *LiveStream // to signal individual processes completition } -func NewMonitor(mq *queue.MessageQueue, db *kv.Store) *Monitor { +func NewMonitor(mq *queue.MessageQueue, store *kv.Store, db *bolt.DB) *Monitor { return &Monitor{ mq: mq, db: db, + store: store, streams: make(map[string]*LiveStream), done: make(chan *LiveStream), } @@ -32,14 +30,24 @@ func NewMonitor(mq *queue.MessageQueue, db *kv.Store) *Monitor { func (m *Monitor) Schedule() { for l := range m.done { delete(m.streams, l.url) + + m.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.Delete([]byte(l.url)) + }) } } func (m *Monitor) Add(url string) { - ls := New(url, m.done, m.mq, m.db) + ls := New(url, m.done, m.mq, m.store) go ls.Start() m.streams[url] = ls + + m.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.Put([]byte(url), []byte{}) + }) } func (m *Monitor) Remove(url string) error { @@ -59,11 +67,6 @@ func (m *Monitor) Status() LiveStreamStatus { status := make(LiveStreamStatus) for k, v := range m.streams { - // wt, ok := <-v.WaitTime() - // if !ok { - // continue - // } - status[k] = Status{ Status: v.status, WaitTime: v.waitTime, @@ -74,46 +77,13 @@ func (m *Monitor) Status() LiveStreamStatus { return status } -// Persist the monitor current state to a file. -// The file is located in the configured config directory -func (m *Monitor) Persist() error { - fd, err := os.Create(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat")) - if err != nil { - return err - } - - defer fd.Close() - - slog.Debug("persisting livestream monitor state") - - var toPersist []string - for url := range maps.Keys(m.streams) { - toPersist = append(toPersist, url) - } - - return gob.NewEncoder(fd).Encode(toPersist) -} - // Restore a saved state and resume the monitored livestreams func (m *Monitor) Restore() error { - fd, err := os.Open(filepath.Join(config.Instance().SessionFilePath, "livestreams.dat")) - if err != nil { - return err - } - - defer fd.Close() - - var toRestore []string - - if err := gob.NewDecoder(fd).Decode(&toRestore); err != nil { - return err - } - - for _, url := range toRestore { - m.Add(url) - } - - slog.Debug("restored livestream monitor state") - - return nil + return m.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.ForEach(func(k, v []byte) error { + m.Add(string(k)) + return nil + }) + }) } diff --git a/server/internal/pipeline/store.go b/server/internal/pipeline/store.go new file mode 100644 index 0000000..7240f95 --- /dev/null +++ b/server/internal/pipeline/store.go @@ -0,0 +1,95 @@ +package pipeline + +import ( + "encoding/json" + "fmt" + + bolt "go.etcd.io/bbolt" +) + +var bucket = []byte("pipelines") + +type Step struct { + Type string `json:"type"` // es. "transcoder", "filewriter" + FFmpegArgs []string `json:"ffmpeg_args,omitempty"` // args da passare a ffmpeg + Path string `json:"path,omitempty"` // solo per filewriter +} + +type Pipeline struct { + ID string `json:"id"` + Name string `json:"name"` + Steps []Step `json:"steps"` +} + +type Store struct { + db *bolt.DB +} + +func NewStore(path string) (*Store, error) { + db, err := bolt.Open(path, 0600, nil) + if err != nil { + return nil, err + } + + // init bucket + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucket) + return err + }) + if err != nil { + return nil, err + } + + return &Store{db: db}, nil +} + +func (s *Store) Save(p Pipeline) error { + data, err := json.Marshal(p) + if err != nil { + return err + } + + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.Put([]byte(p.ID), data) + }) +} + +func (s *Store) Get(id string) (*Pipeline, error) { + var p Pipeline + + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + v := b.Get([]byte(id)) + if v == nil { + return fmt.Errorf("pipeline %s not found", id) + } + return json.Unmarshal(v, &p) + }) + if err != nil { + return nil, err + } + + return &p, nil +} + +func (s *Store) List() ([]Pipeline, error) { + var result []Pipeline + + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.ForEach(func(k, v []byte) error { + var p Pipeline + if err := json.Unmarshal(v, &p); err != nil { + return err + } + result = append(result, p) + return nil + }) + }) + if err != nil { + return nil, err + } + + return result, nil +} diff --git a/server/middleware/utils.go b/server/middleware/utils.go index 4d6640e..3956ac8 100644 --- a/server/middleware/utils.go +++ b/server/middleware/utils.go @@ -11,9 +11,11 @@ func ApplyAuthenticationByConfig(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if config.Instance().RequireAuth { Authenticated(next) + return } if config.Instance().UseOpenId { openid.Middleware(next) + return } next.ServeHTTP(w, r) }) diff --git a/server/rest/common.go b/server/rest/common.go index 5347102..57c09c9 100644 --- a/server/rest/common.go +++ b/server/rest/common.go @@ -1,14 +1,16 @@ package rest import ( - "database/sql" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" + + bolt "go.etcd.io/bbolt" ) type ContainerArgs struct { - DB *sql.DB + DB *bolt.DB MDB *kv.Store MQ *queue.MessageQueue + LM *livestream.Monitor } diff --git a/server/rest/provider.go b/server/rest/provider.go index 3ca408e..fe14114 100644 --- a/server/rest/provider.go +++ b/server/rest/provider.go @@ -14,11 +14,7 @@ var ( func ProvideService(args *ContainerArgs) *Service { serviceOnce.Do(func() { - service = &Service{ - mdb: args.MDB, - db: args.DB, - mq: args.MQ, - } + service = NewService(args.MDB, args.DB, args.MQ, args.LM) }) return service } diff --git a/server/rest/service.go b/server/rest/service.go index 7bed586..31616b6 100644 --- a/server/rest/service.go +++ b/server/rest/service.go @@ -2,8 +2,9 @@ package rest import ( "context" - "database/sql" + "encoding/json" "errors" + "fmt" "io" "os" "os/exec" @@ -17,15 +18,35 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/playlist" + + bolt "go.etcd.io/bbolt" ) type Service struct { mdb *kv.Store - db *sql.DB + db *bolt.DB mq *queue.MessageQueue lm *livestream.Monitor } +func NewService( + mdb *kv.Store, + db *bolt.DB, + mq *queue.MessageQueue, + lm *livestream.Monitor, +) *Service { + db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte("templates")) + return err + }) + return &Service{ + mdb: mdb, + db: db, + mq: mq, + lm: lm, + } +} + func (s *Service) Exec(req internal.DownloadRequest) (string, error) { d := downloaders.NewGenericDownload(req.URL, req.Params) d.SetOutput(internal.DownloadOutput{ @@ -85,64 +106,56 @@ func (s *Service) SetCookies(ctx context.Context, cookies string) error { } func (s *Service) SaveTemplate(ctx context.Context, template *internal.CustomTemplate) error { - conn, err := s.db.Conn(ctx) - if err != nil { - return err - } - - defer conn.Close() - - _, err = conn.ExecContext( - ctx, - "INSERT INTO templates (id, name, content) VALUES (?, ?, ?)", - uuid.NewString(), - template.Name, - template.Content, - ) - - return err + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("templates")) + v, err := json.Marshal(template) + if err != nil { + return err + } + return b.Put([]byte(uuid.NewString()), v) + }) } func (s *Service) GetTemplates(ctx context.Context) (*[]internal.CustomTemplate, error) { - conn, err := s.db.Conn(ctx) - if err != nil { - return nil, err - } - - defer conn.Close() - - rows, err := conn.QueryContext(ctx, "SELECT * FROM templates") - if err != nil { - return nil, err - } - - defer rows.Close() - templates := make([]internal.CustomTemplate, 0) - for rows.Next() { - t := internal.CustomTemplate{} - - err := rows.Scan(&t.Id, &t.Name, &t.Content) - if err != nil { - return nil, err + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("templates")) + if b == nil { + return nil // bucket vuoto, restituisco lista vuota } - templates = append(templates, t) + return b.ForEach(func(k, v []byte) error { + var t internal.CustomTemplate + if err := json.Unmarshal(v, &t); err != nil { + return err + } + templates = append(templates, t) + return nil + }) + }) + + if err != nil { + return nil, err } return &templates, nil } func (s *Service) UpdateTemplate(ctx context.Context, t *internal.CustomTemplate) (*internal.CustomTemplate, error) { - conn, err := s.db.Conn(ctx) + data, err := json.Marshal(t) if err != nil { return nil, err } - defer conn.Close() + err = s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("templates")) + if b == nil { + return fmt.Errorf("bucket templates not found") + } + return b.Put([]byte(t.Id), data) + }) - _, err = conn.ExecContext(ctx, "UPDATE templates SET name = ?, content = ? WHERE id = ?", t.Name, t.Content, t.Id) if err != nil { return nil, err } @@ -151,16 +164,10 @@ func (s *Service) UpdateTemplate(ctx context.Context, t *internal.CustomTemplate } func (s *Service) DeleteTemplate(ctx context.Context, id string) error { - conn, err := s.db.Conn(ctx) - if err != nil { - return err - } - - defer conn.Close() - - _, err = conn.ExecContext(ctx, "DELETE FROM templates WHERE id = ?", id) - - return err + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("templates")) + return b.Delete([]byte(id)) + }) } func (s *Service) GetVersion(ctx context.Context) (string, string, error) { diff --git a/server/server.go b/server/server.go index 066cafc..6e7bcfc 100644 --- a/server/server.go +++ b/server/server.go @@ -3,7 +3,6 @@ package server import ( "context" - "database/sql" "fmt" "io" "io/fs" @@ -13,16 +12,14 @@ import ( "net/rpc" "os" "os/signal" + "path/filepath" "strings" "syscall" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/cors" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archive" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/archiver" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/dbutil" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/filebrowser" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/livestream" @@ -38,7 +35,7 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/twitch" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user" - _ "modernc.org/sqlite" + bolt "go.etcd.io/bbolt" ) type RunConfig struct { @@ -50,7 +47,7 @@ type serverConfig struct { frontend fs.FS swagger fs.FS mdb *kv.Store - db *sql.DB + db *bolt.DB mq *queue.MessageQueue lm *livestream.Monitor tm *twitch.Monitor @@ -60,7 +57,17 @@ type serverConfig struct { var observableLogger = logging.NewObservableLogger() func RunBlocking(rc *RunConfig) { - mdb := kv.NewStore() + dbPath := filepath.Join(config.Instance().SessionFilePath, "bolt.db") + + boltdb, err := bolt.Open(dbPath, 0600, nil) + if err != nil { + panic(err) + } + + mdb, err := kv.NewStore(boltdb, time.Second*15) + if err != nil { + panic(err) + } // ---- LOGGING --------------------------------------------------- logWriters := []io.Writer{ @@ -97,15 +104,6 @@ func RunBlocking(rc *RunConfig) { slog.SetDefault(logger) // ---------------------------------------------------------------- - db, err := sql.Open("sqlite", conf.LocalDatabasePath) - if err != nil { - slog.Error("failed to open database", slog.String("err", err.Error())) - } - - if err := dbutil.Migrate(context.Background(), db); err != nil { - slog.Error("failed to init database", slog.String("err", err.Error())) - } - mq, err := queue.NewMessageQueue() if err != nil { panic(err) @@ -114,7 +112,7 @@ func RunBlocking(rc *RunConfig) { go mdb.Restore(mq) go mdb.EventListener() - lm := livestream.NewMonitor(mq, mdb) + lm := livestream.NewMonitor(mq, mdb, boltdb) go lm.Schedule() go lm.Restore() @@ -123,6 +121,7 @@ func RunBlocking(rc *RunConfig) { config.Instance().Twitch.ClientId, config.Instance().Twitch.ClientSecret, ), + boltdb, ) go tm.Monitor( context.TODO(), @@ -135,8 +134,8 @@ func RunBlocking(rc *RunConfig) { frontend: rc.App, swagger: rc.Swagger, mdb: mdb, + db: boltdb, mq: mq, - db: db, lm: lm, tm: tm, } @@ -144,7 +143,6 @@ func RunBlocking(rc *RunConfig) { srv := newServer(scfg) go gracefulShutdown(srv, &scfg) - go autoPersist(time.Minute*5, mdb, lm, tm) var ( network = "tcp" @@ -171,7 +169,7 @@ func RunBlocking(rc *RunConfig) { } func newServer(c serverConfig) *http.Server { - archiver.Register(c.db) + // archiver.Register(c.db) cronTaskRunner := task.NewCronTaskRunner(c.mq, c.mdb) go cronTaskRunner.Spawner(context.TODO()) @@ -216,7 +214,7 @@ func newServer(c serverConfig) *http.Server { }) // Archive routes - r.Route("/archive", archive.ApplyRouter(c.db)) + // r.Route("/archive", archive.ApplyRouter(c.db)) // Authentication routes r.Route("/auth", func(r chi.Router) { @@ -238,6 +236,7 @@ func newServer(c serverConfig) *http.Server { DB: c.db, MDB: c.mdb, MQ: c.mq, + LM: c.lm, })) // Logging @@ -273,34 +272,10 @@ func gracefulShutdown(srv *http.Server, cfg *serverConfig) { defer func() { cfg.mdb.Persist() - cfg.lm.Persist() - cfg.tm.Persist() + cfg.db.Close() stop() srv.Shutdown(context.Background()) }() }() } - -func autoPersist( - d time.Duration, - db *kv.Store, - lm *livestream.Monitor, - tm *twitch.Monitor, -) { - for { - time.Sleep(d) - if err := db.Persist(); err != nil { - slog.Warn("failed to persisted session", slog.Any("err", err)) - } - if err := lm.Persist(); err != nil { - slog.Warn( - "failed to persisted livestreams monitor session", slog.Any("err", err.Error())) - } - if err := tm.Persist(); err != nil { - slog.Warn( - "failed to persisted twitch monitor session", slog.Any("err", err.Error())) - } - slog.Debug("sucessfully persisted session") - } -} diff --git a/server/subscription/container.go b/server/subscription/container.go index 36f007a..065bc7c 100644 --- a/server/subscription/container.go +++ b/server/subscription/container.go @@ -1,13 +1,13 @@ package subscription import ( - "database/sql" - "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task" + + bolt "go.etcd.io/bbolt" ) -func Container(db *sql.DB, runner task.TaskRunner) domain.RestHandler { +func Container(db *bolt.DB, runner task.TaskRunner) domain.RestHandler { var ( r = provideRepository(db) s = provideService(r, runner) diff --git a/server/subscription/provider.go b/server/subscription/provider.go index f8dc13f..e3f0797 100644 --- a/server/subscription/provider.go +++ b/server/subscription/provider.go @@ -1,7 +1,6 @@ package subscription import ( - "database/sql" "sync" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" @@ -9,6 +8,8 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/rest" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/service" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task" + + bolt "go.etcd.io/bbolt" ) var ( @@ -21,7 +22,7 @@ var ( handOnce sync.Once ) -func provideRepository(db *sql.DB) domain.Repository { +func provideRepository(db *bolt.DB) domain.Repository { repoOnce.Do(func() { repo = repository.New(db) }) diff --git a/server/subscription/repository/repository.go b/server/subscription/repository/repository.go index cd4756c..2709097 100644 --- a/server/subscription/repository/repository.go +++ b/server/subscription/repository/repository.go @@ -2,131 +2,142 @@ package repository import ( "context" - "database/sql" + "encoding/json" + "fmt" "github.com/google/uuid" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/data" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/domain" + bolt "go.etcd.io/bbolt" ) +var bucketName = []byte("subscriptions") + type Repository struct { - db *sql.DB + db *bolt.DB } // Delete implements domain.Repository. func (r *Repository) Delete(ctx context.Context, id string) error { - conn, err := r.db.Conn(ctx) - if err != nil { - return err - } - - defer conn.Close() - - _, err = conn.ExecContext(ctx, "DELETE FROM subscriptions WHERE id = ?", id) - - return err + return r.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketName) + return b.Delete([]byte(id)) + }) } // GetCursor implements domain.Repository. -func (r *Repository) GetCursor(ctx context.Context, id string) (int64, error) { - conn, err := r.db.Conn(ctx) +func (s *Repository) GetCursor(ctx context.Context, id string) (int64, error) { + var cursor int64 + + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("subscriptions")) + v := b.Get([]byte(id)) + if v == nil { + return fmt.Errorf("subscription %s not found", id) + } + + var data struct { + Cursor int64 `json:"cursor"` + } + + if err := json.Unmarshal(v, &data); err != nil { + return err + } + cursor = data.Cursor + return nil + }) + if err != nil { return -1, err } - defer conn.Close() - - row := conn.QueryRowContext(ctx, "SELECT rowid FROM subscriptions WHERE id = ?", id) - - var rowId int64 - - if err := row.Scan(&rowId); err != nil { - return -1, err - } - - return rowId, nil + return cursor, nil } // List implements domain.Repository. func (r *Repository) List(ctx context.Context, start int64, limit int) (*[]data.Subscription, error) { - conn, err := r.db.Conn(ctx) + var subs []data.Subscription + + err := r.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketName) + return b.ForEach(func(k, v []byte) error { + var sub data.Subscription + if err := json.Unmarshal(v, &sub); err != nil { + return err + } + subs = append(subs, sub) + return nil + }) + }) + if err != nil { return nil, err } - defer conn.Close() - - var elements []data.Subscription - - rows, err := conn.QueryContext(ctx, "SELECT rowid, * FROM subscriptions WHERE rowid > ? LIMIT ?", start, limit) - if err != nil { - return nil, err - } - - for rows.Next() { - var rowId int64 - var element data.Subscription - - if err := rows.Scan( - &rowId, - &element.Id, - &element.URL, - &element.Params, - &element.CronExpr, - ); err != nil { - return &elements, err - } - - elements = append(elements, element) - } - - return &elements, nil + return &subs, nil } // Submit implements domain.Repository. -func (r *Repository) Submit(ctx context.Context, sub *data.Subscription) (*data.Subscription, error) { - conn, err := r.db.Conn(ctx) +func (s *Repository) Submit(ctx context.Context, sub *data.Subscription) (*data.Subscription, error) { + if sub.Id == "" { + sub.Id = uuid.NewString() + } + + data, err := json.Marshal(sub) if err != nil { return nil, err } - defer conn.Close() + err = s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("subscriptions")) + return b.Put([]byte(sub.Id), data) + }) - _, err = conn.ExecContext( - ctx, - "INSERT INTO subscriptions (id, url, params, cron) VALUES (?, ?, ?, ?)", - uuid.NewString(), - sub.URL, - sub.Params, - sub.CronExpr, - ) + if err != nil { + return nil, err + } - return sub, err + return sub, nil } // UpdateByExample implements domain.Repository. -func (r *Repository) UpdateByExample(ctx context.Context, example *data.Subscription) error { - conn, err := r.db.Conn(ctx) - if err != nil { - return err - } +func (s *Repository) UpdateByExample(ctx context.Context, example *data.Subscription) error { + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("subscriptions")) - defer conn.Close() + return b.ForEach(func(k, v []byte) error { + var sub data.Subscription + if err := json.Unmarshal(v, &sub); err != nil { + return err + } - _, err = conn.ExecContext( - ctx, - "UPDATE subscriptions SET url = ?, params = ?, cron = ? WHERE id = ? OR url = ?", - example.URL, - example.Params, - example.CronExpr, - example.Id, - example.URL, - ) + if sub.Id == example.Id || sub.URL == example.URL { + // aggiorna i campi + sub.URL = example.URL + sub.Params = example.Params + sub.CronExpr = example.CronExpr - return err + data, err := json.Marshal(sub) + if err != nil { + return err + } + + if err := b.Put(k, data); err != nil { + return err + } + } + + return nil + }) + }) } -func New(db *sql.DB) domain.Repository { +func New(db *bolt.DB) domain.Repository { + db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucketName) + return err + }) + return &Repository{ db: db, } diff --git a/server/twitch/monitor.go b/server/twitch/monitor.go index a729d2d..82ae0f9 100644 --- a/server/twitch/monitor.go +++ b/server/twitch/monitor.go @@ -2,12 +2,10 @@ package twitch import ( "context" - "encoding/gob" "fmt" "iter" "log/slog" "maps" - "os" "path/filepath" "sync" "time" @@ -17,29 +15,48 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/kv" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/pipes" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal/queue" + + bolt "go.etcd.io/bbolt" ) +var bucket = []byte("twitch-monitor") + type Monitor struct { liveChannel chan *StreamInfo monitored map[string]*Client lastState map[string]bool mu sync.RWMutex + db *bolt.DB authenticationManager *AuthenticationManager } -func NewMonitor(authenticationManager *AuthenticationManager) *Monitor { +func NewMonitor(authenticationManager *AuthenticationManager, db *bolt.DB) *Monitor { + db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucket) + return err + }) + return &Monitor{ liveChannel: make(chan *StreamInfo, 16), monitored: make(map[string]*Client), lastState: make(map[string]bool), authenticationManager: authenticationManager, + db: db, } } func (m *Monitor) Add(user string) { m.mu.Lock() - defer m.mu.Unlock() m.monitored[user] = NewTwitchClient(m.authenticationManager) + m.mu.Unlock() + + m.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + //TODO: the empty byte array will be replaced with configs per user + err := b.Put([]byte(user), []byte("")) + return err + }) + slog.Info("added user to twitch monitor", slog.String("user", user)) } @@ -88,9 +105,15 @@ func (m *Monitor) GetMonitoredUsers() iter.Seq[string] { func (m *Monitor) DeleteUser(user string) { m.mu.Lock() - defer m.mu.Unlock() delete(m.monitored, user) delete(m.lastState, user) + m.mu.Unlock() + + m.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + err := b.Delete([]byte(user)) + return err + }) } func DEFAULT_DOWNLOAD_HANDLER(db *kv.Store, mq *queue.MessageQueue) func(user string) error { @@ -106,10 +129,6 @@ func DEFAULT_DOWNLOAD_HANDLER(db *kv.Store, mq *queue.MessageQueue) func(user st ) d := downloaders.NewLiveStreamDownloader(url, []pipes.Pipe{ - // &pipes.FileWriter{ - // Path: filename + ".mp4", - // IsFinal: false, - // }, &pipes.Transcoder{ Args: []string{ "-c:a", "libopus", @@ -130,42 +149,16 @@ func DEFAULT_DOWNLOAD_HANDLER(db *kv.Store, mq *queue.MessageQueue) func(user st } } -func (m *Monitor) Persist() error { - filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat") - - f, err := os.Create(filename) - if err != nil { - return err - } - defer f.Close() - - enc := gob.NewEncoder(f) - users := make([]string, 0, len(m.monitored)) - - for user := range m.monitored { - users = append(users, user) - } - - return enc.Encode(users) -} - func (m *Monitor) Restore() error { - filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat") - - f, err := os.Open(filename) - if err != nil { - if os.IsNotExist(err) { - return nil - } - return err - } - defer f.Close() - - dec := gob.NewDecoder(f) var users []string - if err := dec.Decode(&users); err != nil { - return err - } + + m.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.ForEach(func(k, v []byte) error { + users = append(users, string(k)) + return nil + }) + }) m.monitored = make(map[string]*Client) for _, user := range users {