Dropping rxgo (#201)

* rxgo event source to channel with drop strategy

* code optimizations
This commit is contained in:
Marco Piovanello
2024-09-18 17:49:25 +02:00
committed by GitHub
parent a00059ca88
commit 64fbdbbbdf
6 changed files with 114 additions and 135 deletions

View File

@@ -3,9 +3,9 @@ package logging
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
@@ -22,58 +22,74 @@ var upgrader = websocket.Upgrader{
WriteBufferSize: 1000,
}
func webSocket(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
for msg := range logsObservable.Observe() {
c.WriteJSON(msg.V)
}
}
func sse(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
for msg := range logsObservable.Observe() {
if msg.E != nil {
http.Error(w, msg.E.Error(), http.StatusInternalServerError)
return
}
var (
b bytes.Buffer
sb strings.Builder
)
if err := json.NewEncoder(&b).Encode(msg.V); err != nil {
func webSocket(logger *ObservableLogger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sb.WriteString("event: log\n")
sb.WriteString("data: ")
sb.WriteString(b.String())
sb.WriteRune('\n')
sb.WriteRune('\n')
logs := logger.Observe(r.Context())
fmt.Fprint(w, sb.String())
flusher.Flush()
for {
select {
case <-r.Context().Done():
return
case msg := <-logs:
c.WriteJSON(msg)
}
}
}
}
func ApplyRouter() func(chi.Router) {
func sse(logger *ObservableLogger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
logs := logger.Observe(r.Context())
for {
select {
case <-r.Context().Done():
slog.Info("detaching from logger")
return
case msg, ok := <-logs:
if !ok {
http.Error(w, "closed logs channel", http.StatusInternalServerError)
return
}
var b bytes.Buffer
b.WriteString("event: log\n")
b.WriteString("data: ")
if err := json.NewEncoder(&b).Encode(msg); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b.WriteRune('\n')
b.WriteRune('\n')
io.Copy(w, &b)
flusher.Flush()
}
}
}
}
func ApplyRouter(logger *ObservableLogger) func(chi.Router) {
return func(r chi.Router) {
if config.Instance().RequireAuth {
r.Use(middlewares.Authenticated)
@@ -81,7 +97,7 @@ func ApplyRouter() func(chi.Router) {
if config.Instance().UseOpenId {
r.Use(openid.Middleware)
}
r.Get("/ws", webSocket)
r.Get("/sse", sse)
r.Get("/ws", webSocket(logger))
r.Get("/sse", sse(logger))
}
}

View File

@@ -1,40 +1,53 @@
package logging
import (
"time"
"github.com/reactivex/rxgo/v2"
"context"
)
/*
Logger implementation using the observable pattern.
Implements io.Writer interface.
Logger implementation using the observable pattern.
Implements io.Writer interface.
The observable is an event source which drops everythigng unless there's
a subscriber connected.
The observable is an event source which drops everythigng unless there's
a subscriber connected.
The observer implementatios are a http ServerSentEvents handler and a
websocket one in handler.go
The observer implementatios are a http ServerSentEvents handler and a
websocket one in handler.go
*/
var (
logsChan = make(chan rxgo.Item, 100)
logsObservable = rxgo.
FromEventSource(logsChan, rxgo.WithBackPressureStrategy(rxgo.Drop)).
BufferWithTime(rxgo.WithDuration(time.Millisecond * 500))
)
type ObservableLogger struct{}
type ObservableLogger struct {
logsChan chan []byte
}
func NewObservableLogger() *ObservableLogger {
return &ObservableLogger{}
return &ObservableLogger{
logsChan: make(chan []byte, 100),
}
}
func (o *ObservableLogger) Write(p []byte) (n int, err error) {
logsChan <- rxgo.Of(string(p))
n = len(p)
err = nil
return
select {
case o.logsChan <- p:
n = len(p)
err = nil
return
default:
return
}
}
func (o *ObservableLogger) Observe(ctx context.Context) <-chan string {
logs := make(chan string)
go func() {
for {
select {
case <-ctx.Done():
return
case logLine := <-o.logsChan:
logs <- string(logLine)
}
}
}()
return logs
}

View File

@@ -47,13 +47,16 @@ type serverConfig struct {
lm *livestream.Monitor
}
// TODO: change scope
var observableLogger = logging.NewObservableLogger()
func RunBlocking(rc *RunConfig) {
mdb := internal.NewMemoryDB()
// ---- LOGGING ---------------------------------------------------
logWriters := []io.Writer{
os.Stdout,
logging.NewObservableLogger(), // for web-ui
observableLogger, // for web-ui
}
conf := config.Instance()
@@ -207,7 +210,7 @@ func newServer(c serverConfig) *http.Server {
}))
// Logging
r.Route("/log", logging.ApplyRouter())
r.Route("/log", logging.ApplyRouter(observableLogger))
return &http.Server{Handler: r}
}