This commit is contained in:
Borodinov Ilya 2024-08-13 19:20:52 +03:00
parent 483863005e
commit e87cd1f843
Signed by: noth
GPG key ID: 75503B2EF596D1BD
6 changed files with 165 additions and 4 deletions

29
reactive/creation.go Normal file
View file

@ -0,0 +1,29 @@
package reactive
import "context"
func BlockingLoop[T any](gen func() (T, error)) Observable[T] {
return &blockingLoop[T]{gen}
}
type blockingLoop[T any] struct {
gen func() (T, error)
}
func (b *blockingLoop[T]) Observe(ctx context.Context) <-chan Notification[T] {
ch := make(chan Notification[T])
go func() {
defer close(ch)
for {
v, err := b.gen()
if err != nil {
ch <- MakeError[T](err)
} else {
ch <- MakeNext[T](v)
}
}
}()
return ch
}

66
reactive/errors.go Normal file
View file

@ -0,0 +1,66 @@
package reactive
import "context"
func CatchError[T any](source Observable[T], handle func(error, Observable[T]) Observable[T]) Observable[T] {
return &catchError[T]{source, handle}
}
type catchError[T any] struct {
source Observable[T]
handle func(error, Observable[T]) Observable[T]
}
func (c *catchError[T]) Observe(ctx context.Context) <-chan Notification[T] {
ch := make(chan Notification[T])
go func() {
defer close(ch)
for {
select {
case n := <-c.source.Observe(ctx):
if n.Kind() == ErrorNotification {
Copy(c.handle(n.Error(), c.source).Observe(ctx), ch)
} else {
ch <- n
}
case <-ctx.Done():
return
}
}
}()
return ch
}
func OnError[T any](source Observable[T], handle func(error)) Observable[T] {
return &onError[T]{source, handle}
}
type onError[T any] struct {
source Observable[T]
handle func(error)
}
func (o *onError[T]) Observe(ctx context.Context) <-chan Notification[T] {
ch := make(chan Notification[T])
go func() {
defer close(ch)
for {
select {
case n := <-o.source.Observe(ctx):
if n.Kind() == ErrorNotification {
o.handle(n.Error())
}
ch <- n
case <-ctx.Done():
return
}
}
}()
return ch
}

38
reactive/for_each.go Normal file
View file

@ -0,0 +1,38 @@
package reactive
import "context"
func ForEach[T any](source Observable[T], handle func(T)) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
for n := range source.Observe(context.Background()) {
if n.Kind() == NextNotification {
handle(n.Value())
}
}
}()
return done
}
func OnEach[T any](source Observable[T], next func(T), err func(error)) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
for n := range source.Observe(context.Background()) {
switch n.Kind() {
case NextNotification:
next(n.Value())
case ErrorNotification:
err(n.Error())
}
}
}()
return done
}

View file

@ -164,7 +164,7 @@ func NewChan[T any](bufferCapacity int, endpointCapacity int) *Chan[T] {
start: time.Now(), start: time.Now(),
written: make([]int64, size), written: make([]int64, size),
endpoints: endpoints[T]{ endpoints: endpoints[T]{
entry: make([]Endpoint, endpointCapacity), entry: make([]Endpoint[T], endpointCapacity),
}, },
} }
c.receivers = sync.NewCond(c) c.receivers = sync.NewCond(c)
@ -225,7 +225,7 @@ func (c *Chan[T]) commitData() uint64 {
func (c *Chan[T]) slideBuffer() bool { func (c *Chan[T]) slideBuffer() bool {
slowestCursor := parked slowestCursor := parked
spinlock := c.endpoints.Access(func(endpoints *endpoints) { spinlock := c.endpoints.Access(func(endpoints *endpoints[T]) {
for i := uint32(0); i < endpoints.len; i++ { for i := uint32(0); i < endpoints.len; i++ {
cursor := atomic.LoadUint64(&endpoints.entry[i].cursor) cursor := atomic.LoadUint64(&endpoints.entry[i].cursor)
if cursor < slowestCursor { if cursor < slowestCursor {
@ -300,7 +300,7 @@ func (c *Chan[T]) Send(value T) {
func (c *Chan[T]) Close(err error) { func (c *Chan[T]) Close(err error) {
if atomic.CompareAndSwapUint64(&c.channelState, active, closed) { if atomic.CompareAndSwapUint64(&c.channelState, active, closed) {
c.err = err c.err = err
c.endpoints.Access(func(endpoints *endpoints) { c.endpoints.Access(func(endpoints *endpoints[T]) {
for i := uint32(0); i < endpoints.len; i++ { for i := uint32(0); i < endpoints.len; i++ {
atomic.CompareAndSwapUint64(&endpoints.entry[i].endpointState, active, closed) atomic.CompareAndSwapUint64(&endpoints.entry[i].endpointState, active, closed)
} }

23
reactive/net.go Normal file
View file

@ -0,0 +1,23 @@
package reactive
import (
"net"
"net/netip"
)
func Listen(network string, address string) (Observable[net.Conn], error) {
lis, err := net.Listen(network, address)
if err != nil {
return nil, err
}
return BlockingLoop(lis.Accept), nil
}
func ListenTCP(network string, address netip.AddrPort) (Observable[*net.TCPConn], error) {
lis, err := net.ListenTCP("tcp", net.TCPAddrFromAddrPort(address))
if err != nil {
return nil, err
}
return BlockingLoop(lis.AcceptTCP), nil
}

View file

@ -15,8 +15,10 @@ func (f ObserverFn[T]) OnError(err error) {}
func (f ObserverFn[T]) OnComplete() {} func (f ObserverFn[T]) OnComplete() {}
func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]) { func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]) {
notifs := o.Observe(ctx) ewrap, cancel := context.WithCancel(ctx)
notifs := o.Observe(ewrap)
if notifs == nil { if notifs == nil {
cancel()
observer.OnComplete() observer.OnComplete()
return return
} }
@ -24,6 +26,7 @@ func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
cancel()
observer.OnComplete() observer.OnComplete()
return return
case n, ok := <-notifs: case n, ok := <-notifs:
@ -36,6 +39,8 @@ func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]
observer.OnNext(n.Value()) observer.OnNext(n.Value())
case ErrorNotification: case ErrorNotification:
observer.OnError(n.Error()) observer.OnError(n.Error())
cancel()
return
} }
} }
} }