From 2c30bff45dcc6da1b2d9fcd18bfc1e0d3b8f5843 Mon Sep 17 00:00:00 2001 From: marcobaobao Date: Thu, 5 Sep 2024 15:32:51 +0200 Subject: [PATCH] changed memory_db internals to sync.Map to map+iterators+mutex --- go.mod | 8 ++--- go.sum | 7 +++++ server/internal/memory_db.go | 59 +++++++++++++++++++++++++----------- server/server.go | 10 +++--- 4 files changed, 57 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 438a979..b82a98a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/marcopeocchi/yt-dlp-web-ui -go 1.22 +go 1.23 require ( github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef @@ -11,9 +11,9 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/reactivex/rxgo/v2 v2.5.0 - golang.org/x/oauth2 v0.22.0 + golang.org/x/oauth2 v0.23.0 golang.org/x/sync v0.8.0 - golang.org/x/sys v0.24.0 + golang.org/x/sys v0.25.0 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.32.0 ) @@ -34,7 +34,7 @@ require ( github.com/teivah/onecontext v1.3.0 // indirect golang.org/x/crypto v0.26.0 // indirect modernc.org/gc/v3 v3.0.0-20240801135723-a856999a2e4a // indirect - modernc.org/libc v1.59.9 // indirect + modernc.org/libc v1.60.1 // indirect modernc.org/mathutil v1.6.0 // indirect modernc.org/memory v1.8.0 // indirect modernc.org/strutil v1.2.0 // indirect diff --git a/go.sum b/go.sum index 366a716..cb55842 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA= golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= @@ -77,6 +79,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -93,6 +97,7 @@ modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ= modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ= modernc.org/ccgo/v4 v4.20.7 h1:skrinQsjxWfvj6nbC3ztZPJy+NuwmB3hV9zX/pthNYQ= modernc.org/ccgo/v4 v4.20.7/go.mod h1:UOkI3JSG2zT4E2ioHlncSOZsXbuDCZLvPi3uMlZT5GY= +modernc.org/ccgo/v4 v4.21.0 h1:kKPI3dF7RIag8YcToh5ZwDcVMIv6VGa0ED5cvh0LMW4= modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= modernc.org/gc/v2 v2.5.0 h1:bJ9ChznK1L1mUtAQtxi0wi5AtAs5jQuw4PrPHO5pb6M= @@ -101,6 +106,8 @@ modernc.org/gc/v3 v3.0.0-20240801135723-a856999a2e4a h1:CfbpOLEo2IwNzJdMvE8aiRbP modernc.org/gc/v3 v3.0.0-20240801135723-a856999a2e4a/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= modernc.org/libc v1.59.9 h1:k+nNDDakwipimgmJ1D9H466LhFeSkaPPycAs1OZiDmY= modernc.org/libc v1.59.9/go.mod h1:EY/egGEU7Ju66eU6SBqCNYaFUDuc4npICkMWnU5EE3A= +modernc.org/libc v1.60.1 h1:at373l8IFRTkJIkAU85BIuUoBM4T1b51ds0E1ovPG2s= +modernc.org/libc v1.60.1/go.mod h1:xJuobKuNxKH3RUatS7GjR+suWj+5c2K7bi4m/S5arOY= modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= diff --git a/server/internal/memory_db.go b/server/internal/memory_db.go index 016e3e7..f7d38a8 100644 --- a/server/internal/memory_db.go +++ b/server/internal/memory_db.go @@ -3,6 +3,7 @@ package internal import ( "encoding/gob" "errors" + "maps" "os" "path/filepath" "sync" @@ -13,41 +14,57 @@ import ( // In-Memory Thread-Safe Key-Value Storage with optional persistence type MemoryDB struct { - table sync.Map + table map[string]*Process + mu sync.RWMutex +} + +func NewMemoryDB() *MemoryDB { + return &MemoryDB{ + table: make(map[string]*Process), + } } // Get a process pointer given its id func (m *MemoryDB) Get(id string) (*Process, error) { - entry, ok := m.table.Load(id) + m.mu.RLock() + defer m.mu.RUnlock() + + entry, ok := m.table[id] if !ok { return nil, errors.New("no process found for the given key") } - return entry.(*Process), nil + return entry, nil } // Store a pointer of a process and return its id func (m *MemoryDB) Set(process *Process) string { id := uuid.NewString() - m.table.Store(id, process) + m.mu.Lock() process.Id = id + m.table[id] = process + m.mu.Unlock() return id } // Removes a process progress, given the process id func (m *MemoryDB) Delete(id string) { - m.table.Delete(id) + m.mu.Lock() + delete(m.table, id) + m.mu.Unlock() } func (m *MemoryDB) Keys() *[]string { var running []string - m.table.Range(func(key, value any) bool { - running = append(running, key.(string)) - return true - }) + m.mu.RLock() + defer m.mu.RUnlock() + + for id := range maps.Keys(m.table) { + running = append(running, id) + } return &running } @@ -56,16 +73,17 @@ func (m *MemoryDB) Keys() *[]string { func (m *MemoryDB) All() *[]ProcessResponse { running := []ProcessResponse{} - m.table.Range(func(key, value any) bool { + m.mu.RLock() + for k, v := range maps.All(m.table) { running = append(running, ProcessResponse{ - Id: key.(string), - Info: value.(*Process).Info, - Progress: value.(*Process).Progress, - Output: value.(*Process).Output, - Params: value.(*Process).Params, + Id: k, + Info: v.Info, + Progress: v.Progress, + Output: v.Output, + Params: v.Params, }) - return true - }) + } + m.mu.RUnlock() return &running } @@ -81,6 +99,8 @@ func (m *MemoryDB) Persist() error { return errors.Join(errors.New("failed to persist session"), err) } + m.mu.RLock() + defer m.mu.RUnlock() session := Session{Processes: *running} if err := gob.NewEncoder(fd).Encode(session); err != nil { @@ -103,6 +123,9 @@ func (m *MemoryDB) Restore(mq *MessageQueue) { return } + m.mu.Lock() + defer m.mu.Unlock() + for _, proc := range session.Processes { restored := &Process{ Id: proc.Id, @@ -113,7 +136,7 @@ func (m *MemoryDB) Restore(mq *MessageQueue) { Params: proc.Params, } - m.table.Store(proc.Id, restored) + m.table[proc.Id] = restored if restored.Progress.Status != StatusCompleted { mq.Publish(restored) diff --git a/server/server.go b/server/server.go index 037a5e4..5bb3888 100644 --- a/server/server.go +++ b/server/server.go @@ -55,7 +55,7 @@ type serverConfig struct { } func RunBlocking(cfg *RunConfig) { - var mdb internal.MemoryDB + mdb := internal.NewMemoryDB() // ---- LOGGING --------------------------------------------------- logWriters := []io.Writer{ @@ -104,7 +104,7 @@ func RunBlocking(cfg *RunConfig) { mq.SetupConsumers() go mdb.Restore(mq) - lm := livestream.NewMonitor(mq, &mdb) + lm := livestream.NewMonitor(mq, mdb) go lm.Schedule() go lm.Restore() @@ -113,14 +113,14 @@ func RunBlocking(cfg *RunConfig) { swagger: cfg.Swagger, host: cfg.Host, port: cfg.Port, - mdb: &mdb, + mdb: mdb, mq: mq, db: db, lm: lm, }) - go gracefulShutdown(srv, &mdb) - go autoPersist(time.Minute*5, &mdb) + go gracefulShutdown(srv, mdb) + go autoPersist(time.Minute*5, mdb) var ( network = "tcp"