ultrago/reactive/for_each.go

39 lines
675 B
Go
Raw Permalink Normal View History

2024-08-13 16:20:52 +00:00
package reactive
import "context"
func ForEach[T any](source Observable[T], handle func(T)) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
for n := range source.Observe(context.Background()) {
if n.Kind() == NextNotification {
handle(n.Value())
}
}
}()
return done
}
func OnEach[T any](source Observable[T], next func(T), err func(error)) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
for n := range source.Observe(context.Background()) {
switch n.Kind() {
case NextNotification:
next(n.Value())
case ErrorNotification:
err(n.Error())
}
}
}()
return done
}