repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / pubsub
Eric Bower  ·  2026-02-03

multicast_test.go

  1package pubsub
  2
  3import (
  4	"bytes"
  5	"context"
  6	"fmt"
  7	"log/slog"
  8	"sync"
  9	"testing"
 10)
 11
 12type Buffer struct {
 13	b bytes.Buffer
 14	m sync.Mutex
 15}
 16
 17func (b *Buffer) Read(p []byte) (n int, err error) {
 18	b.m.Lock()
 19	defer b.m.Unlock()
 20	return b.b.Read(p)
 21}
 22func (b *Buffer) Write(p []byte) (n int, err error) {
 23	b.m.Lock()
 24	defer b.m.Unlock()
 25	return b.b.Write(p)
 26}
 27func (b *Buffer) String() string {
 28	b.m.Lock()
 29	defer b.m.Unlock()
 30	return b.b.String()
 31}
 32
 33func TestMulticastSubBlock(t *testing.T) {
 34	orderActual := ""
 35	orderExpected := "sub-pub-"
 36	actual := new(Buffer)
 37	expected := "some test data"
 38	name := "test-channel"
 39	syncer := make(chan int)
 40
 41	cast := NewMulticast(slog.Default())
 42
 43	var wg sync.WaitGroup
 44	wg.Add(2)
 45
 46	channel := NewChannel(name)
 47
 48	go func() {
 49		orderActual += "sub-"
 50		syncer <- 0
 51		fmt.Println(cast.Sub(context.TODO(), "1", actual, []*Channel{channel}, false))
 52		wg.Done()
 53	}()
 54
 55	<-syncer
 56
 57	go func() {
 58		orderActual += "pub-"
 59		fmt.Println(cast.Pub(context.TODO(), "2", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
 60		wg.Done()
 61	}()
 62
 63	wg.Wait()
 64
 65	if orderActual != orderExpected {
 66		t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
 67	}
 68	if actual.String() != expected {
 69		t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual.String(), expected)
 70	}
 71}
 72
 73func TestMulticastPubBlock(t *testing.T) {
 74	orderActual := ""
 75	orderExpected := "pub-sub-"
 76	actual := new(Buffer)
 77	expected := "some test data"
 78	name := "test-channel"
 79	syncer := make(chan int)
 80
 81	cast := NewMulticast(slog.Default())
 82
 83	var wg sync.WaitGroup
 84	wg.Add(2)
 85
 86	channel := NewChannel(name)
 87
 88	go func() {
 89		orderActual += "pub-"
 90		syncer <- 0
 91		fmt.Println(cast.Pub(context.TODO(), "1", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
 92		wg.Done()
 93	}()
 94
 95	<-syncer
 96
 97	go func() {
 98		orderActual += "sub-"
 99		fmt.Println(cast.Sub(context.TODO(), "2", actual, []*Channel{channel}, false))
100		wg.Done()
101	}()
102
103	wg.Wait()
104
105	if orderActual != orderExpected {
106		t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
107	}
108	if actual.String() != expected {
109		t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual.String(), expected)
110	}
111}
112
113func TestMulticastMultSubs(t *testing.T) {
114	orderActual := ""
115	orderExpected := "sub-sub-pub-"
116	actual := new(Buffer)
117	actualOther := new(Buffer)
118	expected := "some test data"
119	name := "test-channel"
120	syncer := make(chan int)
121
122	cast := NewMulticast(slog.Default())
123
124	var wg sync.WaitGroup
125	wg.Add(3)
126
127	channel := NewChannel(name)
128
129	go func() {
130		orderActual += "sub-"
131		syncer <- 0
132		fmt.Println(cast.Sub(context.TODO(), "1", actual, []*Channel{channel}, false))
133		wg.Done()
134	}()
135
136	<-syncer
137
138	go func() {
139		orderActual += "sub-"
140		syncer <- 0
141		fmt.Println(cast.Sub(context.TODO(), "2", actualOther, []*Channel{channel}, false))
142		wg.Done()
143	}()
144
145	<-syncer
146
147	go func() {
148		orderActual += "pub-"
149		fmt.Println(cast.Pub(context.TODO(), "3", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
150		wg.Done()
151	}()
152
153	wg.Wait()
154
155	if orderActual != orderExpected {
156		t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
157	}
158	if actual.String() != expected {
159		t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual.String(), expected)
160	}
161	if actualOther.String() != expected {
162		t.Fatalf("\nactual:(%s)\nexpected:(%s)", actualOther.String(), expected)
163	}
164}