Eric Bower
·
2026-04-21
regression_test.go
1package pubsub
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log/slog"
8 "sync"
9 "testing"
10 "time"
11)
12
13// TestChannelMessageOrdering verifies that messages are delivered without panics or corruption.
14// This applies to both multicast and round-robin dispatchers.
15func TestChannelMessageOrdering(t *testing.T) {
16 name := "order-test"
17 numMessages := 3
18
19 // Test with Multicast
20 t.Run("Multicast", func(t *testing.T) {
21 cast := NewMulticast(slog.Default())
22 buf := new(Buffer)
23 channel := NewChannel(name)
24
25 var wg sync.WaitGroup
26 syncer := make(chan int)
27
28 // Subscribe
29 wg.Add(1)
30 go func() {
31 defer wg.Done()
32 syncer <- 0
33 _ = cast.Sub(context.TODO(), "sub", buf, []*Channel{channel}, false)
34 }()
35
36 <-syncer
37
38 // Publish messages
39 for i := 0; i < numMessages; i++ {
40 wg.Add(1)
41 idx := i
42 go func() {
43 defer wg.Done()
44 msg := fmt.Sprintf("msg%d\n", idx)
45 _ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
46 }()
47 }
48
49 wg.Wait()
50
51 // Verify at least some messages were received
52 content := buf.String()
53 if len(content) == 0 {
54 t.Error("Multicast: no messages received")
55 }
56 })
57}
58
59// TestDispatcherClientDirection verifies that both dispatchers respect client direction.
60// Publishers should not receive messages they publish.
61func TestDispatcherClientDirection(t *testing.T) {
62 name := "direction-test"
63
64 t.Run("Multicast", func(t *testing.T) {
65 cast := NewMulticast(slog.Default())
66 pubBuf := new(Buffer)
67 subBuf := new(Buffer)
68 channel := NewChannel(name)
69
70 var wg sync.WaitGroup
71
72 // Publisher (input only)
73 wg.Add(1)
74 go func() {
75 defer wg.Done()
76 _ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString("test")}, []*Channel{channel}, false)
77 }()
78
79 // Subscriber (output only)
80 wg.Add(1)
81 go func() {
82 defer wg.Done()
83 _ = cast.Sub(context.TODO(), "sub", subBuf, []*Channel{channel}, false)
84 }()
85
86 wg.Wait()
87
88 // Publisher should not receive the message
89 if pubBuf.String() != "" {
90 t.Errorf("Publisher received message: %q", pubBuf.String())
91 }
92
93 // Subscriber should receive it
94 if subBuf.String() != "test" {
95 t.Errorf("Subscriber should have received message, got: %q", subBuf.String())
96 }
97 })
98}
99
100// TestChannelConcurrentPublishes verifies that concurrent publishes don't cause races or data loss.
101// func TestChannelConcurrentPublishes(t *testing.T) {
102// name := "concurrent-test"
103// numPublishers := 10
104// msgsPerPublisher := 5
105// numSubscribers := 3
106
107// t.Run("Multicast", func(t *testing.T) {
108// cast := NewMulticast(slog.Default())
109// buffers := make([]*Buffer, numSubscribers)
110// for i := range buffers {
111// buffers[i] = new(Buffer)
112// }
113// channel := NewChannel(name)
114
115// var wg sync.WaitGroup
116
117// // Subscribe
118// for i := range buffers {
119// wg.Add(1)
120// idx := i
121// go func() {
122// defer wg.Done()
123// _ = cast.Sub(context.TODO(), fmt.Sprintf("sub-%d", idx), buffers[idx], []*Channel{channel}, false)
124// }()
125// }
126// time.Sleep(100 * time.Millisecond)
127
128// // Concurrent publishers
129// pubCount := int32(0)
130// for p := 0; p < numPublishers; p++ {
131// pubID := p
132// for m := 0; m < msgsPerPublisher; m++ {
133// wg.Add(1)
134// msgNum := m
135// go func() {
136// defer wg.Done()
137// msg := fmt.Sprintf("pub%d-msg%d\n", pubID, msgNum)
138// _ = cast.Pub(context.TODO(), fmt.Sprintf("pub-%d", pubID), &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
139// atomic.AddInt32(&pubCount, 1)
140// }()
141// }
142// }
143
144// wg.Wait()
145
146// // Verify all messages delivered to all subscribers
147// totalExpectedMessages := numPublishers * msgsPerPublisher
148// for i, buf := range buffers {
149// messageCount := bytes.Count([]byte(buf.String()), []byte("\n"))
150// if messageCount != totalExpectedMessages {
151// t.Errorf("Subscriber %d: expected %d messages, got %d", i, totalExpectedMessages, messageCount)
152// }
153// }
154
155// // Verify all publishes completed
156// if pubCount != int32(totalExpectedMessages) {
157// t.Errorf("Expected %d publishes to complete, got %d", totalExpectedMessages, pubCount)
158// }
159// })
160// }
161
162// TestDispatcherEmptySubscribers verifies that dispatchers handle empty subscriber set without panic.
163func TestDispatcherEmptySubscribers(t *testing.T) {
164 name := "empty-subs-test"
165
166 t.Run("Multicast", func(t *testing.T) {
167 cast := NewMulticast(slog.Default())
168 channel := NewChannel(name)
169
170 var wg sync.WaitGroup
171
172 // Publish with no subscribers (should not panic)
173 wg.Add(1)
174 go func() {
175 defer wg.Done()
176 defer func() {
177 if r := recover(); r != nil {
178 t.Errorf("Multicast panicked with no subscribers: %v", r)
179 }
180 }()
181 _ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString("test")}, []*Channel{channel}, false)
182 }()
183
184 wg.Wait()
185 t.Log("Multicast handled empty subscribers correctly")
186 })
187}
188
189// TestDispatcherSingleSubscriber verifies that both dispatchers work correctly with one subscriber.
190func TestDispatcherSingleSubscriber(t *testing.T) {
191 name := "single-sub-test"
192 message := "single-sub-message"
193
194 t.Run("Multicast", func(t *testing.T) {
195 cast := NewMulticast(slog.Default())
196 buf := new(Buffer)
197 channel := NewChannel(name)
198
199 var wg sync.WaitGroup
200
201 // Subscribe
202 wg.Add(1)
203 go func() {
204 defer wg.Done()
205 _ = cast.Sub(context.TODO(), "sub", buf, []*Channel{channel}, false)
206 }()
207
208 time.Sleep(100 * time.Millisecond)
209
210 // Publish
211 wg.Add(1)
212 go func() {
213 defer wg.Done()
214 _ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString(message)}, []*Channel{channel}, false)
215 }()
216
217 wg.Wait()
218
219 if buf.String() != message {
220 t.Errorf("Multicast with single subscriber: expected %q, got %q", message, buf.String())
221 }
222 })
223}