repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
4b5c506
parent
deea40d
author
Eric Bower
date
2026-01-25 11:14:06 -0500 EST
refactor: move picosh/pubsub into repo
12 files changed,  +704, -6
M go.mod
M go.sum
M go.mod
+0, -1
1@@ -45,7 +45,6 @@ require (
2 	github.com/mmcdole/gofeed v1.3.0
3 	github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
4 	github.com/picosh/go-rsync-receiver v0.0.0-20250304201040-fcc11dd22d79
5-	github.com/picosh/pubsub v0.0.0-20241114191831-ec8f16c0eb88
6 	github.com/picosh/utils v0.0.0-20260125160622-5c3a9e231ec6
7 	github.com/pkg/sftp v1.13.9
8 	github.com/prometheus/client_golang v1.22.0
M go.sum
+0, -2
1@@ -694,8 +694,6 @@ github.com/peterbourgon/diskv/v3 v3.0.1 h1:x06SQA46+PKIUftmEujdwSEpIx8kR+M9eLYsU
2 github.com/peterbourgon/diskv/v3 v3.0.1/go.mod h1:kJ5Ny7vLdARGU3WUuy6uzO6T0nb/2gWcT1JiBvRmb5o=
3 github.com/picosh/go-rsync-receiver v0.0.0-20250304201040-fcc11dd22d79 h1:MyB9P43hlQ6A2FoP9LGeiTBL3WKToW4gcWd6lQPg/Zg=
4 github.com/picosh/go-rsync-receiver v0.0.0-20250304201040-fcc11dd22d79/go.mod h1:4ZICsr6bESoHP8He9DqROlZiMw4hHHjcbDzhtTTDQzA=
5-github.com/picosh/pubsub v0.0.0-20241114191831-ec8f16c0eb88 h1:hdxE6rquHHw1/eeqS1b+ojLaxGtN8zOiTUclPwaVbPg=
6-github.com/picosh/pubsub v0.0.0-20241114191831-ec8f16c0eb88/go.mod h1:+9hDKIDHQCvGFigCVlIl589BwpT9R4boKhUVc/OgRU4=
7 github.com/picosh/utils v0.0.0-20260125160622-5c3a9e231ec6 h1:9KfCtfcx7vrSyGU1K9whdE1crll9Aq+nAZ6c0FzuzvE=
8 github.com/picosh/utils v0.0.0-20260125160622-5c3a9e231ec6/go.mod h1:HogYEyJ43IGXrOa3D/kjM1pkzNAyh+pejRyv8Eo//pk=
9 github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
M pkg/apps/pipe/cli.go
+1, -1
 1@@ -20,8 +20,8 @@ import (
 2 	"github.com/gorilla/feeds"
 3 	"github.com/picosh/pico/pkg/db"
 4 	"github.com/picosh/pico/pkg/pssh"
 5+	psub "github.com/picosh/pico/pkg/pubsub"
 6 	"github.com/picosh/pico/pkg/shared"
 7-	psub "github.com/picosh/pubsub"
 8 	gossh "golang.org/x/crypto/ssh"
 9 )
10 
M pkg/apps/pipe/ssh.go
+1, -1
 1@@ -9,8 +9,8 @@ import (
 2 	"github.com/antoniomika/syncmap"
 3 	"github.com/picosh/pico/pkg/db/postgres"
 4 	"github.com/picosh/pico/pkg/pssh"
 5+	psub "github.com/picosh/pico/pkg/pubsub"
 6 	"github.com/picosh/pico/pkg/shared"
 7-	psub "github.com/picosh/pubsub"
 8 	"golang.org/x/crypto/ssh"
 9 )
10 
M pkg/apps/pipe/ssh_test.go
+1, -1
 1@@ -16,8 +16,8 @@ import (
 2 	"github.com/picosh/pico/pkg/db"
 3 	"github.com/picosh/pico/pkg/db/stub"
 4 	"github.com/picosh/pico/pkg/pssh"
 5+	psub "github.com/picosh/pico/pkg/pubsub"
 6 	"github.com/picosh/pico/pkg/shared"
 7-	psub "github.com/picosh/pubsub"
 8 	"github.com/prometheus/client_golang/prometheus"
 9 	"golang.org/x/crypto/ssh"
10 )
A pkg/pubsub/README.md
+58, -0
 1@@ -0,0 +1,58 @@
 2+# pubsub
 3+
 4+A generic pubsub implementation for Go.
 5+
 6+```go
 7+package main
 8+
 9+import (
10+	"bytes"
11+	"context"
12+	"fmt"
13+	"log/slog"
14+
15+	"github.com/picosh/pubsub"
16+)
17+
18+func main() {
19+	ctx := context.TODO()
20+	logger := slog.Default()
21+	broker := pubsub.NewMulticast(logger)
22+
23+	chann := []*pubsub.Channel{
24+		pubsub.NewChannel("my-topic"),
25+	}
26+
27+	go func() {
28+		writer := bytes.NewBufferString("my data")
29+		_ = broker.Pub(ctx, "pubID", writer, chann, false)
30+	}()
31+
32+	reader := bytes.NewBufferString("")
33+	_ = broker.Sub(ctx, "subID", reader, chann, false)
34+
35+	// result
36+	fmt.Println("data from pub:", reader)
37+}
38+```
39+
40+## pubsub over ssh
41+
42+The simplest pubsub system for everyday automation needs.
43+
44+Using `wish` we can integrate our pubsub system into an SSH app.
45+
46+[![asciicast](https://asciinema.org/a/674287.svg)](https://asciinema.org/a/674287)
47+
48+```bash
49+# term 1
50+mkdir ./ssh_data
51+cat ~/.ssh/id_ed25519 ./ssh_data/authorized_keys
52+go run ./cmd/example
53+
54+# term 2
55+ssh -p 2222 localhost sub xyz
56+
57+# term 3
58+echo "hello world" | ssh -p 2222 localhost pub xyz
59+```
A pkg/pubsub/broker.go
+203, -0
  1@@ -0,0 +1,203 @@
  2+package pubsub
  3+
  4+import (
  5+	"errors"
  6+	"io"
  7+	"iter"
  8+	"log/slog"
  9+	"sync"
 10+	"time"
 11+
 12+	"github.com/antoniomika/syncmap"
 13+)
 14+
 15+/*
 16+Broker receives published messages and dispatches the message to the
 17+subscribing clients. An message contains a message topic that clients
 18+subscribe to and brokers use these subscription lists for determining the
 19+clients to receive the message.
 20+*/
 21+type Broker interface {
 22+	GetChannels() iter.Seq2[string, *Channel]
 23+	GetClients() iter.Seq2[string, *Client]
 24+	Connect(*Client, []*Channel) (error, error)
 25+}
 26+
 27+type BaseBroker struct {
 28+	Channels *syncmap.Map[string, *Channel]
 29+	Logger   *slog.Logger
 30+}
 31+
 32+func (b *BaseBroker) Cleanup() {
 33+	toRemove := []string{}
 34+	for _, channel := range b.GetChannels() {
 35+		count := 0
 36+
 37+		for range channel.GetClients() {
 38+			count++
 39+		}
 40+
 41+		if count == 0 {
 42+			channel.Cleanup()
 43+			toRemove = append(toRemove, channel.Topic)
 44+		}
 45+	}
 46+
 47+	for _, channel := range toRemove {
 48+		b.Channels.Delete(channel)
 49+	}
 50+}
 51+
 52+func (b *BaseBroker) GetChannels() iter.Seq2[string, *Channel] {
 53+	return b.Channels.Range
 54+}
 55+
 56+func (b *BaseBroker) GetClients() iter.Seq2[string, *Client] {
 57+	return func(yield func(string, *Client) bool) {
 58+		for _, channel := range b.GetChannels() {
 59+			channel.Clients.Range(yield)
 60+		}
 61+	}
 62+}
 63+
 64+func (b *BaseBroker) Connect(client *Client, channels []*Channel) (error, error) {
 65+	for _, channel := range channels {
 66+		dataChannel := b.ensureChannel(channel)
 67+		dataChannel.Clients.Store(client.ID, client)
 68+		client.Channels.Store(dataChannel.Topic, dataChannel)
 69+		defer func() {
 70+			client.Channels.Delete(channel.Topic)
 71+			dataChannel.Clients.Delete(client.ID)
 72+
 73+			client.Cleanup()
 74+
 75+			count := 0
 76+			for _, cl := range dataChannel.GetClients() {
 77+				if cl.Direction == ChannelDirectionInput || cl.Direction == ChannelDirectionInputOutput {
 78+					count++
 79+				}
 80+			}
 81+
 82+			if count == 0 {
 83+				for _, cl := range dataChannel.GetClients() {
 84+					if !cl.KeepAlive {
 85+						cl.Cleanup()
 86+					}
 87+				}
 88+			}
 89+
 90+			b.Cleanup()
 91+		}()
 92+	}
 93+
 94+	var (
 95+		inputErr  error
 96+		outputErr error
 97+		wg        sync.WaitGroup
 98+	)
 99+
100+	// Pub
101+	if client.Direction == ChannelDirectionInput || client.Direction == ChannelDirectionInputOutput {
102+		wg.Add(1)
103+		go func() {
104+			defer wg.Done()
105+			for {
106+				data := make([]byte, 32*1024)
107+				n, err := client.ReadWriter.Read(data)
108+				data = data[:n]
109+
110+				channelMessage := ChannelMessage{
111+					Data:      data,
112+					ClientID:  client.ID,
113+					Direction: ChannelDirectionInput,
114+				}
115+
116+				if client.BlockWrite {
117+				mainLoop:
118+					for {
119+						count := 0
120+						for _, channel := range client.GetChannels() {
121+							for _, chanClient := range channel.GetClients() {
122+								if chanClient.Direction == ChannelDirectionOutput || chanClient.Direction == ChannelDirectionInputOutput {
123+									count++
124+								}
125+							}
126+						}
127+
128+						if count > 0 {
129+							break mainLoop
130+						}
131+
132+						select {
133+						case <-client.Done:
134+							break mainLoop
135+						case <-time.After(1 * time.Millisecond):
136+							continue
137+						}
138+					}
139+				}
140+
141+				var sendwg sync.WaitGroup
142+
143+				for _, channel := range client.GetChannels() {
144+					sendwg.Add(1)
145+					go func() {
146+						defer sendwg.Done()
147+						select {
148+						case channel.Data <- channelMessage:
149+						case <-client.Done:
150+						case <-channel.Done:
151+						}
152+					}()
153+				}
154+
155+				sendwg.Wait()
156+
157+				if err != nil {
158+					if errors.Is(err, io.EOF) {
159+						return
160+					}
161+					inputErr = err
162+					return
163+				}
164+			}
165+		}()
166+	}
167+
168+	// Sub
169+	if client.Direction == ChannelDirectionOutput || client.Direction == ChannelDirectionInputOutput {
170+		wg.Add(1)
171+		go func() {
172+			defer wg.Done()
173+		mainLoop:
174+			for {
175+				select {
176+				case data, ok := <-client.Data:
177+					_, err := client.ReadWriter.Write(data.Data)
178+					if err != nil {
179+						outputErr = err
180+						break mainLoop
181+					}
182+
183+					if !ok {
184+						break mainLoop
185+					}
186+				case <-client.Done:
187+					break mainLoop
188+				}
189+			}
190+		}()
191+	}
192+
193+	wg.Wait()
194+
195+	return inputErr, outputErr
196+}
197+
198+func (b *BaseBroker) ensureChannel(channel *Channel) *Channel {
199+	dataChannel, _ := b.Channels.LoadOrStore(channel.Topic, channel)
200+	dataChannel.Handle()
201+	return dataChannel
202+}
203+
204+var _ Broker = (*BaseBroker)(nil)
A pkg/pubsub/channel.go
+114, -0
  1@@ -0,0 +1,114 @@
  2+package pubsub
  3+
  4+import (
  5+	"iter"
  6+	"sync"
  7+
  8+	"github.com/antoniomika/syncmap"
  9+)
 10+
 11+type ChannelDirection int
 12+
 13+func (d ChannelDirection) String() string {
 14+	return [...]string{"input", "output", "inputoutput"}[d]
 15+}
 16+
 17+const (
 18+	ChannelDirectionInput ChannelDirection = iota
 19+	ChannelDirectionOutput
 20+	ChannelDirectionInputOutput
 21+)
 22+
 23+type ChannelAction int
 24+
 25+func (d ChannelAction) String() string {
 26+	return [...]string{"data", "close"}[d]
 27+}
 28+
 29+const (
 30+	ChannelActionData = iota
 31+	ChannelActionClose
 32+)
 33+
 34+type ChannelMessage struct {
 35+	Data      []byte
 36+	ClientID  string
 37+	Direction ChannelDirection
 38+	Action    ChannelAction
 39+}
 40+
 41+func NewChannel(topic string) *Channel {
 42+	return &Channel{
 43+		Topic:   topic,
 44+		Done:    make(chan struct{}),
 45+		Data:    make(chan ChannelMessage),
 46+		Clients: syncmap.New[string, *Client](),
 47+	}
 48+}
 49+
 50+/*
 51+Channel is a container for a topic.  It holds the list of clients and
 52+a data channel to receive a message.
 53+*/
 54+type Channel struct {
 55+	Topic       string
 56+	Done        chan struct{}
 57+	Data        chan ChannelMessage
 58+	Clients     *syncmap.Map[string, *Client]
 59+	handleOnce  sync.Once
 60+	cleanupOnce sync.Once
 61+}
 62+
 63+func (c *Channel) GetClients() iter.Seq2[string, *Client] {
 64+	return c.Clients.Range
 65+}
 66+
 67+func (c *Channel) Cleanup() {
 68+	c.cleanupOnce.Do(func() {
 69+		close(c.Done)
 70+	})
 71+}
 72+
 73+func (c *Channel) Handle() {
 74+	c.handleOnce.Do(func() {
 75+		go func() {
 76+			defer func() {
 77+				for _, client := range c.GetClients() {
 78+					client.Cleanup()
 79+				}
 80+			}()
 81+
 82+			for {
 83+				select {
 84+				case <-c.Done:
 85+					return
 86+				case data, ok := <-c.Data:
 87+					var wg sync.WaitGroup
 88+					for _, client := range c.GetClients() {
 89+						if client.Direction == ChannelDirectionInput || (client.ID == data.ClientID && !client.Replay) {
 90+							continue
 91+						}
 92+
 93+						wg.Add(1)
 94+						go func() {
 95+							defer wg.Done()
 96+							if !ok {
 97+								client.onceData.Do(func() {
 98+									close(client.Data)
 99+								})
100+								return
101+							}
102+
103+							select {
104+							case client.Data <- data:
105+							case <-client.Done:
106+							case <-c.Done:
107+							}
108+						}()
109+					}
110+					wg.Wait()
111+				}
112+			}
113+		}()
114+	})
115+}
A pkg/pubsub/client.go
+52, -0
 1@@ -0,0 +1,52 @@
 2+package pubsub
 3+
 4+import (
 5+	"io"
 6+	"iter"
 7+	"sync"
 8+
 9+	"github.com/antoniomika/syncmap"
10+)
11+
12+func NewClient(ID string, rw io.ReadWriter, direction ChannelDirection, blockWrite, replay, keepAlive bool) *Client {
13+	return &Client{
14+		ID:         ID,
15+		ReadWriter: rw,
16+		Direction:  direction,
17+		Channels:   syncmap.New[string, *Channel](),
18+		Done:       make(chan struct{}),
19+		Data:       make(chan ChannelMessage),
20+		Replay:     replay,
21+		BlockWrite: blockWrite,
22+		KeepAlive:  keepAlive,
23+	}
24+}
25+
26+/*
27+Client is the container for holding state between multiple devices.  A
28+client has a direction (input, output, inputout) as well as a way to
29+send data to all the associated channels.
30+*/
31+type Client struct {
32+	ID         string
33+	ReadWriter io.ReadWriter
34+	Channels   *syncmap.Map[string, *Channel]
35+	Direction  ChannelDirection
36+	Done       chan struct{}
37+	Data       chan ChannelMessage
38+	Replay     bool
39+	BlockWrite bool
40+	KeepAlive  bool
41+	once       sync.Once
42+	onceData   sync.Once
43+}
44+
45+func (c *Client) GetChannels() iter.Seq2[string, *Channel] {
46+	return c.Channels.Range
47+}
48+
49+func (c *Client) Cleanup() {
50+	c.once.Do(func() {
51+		close(c.Done)
52+	})
53+}
A pkg/pubsub/multicast.go
+84, -0
 1@@ -0,0 +1,84 @@
 2+package pubsub
 3+
 4+import (
 5+	"context"
 6+	"errors"
 7+	"io"
 8+	"iter"
 9+	"log/slog"
10+
11+	"github.com/antoniomika/syncmap"
12+)
13+
14+/*
15+Multicast is a flexible, bidirectional broker.
16+
17+It provides the most pure version of our PubSub interface which lets
18+end-developers build one-to-many connections between publishers and
19+subscribers and vice versa.
20+
21+It doesn't provide any topic filtering capabilities and is only
22+concerned with sending data to and from an `io.ReadWriter` via our
23+channels.
24+*/
25+type Multicast struct {
26+	Broker
27+	Logger *slog.Logger
28+}
29+
30+func NewMulticast(logger *slog.Logger) *Multicast {
31+	return &Multicast{
32+		Logger: logger,
33+		Broker: &BaseBroker{
34+			Channels: syncmap.New[string, *Channel](),
35+			Logger:   logger.With(slog.Bool("broker", true)),
36+		},
37+	}
38+}
39+
40+func (p *Multicast) getClients(direction ChannelDirection) iter.Seq2[string, *Client] {
41+	return func(yield func(string, *Client) bool) {
42+		for clientID, client := range p.GetClients() {
43+			if client.Direction == direction {
44+				yield(clientID, client)
45+			}
46+		}
47+	}
48+}
49+
50+func (p *Multicast) GetPipes() iter.Seq2[string, *Client] {
51+	return p.getClients(ChannelDirectionInputOutput)
52+}
53+
54+func (p *Multicast) GetPubs() iter.Seq2[string, *Client] {
55+	return p.getClients(ChannelDirectionInput)
56+}
57+
58+func (p *Multicast) GetSubs() iter.Seq2[string, *Client] {
59+	return p.getClients(ChannelDirectionOutput)
60+}
61+
62+func (p *Multicast) connect(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, direction ChannelDirection, blockWrite bool, replay, keepAlive bool) (error, error) {
63+	client := NewClient(ID, rw, direction, blockWrite, replay, keepAlive)
64+
65+	go func() {
66+		<-ctx.Done()
67+		client.Cleanup()
68+	}()
69+
70+	return p.Connect(client, channels)
71+}
72+
73+func (p *Multicast) Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error) {
74+	return p.connect(ctx, ID, rw, channels, ChannelDirectionInputOutput, false, replay, false)
75+}
76+
77+func (p *Multicast) Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, blockWrite bool) error {
78+	return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionInput, blockWrite, false, false))
79+}
80+
81+func (p *Multicast) Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error {
82+	return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionOutput, false, false, keepAlive))
83+}
84+
85+var _ PubSub = (*Multicast)(nil)
A pkg/pubsub/multicast_test.go
+164, -0
  1@@ -0,0 +1,164 @@
  2+package pubsub
  3+
  4+import (
  5+	"bytes"
  6+	"context"
  7+	"fmt"
  8+	"log/slog"
  9+	"sync"
 10+	"testing"
 11+)
 12+
 13+type Buffer struct {
 14+	b bytes.Buffer
 15+	m sync.Mutex
 16+}
 17+
 18+func (b *Buffer) Read(p []byte) (n int, err error) {
 19+	b.m.Lock()
 20+	defer b.m.Unlock()
 21+	return b.b.Read(p)
 22+}
 23+func (b *Buffer) Write(p []byte) (n int, err error) {
 24+	b.m.Lock()
 25+	defer b.m.Unlock()
 26+	return b.b.Write(p)
 27+}
 28+func (b *Buffer) String() string {
 29+	b.m.Lock()
 30+	defer b.m.Unlock()
 31+	return b.b.String()
 32+}
 33+
 34+func TestMulticastSubBlock(t *testing.T) {
 35+	orderActual := ""
 36+	orderExpected := "sub-pub-"
 37+	actual := new(Buffer)
 38+	expected := "some test data"
 39+	name := "test-channel"
 40+	syncer := make(chan int)
 41+
 42+	cast := NewMulticast(slog.Default())
 43+
 44+	var wg sync.WaitGroup
 45+	wg.Add(2)
 46+
 47+	channel := NewChannel(name)
 48+
 49+	go func() {
 50+		orderActual += "sub-"
 51+		syncer <- 0
 52+		fmt.Println(cast.Sub(context.TODO(), "1", actual, []*Channel{channel}, false))
 53+		wg.Done()
 54+	}()
 55+
 56+	<-syncer
 57+
 58+	go func() {
 59+		orderActual += "pub-"
 60+		fmt.Println(cast.Pub(context.TODO(), "2", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
 61+		wg.Done()
 62+	}()
 63+
 64+	wg.Wait()
 65+
 66+	if orderActual != orderExpected {
 67+		t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
 68+	}
 69+	if actual.String() != expected {
 70+		t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual, expected)
 71+	}
 72+}
 73+
 74+func TestMulticastPubBlock(t *testing.T) {
 75+	orderActual := ""
 76+	orderExpected := "pub-sub-"
 77+	actual := new(Buffer)
 78+	expected := "some test data"
 79+	name := "test-channel"
 80+	syncer := make(chan int)
 81+
 82+	cast := NewMulticast(slog.Default())
 83+
 84+	var wg sync.WaitGroup
 85+	wg.Add(2)
 86+
 87+	channel := NewChannel(name)
 88+
 89+	go func() {
 90+		orderActual += "pub-"
 91+		syncer <- 0
 92+		fmt.Println(cast.Pub(context.TODO(), "1", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
 93+		wg.Done()
 94+	}()
 95+
 96+	<-syncer
 97+
 98+	go func() {
 99+		orderActual += "sub-"
100+		wg.Done()
101+		fmt.Println(cast.Sub(context.TODO(), "2", actual, []*Channel{channel}, false))
102+	}()
103+
104+	wg.Wait()
105+
106+	if orderActual != orderExpected {
107+		t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
108+	}
109+	if actual.String() != expected {
110+		t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual, expected)
111+	}
112+}
113+
114+func TestMulticastMultSubs(t *testing.T) {
115+	orderActual := ""
116+	orderExpected := "sub-sub-pub-"
117+	actual := new(Buffer)
118+	actualOther := new(Buffer)
119+	expected := "some test data"
120+	name := "test-channel"
121+	syncer := make(chan int)
122+
123+	cast := NewMulticast(slog.Default())
124+
125+	var wg sync.WaitGroup
126+	wg.Add(3)
127+
128+	channel := NewChannel(name)
129+
130+	go func() {
131+		orderActual += "sub-"
132+		syncer <- 0
133+		fmt.Println(cast.Sub(context.TODO(), "1", actual, []*Channel{channel}, false))
134+		wg.Done()
135+	}()
136+
137+	<-syncer
138+
139+	go func() {
140+		orderActual += "sub-"
141+		syncer <- 0
142+		fmt.Println(cast.Sub(context.TODO(), "2", actualOther, []*Channel{channel}, false))
143+		wg.Done()
144+	}()
145+
146+	<-syncer
147+
148+	go func() {
149+		orderActual += "pub-"
150+		fmt.Println(cast.Pub(context.TODO(), "3", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
151+		wg.Done()
152+	}()
153+
154+	wg.Wait()
155+
156+	if orderActual != orderExpected {
157+		t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
158+	}
159+	if actual.String() != expected {
160+		t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual, expected)
161+	}
162+	if actualOther.String() != expected {
163+		t.Fatalf("\nactual:(%s)\nexpected:(%s)", actualOther, expected)
164+	}
165+}
A pkg/pubsub/pubsub.go
+26, -0
 1@@ -0,0 +1,26 @@
 2+package pubsub
 3+
 4+import (
 5+	"context"
 6+	"io"
 7+	"iter"
 8+)
 9+
10+/*
11+PubSub is our take on a basic publisher and subscriber interface.
12+
13+It has a few notable requirements:
14+- Each operation must accept an array of channels
15+- A way to send, receive, and stream data between clients
16+
17+PubSub also inherits the properties of a Broker.
18+*/
19+type PubSub interface {
20+	Broker
21+	GetPubs() iter.Seq2[string, *Client]
22+	GetSubs() iter.Seq2[string, *Client]
23+	GetPipes() iter.Seq2[string, *Client]
24+	Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error)
25+	Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error
26+	Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, blockWrite bool) error
27+}