ultrago/reactive/observable.go

46 lines
1 KiB
Go
Raw Normal View History

2024-08-13 15:38:22 +00:00
package reactive
import "context"
type Observable[T any] interface {
Observe(ctx context.Context) <-chan Notification[T]
}
type FnObservable[T any] func(ctx context.Context) <-chan Notification[T]
func (f FnObservable[T]) Observe(ctx context.Context) <-chan Notification[T] {
return f(ctx)
}
func Cold[T any](f func(ctx context.Context) <-chan Notification[T]) Observable[T] {
return FnObservable[T](f)
}
func LazyHot[T any](f func(chan<- Notification[T])) Observable[T] {
var lazy chan Notification[T] = nil
return FnObservable[T](func(ctx context.Context) <-chan Notification[T] {
if lazy == nil {
lazy = make(chan Notification[T])
go func() {
f(lazy)
}()
}
return lazy
})
}
func Hot[T any](c <-chan Notification[T]) Observable[T] {
return FnObservable[T](func(ctx context.Context) <-chan Notification[T] { return c })
}
func FromChannel[T any](c <-chan T) Observable[T] {
ch := make(chan Notification[T])
go func() {
for v := range c {
ch <- MakeNext(v)
}
}()
return Hot(ch)
}