commit be7cee87e0f5e20fa05916906355356139c2d106 Author: Borodinov Ilya Date: Tue Aug 13 18:38:22 2024 +0300 initial commit diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..277c9be --- /dev/null +++ b/LICENSE @@ -0,0 +1,11 @@ +Anti-GitHub License (AGHL) v1 (based on the MIT license) + +Copyright (c) 2024 Ilya Borodinov + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +1. The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +2. The Software shall not be published, distributed, or otherwise made available on GitHub.com or any of its subdomains or affiliated websites. Failure to comply with this condition will immediately revoke the rights granted hereunder. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/cli/go.mod b/cli/go.mod new file mode 100644 index 0000000..cc4dc4e --- /dev/null +++ b/cli/go.mod @@ -0,0 +1,28 @@ +module git.minkystudios.ru/noth/ultrago/cli + +go 1.22.5 + +require ( + github.com/atotto/clipboard v0.1.4 // indirect + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/charmbracelet/bubbles v0.16.1 // indirect + github.com/charmbracelet/bubbletea v0.24.2 // indirect + github.com/charmbracelet/lipgloss v0.7.1 // indirect + github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect + github.com/erikgeiser/promptkit v0.9.0 // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-localereader v0.0.1 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect + github.com/muesli/cancelreader v0.2.2 // indirect + github.com/muesli/reflow v0.3.0 // indirect + github.com/muesli/termenv v0.15.2 // indirect + github.com/reeflective/readline v1.0.15 // indirect + github.com/rivo/uniseg v0.4.4 // indirect + golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect + golang.org/x/sync v0.3.0 // indirect + golang.org/x/sys v0.11.0 // indirect + golang.org/x/term v0.11.0 // indirect + golang.org/x/text v0.12.0 // indirect +) diff --git a/cli/go.sum b/cli/go.sum new file mode 100644 index 0000000..9f9943f --- /dev/null +++ b/cli/go.sum @@ -0,0 +1,57 @@ +github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4= +github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI= +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/charmbracelet/bubbles v0.16.1 h1:6uzpAAaT9ZqKssntbvZMlksWHruQLNxg49H5WdeuYSY= +github.com/charmbracelet/bubbles v0.16.1/go.mod h1:2QCp9LFlEsBQMvIYERr7Ww2H2bA7xen1idUDIzm/+Xc= +github.com/charmbracelet/bubbletea v0.24.2 h1:uaQIKx9Ai6Gdh5zpTbGiWpytMU+CfsPp06RaW2cx/SY= +github.com/charmbracelet/bubbletea v0.24.2/go.mod h1:XdrNrV4J8GiyshTtx3DNuYkR1FDaJmO3l2nejekbsgg= +github.com/charmbracelet/lipgloss v0.7.1 h1:17WMwi7N1b1rVWOjMT+rCh7sQkvDU75B2hbZpc5Kc1E= +github.com/charmbracelet/lipgloss v0.7.1/go.mod h1:yG0k3giv8Qj8edTCbbg6AlQ5e8KNWpFujkNawKNhE2c= +github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 h1:q2hJAaP1k2wIvVRd/hEHD7lacgqrCPS+k8g1MndzfWY= +github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= +github.com/erikgeiser/promptkit v0.9.0 h1:3qL1mS/ntCrXdb8sTP/ka82CJ9kEQaGuYXNrYJkWYBc= +github.com/erikgeiser/promptkit v0.9.0/go.mod h1:pU9dtogSe3Jlc2AY77EP7R4WFP/vgD4v+iImC83KsCo= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= +github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4= +github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= +github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo= +github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= +github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/reflow v0.3.0 h1:IFsN6K9NfGtjeggFP+68I4chLZV2yIKsXJFNZ+eWh6s= +github.com/muesli/reflow v0.3.0/go.mod h1:pbwTDkVPibjO2kyvBQRBxTWEEGDGq0FlB1BIKtnHY/8= +github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= +github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= +github.com/reeflective/readline v1.0.15 h1:uB/M1sAc2yZGO14Ujgr/imLwQXqGdOhDDWAEHF+MBaE= +github.com/reeflective/readline v1.0.15/go.mod h1:3iOe/qyb2jEy0KqLrNlb/CojBVqxga9ACqz/VU22H6A= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= +github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= +golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= diff --git a/cli/input.go b/cli/input.go new file mode 100644 index 0000000..31124a6 --- /dev/null +++ b/cli/input.go @@ -0,0 +1,33 @@ +package cli + +import "github.com/erikgeiser/promptkit/textinput" + +func WithPretty(inp *textinput.TextInput) { + const template = ` +{{- Foreground "2" (Bold "?") }} {{ .Prompt }} +{{- Foreground "8" ">" }} {{ .Input }} + ` + inp.Template = template +} + +type InputOption func(*textinput.TextInput) + +func WithDefault(defaultValue string) InputOption { + return func(inp *textinput.TextInput) { + inp.Placeholder = defaultValue + } +} + +func WithValidate(validate func(string) error) InputOption { + return func(inp *textinput.TextInput) { + inp.Validate = validate + } +} + +func Ask(prompt string, opts ...InputOption) (string, error) { + inp := textinput.New(prompt) + for _, opt := range opts { + opt(inp) + } + return inp.RunPrompt() +} diff --git a/go.work b/go.work new file mode 100644 index 0000000..7e65098 --- /dev/null +++ b/go.work @@ -0,0 +1,6 @@ +go 1.22.5 + +use ( + ./cli + ./reactive +) diff --git a/go.work.sum b/go.work.sum new file mode 100644 index 0000000..51d10cf --- /dev/null +++ b/go.work.sum @@ -0,0 +1,9 @@ +github.com/charmbracelet/harmonica v0.2.0/go.mod h1:KSri/1RMQOZLbw7AHqgcBycp8pgJnQMYYT8QZRqZ1Ao= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/sahilm/fuzzy v0.1.0/go.mod h1:VFvziUEIMCrT6A6tw2RFIXPXXmzXbOsSHF0DOI8ZK9Y= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= diff --git a/reactive/channels.go b/reactive/channels.go new file mode 100644 index 0000000..bd3dbf3 --- /dev/null +++ b/reactive/channels.go @@ -0,0 +1,7 @@ +package reactive + +func Copy[T any](in <-chan T, out chan<- T) { + for v := range in { + out <- v + } +} diff --git a/reactive/go.mod b/reactive/go.mod new file mode 100644 index 0000000..b9037b0 --- /dev/null +++ b/reactive/go.mod @@ -0,0 +1,3 @@ +module git.minkystudios.ru/noth/ultrago/reactive + +go 1.22.5 diff --git a/reactive/multicast/multicast.go b/reactive/multicast/multicast.go new file mode 100644 index 0000000..4525107 --- /dev/null +++ b/reactive/multicast/multicast.go @@ -0,0 +1,406 @@ +package multicast + +import ( + "fmt" + "math" + "runtime" + "sync" + "sync/atomic" + "time" +) + +const _PADDING = 1 // 0 turns padding off, 1 turns it on. + +const _EXTRA_PADDING = 0 * 64 // multiples of 64, benefits inconclusive. + +type pad60 [_PADDING * (_EXTRA_PADDING + 60)]byte + +type pad56 [_PADDING * (_EXTRA_PADDING + 56)]byte + +type pad48 [_PADDING * (_EXTRA_PADDING + 48)]byte + +type pad40 [_PADDING * (_EXTRA_PADDING + 40)]byte + +type pad32 [_PADDING * (_EXTRA_PADDING + 32)]byte + +// Activity of committer +const ( + resting uint32 = iota + working +) + +// Activity of endpoints +const ( + idling uint32 = iota + enumerating + creating +) + +// State of endpoint and channel +const ( + active uint64 = iota + canceled + closed +) + +// Cursor is parked so it does not influence advancing the commit index. +const ( + parked uint64 = math.MaxUint64 +) + +const ( + // ReplayAll can be passed to NewEndpoint to retain as many of the + // previously sent messages as possible that are still in the buffer. + ReplayAll uint64 = math.MaxUint64 +) + +// Chan is a fast, concurrent multi-(casting,sending,receiving) buffered +// channel. It is implemented using only sync/atomic operations. Spinlocks using +// runtime.Gosched() are used in situations where goroutines are waiting or +// contending for resources. +type Chan[T any] struct { + buffer []T + _________a pad40 + begin uint64 + _________b pad56 + end uint64 + _________c pad56 + commit uint64 + _________d pad56 + mod uint64 + _________e pad56 + endpoints endpoints[T] + + err error + ____________f pad48 + channelState uint64 // active, closed + ____________g pad56 + + write uint64 + _________________h pad56 + start time.Time + _________________i pad40 + written []int64 // nanoseconds since start + _________________j pad40 + committerActivity uint32 // resting, working + _________________k pad60 + + receivers *sync.Cond + _________________l pad56 +} + +type endpoints[T any] struct { + entry []Endpoint[T] + len uint32 + endpointsActivity uint32 // idling, enumerating, creating + ________ pad32 +} + +type ChannelError string + +func (e ChannelError) Error() string { return string(e) } + +// ErrOutOfEndpoints is returned by NewEndpoint when the maximum number of +// endpoints has already been created. +const ErrOutOfEndpoints = ChannelError("out of endpoints") + +func (e *endpoints[T]) NewForChan(c *Chan[T], keep uint64) (*Endpoint[T], error) { + for !atomic.CompareAndSwapUint32(&e.endpointsActivity, idling, creating) { + runtime.Gosched() + } + defer atomic.StoreUint32(&e.endpointsActivity, idling) + var start uint64 + commit := c.commitData() + begin := atomic.LoadUint64(&c.begin) + if commit-begin <= keep { + start = begin + } else { + start = commit - keep + } + if int(e.len) == len(e.entry) { + for index := uint32(0); index < e.len; index++ { + ep := &e.entry[index] + if atomic.CompareAndSwapUint64(&ep.cursor, parked, start) { + ep.endpointState = atomic.LoadUint64(&c.channelState) + ep.lastActive = time.Now() + return ep, nil + } + } + return nil, ErrOutOfEndpoints + } + ep := &e.entry[e.len] + ep.Chan = c + ep.cursor = start + ep.endpointState = atomic.LoadUint64(&c.channelState) + ep.lastActive = time.Now() + e.len++ + return ep, nil +} + +func (e *endpoints[T]) Access(access func(*endpoints[T])) bool { + contention := false + for !atomic.CompareAndSwapUint32(&e.endpointsActivity, idling, enumerating) { + runtime.Gosched() + contention = true + } + access(e) + atomic.StoreUint32(&e.endpointsActivity, idling) + return !contention +} + +// NewChan creates a new channel. The parameters bufferCapacity and +// endpointCapacity determine the size of the message buffer and maximum +// number of concurrent receiving endpoints respectively. +// +// Note that bufferCapacity is always scaled up to a power of 2 so e.g. +// specifying 400 will create a buffer of 512 (2^9). Also because of this a +// bufferCapacity of 0 is scaled up to 1 (2^0). +func NewChan[T any](bufferCapacity int, endpointCapacity int) *Chan[T] { + size := uint64(1) << uint(math.Ceil(math.Log2(float64(bufferCapacity)))) + c := &Chan[T]{ + end: size, + mod: size - 1, + buffer: make([]T, size), + start: time.Now(), + written: make([]int64, size), + endpoints: endpoints[T]{ + entry: make([]Endpoint, endpointCapacity), + }, + } + c.receivers = sync.NewCond(c) + return c +} + +// Lock, empty method so we can pass *Chan to sync.NewCond as a Locker. +func (c *Chan[T]) Lock() {} + +// Unlock, empty method so we can pass *Chan to sync.NewCond as a Locker. +func (c *Chan[T]) Unlock() {} + +// Endpoint is returned by a call to NewEndpoint on the channel. Every +// endpoint should be used by only a single goroutine, so no sharing between +// goroutines. +type Endpoint[T any] struct { + *Chan[T] + _____________a pad56 + cursor uint64 + _____________b pad56 + endpointState uint64 // active, canceled, closed + _____________c pad56 + lastActive time.Time // track activity to deterime when to sleep + _____________d pad40 + endpointClosed uint64 // active, closed + _____________e pad56 +} + +func (c *Chan[T]) commitData() uint64 { + commit := atomic.LoadUint64(&c.commit) + if commit >= atomic.LoadUint64(&c.write) { + return commit + } + if !atomic.CompareAndSwapUint32(&c.committerActivity, resting, working) { + return commit + } + commit = atomic.LoadUint64(&c.commit) + newcommit := commit + for ; atomic.LoadInt64(&c.written[newcommit&c.mod])&1 == 1; newcommit++ { + atomic.AddInt64(&c.written[newcommit&c.mod], -1) + if newcommit >= atomic.LoadUint64(&c.end) { + break + } + } + write := atomic.LoadUint64(&c.write) + if newcommit > write { + panic(fmt.Sprintf("commitData: range error (commit=%d,write=%d,newcommit=%d)", commit, write, newcommit)) + } + if newcommit > commit { + if !atomic.CompareAndSwapUint64(&c.commit, commit, newcommit) { + panic(fmt.Sprintf("commitData; swap error (c.commit=%d,%d,%d)", c.commit, commit, newcommit)) + } + c.receivers.Broadcast() + } + atomic.StoreUint32(&c.committerActivity, resting) + return atomic.LoadUint64(&c.commit) +} + +func (c *Chan[T]) slideBuffer() bool { + slowestCursor := parked + spinlock := c.endpoints.Access(func(endpoints *endpoints) { + for i := uint32(0); i < endpoints.len; i++ { + cursor := atomic.LoadUint64(&endpoints.entry[i].cursor) + if cursor < slowestCursor { + slowestCursor = cursor + } + } + if atomic.LoadUint64(&c.begin) < slowestCursor && slowestCursor <= atomic.LoadUint64(&c.end) { + if c.mod < 16 { + atomic.AddUint64(&c.begin, 1) + atomic.AddUint64(&c.end, 1) + } else { + atomic.StoreUint64(&c.begin, slowestCursor) + atomic.StoreUint64(&c.end, slowestCursor+c.mod+1) + } + } else { + slowestCursor = parked + } + }) + if slowestCursor == parked { + if spinlock { + runtime.Gosched() + } + if atomic.LoadUint64(&c.channelState) != active { + return false + } + } + return true +} + +// FastSend can be used to send values to the channel from a SINGLE goroutine. +// Also, this does not record the time a message was sent, so the maxAge value +// passed to Range will be ignored. +// +// Note, that when the number of unread messages has reached bufferCapacity, then +// the call to FastSend will block until the slowest Endpoint has read another +// message. +func (c *Chan[T]) FastSend(value T) { + for c.commit == c.end { + if !c.slideBuffer() { + return + } + } + c.buffer[c.commit&c.mod] = value + atomic.AddUint64(&c.commit, 1) + c.receivers.Broadcast() +} + +// Send can be used by concurrent goroutines to send values to the channel. +// +// Note, that when the number of unread messages has reached bufferCapacity, then +// the call to Send will block until the slowest Endpoint has read another +// message. +func (c *Chan[T]) Send(value T) { + write := atomic.AddUint64(&c.write, 1) - 1 + for write >= atomic.LoadUint64(&c.end) { + if !c.slideBuffer() { + return + } + } + c.buffer[write&c.mod] = value + updated := time.Since(c.start).Nanoseconds() + if updated == 0 { + panic("clock failure; zero duration measured") + } + atomic.StoreInt64(&c.written[write&c.mod], updated<<1+1) + c.receivers.Broadcast() +} + +// Close will close the channel. Pass in an error or nil. Endpoints continue to +// receive data until the buffer is empty. Only then will the close notification +// be delivered to the Range function. +func (c *Chan[T]) Close(err error) { + if atomic.CompareAndSwapUint64(&c.channelState, active, closed) { + c.err = err + c.endpoints.Access(func(endpoints *endpoints) { + for i := uint32(0); i < endpoints.len; i++ { + atomic.CompareAndSwapUint64(&endpoints.entry[i].endpointState, active, closed) + } + }) + } + c.receivers.Broadcast() +} + +// Closed returns true when the channel was closed using the Close method. +func (c *Chan[T]) Closed() bool { + return atomic.LoadUint64(&c.channelState) >= closed +} + +// NewEndpoint will create a new channel endpoint that can be used to receive +// from the channel. The argument keep specifies how many entries of the +// existing channel buffer to keep. +// +// After Close is called on the channel, any endpoints created after that +// will still receive the number of messages as indicated in the keep parameter +// and then subsequently the close. +// +// An endpoint that is canceled or read until it is exhausted (after channel was +// closed) will be reused by NewEndpoint. +func (c *Chan[T]) NewEndpoint(keep uint64) (*Endpoint[T], error) { + return c.endpoints.NewForChan(c, keep) +} + +// Range will call the passed in foreach function with all the messages in +// the buffer, followed by all the messages received. When the foreach function +// returns true Range will continue, when you return false this is the same as +// calling Cancel. When canceled the foreach will never be called again. +// Passing a maxAge duration other than 0 will skip messages that are older +// than maxAge. +// +// When the channel is closed, eventually when the buffer is exhausted the close +// with optional error will be notified by calling foreach one last time with +// the closed parameter set to true. +func (e *Endpoint[T]) Range(next func(value T, closed bool) bool, err func(err error, closed bool) bool, maxAge time.Duration) { + e.lastActive = time.Now() + for { + commit := e.commitData() + for ; e.cursor == commit; commit = e.commitData() { + if atomic.CompareAndSwapUint64(&e.endpointState, canceled, canceled) { + atomic.StoreUint64(&e.cursor, parked) + return + } + if atomic.LoadUint64(&e.commit) < atomic.LoadUint64(&e.write) { + if e.endpointClosed == 1 { + panic(fmt.Sprintf("data written after closing endpoint; commit(%d) write(%d)", + atomic.LoadUint64(&e.commit), atomic.LoadUint64(&e.write))) + } + runtime.Gosched() + e.lastActive = time.Now() + } else { + now := time.Now() + if now.Before(e.lastActive.Add(1 * time.Millisecond)) { + if atomic.CompareAndSwapUint64(&e.endpointState, closed, closed) { + e.endpointClosed = 1 + } + runtime.Gosched() + } else if now.Before(e.lastActive.Add(250 * time.Millisecond)) { + if atomic.CompareAndSwapUint64(&e.endpointState, closed, closed) { + err(e.err, true) + atomic.StoreUint64(&e.cursor, parked) + return + } + runtime.Gosched() + } else { + e.receivers.Wait() + e.lastActive = time.Now() + } + } + } + + for ; e.cursor != commit; atomic.AddUint64(&e.cursor, 1) { + item := e.buffer[e.cursor&e.mod] + emit := true + if maxAge != 0 { + stale := time.Since(e.start).Nanoseconds() - maxAge.Nanoseconds() + updated := atomic.LoadInt64(&e.written[e.cursor&e.mod]) >> 1 + if updated != 0 && updated <= stale { + emit = false + } + } + if emit && !next(item, false) { + atomic.StoreUint64(&e.endpointState, canceled) + } + if atomic.LoadUint64(&e.endpointState) == canceled { + atomic.StoreUint64(&e.cursor, parked) + return + } + } + e.lastActive = time.Now() + } +} + +// Cancel cancels the endpoint, making it available to be reused when +// NewEndpoint is called on the channel. When canceled the foreach function +// passed to Range is not notified, instead just never called again. +func (e *Endpoint[T]) Cancel() { + atomic.CompareAndSwapUint64(&e.endpointState, active, canceled) + e.receivers.Broadcast() +} diff --git a/reactive/notification.go b/reactive/notification.go new file mode 100644 index 0000000..f014378 --- /dev/null +++ b/reactive/notification.go @@ -0,0 +1,49 @@ +package reactive + +type NotificationKind int + +const ( + NextNotification NotificationKind = iota + ErrorNotification +) + +type Notification[T any] interface { + Value() T + Error() error + + Kind() NotificationKind +} + +func MakeNext[T any](v T) Notification[T] { + return val[T]{v} +} +func MakeError[T any](e error) Notification[T] { + return err[T]{e} +} + +type val[T any] struct { + value T +} +type err[T any] struct { + err error +} + +func (v val[T]) Value() T { + return v.value +} +func (v val[T]) Error() error { + return nil +} +func (v val[T]) Kind() NotificationKind { + return NextNotification +} + +func (e err[T]) Value() T { + panic("tried to .Value() on an Error notification") +} +func (e err[T]) Error() error { + return e.err +} +func (e err[T]) Kind() NotificationKind { + return ErrorNotification +} diff --git a/reactive/observable.go b/reactive/observable.go new file mode 100644 index 0000000..90dde29 --- /dev/null +++ b/reactive/observable.go @@ -0,0 +1,45 @@ +package reactive + +import "context" + +type Observable[T any] interface { + Observe(ctx context.Context) <-chan Notification[T] +} + +type FnObservable[T any] func(ctx context.Context) <-chan Notification[T] + +func (f FnObservable[T]) Observe(ctx context.Context) <-chan Notification[T] { + return f(ctx) +} + +func Cold[T any](f func(ctx context.Context) <-chan Notification[T]) Observable[T] { + return FnObservable[T](f) +} + +func LazyHot[T any](f func(chan<- Notification[T])) Observable[T] { + var lazy chan Notification[T] = nil + + return FnObservable[T](func(ctx context.Context) <-chan Notification[T] { + if lazy == nil { + lazy = make(chan Notification[T]) + go func() { + f(lazy) + }() + } + return lazy + }) +} + +func Hot[T any](c <-chan Notification[T]) Observable[T] { + return FnObservable[T](func(ctx context.Context) <-chan Notification[T] { return c }) +} + +func FromChannel[T any](c <-chan T) Observable[T] { + ch := make(chan Notification[T]) + go func() { + for v := range c { + ch <- MakeNext(v) + } + }() + return Hot(ch) +} diff --git a/reactive/observer.go b/reactive/observer.go new file mode 100644 index 0000000..cb5b718 --- /dev/null +++ b/reactive/observer.go @@ -0,0 +1,42 @@ +package reactive + +import "context" + +type Observer[T any] interface { + OnNext(T) + OnError(error) + OnComplete() +} + +type ObserverFn[T any] func(T) + +func (f ObserverFn[T]) OnNext(v T) { f(v) } +func (f ObserverFn[T]) OnError(err error) {} +func (f ObserverFn[T]) OnComplete() {} + +func Subscribe[T any](ctx context.Context, o Observable[T], observer Observer[T]) { + notifs := o.Observe(ctx) + if notifs == nil { + observer.OnComplete() + return + } + + for { + select { + case <-ctx.Done(): + observer.OnComplete() + return + case n, ok := <-notifs: + if !ok { + observer.OnComplete() + return + } + switch n.Kind() { + case NextNotification: + observer.OnNext(n.Value()) + case ErrorNotification: + observer.OnError(n.Error()) + } + } + } +} diff --git a/reactive/subject.go b/reactive/subject.go new file mode 100644 index 0000000..4208d73 --- /dev/null +++ b/reactive/subject.go @@ -0,0 +1,74 @@ +package reactive + +import ( + "context" + + "codeberg.org/noth/ultrago/reactive/multicast" +) + +type Subject[T any] struct { + multi *multicast.Chan[Notification[T]] + err error +} + +func NewSubject[T any]() *Subject[T] { + return &Subject[T]{ + multi: multicast.NewChan[Notification[T]](0, 0), + } +} + +func (s *Subject[T]) Observe(ctx context.Context) <-chan Notification[T] { + if s.err != nil { + ch := make(chan Notification[T]) + ch <- MakeError[T](s.err) + return ch + } + + if s.multi.Closed() { + return nil + } + + endpoint, err := s.multi.NewEndpoint(0) + if err != nil { + // revisit behavior? maybe return a channel with MakeError(err)? + panic(err) + } + + ch := make(chan Notification[T]) + endpoint.Range(func(value Notification[T], closed bool) bool { + ch <- value + return true + }, func(err error, closed bool) bool { + if err != nil { + ch <- MakeError[T](err) + } + if closed { + close(ch) + } + return true + }, 0) + + go func() { + <-ctx.Done() + endpoint.Cancel() + }() + + return ch +} + +func (s *Subject[T]) OnNext(value T) { + s.multi.FastSend(MakeNext[T](value)) +} + +func (s *Subject[T]) OnError(err error) { + s.multi.FastSend(MakeError[T](err)) + s.multi.Close(err) +} + +func (s *Subject[T]) OnCompleted() { + s.multi.Close(nil) +} + +func (s *Subject[T]) Closed() bool { + return s.multi.Closed() +} diff --git a/reactive/switch_map.go b/reactive/switch_map.go new file mode 100644 index 0000000..5ecd0f3 --- /dev/null +++ b/reactive/switch_map.go @@ -0,0 +1,54 @@ +package reactive + +import ( + "context" + "sync/atomic" +) + +func SwitchMap[T, R any](source Observable[T], project func(T) Observable[R]) Observable[R] { + ctx, cancel := context.WithCancel(context.Background()) + + return &switchMap[T, R]{ctx, cancel, source, project, false, atomic.Bool{}} +} + +type switchMap[T, R any] struct { + ctx context.Context + cancel context.CancelFunc + source Observable[T] + project func(T) Observable[R] + completed bool + innerCompleted atomic.Bool +} + +func (s *switchMap[T, R]) reset() { + s.cancel() + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *switchMap[T, R]) Observe(ctx context.Context) <-chan Notification[R] { + observing := s.source.Observe(ctx) + res := make(chan Notification[R]) + + go func() { + for notification := range observing { + switch notification.Kind() { + case NextNotification: + s.innerCompleted.Store(false) + s.reset() + inner := s.project(notification.Value()) + go func() { + Copy(inner.Observe(s.ctx), res) + s.innerCompleted.Store(true) + }() + case ErrorNotification: + res <- MakeError[R](notification.Error()) + } + } + + if s.innerCompleted.Load() { + close(res) + } + }() + + return res +} diff --git a/reactive/tap.go b/reactive/tap.go new file mode 100644 index 0000000..ad1e478 --- /dev/null +++ b/reactive/tap.go @@ -0,0 +1,41 @@ +package reactive + +import "context" + +func Tap[T any](source Observable[T], observer Observer[T]) Observable[T] { + return &tap[T]{source, observer, false} +} + +func TapResponse[T any](source Observable[T], observer Observer[T]) Observable[T] { + return &tap[T]{source, observer, true} +} + +type tap[T any] struct { + source Observable[T] + observer Observer[T] + noError bool +} + +func (t *tap[T]) Observe(ctx context.Context) <-chan Notification[T] { + observing := t.source.Observe(ctx) + res := make(chan Notification[T]) + + go func() { + for notification := range observing { + switch notification.Kind() { + case NextNotification: + res <- notification + t.observer.OnNext(notification.Value()) + case ErrorNotification: + res <- notification + if !t.noError { + t.observer.OnError(notification.Error()) + } + } + } + t.observer.OnComplete() + close(res) + }() + + return res +}