changed memory_db internals to sync.Map to map+iterators+mutex
This commit is contained in:
@@ -3,6 +3,7 @@ package internal
|
||||
import (
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"maps"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
@@ -13,41 +14,57 @@ import (
|
||||
|
||||
// In-Memory Thread-Safe Key-Value Storage with optional persistence
|
||||
type MemoryDB struct {
|
||||
table sync.Map
|
||||
table map[string]*Process
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMemoryDB() *MemoryDB {
|
||||
return &MemoryDB{
|
||||
table: make(map[string]*Process),
|
||||
}
|
||||
}
|
||||
|
||||
// Get a process pointer given its id
|
||||
func (m *MemoryDB) Get(id string) (*Process, error) {
|
||||
entry, ok := m.table.Load(id)
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
entry, ok := m.table[id]
|
||||
if !ok {
|
||||
return nil, errors.New("no process found for the given key")
|
||||
}
|
||||
|
||||
return entry.(*Process), nil
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
// Store a pointer of a process and return its id
|
||||
func (m *MemoryDB) Set(process *Process) string {
|
||||
id := uuid.NewString()
|
||||
|
||||
m.table.Store(id, process)
|
||||
m.mu.Lock()
|
||||
process.Id = id
|
||||
m.table[id] = process
|
||||
m.mu.Unlock()
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
// Removes a process progress, given the process id
|
||||
func (m *MemoryDB) Delete(id string) {
|
||||
m.table.Delete(id)
|
||||
m.mu.Lock()
|
||||
delete(m.table, id)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *MemoryDB) Keys() *[]string {
|
||||
var running []string
|
||||
|
||||
m.table.Range(func(key, value any) bool {
|
||||
running = append(running, key.(string))
|
||||
return true
|
||||
})
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
for id := range maps.Keys(m.table) {
|
||||
running = append(running, id)
|
||||
}
|
||||
|
||||
return &running
|
||||
}
|
||||
@@ -56,16 +73,17 @@ func (m *MemoryDB) Keys() *[]string {
|
||||
func (m *MemoryDB) All() *[]ProcessResponse {
|
||||
running := []ProcessResponse{}
|
||||
|
||||
m.table.Range(func(key, value any) bool {
|
||||
m.mu.RLock()
|
||||
for k, v := range maps.All(m.table) {
|
||||
running = append(running, ProcessResponse{
|
||||
Id: key.(string),
|
||||
Info: value.(*Process).Info,
|
||||
Progress: value.(*Process).Progress,
|
||||
Output: value.(*Process).Output,
|
||||
Params: value.(*Process).Params,
|
||||
Id: k,
|
||||
Info: v.Info,
|
||||
Progress: v.Progress,
|
||||
Output: v.Output,
|
||||
Params: v.Params,
|
||||
})
|
||||
return true
|
||||
})
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
return &running
|
||||
}
|
||||
@@ -81,6 +99,8 @@ func (m *MemoryDB) Persist() error {
|
||||
return errors.Join(errors.New("failed to persist session"), err)
|
||||
}
|
||||
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
session := Session{Processes: *running}
|
||||
|
||||
if err := gob.NewEncoder(fd).Encode(session); err != nil {
|
||||
@@ -103,6 +123,9 @@ func (m *MemoryDB) Restore(mq *MessageQueue) {
|
||||
return
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
for _, proc := range session.Processes {
|
||||
restored := &Process{
|
||||
Id: proc.Id,
|
||||
@@ -113,7 +136,7 @@ func (m *MemoryDB) Restore(mq *MessageQueue) {
|
||||
Params: proc.Params,
|
||||
}
|
||||
|
||||
m.table.Store(proc.Id, restored)
|
||||
m.table[proc.Id] = restored
|
||||
|
||||
if restored.Progress.Status != StatusCompleted {
|
||||
mq.Publish(restored)
|
||||
|
||||
Reference in New Issue
Block a user