diff --git a/.gitignore b/.gitignore index 167234e..d7c33d0 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ frontend/.yarn/install-state.gz livestreams.dat .vite/deps archive.txt +twitch-monitor.dat diff --git a/server/config/config.go b/server/config/config.go index 5693d13..0c9fa61 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" "sync" + "time" "gopkg.in/yaml.v3" ) @@ -31,6 +32,11 @@ type Config struct { OpenIdEmailWhitelist []string `yaml:"openid_email_whitelist"` FrontendPath string `yaml:"frontend_path"` AutoArchive bool `yaml:"auto_archive"` + Twitch struct { + ClientId string `yaml:"client_id"` + ClientSecret string `yaml:"client_secret"` + CheckInterval time.Duration `yaml:"check_interval"` + } `yaml:"twitch"` } var ( diff --git a/server/middleware/utils.go b/server/middleware/utils.go new file mode 100644 index 0000000..4d6640e --- /dev/null +++ b/server/middleware/utils.go @@ -0,0 +1,20 @@ +package middlewares + +import ( + "net/http" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/openid" +) + +func ApplyAuthenticationByConfig(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if config.Instance().RequireAuth { + Authenticated(next) + } + if config.Instance().UseOpenId { + openid.Middleware(next) + } + next.ServeHTTP(w, r) + }) +} diff --git a/server/server.go b/server/server.go index 429046e..3a26e00 100644 --- a/server/server.go +++ b/server/server.go @@ -34,6 +34,7 @@ import ( "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/status" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/subscription/task" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/twitch" "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/user" _ "modernc.org/sqlite" @@ -51,6 +52,7 @@ type serverConfig struct { db *sql.DB mq *internal.MessageQueue lm *livestream.Monitor + tm *twitch.Monitor } // TODO: change scope @@ -115,17 +117,33 @@ func RunBlocking(rc *RunConfig) { go lm.Schedule() go lm.Restore() - srv := newServer(serverConfig{ + tm := twitch.NewMonitor( + twitch.NewAuthenticationManager( + config.Instance().Twitch.ClientId, + config.Instance().Twitch.ClientSecret, + ), + ) + go tm.Monitor( + context.TODO(), + config.Instance().Twitch.CheckInterval, + twitch.DEFAULT_DOWNLOAD_HANDLER(mdb, mq), + ) + go tm.Restore() + + scfg := serverConfig{ frontend: rc.App, swagger: rc.Swagger, mdb: mdb, mq: mq, db: db, lm: lm, - }) + tm: tm, + } - go gracefulShutdown(srv, mdb) - go autoPersist(time.Minute*5, mdb, lm) + srv := newServer(scfg) + + go gracefulShutdown(srv, &scfg) + go autoPersist(time.Minute*5, mdb, lm, tm) var ( network = "tcp" @@ -188,12 +206,7 @@ func newServer(c serverConfig) *http.Server { // Filebrowser routes r.Route("/filebrowser", func(r chi.Router) { - if config.Instance().RequireAuth { - r.Use(middlewares.Authenticated) - } - if config.Instance().UseOpenId { - r.Use(openid.Middleware) - } + r.Use(middlewares.ApplyAuthenticationByConfig) r.Post("/downloaded", filebrowser.ListDownloaded) r.Post("/delete", filebrowser.DeleteFile) r.Get("/d/{id}", filebrowser.DownloadFile) @@ -235,10 +248,17 @@ func newServer(c serverConfig) *http.Server { // Subscriptions r.Route("/subscriptions", subscription.Container(c.db, cronTaskRunner).ApplyRouter()) + // Twitch + r.Route("/twitch", func(r chi.Router) { + r.Use(middlewares.ApplyAuthenticationByConfig) + r.Get("/all", twitch.GetMonitoredUsers(c.tm)) + r.Post("/add", twitch.MonitorUserHandler(c.tm)) + }) + return &http.Server{Handler: r} } -func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) { +func gracefulShutdown(srv *http.Server, cfg *serverConfig) { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, @@ -250,7 +270,9 @@ func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) { slog.Info("shutdown signal received") defer func() { - db.Persist() + cfg.mdb.Persist() + cfg.lm.Persist() + cfg.tm.Persist() stop() srv.Shutdown(context.Background()) @@ -258,8 +280,14 @@ func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) { }() } -func autoPersist(d time.Duration, db *internal.MemoryDB, lm *livestream.Monitor) { +func autoPersist( + d time.Duration, + db *internal.MemoryDB, + lm *livestream.Monitor, + tm *twitch.Monitor, +) { for { + time.Sleep(d) if err := db.Persist(); err != nil { slog.Warn("failed to persisted session", slog.Any("err", err)) } @@ -267,7 +295,10 @@ func autoPersist(d time.Duration, db *internal.MemoryDB, lm *livestream.Monitor) slog.Warn( "failed to persisted livestreams monitor session", slog.Any("err", err.Error())) } + if err := tm.Persist(); err != nil { + slog.Warn( + "failed to persisted twitch monitor session", slog.Any("err", err.Error())) + } slog.Debug("sucessfully persisted session") - time.Sleep(d) } } diff --git a/server/twitch/auth.go b/server/twitch/auth.go new file mode 100644 index 0000000..4a67f35 --- /dev/null +++ b/server/twitch/auth.go @@ -0,0 +1,75 @@ +package twitch + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" +) + +const authURL = "https://id.twitch.tv/oauth2/token" + +type AuthResponse struct { + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + TokenType string `json:"token_type"` +} + +type AccessToken struct { + Token string + Expiry time.Time +} + +type AuthenticationManager struct { + clientId string + clientSecret string + accesToken *AccessToken +} + +func NewAuthenticationManager(clientId, clientSecret string) *AuthenticationManager { + return &AuthenticationManager{ + clientId: clientId, + clientSecret: clientSecret, + accesToken: &AccessToken{}, + } +} + +func (a *AuthenticationManager) GetAccessToken() (*AccessToken, error) { + if a.accesToken != nil && a.accesToken.Token != "" && a.accesToken.Expiry.After(time.Now()) { + return a.accesToken, nil + } + + data := url.Values{} + data.Set("client_id", a.clientId) + data.Set("client_secret", a.clientSecret) + data.Set("grant_type", "client_credentials") + + resp, err := http.PostForm(authURL, data) + if err != nil { + return nil, fmt.Errorf("errore richiesta token: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("status non OK: %s", resp.Status) + } + + var auth AuthResponse + if err := json.NewDecoder(resp.Body).Decode(&auth); err != nil { + return nil, fmt.Errorf("errore decoding JSON: %w", err) + } + + token := &AccessToken{ + Token: auth.AccessToken, + Expiry: time.Now().Add(time.Duration(auth.ExpiresIn) * time.Second), + } + + a.accesToken = token + + return token, nil +} + +func (a *AuthenticationManager) GetClientId() string { + return a.clientId +} diff --git a/server/twitch/client.go b/server/twitch/client.go new file mode 100644 index 0000000..c4c1560 --- /dev/null +++ b/server/twitch/client.go @@ -0,0 +1,91 @@ +package twitch + +import ( + "encoding/json" + "io" + "net/http" + "time" +) + +const twitchAPIURL = "https://api.twitch.tv/helix" + +type Client struct { + authenticationManager AuthenticationManager +} + +func NewTwitchClient(am *AuthenticationManager) *Client { + return &Client{ + authenticationManager: *am, + } +} + +type streamResp struct { + Data []struct { + ID string `json:"id"` + UserName string `json:"user_name"` + Title string `json:"title"` + GameName string `json:"game_name"` + StartedAt string `json:"started_at"` + } `json:"data"` +} + +func (c *Client) doRequest(endpoint string, params map[string]string) ([]byte, error) { + token, err := c.authenticationManager.GetAccessToken() + if err != nil { + return nil, err + } + + reqURL := twitchAPIURL + endpoint + req, err := http.NewRequest("GET", reqURL, nil) + if err != nil { + return nil, err + } + + q := req.URL.Query() + for k, v := range params { + q.Set(k, v) + } + req.URL.RawQuery = q.Encode() + + req.Header.Set("Client-Id", c.authenticationManager.GetClientId()) + req.Header.Set("Authorization", "Bearer "+token.Token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + return io.ReadAll(resp.Body) +} + +func (c *Client) PollStream(channel string, liveChannel chan<- *StreamInfo) error { + body, err := c.doRequest("/streams", map[string]string{"user_login": channel}) + if err != nil { + return err + } + + var sr streamResp + if err := json.Unmarshal(body, &sr); err != nil { + return err + } + + if len(sr.Data) == 0 { + liveChannel <- &StreamInfo{UserName: channel, IsLive: false} + return nil + } + + s := sr.Data[0] + started, _ := time.Parse(time.RFC3339, s.StartedAt) + + liveChannel <- &StreamInfo{ + ID: s.ID, + UserName: s.UserName, + Title: s.Title, + GameName: s.GameName, + StartedAt: started, + IsLive: true, + } + + return nil +} diff --git a/server/twitch/monitor.go b/server/twitch/monitor.go new file mode 100644 index 0000000..c07b984 --- /dev/null +++ b/server/twitch/monitor.go @@ -0,0 +1,142 @@ +package twitch + +import ( + "context" + "encoding/gob" + "fmt" + "iter" + "log/slog" + "maps" + "os" + "path/filepath" + "sync" + "time" + + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config" + "github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal" +) + +type Monitor struct { + liveChannel chan *StreamInfo + monitored map[string]*Client + lastState map[string]bool + mu sync.RWMutex + authenticationManager *AuthenticationManager +} + +func NewMonitor(authenticationManager *AuthenticationManager) *Monitor { + return &Monitor{ + liveChannel: make(chan *StreamInfo, 16), + monitored: make(map[string]*Client), + lastState: make(map[string]bool), + authenticationManager: authenticationManager, + } +} + +func (m *Monitor) Add(user string) { + m.mu.Lock() + defer m.mu.Unlock() + m.monitored[user] = NewTwitchClient(m.authenticationManager) + slog.Info("added user to twitch monitor", slog.String("user", user)) +} + +func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler func(url string) error) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m.mu.RLock() + for user, client := range m.monitored { + u := user + c := client + + go func() { + if err := c.PollStream(u, m.liveChannel); err != nil { + slog.Error("polling failed", slog.String("user", u), slog.Any("err", err)) + } + }() + } + m.mu.RUnlock() + + case stream := <-m.liveChannel: + wasLive := m.lastState[stream.UserName] + if stream.IsLive && !wasLive { + slog.Info("stream went live", slog.String("user", stream.UserName)) + if err := handler(fmt.Sprintf("https://www.twitch.tv/%s", stream.UserName)); err != nil { + slog.Error("handler failed", slog.String("user", stream.UserName), slog.Any("err", err)) + } + } + m.lastState[stream.UserName] = stream.IsLive + + case <-ctx.Done(): + slog.Info("stopping twitch monitor") + return + } + } +} + +func (m *Monitor) GetMonitoredUsers() iter.Seq[string] { + m.mu.RLock() + defer m.mu.RUnlock() + return maps.Keys(m.monitored) +} + +func DEFAULT_DOWNLOAD_HANDLER(db *internal.MemoryDB, mq *internal.MessageQueue) func(url string) error { + return func(url string) error { + p := &internal.Process{ + Url: url, + Livestream: true, + Params: []string{"--downloader", "ffmpeg", "--no-part"}, + } + db.Set(p) + mq.Publish(p) + return nil + } +} + +func (m *Monitor) Persist() error { + filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat") + + f, err := os.Create(filename) + if err != nil { + return err + } + defer f.Close() + + enc := gob.NewEncoder(f) + users := make([]string, 0, len(m.monitored)) + + for user := range m.monitored { + users = append(users, user) + } + + return enc.Encode(users) +} + +func (m *Monitor) Restore() error { + filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat") + + f, err := os.Open(filename) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + + dec := gob.NewDecoder(f) + var users []string + if err := dec.Decode(&users); err != nil { + return err + } + + m.monitored = make(map[string]*Client) + for _, user := range users { + m.monitored[user] = NewTwitchClient(m.authenticationManager) + } + + return nil +} diff --git a/server/twitch/rest.go b/server/twitch/rest.go new file mode 100644 index 0000000..7b8238b --- /dev/null +++ b/server/twitch/rest.go @@ -0,0 +1,40 @@ +package twitch + +import ( + "encoding/json" + "net/http" + "slices" +) + +type addUserReq struct { + User string `json:"user"` +} + +func MonitorUserHandler(m *Monitor) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req addUserReq + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + m.Add(req.User) + + if err := json.NewEncoder(w).Encode("ok"); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +func GetMonitoredUsers(m *Monitor) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + it := m.GetMonitoredUsers() + + if err := json.NewEncoder(w).Encode(slices.Collect(it)); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} diff --git a/server/twitch/types.go b/server/twitch/types.go new file mode 100644 index 0000000..ed03aa5 --- /dev/null +++ b/server/twitch/types.go @@ -0,0 +1,20 @@ +package twitch + +import "time" + +type StreamInfo struct { + ID string + UserName string + Title string + GameName string + StartedAt time.Time + IsLive bool +} + +type VodInfo struct { + ID string + Title string + URL string + Duration string + CreatedAt time.Time +}