ultrago/reactive/creation.go
2024-08-13 19:20:52 +03:00

29 lines
480 B
Go

package reactive
import "context"
func BlockingLoop[T any](gen func() (T, error)) Observable[T] {
return &blockingLoop[T]{gen}
}
type blockingLoop[T any] struct {
gen func() (T, error)
}
func (b *blockingLoop[T]) Observe(ctx context.Context) <-chan Notification[T] {
ch := make(chan Notification[T])
go func() {
defer close(ch)
for {
v, err := b.gen()
if err != nil {
ch <- MakeError[T](err)
} else {
ch <- MakeNext[T](v)
}
}
}()
return ch
}