diff --git a/reactive/creation.go b/reactive/creation.go new file mode 100644 index 0000000..7949033 --- /dev/null +++ b/reactive/creation.go @@ -0,0 +1,29 @@ +package reactive + +import "context" + +func BlockingLoop[T any](gen func() (T, error)) Observable[T] { + return &blockingLoop[T]{gen} +} + +type blockingLoop[T any] struct { + gen func() (T, error) +} + +func (b *blockingLoop[T]) Observe(ctx context.Context) <-chan Notification[T] { + ch := make(chan Notification[T]) + + go func() { + defer close(ch) + for { + v, err := b.gen() + if err != nil { + ch <- MakeError[T](err) + } else { + ch <- MakeNext[T](v) + } + } + }() + + return ch +} diff --git a/reactive/errors.go b/reactive/errors.go new file mode 100644 index 0000000..2b244b6 --- /dev/null +++ b/reactive/errors.go @@ -0,0 +1,66 @@ +package reactive + +import "context" + +func CatchError[T any](source Observable[T], handle func(error, Observable[T]) Observable[T]) Observable[T] { + return &catchError[T]{source, handle} +} + +type catchError[T any] struct { + source Observable[T] + handle func(error, Observable[T]) Observable[T] +} + +func (c *catchError[T]) Observe(ctx context.Context) <-chan Notification[T] { + ch := make(chan Notification[T]) + + go func() { + defer close(ch) + + for { + select { + case n := <-c.source.Observe(ctx): + if n.Kind() == ErrorNotification { + Copy(c.handle(n.Error(), c.source).Observe(ctx), ch) + } else { + ch <- n + } + case <-ctx.Done(): + return + } + } + }() + + return ch +} + +func OnError[T any](source Observable[T], handle func(error)) Observable[T] { + return &onError[T]{source, handle} +} + +type onError[T any] struct { + source Observable[T] + handle func(error) +} + +func (o *onError[T]) Observe(ctx context.Context) <-chan Notification[T] { + ch := make(chan Notification[T]) + + go func() { + defer close(ch) + + for { + select { + case n := <-o.source.Observe(ctx): + if n.Kind() == ErrorNotification { + o.handle(n.Error()) + } + ch <- n + case <-ctx.Done(): + return + } + } + }() + + return ch +} diff --git a/reactive/for_each.go b/reactive/for_each.go new file mode 100644 index 0000000..89637e5 --- /dev/null +++ b/reactive/for_each.go @@ -0,0 +1,38 @@ +package reactive + +import "context" + +func ForEach[T any](source Observable[T], handle func(T)) <-chan struct{} { + done := make(chan struct{}) + + go func() { + defer close(done) + + for n := range source.Observe(context.Background()) { + if n.Kind() == NextNotification { + handle(n.Value()) + } + } + }() + + return done +} + +func OnEach[T any](source Observable[T], next func(T), err func(error)) <-chan struct{} { + done := make(chan struct{}) + + go func() { + defer close(done) + + for n := range source.Observe(context.Background()) { + switch n.Kind() { + case NextNotification: + next(n.Value()) + case ErrorNotification: + err(n.Error()) + } + } + }() + + return done +} diff --git a/reactive/multicast/multicast.go b/reactive/multicast/multicast.go index 4525107..725b72f 100644 --- a/reactive/multicast/multicast.go +++ b/reactive/multicast/multicast.go @@ -164,7 +164,7 @@ func NewChan[T any](bufferCapacity int, endpointCapacity int) *Chan[T] { start: time.Now(), written: make([]int64, size), endpoints: endpoints[T]{ - entry: make([]Endpoint, endpointCapacity), + entry: make([]Endpoint[T], endpointCapacity), }, } c.receivers = sync.NewCond(c) @@ -225,7 +225,7 @@ func (c *Chan[T]) commitData() uint64 { func (c *Chan[T]) slideBuffer() bool { slowestCursor := parked - spinlock := c.endpoints.Access(func(endpoints *endpoints) { + spinlock := c.endpoints.Access(func(endpoints *endpoints[T]) { for i := uint32(0); i < endpoints.len; i++ { cursor := atomic.LoadUint64(&endpoints.entry[i].cursor) if cursor < slowestCursor { @@ -300,7 +300,7 @@ func (c *Chan[T]) Send(value T) { func (c *Chan[T]) Close(err error) { if atomic.CompareAndSwapUint64(&c.channelState, active, closed) { c.err = err - c.endpoints.Access(func(endpoints *endpoints) { + c.endpoints.Access(func(endpoints *endpoints[T]) { for i := uint32(0); i < endpoints.len; i++ { atomic.CompareAndSwapUint64(&endpoints.entry[i].endpointState, active, closed) } diff --git a/reactive/net.go b/reactive/net.go new file mode 100644 index 0000000..abeb008 --- /dev/null +++ b/reactive/net.go @@ -0,0 +1,23 @@ +package reactive + +import ( + "net" + "net/netip" +) + +func Listen(network string, address string) (Observable[net.Conn], error) { + lis, err := net.Listen(network, address) + if err != nil { + return nil, err + } + return BlockingLoop(lis.Accept), nil +} + +func ListenTCP(network string, address netip.AddrPort) (Observable[*net.TCPConn], error) { + lis, err := net.ListenTCP("tcp", net.TCPAddrFromAddrPort(address)) + if err != nil { + return nil, err + } + + return BlockingLoop(lis.AcceptTCP), nil +} diff --git a/reactive/observer.go b/reactive/observer.go index cb5b718..56cd3f5 100644 --- a/reactive/observer.go +++ b/reactive/observer.go @@ -15,8 +15,10 @@ func (f ObserverFn[T]) OnError(err error) {} func (f ObserverFn[T]) OnComplete() {} func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]) { - notifs := o.Observe(ctx) + ewrap, cancel := context.WithCancel(ctx) + notifs := o.Observe(ewrap) if notifs == nil { + cancel() observer.OnComplete() return } @@ -24,6 +26,7 @@ func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T] for { select { case <-ctx.Done(): + cancel() observer.OnComplete() return case n, ok := <-notifs: @@ -36,6 +39,8 @@ func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T] observer.OnNext(n.Value()) case ErrorNotification: observer.OnError(n.Error()) + cancel() + return } } }