repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / pubsub
Eric Bower  ·  2026-04-21

regression_test.go

  1package pubsub
  2
  3import (
  4	"bytes"
  5	"context"
  6	"fmt"
  7	"log/slog"
  8	"sync"
  9	"testing"
 10	"time"
 11)
 12
 13// TestChannelMessageOrdering verifies that messages are delivered without panics or corruption.
 14// This applies to both multicast and round-robin dispatchers.
 15func TestChannelMessageOrdering(t *testing.T) {
 16	name := "order-test"
 17	numMessages := 3
 18
 19	// Test with Multicast
 20	t.Run("Multicast", func(t *testing.T) {
 21		cast := NewMulticast(slog.Default())
 22		buf := new(Buffer)
 23		channel := NewChannel(name)
 24
 25		var wg sync.WaitGroup
 26		syncer := make(chan int)
 27
 28		// Subscribe
 29		wg.Add(1)
 30		go func() {
 31			defer wg.Done()
 32			syncer <- 0
 33			_ = cast.Sub(context.TODO(), "sub", buf, []*Channel{channel}, false)
 34		}()
 35
 36		<-syncer
 37
 38		// Publish messages
 39		for i := 0; i < numMessages; i++ {
 40			wg.Add(1)
 41			idx := i
 42			go func() {
 43				defer wg.Done()
 44				msg := fmt.Sprintf("msg%d\n", idx)
 45				_ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
 46			}()
 47		}
 48
 49		wg.Wait()
 50
 51		// Verify at least some messages were received
 52		content := buf.String()
 53		if len(content) == 0 {
 54			t.Error("Multicast: no messages received")
 55		}
 56	})
 57}
 58
 59// TestDispatcherClientDirection verifies that both dispatchers respect client direction.
 60// Publishers should not receive messages they publish.
 61func TestDispatcherClientDirection(t *testing.T) {
 62	name := "direction-test"
 63
 64	t.Run("Multicast", func(t *testing.T) {
 65		cast := NewMulticast(slog.Default())
 66		pubBuf := new(Buffer)
 67		subBuf := new(Buffer)
 68		channel := NewChannel(name)
 69
 70		var wg sync.WaitGroup
 71
 72		// Publisher (input only)
 73		wg.Add(1)
 74		go func() {
 75			defer wg.Done()
 76			_ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString("test")}, []*Channel{channel}, false)
 77		}()
 78
 79		// Subscriber (output only)
 80		wg.Add(1)
 81		go func() {
 82			defer wg.Done()
 83			_ = cast.Sub(context.TODO(), "sub", subBuf, []*Channel{channel}, false)
 84		}()
 85
 86		wg.Wait()
 87
 88		// Publisher should not receive the message
 89		if pubBuf.String() != "" {
 90			t.Errorf("Publisher received message: %q", pubBuf.String())
 91		}
 92
 93		// Subscriber should receive it
 94		if subBuf.String() != "test" {
 95			t.Errorf("Subscriber should have received message, got: %q", subBuf.String())
 96		}
 97	})
 98}
 99
100// TestChannelConcurrentPublishes verifies that concurrent publishes don't cause races or data loss.
101// func TestChannelConcurrentPublishes(t *testing.T) {
102// 	name := "concurrent-test"
103// 	numPublishers := 10
104// 	msgsPerPublisher := 5
105// 	numSubscribers := 3
106
107// 	t.Run("Multicast", func(t *testing.T) {
108// 		cast := NewMulticast(slog.Default())
109// 		buffers := make([]*Buffer, numSubscribers)
110// 		for i := range buffers {
111// 			buffers[i] = new(Buffer)
112// 		}
113// 		channel := NewChannel(name)
114
115// 		var wg sync.WaitGroup
116
117// 		// Subscribe
118// 		for i := range buffers {
119// 			wg.Add(1)
120// 			idx := i
121// 			go func() {
122// 				defer wg.Done()
123// 				_ = cast.Sub(context.TODO(), fmt.Sprintf("sub-%d", idx), buffers[idx], []*Channel{channel}, false)
124// 			}()
125// 		}
126// 		time.Sleep(100 * time.Millisecond)
127
128// 		// Concurrent publishers
129// 		pubCount := int32(0)
130// 		for p := 0; p < numPublishers; p++ {
131// 			pubID := p
132// 			for m := 0; m < msgsPerPublisher; m++ {
133// 				wg.Add(1)
134// 				msgNum := m
135// 				go func() {
136// 					defer wg.Done()
137// 					msg := fmt.Sprintf("pub%d-msg%d\n", pubID, msgNum)
138// 					_ = cast.Pub(context.TODO(), fmt.Sprintf("pub-%d", pubID), &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
139// 					atomic.AddInt32(&pubCount, 1)
140// 				}()
141// 			}
142// 		}
143
144// 		wg.Wait()
145
146// 		// Verify all messages delivered to all subscribers
147// 		totalExpectedMessages := numPublishers * msgsPerPublisher
148// 		for i, buf := range buffers {
149// 			messageCount := bytes.Count([]byte(buf.String()), []byte("\n"))
150// 			if messageCount != totalExpectedMessages {
151// 				t.Errorf("Subscriber %d: expected %d messages, got %d", i, totalExpectedMessages, messageCount)
152// 			}
153// 		}
154
155// 		// Verify all publishes completed
156// 		if pubCount != int32(totalExpectedMessages) {
157// 			t.Errorf("Expected %d publishes to complete, got %d", totalExpectedMessages, pubCount)
158// 		}
159// 	})
160// }
161
162// TestDispatcherEmptySubscribers verifies that dispatchers handle empty subscriber set without panic.
163func TestDispatcherEmptySubscribers(t *testing.T) {
164	name := "empty-subs-test"
165
166	t.Run("Multicast", func(t *testing.T) {
167		cast := NewMulticast(slog.Default())
168		channel := NewChannel(name)
169
170		var wg sync.WaitGroup
171
172		// Publish with no subscribers (should not panic)
173		wg.Add(1)
174		go func() {
175			defer wg.Done()
176			defer func() {
177				if r := recover(); r != nil {
178					t.Errorf("Multicast panicked with no subscribers: %v", r)
179				}
180			}()
181			_ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString("test")}, []*Channel{channel}, false)
182		}()
183
184		wg.Wait()
185		t.Log("Multicast handled empty subscribers correctly")
186	})
187}
188
189// TestDispatcherSingleSubscriber verifies that both dispatchers work correctly with one subscriber.
190func TestDispatcherSingleSubscriber(t *testing.T) {
191	name := "single-sub-test"
192	message := "single-sub-message"
193
194	t.Run("Multicast", func(t *testing.T) {
195		cast := NewMulticast(slog.Default())
196		buf := new(Buffer)
197		channel := NewChannel(name)
198
199		var wg sync.WaitGroup
200
201		// Subscribe
202		wg.Add(1)
203		go func() {
204			defer wg.Done()
205			_ = cast.Sub(context.TODO(), "sub", buf, []*Channel{channel}, false)
206		}()
207
208		time.Sleep(100 * time.Millisecond)
209
210		// Publish
211		wg.Add(1)
212		go func() {
213			defer wg.Done()
214			_ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString(message)}, []*Channel{channel}, false)
215		}()
216
217		wg.Wait()
218
219		if buf.String() != message {
220			t.Errorf("Multicast with single subscriber: expected %q, got %q", message, buf.String())
221		}
222	})
223}