Eric Bower
·
2026-02-03
multicast_test.go
1package pubsub
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log/slog"
8 "sync"
9 "testing"
10)
11
12type Buffer struct {
13 b bytes.Buffer
14 m sync.Mutex
15}
16
17func (b *Buffer) Read(p []byte) (n int, err error) {
18 b.m.Lock()
19 defer b.m.Unlock()
20 return b.b.Read(p)
21}
22func (b *Buffer) Write(p []byte) (n int, err error) {
23 b.m.Lock()
24 defer b.m.Unlock()
25 return b.b.Write(p)
26}
27func (b *Buffer) String() string {
28 b.m.Lock()
29 defer b.m.Unlock()
30 return b.b.String()
31}
32
33func TestMulticastSubBlock(t *testing.T) {
34 orderActual := ""
35 orderExpected := "sub-pub-"
36 actual := new(Buffer)
37 expected := "some test data"
38 name := "test-channel"
39 syncer := make(chan int)
40
41 cast := NewMulticast(slog.Default())
42
43 var wg sync.WaitGroup
44 wg.Add(2)
45
46 channel := NewChannel(name)
47
48 go func() {
49 orderActual += "sub-"
50 syncer <- 0
51 fmt.Println(cast.Sub(context.TODO(), "1", actual, []*Channel{channel}, false))
52 wg.Done()
53 }()
54
55 <-syncer
56
57 go func() {
58 orderActual += "pub-"
59 fmt.Println(cast.Pub(context.TODO(), "2", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
60 wg.Done()
61 }()
62
63 wg.Wait()
64
65 if orderActual != orderExpected {
66 t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
67 }
68 if actual.String() != expected {
69 t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual.String(), expected)
70 }
71}
72
73func TestMulticastPubBlock(t *testing.T) {
74 orderActual := ""
75 orderExpected := "pub-sub-"
76 actual := new(Buffer)
77 expected := "some test data"
78 name := "test-channel"
79 syncer := make(chan int)
80
81 cast := NewMulticast(slog.Default())
82
83 var wg sync.WaitGroup
84 wg.Add(2)
85
86 channel := NewChannel(name)
87
88 go func() {
89 orderActual += "pub-"
90 syncer <- 0
91 fmt.Println(cast.Pub(context.TODO(), "1", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
92 wg.Done()
93 }()
94
95 <-syncer
96
97 go func() {
98 orderActual += "sub-"
99 fmt.Println(cast.Sub(context.TODO(), "2", actual, []*Channel{channel}, false))
100 wg.Done()
101 }()
102
103 wg.Wait()
104
105 if orderActual != orderExpected {
106 t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
107 }
108 if actual.String() != expected {
109 t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual.String(), expected)
110 }
111}
112
113func TestMulticastMultSubs(t *testing.T) {
114 orderActual := ""
115 orderExpected := "sub-sub-pub-"
116 actual := new(Buffer)
117 actualOther := new(Buffer)
118 expected := "some test data"
119 name := "test-channel"
120 syncer := make(chan int)
121
122 cast := NewMulticast(slog.Default())
123
124 var wg sync.WaitGroup
125 wg.Add(3)
126
127 channel := NewChannel(name)
128
129 go func() {
130 orderActual += "sub-"
131 syncer <- 0
132 fmt.Println(cast.Sub(context.TODO(), "1", actual, []*Channel{channel}, false))
133 wg.Done()
134 }()
135
136 <-syncer
137
138 go func() {
139 orderActual += "sub-"
140 syncer <- 0
141 fmt.Println(cast.Sub(context.TODO(), "2", actualOther, []*Channel{channel}, false))
142 wg.Done()
143 }()
144
145 <-syncer
146
147 go func() {
148 orderActual += "pub-"
149 fmt.Println(cast.Pub(context.TODO(), "3", &Buffer{b: *bytes.NewBufferString(expected)}, []*Channel{channel}, true))
150 wg.Done()
151 }()
152
153 wg.Wait()
154
155 if orderActual != orderExpected {
156 t.Fatalf("\norderActual:(%s)\norderExpected:(%s)", orderActual, orderExpected)
157 }
158 if actual.String() != expected {
159 t.Fatalf("\nactual:(%s)\nexpected:(%s)", actual.String(), expected)
160 }
161 if actualOther.String() != expected {
162 t.Fatalf("\nactual:(%s)\nexpected:(%s)", actualOther.String(), expected)
163 }
164}