Eric Bower
·
2026-02-03
channel.go
1package pubsub
2
3import (
4 "iter"
5 "sync"
6
7 "github.com/antoniomika/syncmap"
8)
9
10type ChannelDirection int
11
12func (d ChannelDirection) String() string {
13 return [...]string{"input", "output", "inputoutput"}[d]
14}
15
16const (
17 ChannelDirectionInput ChannelDirection = iota
18 ChannelDirectionOutput
19 ChannelDirectionInputOutput
20)
21
22type ChannelAction int
23
24func (d ChannelAction) String() string {
25 return [...]string{"data", "close"}[d]
26}
27
28const (
29 ChannelActionData = iota
30 ChannelActionClose
31)
32
33type ChannelMessage struct {
34 Data []byte
35 ClientID string
36 Direction ChannelDirection
37 Action ChannelAction
38}
39
40func NewChannel(topic string) *Channel {
41 return &Channel{
42 Topic: topic,
43 Done: make(chan struct{}),
44 Data: make(chan ChannelMessage),
45 Clients: syncmap.New[string, *Client](),
46 }
47}
48
49/*
50Channel is a container for a topic. It holds the list of clients and
51a data channel to receive a message.
52*/
53type Channel struct {
54 Topic string
55 Done chan struct{}
56 Data chan ChannelMessage
57 Clients *syncmap.Map[string, *Client]
58 handleOnce sync.Once
59 cleanupOnce sync.Once
60 mu sync.Mutex
61 Dispatcher MessageDispatcher
62}
63
64func (c *Channel) GetClients() iter.Seq2[string, *Client] {
65 return c.Clients.Range
66}
67
68func (c *Channel) SetDispatcher(d MessageDispatcher) {
69 c.mu.Lock()
70 defer c.mu.Unlock()
71 if c.Dispatcher == nil {
72 c.Dispatcher = d
73 }
74}
75
76func (c *Channel) GetDispatcher() MessageDispatcher {
77 c.mu.Lock()
78 defer c.mu.Unlock()
79 return c.Dispatcher
80}
81
82func (c *Channel) Cleanup() {
83 c.cleanupOnce.Do(func() {
84 close(c.Done)
85 })
86}
87
88func (c *Channel) Handle() {
89 c.handleOnce.Do(func() {
90 go func() {
91 defer func() {
92 for _, client := range c.GetClients() {
93 client.Cleanup()
94 }
95 }()
96
97 for {
98 select {
99 case <-c.Done:
100 return
101 case data, ok := <-c.Data:
102 if !ok {
103 // Channel is closing, close all client data channels
104 for _, client := range c.GetClients() {
105 client.onceData.Do(func() {
106 close(client.Data)
107 })
108 }
109 return
110 }
111
112 // Collect eligible subscribers
113 subscribers := make([]*Client, 0)
114 for _, client := range c.GetClients() {
115 // Skip input-only clients and senders (unless replay is enabled)
116 if client.Direction == ChannelDirectionInput || (client.ID == data.ClientID && !client.Replay) {
117 continue
118 }
119 subscribers = append(subscribers, client)
120 }
121
122 if len(data.Data) > 0 {
123 // Dispatch message using the configured dispatcher
124 dispatcher := c.GetDispatcher()
125 if dispatcher != nil {
126 _ = dispatcher.Dispatch(data, subscribers, c.Done)
127 }
128 }
129 }
130 }
131 }()
132 })
133}