67 lines
1.2 KiB
Go
67 lines
1.2 KiB
Go
|
package reactive
|
||
|
|
||
|
import "context"
|
||
|
|
||
|
func CatchError[T any](source Observable[T], handle func(error, Observable[T]) Observable[T]) Observable[T] {
|
||
|
return &catchError[T]{source, handle}
|
||
|
}
|
||
|
|
||
|
type catchError[T any] struct {
|
||
|
source Observable[T]
|
||
|
handle func(error, Observable[T]) Observable[T]
|
||
|
}
|
||
|
|
||
|
func (c *catchError[T]) Observe(ctx context.Context) <-chan Notification[T] {
|
||
|
ch := make(chan Notification[T])
|
||
|
|
||
|
go func() {
|
||
|
defer close(ch)
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case n := <-c.source.Observe(ctx):
|
||
|
if n.Kind() == ErrorNotification {
|
||
|
Copy(c.handle(n.Error(), c.source).Observe(ctx), ch)
|
||
|
} else {
|
||
|
ch <- n
|
||
|
}
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return ch
|
||
|
}
|
||
|
|
||
|
func OnError[T any](source Observable[T], handle func(error)) Observable[T] {
|
||
|
return &onError[T]{source, handle}
|
||
|
}
|
||
|
|
||
|
type onError[T any] struct {
|
||
|
source Observable[T]
|
||
|
handle func(error)
|
||
|
}
|
||
|
|
||
|
func (o *onError[T]) Observe(ctx context.Context) <-chan Notification[T] {
|
||
|
ch := make(chan Notification[T])
|
||
|
|
||
|
go func() {
|
||
|
defer close(ch)
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case n := <-o.source.Observe(ctx):
|
||
|
if n.Kind() == ErrorNotification {
|
||
|
o.handle(n.Error())
|
||
|
}
|
||
|
ch <- n
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return ch
|
||
|
}
|