42 lines
882 B
Go
42 lines
882 B
Go
|
package reactive
|
||
|
|
||
|
import "context"
|
||
|
|
||
|
func Tap[T any](source Observable[T], observer Observer[T]) Observable[T] {
|
||
|
return &tap[T]{source, observer, false}
|
||
|
}
|
||
|
|
||
|
func TapResponse[T any](source Observable[T], observer Observer[T]) Observable[T] {
|
||
|
return &tap[T]{source, observer, true}
|
||
|
}
|
||
|
|
||
|
type tap[T any] struct {
|
||
|
source Observable[T]
|
||
|
observer Observer[T]
|
||
|
noError bool
|
||
|
}
|
||
|
|
||
|
func (t *tap[T]) Observe(ctx context.Context) <-chan Notification[T] {
|
||
|
observing := t.source.Observe(ctx)
|
||
|
res := make(chan Notification[T])
|
||
|
|
||
|
go func() {
|
||
|
for notification := range observing {
|
||
|
switch notification.Kind() {
|
||
|
case NextNotification:
|
||
|
res <- notification
|
||
|
t.observer.OnNext(notification.Value())
|
||
|
case ErrorNotification:
|
||
|
res <- notification
|
||
|
if !t.noError {
|
||
|
t.observer.OnError(notification.Error())
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
t.observer.OnComplete()
|
||
|
close(res)
|
||
|
}()
|
||
|
|
||
|
return res
|
||
|
}
|