changed map+rwMutext to sync.Map

This commit is contained in:
2023-03-01 15:06:11 +01:00
parent 72857882e4
commit aaad68a42c
5 changed files with 63 additions and 68 deletions

View File

@@ -1,6 +1,8 @@
package server package server
import ( import (
"errors"
"fmt"
"log" "log"
"os" "os"
"sync" "sync"
@@ -13,86 +15,74 @@ import (
// In-Memory volatile Thread-Safe Key-Value Storage // In-Memory volatile Thread-Safe Key-Value Storage
type MemoryDB struct { type MemoryDB struct {
table map[string]*Process table sync.Map
mu sync.Mutex
}
// Inits the db with an empty map of string->Process pointer
func (m *MemoryDB) New() {
m.table = make(map[string]*Process)
} }
// Get a process pointer given its id // Get a process pointer given its id
func (m *MemoryDB) Get(id string) *Process { func (m *MemoryDB) Get(id string) (*Process, error) {
m.mu.Lock() entry, ok := db.table.Load(id)
res := m.table[id] if !ok {
m.mu.Unlock() return nil, errors.New("no process found for the given key")
return res }
return entry.(*Process), nil
} }
// Store a pointer of a process and return its id // Store a pointer of a process and return its id
func (m *MemoryDB) Set(process *Process) string { func (m *MemoryDB) Set(process *Process) string {
id := uuid.Must(uuid.NewRandom()).String() id := uuid.Must(uuid.NewRandom()).String()
m.mu.Lock() db.table.Store(id, process)
m.table[id] = process
m.mu.Unlock()
return id return id
} }
// Update a process info/metadata, given the process id // Update a process info/metadata, given the process id
func (m *MemoryDB) Update(id string, info DownloadInfo) { func (m *MemoryDB) UpdateInfo(id string, info DownloadInfo) error {
m.mu.Lock() entry, ok := db.table.Load(id)
if m.table[id] != nil { if ok {
m.table[id].Info = info entry.(*Process).Info = info
db.table.Store(id, entry)
return nil
} }
m.mu.Unlock() return fmt.Errorf("can't update row with id %s", id)
} }
// Update a process progress data, given the process id // Update a process progress data, given the process id
// Used for updating completition percentage or ETA // Used for updating completition percentage or ETA
func (m *MemoryDB) UpdateProgress(id string, progress DownloadProgress) { func (m *MemoryDB) UpdateProgress(id string, progress DownloadProgress) error {
m.mu.Lock() entry, ok := db.table.Load(id)
if m.table[id] != nil { if ok {
m.table[id].Progress = progress entry.(*Process).Progress = progress
db.table.Store(id, entry)
return nil
} }
m.mu.Unlock() return fmt.Errorf("can't update row with id %s", id)
} }
// Removes a process progress, given the process id // Removes a process progress, given the process id
func (m *MemoryDB) Delete(id string) { func (m *MemoryDB) Delete(id string) {
m.mu.Lock() db.table.Delete(id)
delete(m.table, id)
m.mu.Unlock()
} }
// Returns a slice of all currently stored processes id func (m *MemoryDB) Keys() *[]string {
func (m *MemoryDB) Keys() []string { running := []string{}
m.mu.Lock() db.table.Range(func(key, value any) bool {
keys := make([]string, len(m.table)) running = append(running, key.(string))
i := 0 return true
for k := range m.table { })
keys[i] = k return &running
i++
}
m.mu.Unlock()
return keys
} }
// Returns a slice of all currently stored processes progess // Returns a slice of all currently stored processes progess
func (m *MemoryDB) All() []ProcessResponse { func (m *MemoryDB) All() *[]ProcessResponse {
running := make([]ProcessResponse, len(m.table)) running := []ProcessResponse{}
i := 0 db.table.Range(func(key, value any) bool {
for k, v := range m.table { running = append(running, ProcessResponse{
if v != nil { Id: key.(string),
running[i] = ProcessResponse{ Info: value.(*Process).Info,
Id: k, Progress: value.(*Process).Progress,
Info: v.Info, })
Progress: v.Progress, return true
} })
i++ return &running
}
}
return running
} }
// WIP: Persist the database in a single file named "session.dat" // WIP: Persist the database in a single file named "session.dat"
@@ -100,7 +90,7 @@ func (m *MemoryDB) Persist() {
running := m.All() running := m.All()
session, err := json.Marshal(Session{ session, err := json.Marshal(Session{
Processes: running, Processes: *running,
}) })
if err != nil { if err != nil {
log.Println(cli.Red, "Failed to persist database", cli.Reset) log.Println(cli.Red, "Failed to persist database", cli.Reset)

View File

@@ -118,7 +118,7 @@ func (p *Process) Start(path, filename string) {
} }
info := DownloadInfo{URL: p.url} info := DownloadInfo{URL: p.url}
json.Unmarshal(stdout, &info) json.Unmarshal(stdout, &info)
p.mem.Update(p.id, info) p.mem.UpdateInfo(p.id, info)
}() }()
// --------------- progress block --------------- // // --------------- progress block --------------- //

View File

@@ -18,7 +18,7 @@ import "time"
// -t-> |> // -t-> |>
// //
// --A-----C-----G-------|> // --A-----C-----G-------|>
func Debounce(interval time.Duration, source chan string, cb func(emit string)) { func Debounce(interval time.Duration, source chan string, f func(emit string)) {
var item string var item string
timer := time.NewTimer(interval) timer := time.NewTimer(interval)
for { for {
@@ -27,7 +27,7 @@ func Debounce(interval time.Duration, source chan string, cb func(emit string))
timer.Reset(interval) timer.Reset(interval)
case <-timer.C: case <-timer.C:
if item != "" { if item != "" {
cb(item) f(item)
} }
} }
} }

View File

@@ -17,10 +17,6 @@ import (
var db MemoryDB var db MemoryDB
func init() {
db.New()
}
func RunBlocking(ctx context.Context) { func RunBlocking(ctx context.Context) {
fe := ctx.Value("frontend").(fs.SubFS) fe := ctx.Value("frontend").(fs.SubFS)
port := ctx.Value("port").(int) port := ctx.Value("port").(int)

View File

@@ -40,7 +40,11 @@ func (t *Service) Exec(args DownloadSpecificArgs, result *string) error {
// Progess retrieves the Progress of a specific Process given its Id // Progess retrieves the Progress of a specific Process given its Id
func (t *Service) Progess(args Args, progress *DownloadProgress) error { func (t *Service) Progess(args Args, progress *DownloadProgress) error {
*progress = db.Get(args.Id).Progress proc, err := db.Get(args.Id)
if err != nil {
return err
}
*progress = proc.Progress
return nil return nil
} }
@@ -54,21 +58,23 @@ func (t *Service) Formats(args Args, progress *DownloadFormats) error {
// Pending retrieves a slice of all Pending/Running processes ids // Pending retrieves a slice of all Pending/Running processes ids
func (t *Service) Pending(args NoArgs, pending *Pending) error { func (t *Service) Pending(args NoArgs, pending *Pending) error {
*pending = Pending(db.Keys()) *pending = *db.Keys()
return nil return nil
} }
// Running retrieves a slice of all Processes progress // Running retrieves a slice of all Processes progress
func (t *Service) Running(args NoArgs, running *Running) error { func (t *Service) Running(args NoArgs, running *Running) error {
*running = db.All() *running = *db.All()
return nil return nil
} }
// Kill kills a process given its id and remove it from the memoryDB // Kill kills a process given its id and remove it from the memoryDB
func (t *Service) Kill(args string, killed *string) error { func (t *Service) Kill(args string, killed *string) error {
log.Println("Trying killing process with id", args) log.Println("Trying killing process with id", args)
proc := db.Get(args) proc, err := db.Get(args)
var err error if err != nil {
return err
}
if proc != nil { if proc != nil {
err = proc.Kill() err = proc.Kill()
} }
@@ -81,8 +87,11 @@ func (t *Service) KillAll(args NoArgs, killed *string) error {
log.Println("Killing all spawned processes", args) log.Println("Killing all spawned processes", args)
keys := db.Keys() keys := db.Keys()
var err error var err error
for _, key := range keys { for _, key := range *keys {
proc := db.Get(key) proc, err := db.Get(key)
if err != nil {
return err
}
if proc != nil { if proc != nil {
proc.Kill() proc.Kill()
} }