ultrago/reactive/observer.go
2024-08-13 19:20:52 +03:00

47 lines
850 B
Go

package reactive
import "context"
type Observer[T any] interface {
OnNext(T)
OnError(error)
OnComplete()
}
type ObserverFn[T any] func(T)
func (f ObserverFn[T]) OnNext(v T) { f(v) }
func (f ObserverFn[T]) OnError(err error) {}
func (f ObserverFn[T]) OnComplete() {}
func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]) {
ewrap, cancel := context.WithCancel(ctx)
notifs := o.Observe(ewrap)
if notifs == nil {
cancel()
observer.OnComplete()
return
}
for {
select {
case <-ctx.Done():
cancel()
observer.OnComplete()
return
case n, ok := <-notifs:
if !ok {
observer.OnComplete()
return
}
switch n.Kind() {
case NextNotification:
observer.OnNext(n.Value())
case ErrorNotification:
observer.OnError(n.Error())
cancel()
return
}
}
}
}