Logging in webUI, Archive view refactor (#127)

* test logging

* test impl for logging

* implemented "live logging", restyle templates dropdown

* moved extract audio to downloadDialog, fixed labels

* code refactoring

* buffering logs
This commit is contained in:
Marco
2024-01-09 14:29:18 +01:00
committed by GitHub
parent de1d9e6a3c
commit 6aa2d41988
25 changed files with 630 additions and 112 deletions

View File

@@ -8,6 +8,8 @@ import (
)
type Config struct {
CurrentLogFile string
LogPath string `yaml:"log_path"`
Host string `yaml:"host"`
Port int `yaml:"port"`
DownloadPath string `yaml:"downloadPath"`

View File

@@ -4,13 +4,11 @@ import (
"encoding/gob"
"errors"
"fmt"
"log"
"os"
"path/filepath"
"sync"
"github.com/google/uuid"
"github.com/marcopeocchi/yt-dlp-web-ui/server/cli"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
@@ -94,14 +92,14 @@ func (m *MemoryDB) All() *[]ProcessResponse {
}
// WIP: Persist the database in a single file named "session.dat"
func (m *MemoryDB) Persist() {
func (m *MemoryDB) Persist() error {
running := m.All()
sf := filepath.Join(config.Instance().SessionFilePath, "session.dat")
fd, err := os.Create(sf)
if err != nil {
log.Println(cli.Red, "Failed to persist session", cli.Reset)
return errors.Join(errors.New("failed to persist session"), err)
}
session := Session{
@@ -110,10 +108,10 @@ func (m *MemoryDB) Persist() {
err = gob.NewEncoder(fd).Encode(session)
if err != nil {
log.Println(cli.Red, "Failed to persist session", cli.Reset)
return errors.Join(errors.New("failed to persist session"), err)
}
log.Println(cli.BgBlue, "Successfully serialized session", cli.Reset)
return nil
}
// WIP: Restore a persisted state
@@ -146,6 +144,4 @@ func (m *MemoryDB) Restore() {
go restored.Start()
}
}
log.Println(cli.BgGreen, "Successfully restored session", cli.Reset)
}

View File

@@ -1,8 +1,6 @@
package internal
import (
"log"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
@@ -19,7 +17,7 @@ func NewMessageQueue() *MessageQueue {
size := config.Instance().QueueSize
if size <= 0 {
log.Fatalln("invalid queue size")
panic("invalid queue size")
}
return &MessageQueue{

View File

@@ -3,12 +3,11 @@ package internal
import (
"encoding/json"
"errors"
"log"
"log/slog"
"os/exec"
"strings"
"time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/cli"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
@@ -19,7 +18,7 @@ type metadata struct {
Type string `json:"_type"`
}
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB, logger *slog.Logger) error {
var (
downloader = config.Instance().DownloaderPath
cmd = exec.Command(downloader, req.URL, "-J")
@@ -37,14 +36,14 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
return err
}
log.Println(cli.BgRed, "Decoding metadata", cli.Reset, req.URL)
logger.Info("decoding metadata", slog.String("url", req.URL))
err = json.NewDecoder(stdout).Decode(&m)
if err != nil {
return err
}
log.Println(cli.BgGreen, "Decoded metadata", cli.Reset, req.URL)
logger.Info("decoded metadata", slog.String("url", req.URL))
if m.Type == "" {
cmd.Wait()
@@ -52,8 +51,10 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
}
if m.Type == "playlist" {
log.Println(
cli.BgGreen, "Playlist detected", cli.Reset, m.Count, "entries",
logger.Info(
"playlist detected",
slog.String("url", req.URL),
slog.Int("count", m.Count),
)
for i, meta := range m.Entries {
@@ -93,8 +94,7 @@ func PlaylistDetect(req DownloadRequest, mq *MessageQueue, db *MemoryDB) error {
proc := &Process{Url: req.URL, Params: req.Params}
mq.Publish(proc)
log.Println("Sending new process to message queue", proc.Url)
logger.Info("sending new process to message queue", slog.String("url", proc.Url))
err = cmd.Wait()
return err
return cmd.Wait()
}

View File

@@ -4,6 +4,7 @@ import (
"bufio"
"encoding/json"
"fmt"
"log/slog"
"regexp"
"sync"
"syscall"
@@ -50,6 +51,7 @@ type Process struct {
Progress DownloadProgress
Output DownloadOutput
proc *os.Process
Logger *slog.Logger
}
type DownloadOutput struct {
@@ -106,13 +108,21 @@ func (p *Process) Start() {
r, err := cmd.StdoutPipe()
if err != nil {
log.Panicln(err)
p.Logger.Error(
"failed to connect to stdout",
slog.String("err", err.Error()),
)
panic(err)
}
scan := bufio.NewScanner(r)
err = cmd.Start()
if err != nil {
log.Panicln(err)
p.Logger.Error(
"failed to start yt-dlp process",
slog.String("err", err.Error()),
)
panic(err)
}
p.proc = cmd.Process
@@ -151,10 +161,10 @@ func (p *Process) Start() {
Speed: stdout.Speed,
ETA: stdout.Eta,
}
log.Println(
cli.BgGreen, "DL", cli.Reset,
cli.BgBlue, p.getShortId(), cli.Reset,
p.Url, stdout.Percentage,
p.Logger.Info("progress",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("percentege", stdout.Percentage),
)
}
})
@@ -175,12 +185,9 @@ func (p *Process) Complete() {
ETA: 0,
}
shortId := p.getShortId()
log.Println(
cli.BgMagenta, "FINISH", cli.Reset,
cli.BgBlue, shortId, cli.Reset,
p.Url,
p.Logger.Info("finished",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
}
@@ -197,7 +204,7 @@ func (p *Process) Kill() error {
}
err = syscall.Kill(-pgid, syscall.SIGTERM)
log.Println("Killed process", p.Id)
p.Logger.Info("killed process", slog.String("id", p.Id))
return err
}
@@ -233,6 +240,12 @@ func (p *Process) GetFormatsSync() (DownloadFormats, error) {
p.Url,
)
p.Logger.Info(
"retrieving metadata",
slog.String("caller", "getFormats"),
slog.String("url", p.Url),
)
go func() {
decodingError = json.Unmarshal(stdout, &info)
wg.Done()
@@ -264,7 +277,11 @@ func (p *Process) SetMetadata() error {
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Println("Cannot retrieve info for", p.Url)
p.Logger.Error("failed retrieving info",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
slog.String("err", err.Error()),
)
return err
}
@@ -278,10 +295,9 @@ func (p *Process) SetMetadata() error {
return err
}
log.Println(
cli.BgRed, "Metadata", cli.Reset,
cli.BgBlue, p.getShortId(), cli.Reset,
p.Url,
p.Logger.Info("retrieving metadata",
slog.String("id", p.getShortId()),
slog.String("url", p.Url),
)
err = json.NewDecoder(stdout).Decode(&info)

80
server/logging/handler.go Normal file
View File

@@ -0,0 +1,80 @@
package logging
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
middlewares "github.com/marcopeocchi/yt-dlp-web-ui/server/middleware"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: 1000,
WriteBufferSize: 1000,
}
func webSocket(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
for msg := range logsObservable.Observe() {
c.WriteJSON(msg.V)
}
}
func sse(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
for msg := range logsObservable.Observe() {
if msg.E != nil {
http.Error(w, msg.E.Error(), http.StatusInternalServerError)
return
}
var (
b bytes.Buffer
sb strings.Builder
)
if err := json.NewEncoder(&b).Encode(msg.V); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sb.WriteString("event: log\n")
sb.WriteString("data: " + b.String() + "\n\n")
fmt.Fprint(w, sb.String())
flusher.Flush()
}
}
func ApplyRouter() func(chi.Router) {
return func(r chi.Router) {
if config.Instance().RequireAuth {
r.Use(middlewares.Authenticated)
}
r.Get("/ws", webSocket)
r.Get("/sse", sse)
}
}

View File

@@ -0,0 +1,29 @@
package logging
import (
"time"
"github.com/reactivex/rxgo/v2"
)
var (
logsChan = make(chan rxgo.Item, 100)
logsObservable = rxgo.
FromChannel(logsChan, rxgo.WithBackPressureStrategy(rxgo.Drop)).
BufferWithTime(rxgo.WithDuration(time.Millisecond * 500))
)
type ObservableLogger struct{}
func NewObservableLogger() *ObservableLogger {
return &ObservableLogger{}
}
func (o *ObservableLogger) Write(p []byte) (n int, err error) {
logsChan <- rxgo.Of(string(p))
n = len(p)
err = nil
return
}

View File

@@ -1,6 +1,8 @@
package rpc
import (
"log/slog"
"github.com/go-chi/chi/v5"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
@@ -8,10 +10,15 @@ import (
)
// Dependency injection container.
func Container(db *internal.MemoryDB, mq *internal.MessageQueue) *Service {
func Container(
db *internal.MemoryDB,
mq *internal.MessageQueue,
logger *slog.Logger,
) *Service {
return &Service{
db: db,
mq: mq,
db: db,
mq: mq,
logger: logger,
}
}

View File

@@ -1,7 +1,7 @@
package rpc
import (
"log"
"log/slog"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
"github.com/marcopeocchi/yt-dlp-web-ui/server/sys"
@@ -9,8 +9,9 @@ import (
)
type Service struct {
db *internal.MemoryDB
mq *internal.MessageQueue
db *internal.MemoryDB
mq *internal.MessageQueue
logger *slog.Logger
}
type Running []internal.ProcessResponse
@@ -34,6 +35,7 @@ func (s *Service) Exec(args internal.DownloadRequest, result *string) error {
Path: args.Path,
Filename: args.Rename,
},
Logger: s.logger,
}
s.db.Set(p)
@@ -46,7 +48,7 @@ func (s *Service) Exec(args internal.DownloadRequest, result *string) error {
// Exec spawns a Process.
// The result of the execution is the newly spawned process Id.
func (s *Service) ExecPlaylist(args internal.DownloadRequest, result *string) error {
err := internal.PlaylistDetect(args, s.mq, s.db)
err := internal.PlaylistDetect(args, s.mq, s.db, s.logger)
if err != nil {
return err
}
@@ -88,7 +90,7 @@ func (s *Service) Running(args NoArgs, running *Running) error {
// Kill kills a process given its id and remove it from the memoryDB
func (s *Service) Kill(args string, killed *string) error {
log.Println("Trying killing process with id", args)
s.logger.Info("Trying killing process with id", slog.String("id", args))
proc, err := s.db.Get(args)
if err != nil {
@@ -106,7 +108,7 @@ func (s *Service) Kill(args string, killed *string) error {
// KillAll kills all process unconditionally and removes them from
// the memory db
func (s *Service) KillAll(args NoArgs, killed *string) error {
log.Println("Killing all spawned processes", args)
s.logger.Info("Killing all spawned processes")
keys := s.db.Keys()
var err error
for _, key := range *keys {
@@ -125,7 +127,7 @@ func (s *Service) KillAll(args NoArgs, killed *string) error {
// Remove a process from the db rendering it unusable if active
func (s *Service) Clear(args string, killed *string) error {
log.Println("Clearing process with id", args)
s.logger.Info("Clearing process with id", slog.String("id", args))
s.db.Delete(args)
return nil
}
@@ -148,7 +150,7 @@ func (s *Service) DirectoryTree(args NoArgs, tree *[]string) error {
// Updates the yt-dlp binary using its builtin function
func (s *Service) UpdateExecutable(args NoArgs, updated *bool) error {
log.Println("Updating yt-dlp executable to the latest release")
s.logger.Info("Updating yt-dlp executable to the latest release")
err := updater.UpdateExecutable()
if err != nil {
*updated = true

View File

@@ -4,8 +4,9 @@ import (
"context"
"database/sql"
"fmt"
"io"
"io/fs"
"log"
"log/slog"
"net/http"
"net/rpc"
"os"
@@ -20,6 +21,7 @@ import (
"github.com/marcopeocchi/yt-dlp-web-ui/server/dbutils"
"github.com/marcopeocchi/yt-dlp-web-ui/server/handlers"
"github.com/marcopeocchi/yt-dlp-web-ui/server/internal"
"github.com/marcopeocchi/yt-dlp-web-ui/server/logging"
middlewares "github.com/marcopeocchi/yt-dlp-web-ui/server/middleware"
"github.com/marcopeocchi/yt-dlp-web-ui/server/rest"
ytdlpRPC "github.com/marcopeocchi/yt-dlp-web-ui/server/rpc"
@@ -29,6 +31,7 @@ import (
type serverConfig struct {
frontend fs.FS
logger *slog.Logger
host string
port int
mdb *internal.MemoryDB
@@ -40,14 +43,21 @@ func RunBlocking(host string, port int, frontend fs.FS, dbPath string) {
var mdb internal.MemoryDB
mdb.Restore()
logger := slog.New(
slog.NewTextHandler(
io.MultiWriter(os.Stdout, logging.NewObservableLogger()),
nil,
),
)
db, err := sql.Open("sqlite", dbPath)
if err != nil {
log.Fatalln(err)
logger.Error("failed to open database", slog.String("err", err.Error()))
}
err = dbutils.AutoMigrate(context.Background(), db)
if err != nil {
log.Fatalln(err)
logger.Error("failed to init database", slog.String("err", err.Error()))
}
mq := internal.NewMessageQueue()
@@ -55,6 +65,7 @@ func RunBlocking(host string, port int, frontend fs.FS, dbPath string) {
srv := newServer(serverConfig{
frontend: frontend,
logger: logger,
host: host,
port: port,
mdb: &mdb,
@@ -63,13 +74,15 @@ func RunBlocking(host string, port int, frontend fs.FS, dbPath string) {
})
go gracefulShutdown(srv, &mdb)
go autoPersist(time.Minute*5, &mdb)
go autoPersist(time.Minute*5, &mdb, logger)
log.Fatal(srv.ListenAndServe())
if err := srv.ListenAndServe(); err != nil {
logger.Warn("http server stopped", slog.String("err", err.Error()))
}
}
func newServer(c serverConfig) *http.Server {
service := ytdlpRPC.Container(c.mdb, c.mq)
service := ytdlpRPC.Container(c.mdb, c.mq, c.logger)
rpc.Register(service)
r := chi.NewRouter()
@@ -91,9 +104,7 @@ func newServer(c serverConfig) *http.Server {
r.Use(corsMiddleware.Handler)
r.Use(middleware.Logger)
app := http.FileServer(http.FS(c.frontend))
r.Mount("/", app)
r.Mount("/", http.FileServer(http.FS(c.frontend)))
// Archive routes
r.Route("/archive", func(r chi.Router) {
@@ -118,6 +129,9 @@ func newServer(c serverConfig) *http.Server {
// REST API handlers
r.Route("/api/v1", rest.ApplyRouter(c.db, c.mdb, c.mq))
// Logging
r.Route("/log", logging.ApplyRouter())
return &http.Server{
Addr: fmt.Sprintf("%s:%d", c.host, c.port),
Handler: r,
@@ -133,7 +147,7 @@ func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) {
go func() {
<-ctx.Done()
log.Println("shutdown signal received")
slog.Info("shutdown signal received")
defer func() {
db.Persist()
@@ -143,9 +157,15 @@ func gracefulShutdown(srv *http.Server, db *internal.MemoryDB) {
}()
}
func autoPersist(d time.Duration, db *internal.MemoryDB) {
func autoPersist(d time.Duration, db *internal.MemoryDB, logger *slog.Logger) {
for {
db.Persist()
if err := db.Persist(); err != nil {
logger.Info(
"failed to persisted session",
slog.String("err", err.Error()),
)
}
logger.Info("sucessfully persisted session")
time.Sleep(d)
}
}

55
server/utils/logrotate.go Normal file
View File

@@ -0,0 +1,55 @@
package utils
import (
"io"
"io/fs"
"os"
"path/filepath"
"time"
"github.com/marcopeocchi/yt-dlp-web-ui/server/config"
)
func LogRotate() (*os.File, error) {
logs := findLogs()
for _, log := range logs {
logfd, err := os.Open(log)
if err != nil {
return nil, err
}
gzWriter, err := os.Create(log + ".gz")
if err != nil {
return nil, err
}
_, err = io.Copy(gzWriter, logfd)
if err != nil {
return nil, err
}
}
logfile := time.Now().String() + ".log"
config.Instance().CurrentLogFile = logfile
return os.Create(logfile)
}
func findLogs() []string {
var (
logfiles []string
root = config.Instance().LogPath
)
filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if filepath.Ext(d.Name()) == ".log" {
logfiles = append(logfiles, path)
}
return nil
})
return logfiles
}