47 lines
850 B
Go
47 lines
850 B
Go
package reactive
|
|
|
|
import "context"
|
|
|
|
type Observer[T any] interface {
|
|
OnNext(T)
|
|
OnError(error)
|
|
OnComplete()
|
|
}
|
|
|
|
type ObserverFn[T any] func(T)
|
|
|
|
func (f ObserverFn[T]) OnNext(v T) { f(v) }
|
|
func (f ObserverFn[T]) OnError(err error) {}
|
|
func (f ObserverFn[T]) OnComplete() {}
|
|
|
|
func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]) {
|
|
ewrap, cancel := context.WithCancel(ctx)
|
|
notifs := o.Observe(ewrap)
|
|
if notifs == nil {
|
|
cancel()
|
|
observer.OnComplete()
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
cancel()
|
|
observer.OnComplete()
|
|
return
|
|
case n, ok := <-notifs:
|
|
if !ok {
|
|
observer.OnComplete()
|
|
return
|
|
}
|
|
switch n.Kind() {
|
|
case NextNotification:
|
|
observer.OnNext(n.Value())
|
|
case ErrorNotification:
|
|
observer.OnError(n.Error())
|
|
cancel()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|