repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / pubsub
Eric Bower  ·  2026-02-03

regression_test.go

  1package pubsub
  2
  3import (
  4	"bytes"
  5	"context"
  6	"fmt"
  7	"log/slog"
  8	"sync"
  9	"sync/atomic"
 10	"testing"
 11	"time"
 12)
 13
 14// TestChannelMessageOrdering verifies that messages are delivered without panics or corruption.
 15// This applies to both multicast and round-robin dispatchers.
 16func TestChannelMessageOrdering(t *testing.T) {
 17	name := "order-test"
 18	numMessages := 3
 19
 20	// Test with Multicast
 21	t.Run("Multicast", func(t *testing.T) {
 22		cast := NewMulticast(slog.Default())
 23		buf := new(Buffer)
 24		channel := NewChannel(name)
 25
 26		var wg sync.WaitGroup
 27		syncer := make(chan int)
 28
 29		// Subscribe
 30		wg.Add(1)
 31		go func() {
 32			defer wg.Done()
 33			syncer <- 0
 34			_ = cast.Sub(context.TODO(), "sub", buf, []*Channel{channel}, false)
 35		}()
 36
 37		<-syncer
 38
 39		// Publish messages
 40		for i := 0; i < numMessages; i++ {
 41			wg.Add(1)
 42			idx := i
 43			go func() {
 44				defer wg.Done()
 45				msg := fmt.Sprintf("msg%d\n", idx)
 46				_ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
 47			}()
 48		}
 49
 50		wg.Wait()
 51
 52		// Verify at least some messages were received
 53		content := buf.String()
 54		if len(content) == 0 {
 55			t.Error("Multicast: no messages received")
 56		}
 57	})
 58}
 59
 60// TestDispatcherClientDirection verifies that both dispatchers respect client direction.
 61// Publishers should not receive messages they publish.
 62func TestDispatcherClientDirection(t *testing.T) {
 63	name := "direction-test"
 64
 65	t.Run("Multicast", func(t *testing.T) {
 66		cast := NewMulticast(slog.Default())
 67		pubBuf := new(Buffer)
 68		subBuf := new(Buffer)
 69		channel := NewChannel(name)
 70
 71		var wg sync.WaitGroup
 72
 73		// Publisher (input only)
 74		wg.Add(1)
 75		go func() {
 76			defer wg.Done()
 77			_ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString("test")}, []*Channel{channel}, false)
 78		}()
 79
 80		// Subscriber (output only)
 81		wg.Add(1)
 82		go func() {
 83			defer wg.Done()
 84			_ = cast.Sub(context.TODO(), "sub", subBuf, []*Channel{channel}, false)
 85		}()
 86
 87		wg.Wait()
 88
 89		// Publisher should not receive the message
 90		if pubBuf.String() != "" {
 91			t.Errorf("Publisher received message: %q", pubBuf.String())
 92		}
 93
 94		// Subscriber should receive it
 95		if subBuf.String() != "test" {
 96			t.Errorf("Subscriber should have received message, got: %q", subBuf.String())
 97		}
 98	})
 99}
100
101// TestChannelConcurrentPublishes verifies that concurrent publishes don't cause races or data loss.
102func TestChannelConcurrentPublishes(t *testing.T) {
103	name := "concurrent-test"
104	numPublishers := 10
105	msgsPerPublisher := 5
106	numSubscribers := 3
107
108	t.Run("Multicast", func(t *testing.T) {
109		cast := NewMulticast(slog.Default())
110		buffers := make([]*Buffer, numSubscribers)
111		for i := range buffers {
112			buffers[i] = new(Buffer)
113		}
114		channel := NewChannel(name)
115
116		var wg sync.WaitGroup
117
118		// Subscribe
119		for i := range buffers {
120			wg.Add(1)
121			idx := i
122			go func() {
123				defer wg.Done()
124				_ = cast.Sub(context.TODO(), fmt.Sprintf("sub-%d", idx), buffers[idx], []*Channel{channel}, false)
125			}()
126		}
127		time.Sleep(100 * time.Millisecond)
128
129		// Concurrent publishers
130		pubCount := int32(0)
131		for p := 0; p < numPublishers; p++ {
132			pubID := p
133			for m := 0; m < msgsPerPublisher; m++ {
134				wg.Add(1)
135				msgNum := m
136				go func() {
137					defer wg.Done()
138					msg := fmt.Sprintf("pub%d-msg%d\n", pubID, msgNum)
139					_ = cast.Pub(context.TODO(), fmt.Sprintf("pub-%d", pubID), &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
140					atomic.AddInt32(&pubCount, 1)
141				}()
142			}
143		}
144
145		wg.Wait()
146
147		// Verify all messages delivered to all subscribers
148		totalExpectedMessages := numPublishers * msgsPerPublisher
149		for i, buf := range buffers {
150			messageCount := bytes.Count([]byte(buf.String()), []byte("\n"))
151			if messageCount != totalExpectedMessages {
152				t.Errorf("Subscriber %d: expected %d messages, got %d", i, totalExpectedMessages, messageCount)
153			}
154		}
155
156		// Verify all publishes completed
157		if pubCount != int32(totalExpectedMessages) {
158			t.Errorf("Expected %d publishes to complete, got %d", totalExpectedMessages, pubCount)
159		}
160	})
161}
162
163// TestDispatcherEmptySubscribers verifies that dispatchers handle empty subscriber set without panic.
164func TestDispatcherEmptySubscribers(t *testing.T) {
165	name := "empty-subs-test"
166
167	t.Run("Multicast", func(t *testing.T) {
168		cast := NewMulticast(slog.Default())
169		channel := NewChannel(name)
170
171		var wg sync.WaitGroup
172
173		// Publish with no subscribers (should not panic)
174		wg.Add(1)
175		go func() {
176			defer wg.Done()
177			defer func() {
178				if r := recover(); r != nil {
179					t.Errorf("Multicast panicked with no subscribers: %v", r)
180				}
181			}()
182			_ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString("test")}, []*Channel{channel}, false)
183		}()
184
185		wg.Wait()
186		t.Log("Multicast handled empty subscribers correctly")
187	})
188}
189
190// TestDispatcherSingleSubscriber verifies that both dispatchers work correctly with one subscriber.
191func TestDispatcherSingleSubscriber(t *testing.T) {
192	name := "single-sub-test"
193	message := "single-sub-message"
194
195	t.Run("Multicast", func(t *testing.T) {
196		cast := NewMulticast(slog.Default())
197		buf := new(Buffer)
198		channel := NewChannel(name)
199
200		var wg sync.WaitGroup
201
202		// Subscribe
203		wg.Add(1)
204		go func() {
205			defer wg.Done()
206			_ = cast.Sub(context.TODO(), "sub", buf, []*Channel{channel}, false)
207		}()
208
209		time.Sleep(100 * time.Millisecond)
210
211		// Publish
212		wg.Add(1)
213		go func() {
214			defer wg.Done()
215			_ = cast.Pub(context.TODO(), "pub", &Buffer{b: *bytes.NewBufferString(message)}, []*Channel{channel}, false)
216		}()
217
218		wg.Wait()
219
220		if buf.String() != message {
221			t.Errorf("Multicast with single subscriber: expected %q, got %q", message, buf.String())
222		}
223	})
224}