initial commit

This commit is contained in:
Borodinov Ilya 2024-08-13 18:38:22 +03:00
commit be7cee87e0
Signed by: noth
GPG key ID: 75503B2EF596D1BD
15 changed files with 865 additions and 0 deletions

11
LICENSE Normal file
View file

@ -0,0 +1,11 @@
Anti-GitHub License (AGHL) v1 (based on the MIT license)
Copyright (c) 2024 Ilya Borodinov <borodinov.ilya@gmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
1. The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
2. The Software shall not be published, distributed, or otherwise made available on GitHub.com or any of its subdomains or affiliated websites. Failure to comply with this condition will immediately revoke the rights granted hereunder.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

28
cli/go.mod Normal file
View file

@ -0,0 +1,28 @@
module git.minkystudios.ru/noth/ultrago/cli
go 1.22.5
require (
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/bubbles v0.16.1 // indirect
github.com/charmbracelet/bubbletea v0.24.2 // indirect
github.com/charmbracelet/lipgloss v0.7.1 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/erikgeiser/promptkit v0.9.0 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-localereader v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/reflow v0.3.0 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/reeflective/readline v1.0.15 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/term v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
)

57
cli/go.sum Normal file
View file

@ -0,0 +1,57 @@
github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/charmbracelet/bubbles v0.16.1 h1:6uzpAAaT9ZqKssntbvZMlksWHruQLNxg49H5WdeuYSY=
github.com/charmbracelet/bubbles v0.16.1/go.mod h1:2QCp9LFlEsBQMvIYERr7Ww2H2bA7xen1idUDIzm/+Xc=
github.com/charmbracelet/bubbletea v0.24.2 h1:uaQIKx9Ai6Gdh5zpTbGiWpytMU+CfsPp06RaW2cx/SY=
github.com/charmbracelet/bubbletea v0.24.2/go.mod h1:XdrNrV4J8GiyshTtx3DNuYkR1FDaJmO3l2nejekbsgg=
github.com/charmbracelet/lipgloss v0.7.1 h1:17WMwi7N1b1rVWOjMT+rCh7sQkvDU75B2hbZpc5Kc1E=
github.com/charmbracelet/lipgloss v0.7.1/go.mod h1:yG0k3giv8Qj8edTCbbg6AlQ5e8KNWpFujkNawKNhE2c=
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 h1:q2hJAaP1k2wIvVRd/hEHD7lacgqrCPS+k8g1MndzfWY=
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk=
github.com/erikgeiser/promptkit v0.9.0 h1:3qL1mS/ntCrXdb8sTP/ka82CJ9kEQaGuYXNrYJkWYBc=
github.com/erikgeiser/promptkit v0.9.0/go.mod h1:pU9dtogSe3Jlc2AY77EP7R4WFP/vgD4v+iImC83KsCo=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98=
github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4=
github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88=
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI=
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo=
github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA=
github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo=
github.com/muesli/reflow v0.3.0 h1:IFsN6K9NfGtjeggFP+68I4chLZV2yIKsXJFNZ+eWh6s=
github.com/muesli/reflow v0.3.0/go.mod h1:pbwTDkVPibjO2kyvBQRBxTWEEGDGq0FlB1BIKtnHY/8=
github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo=
github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8=
github.com/reeflective/readline v1.0.15 h1:uB/M1sAc2yZGO14Ujgr/imLwQXqGdOhDDWAEHF+MBaE=
github.com/reeflective/readline v1.0.15/go.mod h1:3iOe/qyb2jEy0KqLrNlb/CojBVqxga9ACqz/VU22H6A=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw=
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=

33
cli/input.go Normal file
View file

@ -0,0 +1,33 @@
package cli
import "github.com/erikgeiser/promptkit/textinput"
func WithPretty(inp *textinput.TextInput) {
const template = `
{{- Foreground "2" (Bold "?") }} {{ .Prompt }}
{{- Foreground "8" ">" }} {{ .Input }}
`
inp.Template = template
}
type InputOption func(*textinput.TextInput)
func WithDefault(defaultValue string) InputOption {
return func(inp *textinput.TextInput) {
inp.Placeholder = defaultValue
}
}
func WithValidate(validate func(string) error) InputOption {
return func(inp *textinput.TextInput) {
inp.Validate = validate
}
}
func Ask(prompt string, opts ...InputOption) (string, error) {
inp := textinput.New(prompt)
for _, opt := range opts {
opt(inp)
}
return inp.RunPrompt()
}

6
go.work Normal file
View file

@ -0,0 +1,6 @@
go 1.22.5
use (
./cli
./reactive
)

9
go.work.sum Normal file
View file

@ -0,0 +1,9 @@
github.com/charmbracelet/harmonica v0.2.0/go.mod h1:KSri/1RMQOZLbw7AHqgcBycp8pgJnQMYYT8QZRqZ1Ao=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/sahilm/fuzzy v0.1.0/go.mod h1:VFvziUEIMCrT6A6tw2RFIXPXXmzXbOsSHF0DOI8ZK9Y=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=

7
reactive/channels.go Normal file
View file

@ -0,0 +1,7 @@
package reactive
func Copy[T any](in <-chan T, out chan<- T) {
for v := range in {
out <- v
}
}

3
reactive/go.mod Normal file
View file

@ -0,0 +1,3 @@
module git.minkystudios.ru/noth/ultrago/reactive
go 1.22.5

View file

@ -0,0 +1,406 @@
package multicast
import (
"fmt"
"math"
"runtime"
"sync"
"sync/atomic"
"time"
)
const _PADDING = 1 // 0 turns padding off, 1 turns it on.
const _EXTRA_PADDING = 0 * 64 // multiples of 64, benefits inconclusive.
type pad60 [_PADDING * (_EXTRA_PADDING + 60)]byte
type pad56 [_PADDING * (_EXTRA_PADDING + 56)]byte
type pad48 [_PADDING * (_EXTRA_PADDING + 48)]byte
type pad40 [_PADDING * (_EXTRA_PADDING + 40)]byte
type pad32 [_PADDING * (_EXTRA_PADDING + 32)]byte
// Activity of committer
const (
resting uint32 = iota
working
)
// Activity of endpoints
const (
idling uint32 = iota
enumerating
creating
)
// State of endpoint and channel
const (
active uint64 = iota
canceled
closed
)
// Cursor is parked so it does not influence advancing the commit index.
const (
parked uint64 = math.MaxUint64
)
const (
// ReplayAll can be passed to NewEndpoint to retain as many of the
// previously sent messages as possible that are still in the buffer.
ReplayAll uint64 = math.MaxUint64
)
// Chan is a fast, concurrent multi-(casting,sending,receiving) buffered
// channel. It is implemented using only sync/atomic operations. Spinlocks using
// runtime.Gosched() are used in situations where goroutines are waiting or
// contending for resources.
type Chan[T any] struct {
buffer []T
_________a pad40
begin uint64
_________b pad56
end uint64
_________c pad56
commit uint64
_________d pad56
mod uint64
_________e pad56
endpoints endpoints[T]
err error
____________f pad48
channelState uint64 // active, closed
____________g pad56
write uint64
_________________h pad56
start time.Time
_________________i pad40
written []int64 // nanoseconds since start
_________________j pad40
committerActivity uint32 // resting, working
_________________k pad60
receivers *sync.Cond
_________________l pad56
}
type endpoints[T any] struct {
entry []Endpoint[T]
len uint32
endpointsActivity uint32 // idling, enumerating, creating
________ pad32
}
type ChannelError string
func (e ChannelError) Error() string { return string(e) }
// ErrOutOfEndpoints is returned by NewEndpoint when the maximum number of
// endpoints has already been created.
const ErrOutOfEndpoints = ChannelError("out of endpoints")
func (e *endpoints[T]) NewForChan(c *Chan[T], keep uint64) (*Endpoint[T], error) {
for !atomic.CompareAndSwapUint32(&e.endpointsActivity, idling, creating) {
runtime.Gosched()
}
defer atomic.StoreUint32(&e.endpointsActivity, idling)
var start uint64
commit := c.commitData()
begin := atomic.LoadUint64(&c.begin)
if commit-begin <= keep {
start = begin
} else {
start = commit - keep
}
if int(e.len) == len(e.entry) {
for index := uint32(0); index < e.len; index++ {
ep := &e.entry[index]
if atomic.CompareAndSwapUint64(&ep.cursor, parked, start) {
ep.endpointState = atomic.LoadUint64(&c.channelState)
ep.lastActive = time.Now()
return ep, nil
}
}
return nil, ErrOutOfEndpoints
}
ep := &e.entry[e.len]
ep.Chan = c
ep.cursor = start
ep.endpointState = atomic.LoadUint64(&c.channelState)
ep.lastActive = time.Now()
e.len++
return ep, nil
}
func (e *endpoints[T]) Access(access func(*endpoints[T])) bool {
contention := false
for !atomic.CompareAndSwapUint32(&e.endpointsActivity, idling, enumerating) {
runtime.Gosched()
contention = true
}
access(e)
atomic.StoreUint32(&e.endpointsActivity, idling)
return !contention
}
// NewChan creates a new channel. The parameters bufferCapacity and
// endpointCapacity determine the size of the message buffer and maximum
// number of concurrent receiving endpoints respectively.
//
// Note that bufferCapacity is always scaled up to a power of 2 so e.g.
// specifying 400 will create a buffer of 512 (2^9). Also because of this a
// bufferCapacity of 0 is scaled up to 1 (2^0).
func NewChan[T any](bufferCapacity int, endpointCapacity int) *Chan[T] {
size := uint64(1) << uint(math.Ceil(math.Log2(float64(bufferCapacity))))
c := &Chan[T]{
end: size,
mod: size - 1,
buffer: make([]T, size),
start: time.Now(),
written: make([]int64, size),
endpoints: endpoints[T]{
entry: make([]Endpoint, endpointCapacity),
},
}
c.receivers = sync.NewCond(c)
return c
}
// Lock, empty method so we can pass *Chan to sync.NewCond as a Locker.
func (c *Chan[T]) Lock() {}
// Unlock, empty method so we can pass *Chan to sync.NewCond as a Locker.
func (c *Chan[T]) Unlock() {}
// Endpoint is returned by a call to NewEndpoint on the channel. Every
// endpoint should be used by only a single goroutine, so no sharing between
// goroutines.
type Endpoint[T any] struct {
*Chan[T]
_____________a pad56
cursor uint64
_____________b pad56
endpointState uint64 // active, canceled, closed
_____________c pad56
lastActive time.Time // track activity to deterime when to sleep
_____________d pad40
endpointClosed uint64 // active, closed
_____________e pad56
}
func (c *Chan[T]) commitData() uint64 {
commit := atomic.LoadUint64(&c.commit)
if commit >= atomic.LoadUint64(&c.write) {
return commit
}
if !atomic.CompareAndSwapUint32(&c.committerActivity, resting, working) {
return commit
}
commit = atomic.LoadUint64(&c.commit)
newcommit := commit
for ; atomic.LoadInt64(&c.written[newcommit&c.mod])&1 == 1; newcommit++ {
atomic.AddInt64(&c.written[newcommit&c.mod], -1)
if newcommit >= atomic.LoadUint64(&c.end) {
break
}
}
write := atomic.LoadUint64(&c.write)
if newcommit > write {
panic(fmt.Sprintf("commitData: range error (commit=%d,write=%d,newcommit=%d)", commit, write, newcommit))
}
if newcommit > commit {
if !atomic.CompareAndSwapUint64(&c.commit, commit, newcommit) {
panic(fmt.Sprintf("commitData; swap error (c.commit=%d,%d,%d)", c.commit, commit, newcommit))
}
c.receivers.Broadcast()
}
atomic.StoreUint32(&c.committerActivity, resting)
return atomic.LoadUint64(&c.commit)
}
func (c *Chan[T]) slideBuffer() bool {
slowestCursor := parked
spinlock := c.endpoints.Access(func(endpoints *endpoints) {
for i := uint32(0); i < endpoints.len; i++ {
cursor := atomic.LoadUint64(&endpoints.entry[i].cursor)
if cursor < slowestCursor {
slowestCursor = cursor
}
}
if atomic.LoadUint64(&c.begin) < slowestCursor && slowestCursor <= atomic.LoadUint64(&c.end) {
if c.mod < 16 {
atomic.AddUint64(&c.begin, 1)
atomic.AddUint64(&c.end, 1)
} else {
atomic.StoreUint64(&c.begin, slowestCursor)
atomic.StoreUint64(&c.end, slowestCursor+c.mod+1)
}
} else {
slowestCursor = parked
}
})
if slowestCursor == parked {
if spinlock {
runtime.Gosched()
}
if atomic.LoadUint64(&c.channelState) != active {
return false
}
}
return true
}
// FastSend can be used to send values to the channel from a SINGLE goroutine.
// Also, this does not record the time a message was sent, so the maxAge value
// passed to Range will be ignored.
//
// Note, that when the number of unread messages has reached bufferCapacity, then
// the call to FastSend will block until the slowest Endpoint has read another
// message.
func (c *Chan[T]) FastSend(value T) {
for c.commit == c.end {
if !c.slideBuffer() {
return
}
}
c.buffer[c.commit&c.mod] = value
atomic.AddUint64(&c.commit, 1)
c.receivers.Broadcast()
}
// Send can be used by concurrent goroutines to send values to the channel.
//
// Note, that when the number of unread messages has reached bufferCapacity, then
// the call to Send will block until the slowest Endpoint has read another
// message.
func (c *Chan[T]) Send(value T) {
write := atomic.AddUint64(&c.write, 1) - 1
for write >= atomic.LoadUint64(&c.end) {
if !c.slideBuffer() {
return
}
}
c.buffer[write&c.mod] = value
updated := time.Since(c.start).Nanoseconds()
if updated == 0 {
panic("clock failure; zero duration measured")
}
atomic.StoreInt64(&c.written[write&c.mod], updated<<1+1)
c.receivers.Broadcast()
}
// Close will close the channel. Pass in an error or nil. Endpoints continue to
// receive data until the buffer is empty. Only then will the close notification
// be delivered to the Range function.
func (c *Chan[T]) Close(err error) {
if atomic.CompareAndSwapUint64(&c.channelState, active, closed) {
c.err = err
c.endpoints.Access(func(endpoints *endpoints) {
for i := uint32(0); i < endpoints.len; i++ {
atomic.CompareAndSwapUint64(&endpoints.entry[i].endpointState, active, closed)
}
})
}
c.receivers.Broadcast()
}
// Closed returns true when the channel was closed using the Close method.
func (c *Chan[T]) Closed() bool {
return atomic.LoadUint64(&c.channelState) >= closed
}
// NewEndpoint will create a new channel endpoint that can be used to receive
// from the channel. The argument keep specifies how many entries of the
// existing channel buffer to keep.
//
// After Close is called on the channel, any endpoints created after that
// will still receive the number of messages as indicated in the keep parameter
// and then subsequently the close.
//
// An endpoint that is canceled or read until it is exhausted (after channel was
// closed) will be reused by NewEndpoint.
func (c *Chan[T]) NewEndpoint(keep uint64) (*Endpoint[T], error) {
return c.endpoints.NewForChan(c, keep)
}
// Range will call the passed in foreach function with all the messages in
// the buffer, followed by all the messages received. When the foreach function
// returns true Range will continue, when you return false this is the same as
// calling Cancel. When canceled the foreach will never be called again.
// Passing a maxAge duration other than 0 will skip messages that are older
// than maxAge.
//
// When the channel is closed, eventually when the buffer is exhausted the close
// with optional error will be notified by calling foreach one last time with
// the closed parameter set to true.
func (e *Endpoint[T]) Range(next func(value T, closed bool) bool, err func(err error, closed bool) bool, maxAge time.Duration) {
e.lastActive = time.Now()
for {
commit := e.commitData()
for ; e.cursor == commit; commit = e.commitData() {
if atomic.CompareAndSwapUint64(&e.endpointState, canceled, canceled) {
atomic.StoreUint64(&e.cursor, parked)
return
}
if atomic.LoadUint64(&e.commit) < atomic.LoadUint64(&e.write) {
if e.endpointClosed == 1 {
panic(fmt.Sprintf("data written after closing endpoint; commit(%d) write(%d)",
atomic.LoadUint64(&e.commit), atomic.LoadUint64(&e.write)))
}
runtime.Gosched()
e.lastActive = time.Now()
} else {
now := time.Now()
if now.Before(e.lastActive.Add(1 * time.Millisecond)) {
if atomic.CompareAndSwapUint64(&e.endpointState, closed, closed) {
e.endpointClosed = 1
}
runtime.Gosched()
} else if now.Before(e.lastActive.Add(250 * time.Millisecond)) {
if atomic.CompareAndSwapUint64(&e.endpointState, closed, closed) {
err(e.err, true)
atomic.StoreUint64(&e.cursor, parked)
return
}
runtime.Gosched()
} else {
e.receivers.Wait()
e.lastActive = time.Now()
}
}
}
for ; e.cursor != commit; atomic.AddUint64(&e.cursor, 1) {
item := e.buffer[e.cursor&e.mod]
emit := true
if maxAge != 0 {
stale := time.Since(e.start).Nanoseconds() - maxAge.Nanoseconds()
updated := atomic.LoadInt64(&e.written[e.cursor&e.mod]) >> 1
if updated != 0 && updated <= stale {
emit = false
}
}
if emit && !next(item, false) {
atomic.StoreUint64(&e.endpointState, canceled)
}
if atomic.LoadUint64(&e.endpointState) == canceled {
atomic.StoreUint64(&e.cursor, parked)
return
}
}
e.lastActive = time.Now()
}
}
// Cancel cancels the endpoint, making it available to be reused when
// NewEndpoint is called on the channel. When canceled the foreach function
// passed to Range is not notified, instead just never called again.
func (e *Endpoint[T]) Cancel() {
atomic.CompareAndSwapUint64(&e.endpointState, active, canceled)
e.receivers.Broadcast()
}

49
reactive/notification.go Normal file
View file

@ -0,0 +1,49 @@
package reactive
type NotificationKind int
const (
NextNotification NotificationKind = iota
ErrorNotification
)
type Notification[T any] interface {
Value() T
Error() error
Kind() NotificationKind
}
func MakeNext[T any](v T) Notification[T] {
return val[T]{v}
}
func MakeError[T any](e error) Notification[T] {
return err[T]{e}
}
type val[T any] struct {
value T
}
type err[T any] struct {
err error
}
func (v val[T]) Value() T {
return v.value
}
func (v val[T]) Error() error {
return nil
}
func (v val[T]) Kind() NotificationKind {
return NextNotification
}
func (e err[T]) Value() T {
panic("tried to .Value() on an Error notification")
}
func (e err[T]) Error() error {
return e.err
}
func (e err[T]) Kind() NotificationKind {
return ErrorNotification
}

45
reactive/observable.go Normal file
View file

@ -0,0 +1,45 @@
package reactive
import "context"
type Observable[T any] interface {
Observe(ctx context.Context) <-chan Notification[T]
}
type FnObservable[T any] func(ctx context.Context) <-chan Notification[T]
func (f FnObservable[T]) Observe(ctx context.Context) <-chan Notification[T] {
return f(ctx)
}
func Cold[T any](f func(ctx context.Context) <-chan Notification[T]) Observable[T] {
return FnObservable[T](f)
}
func LazyHot[T any](f func(chan<- Notification[T])) Observable[T] {
var lazy chan Notification[T] = nil
return FnObservable[T](func(ctx context.Context) <-chan Notification[T] {
if lazy == nil {
lazy = make(chan Notification[T])
go func() {
f(lazy)
}()
}
return lazy
})
}
func Hot[T any](c <-chan Notification[T]) Observable[T] {
return FnObservable[T](func(ctx context.Context) <-chan Notification[T] { return c })
}
func FromChannel[T any](c <-chan T) Observable[T] {
ch := make(chan Notification[T])
go func() {
for v := range c {
ch <- MakeNext(v)
}
}()
return Hot(ch)
}

42
reactive/observer.go Normal file
View file

@ -0,0 +1,42 @@
package reactive
import "context"
type Observer[T any] interface {
OnNext(T)
OnError(error)
OnComplete()
}
type ObserverFn[T any] func(T)
func (f ObserverFn[T]) OnNext(v T) { f(v) }
func (f ObserverFn[T]) OnError(err error) {}
func (f ObserverFn[T]) OnComplete() {}
func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]) {
notifs := o.Observe(ctx)
if notifs == nil {
observer.OnComplete()
return
}
for {
select {
case <-ctx.Done():
observer.OnComplete()
return
case n, ok := <-notifs:
if !ok {
observer.OnComplete()
return
}
switch n.Kind() {
case NextNotification:
observer.OnNext(n.Value())
case ErrorNotification:
observer.OnError(n.Error())
}
}
}
}

74
reactive/subject.go Normal file
View file

@ -0,0 +1,74 @@
package reactive
import (
"context"
"codeberg.org/noth/ultrago/reactive/multicast"
)
type Subject[T any] struct {
multi *multicast.Chan[Notification[T]]
err error
}
func NewSubject[T any]() *Subject[T] {
return &Subject[T]{
multi: multicast.NewChan[Notification[T]](0, 0),
}
}
func (s *Subject[T]) Observe(ctx context.Context) <-chan Notification[T] {
if s.err != nil {
ch := make(chan Notification[T])
ch <- MakeError[T](s.err)
return ch
}
if s.multi.Closed() {
return nil
}
endpoint, err := s.multi.NewEndpoint(0)
if err != nil {
// revisit behavior? maybe return a channel with MakeError(err)?
panic(err)
}
ch := make(chan Notification[T])
endpoint.Range(func(value Notification[T], closed bool) bool {
ch <- value
return true
}, func(err error, closed bool) bool {
if err != nil {
ch <- MakeError[T](err)
}
if closed {
close(ch)
}
return true
}, 0)
go func() {
<-ctx.Done()
endpoint.Cancel()
}()
return ch
}
func (s *Subject[T]) OnNext(value T) {
s.multi.FastSend(MakeNext[T](value))
}
func (s *Subject[T]) OnError(err error) {
s.multi.FastSend(MakeError[T](err))
s.multi.Close(err)
}
func (s *Subject[T]) OnCompleted() {
s.multi.Close(nil)
}
func (s *Subject[T]) Closed() bool {
return s.multi.Closed()
}

54
reactive/switch_map.go Normal file
View file

@ -0,0 +1,54 @@
package reactive
import (
"context"
"sync/atomic"
)
func SwitchMap[T, R any](source Observable[T], project func(T) Observable[R]) Observable[R] {
ctx, cancel := context.WithCancel(context.Background())
return &switchMap[T, R]{ctx, cancel, source, project, false, atomic.Bool{}}
}
type switchMap[T, R any] struct {
ctx context.Context
cancel context.CancelFunc
source Observable[T]
project func(T) Observable[R]
completed bool
innerCompleted atomic.Bool
}
func (s *switchMap[T, R]) reset() {
s.cancel()
s.ctx, s.cancel = context.WithCancel(context.Background())
}
func (s *switchMap[T, R]) Observe(ctx context.Context) <-chan Notification[R] {
observing := s.source.Observe(ctx)
res := make(chan Notification[R])
go func() {
for notification := range observing {
switch notification.Kind() {
case NextNotification:
s.innerCompleted.Store(false)
s.reset()
inner := s.project(notification.Value())
go func() {
Copy(inner.Observe(s.ctx), res)
s.innerCompleted.Store(true)
}()
case ErrorNotification:
res <- MakeError[R](notification.Error())
}
}
if s.innerCompleted.Load() {
close(res)
}
}()
return res
}

41
reactive/tap.go Normal file
View file

@ -0,0 +1,41 @@
package reactive
import "context"
func Tap[T any](source Observable[T], observer Observer[T]) Observable[T] {
return &tap[T]{source, observer, false}
}
func TapResponse[T any](source Observable[T], observer Observer[T]) Observable[T] {
return &tap[T]{source, observer, true}
}
type tap[T any] struct {
source Observable[T]
observer Observer[T]
noError bool
}
func (t *tap[T]) Observe(ctx context.Context) <-chan Notification[T] {
observing := t.source.Observe(ctx)
res := make(chan Notification[T])
go func() {
for notification := range observing {
switch notification.Kind() {
case NextNotification:
res <- notification
t.observer.OnNext(notification.Value())
case ErrorNotification:
res <- notification
if !t.noError {
t.observer.OnError(notification.Error())
}
}
}
t.observer.OnComplete()
close(res)
}()
return res
}