Eric Bower
·
2026-02-03
regression_test.go
1package pubsub
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log/slog"
8 "sync"
9 "sync/atomic"
10 "testing"
11 "time"
12)
13
14// TestChannelMessageOrdering verifies that messages are delivered without panics or corruption.
15// This applies to both multicast and round-robin dispatchers.
16func TestChannelMessageOrdering(t *testing.T) {
17 name := "order-test"
18 numMessages := 3
19
20 // Test with Multicast
21 t.Run("Multicast", func(t *testing.T) {
22 cast := NewMulticast(slog.Default())
23 buf := new(Buffer)
24 channel := NewChannel(name)
25
26 var wg sync.WaitGroup
27 syncer := make(chan int)
28
29 // Subscribe
30 wg.Add(1)
31 go func() {
32 defer wg.Done()
33 syncer <- 0
34 _ = cast.Sub(context.TODO(), "sub", buf, []*Channel{channel}, false)
35 }()
36
37 <-syncer
38
39 // Publish messages
40 for i := 0; i < numMessages; i++ {
41 wg.Add(1)
42 idx := i
43 go func() {
44 defer wg.Done()
45 msg := fmt.Sprintf("msg%d\n", idx)
46 _ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
47 }()
48 }
49
50 wg.Wait()
51
52 // Verify at least some messages were received
53 content := buf.String()
54 if len(content) == 0 {
55 t.Error("Multicast: no messages received")
56 }
57 })
58}
59
60// TestDispatcherClientDirection verifies that both dispatchers respect client direction.
61// Publishers should not receive messages they publish.
62func TestDispatcherClientDirection(t *testing.T) {
63 name := "direction-test"
64
65 t.Run("Multicast", func(t *testing.T) {
66 cast := NewMulticast(slog.Default())
67 pubBuf := new(Buffer)
68 subBuf := new(Buffer)
69 channel := NewChannel(name)
70
71 var wg sync.WaitGroup
72
73 // Publisher (input only)
74 wg.Add(1)
75 go func() {
76 defer wg.Done()
77 _ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString("test")}, []*Channel{channel}, false)
78 }()
79
80 // Subscriber (output only)
81 wg.Add(1)
82 go func() {
83 defer wg.Done()
84 _ = cast.Sub(context.TODO(), "sub", subBuf, []*Channel{channel}, false)
85 }()
86
87 wg.Wait()
88
89 // Publisher should not receive the message
90 if pubBuf.String() != "" {
91 t.Errorf("Publisher received message: %q", pubBuf.String())
92 }
93
94 // Subscriber should receive it
95 if subBuf.String() != "test" {
96 t.Errorf("Subscriber should have received message, got: %q", subBuf.String())
97 }
98 })
99}
100
101// TestChannelConcurrentPublishes verifies that concurrent publishes don't cause races or data loss.
102func TestChannelConcurrentPublishes(t *testing.T) {
103 name := "concurrent-test"
104 numPublishers := 10
105 msgsPerPublisher := 5
106 numSubscribers := 3
107
108 t.Run("Multicast", func(t *testing.T) {
109 cast := NewMulticast(slog.Default())
110 buffers := make([]*Buffer, numSubscribers)
111 for i := range buffers {
112 buffers[i] = new(Buffer)
113 }
114 channel := NewChannel(name)
115
116 var wg sync.WaitGroup
117
118 // Subscribe
119 for i := range buffers {
120 wg.Add(1)
121 idx := i
122 go func() {
123 defer wg.Done()
124 _ = cast.Sub(context.TODO(), fmt.Sprintf("sub-%d", idx), buffers[idx], []*Channel{channel}, false)
125 }()
126 }
127 time.Sleep(100 * time.Millisecond)
128
129 // Concurrent publishers
130 pubCount := int32(0)
131 for p := 0; p < numPublishers; p++ {
132 pubID := p
133 for m := 0; m < msgsPerPublisher; m++ {
134 wg.Add(1)
135 msgNum := m
136 go func() {
137 defer wg.Done()
138 msg := fmt.Sprintf("pub%d-msg%d\n", pubID, msgNum)
139 _ = cast.Pub(context.TODO(), fmt.Sprintf("pub-%d", pubID), &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
140 atomic.AddInt32(&pubCount, 1)
141 }()
142 }
143 }
144
145 wg.Wait()
146
147 // Verify all messages delivered to all subscribers
148 totalExpectedMessages := numPublishers * msgsPerPublisher
149 for i, buf := range buffers {
150 messageCount := bytes.Count([]byte(buf.String()), []byte("\n"))
151 if messageCount != totalExpectedMessages {
152 t.Errorf("Subscriber %d: expected %d messages, got %d", i, totalExpectedMessages, messageCount)
153 }
154 }
155
156 // Verify all publishes completed
157 if pubCount != int32(totalExpectedMessages) {
158 t.Errorf("Expected %d publishes to complete, got %d", totalExpectedMessages, pubCount)
159 }
160 })
161}
162
163// TestDispatcherEmptySubscribers verifies that dispatchers handle empty subscriber set without panic.
164func TestDispatcherEmptySubscribers(t *testing.T) {
165 name := "empty-subs-test"
166
167 t.Run("Multicast", func(t *testing.T) {
168 cast := NewMulticast(slog.Default())
169 channel := NewChannel(name)
170
171 var wg sync.WaitGroup
172
173 // Publish with no subscribers (should not panic)
174 wg.Add(1)
175 go func() {
176 defer wg.Done()
177 defer func() {
178 if r := recover(); r != nil {
179 t.Errorf("Multicast panicked with no subscribers: %v", r)
180 }
181 }()
182 _ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString("test")}, []*Channel{channel}, false)
183 }()
184
185 wg.Wait()
186 t.Log("Multicast handled empty subscribers correctly")
187 })
188}
189
190// TestDispatcherSingleSubscriber verifies that both dispatchers work correctly with one subscriber.
191func TestDispatcherSingleSubscriber(t *testing.T) {
192 name := "single-sub-test"
193 message := "single-sub-message"
194
195 t.Run("Multicast", func(t *testing.T) {
196 cast := NewMulticast(slog.Default())
197 buf := new(Buffer)
198 channel := NewChannel(name)
199
200 var wg sync.WaitGroup
201
202 // Subscribe
203 wg.Add(1)
204 go func() {
205 defer wg.Done()
206 _ = cast.Sub(context.TODO(), "sub", buf, []*Channel{channel}, false)
207 }()
208
209 time.Sleep(100 * time.Millisecond)
210
211 // Publish
212 wg.Add(1)
213 go func() {
214 defer wg.Done()
215 _ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString(message)}, []*Channel{channel}, false)
216 }()
217
218 wg.Wait()
219
220 if buf.String() != message {
221 t.Errorf("Multicast with single subscriber: expected %q, got %q", message, buf.String())
222 }
223 })
224}