diff --git a/server/rx/extensions.go b/server/rx/extensions.go index 1a994d5..38c8f90 100644 --- a/server/rx/extensions.go +++ b/server/rx/extensions.go @@ -6,12 +6,20 @@ import "time" // // Debounce emits the most recently emitted value from the source // withing the timespan set by the span time.Duration -func Sample[T any](span time.Duration, source chan T, done chan struct{}, fn func(e T)) { - ticker := time.NewTicker(span) +func Sample(span time.Duration, source chan []byte, done chan struct{}, fn func(e []byte)) { + var ( + item []byte + ticker = time.NewTicker(span) + ) + for { select { case <-ticker.C: - fn(<-source) + if item != nil { + fn(item) + } + case <-source: + item = <-source case <-done: ticker.Stop() return