46 lines
1 KiB
Go
46 lines
1 KiB
Go
|
package reactive
|
||
|
|
||
|
import "context"
|
||
|
|
||
|
type Observable[T any] interface {
|
||
|
Observe(ctx context.Context) <-chan Notification[T]
|
||
|
}
|
||
|
|
||
|
type FnObservable[T any] func(ctx context.Context) <-chan Notification[T]
|
||
|
|
||
|
func (f FnObservable[T]) Observe(ctx context.Context) <-chan Notification[T] {
|
||
|
return f(ctx)
|
||
|
}
|
||
|
|
||
|
func Cold[T any](f func(ctx context.Context) <-chan Notification[T]) Observable[T] {
|
||
|
return FnObservable[T](f)
|
||
|
}
|
||
|
|
||
|
func LazyHot[T any](f func(chan<- Notification[T])) Observable[T] {
|
||
|
var lazy chan Notification[T] = nil
|
||
|
|
||
|
return FnObservable[T](func(ctx context.Context) <-chan Notification[T] {
|
||
|
if lazy == nil {
|
||
|
lazy = make(chan Notification[T])
|
||
|
go func() {
|
||
|
f(lazy)
|
||
|
}()
|
||
|
}
|
||
|
return lazy
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func Hot[T any](c <-chan Notification[T]) Observable[T] {
|
||
|
return FnObservable[T](func(ctx context.Context) <-chan Notification[T] { return c })
|
||
|
}
|
||
|
|
||
|
func FromChannel[T any](c <-chan T) Observable[T] {
|
||
|
ch := make(chan Notification[T])
|
||
|
go func() {
|
||
|
for v := range c {
|
||
|
ch <- MakeNext(v)
|
||
|
}
|
||
|
}()
|
||
|
return Hot(ch)
|
||
|
}
|