Files
yt-dlp-webui/server/twitch/monitor.go
Marco Piovanello f4a0f688af Feat twitch livestreams (#334)
* backend code

* fixed twitch authentication
2025-08-25 12:54:16 +02:00

143 lines
3.2 KiB
Go

package twitch
import (
"context"
"encoding/gob"
"fmt"
"iter"
"log/slog"
"maps"
"os"
"path/filepath"
"sync"
"time"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/config"
"github.com/marcopiovanello/yt-dlp-web-ui/v3/server/internal"
)
type Monitor struct {
liveChannel chan *StreamInfo
monitored map[string]*Client
lastState map[string]bool
mu sync.RWMutex
authenticationManager *AuthenticationManager
}
func NewMonitor(authenticationManager *AuthenticationManager) *Monitor {
return &Monitor{
liveChannel: make(chan *StreamInfo, 16),
monitored: make(map[string]*Client),
lastState: make(map[string]bool),
authenticationManager: authenticationManager,
}
}
func (m *Monitor) Add(user string) {
m.mu.Lock()
defer m.mu.Unlock()
m.monitored[user] = NewTwitchClient(m.authenticationManager)
slog.Info("added user to twitch monitor", slog.String("user", user))
}
func (m *Monitor) Monitor(ctx context.Context, interval time.Duration, handler func(url string) error) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.mu.RLock()
for user, client := range m.monitored {
u := user
c := client
go func() {
if err := c.PollStream(u, m.liveChannel); err != nil {
slog.Error("polling failed", slog.String("user", u), slog.Any("err", err))
}
}()
}
m.mu.RUnlock()
case stream := <-m.liveChannel:
wasLive := m.lastState[stream.UserName]
if stream.IsLive && !wasLive {
slog.Info("stream went live", slog.String("user", stream.UserName))
if err := handler(fmt.Sprintf("https://www.twitch.tv/%s", stream.UserName)); err != nil {
slog.Error("handler failed", slog.String("user", stream.UserName), slog.Any("err", err))
}
}
m.lastState[stream.UserName] = stream.IsLive
case <-ctx.Done():
slog.Info("stopping twitch monitor")
return
}
}
}
func (m *Monitor) GetMonitoredUsers() iter.Seq[string] {
m.mu.RLock()
defer m.mu.RUnlock()
return maps.Keys(m.monitored)
}
func DEFAULT_DOWNLOAD_HANDLER(db *internal.MemoryDB, mq *internal.MessageQueue) func(url string) error {
return func(url string) error {
p := &internal.Process{
Url: url,
Livestream: true,
Params: []string{"--downloader", "ffmpeg", "--no-part"},
}
db.Set(p)
mq.Publish(p)
return nil
}
}
func (m *Monitor) Persist() error {
filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat")
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
enc := gob.NewEncoder(f)
users := make([]string, 0, len(m.monitored))
for user := range m.monitored {
users = append(users, user)
}
return enc.Encode(users)
}
func (m *Monitor) Restore() error {
filename := filepath.Join(config.Instance().SessionFilePath, "twitch-monitor.dat")
f, err := os.Open(filename)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()
dec := gob.NewDecoder(f)
var users []string
if err := dec.Decode(&users); err != nil {
return err
}
m.monitored = make(map[string]*Client)
for _, user := range users {
m.monitored[user] = NewTwitchClient(m.authenticationManager)
}
return nil
}