- commit
- 85e157e
- parent
- aed3af6
- author
- Eric Bower
- date
- 2025-12-26 11:21:06 -0500 EST
fix(pipe): tests timing issue
1 files changed,
+30,
-9
+30,
-9
1@@ -701,13 +701,34 @@ func TestPubSub_BlockingWaitsForSubscriber(t *testing.T) {
2 t.Fatalf("failed to start pub: %v", err)
3 }
4
5- // Read initial output - should indicate waiting for subscribers
6- initialOutput := make([]byte, 500)
7- n, err := pubStdout.Read(initialOutput)
8- if err != nil && err != io.EOF {
9- t.Fatalf("failed to read initial output: %v", err)
10+ // Read output until we see "waiting" message or timeout
11+ // Need to read in a loop because Read() may return partial data
12+ var output string
13+ readDone := make(chan struct{})
14+ go func() {
15+ buf := make([]byte, 1024)
16+ for {
17+ n, err := pubStdout.Read(buf)
18+ if n > 0 {
19+ output += string(buf[:n])
20+ if strings.Contains(output, "waiting") {
21+ close(readDone)
22+ return
23+ }
24+ }
25+ if err != nil {
26+ close(readDone)
27+ return
28+ }
29+ }
30+ }()
31+
32+ select {
33+ case <-readDone:
34+ case <-time.After(2 * time.Second):
35+ t.Fatalf("timeout waiting for 'waiting' message, got: %q", output)
36 }
37- output := string(initialOutput[:n])
38+
39 if !strings.Contains(output, "waiting") {
40 t.Errorf("expected 'waiting' message for blocking pub, got: %q", output)
41 }
42@@ -740,13 +761,13 @@ func TestPubSub_BlockingWaitsForSubscriber(t *testing.T) {
43
44 // Subscriber should receive the message
45 received := make([]byte, 100)
46- n, err = subStdout.Read(received)
47+ nRead, err := subStdout.Read(received)
48 if err != nil && err != io.EOF {
49 t.Logf("read error: %v", err)
50 }
51
52- if !strings.Contains(string(received[:n]), testMessage) {
53- t.Errorf("subscriber did not receive blocking message, got: %q, want: %q", string(received[:n]), testMessage)
54+ if !strings.Contains(string(received[:nRead]), testMessage) {
55+ t.Errorf("subscriber did not receive blocking message, got: %q, want: %q", string(received[:nRead]), testMessage)
56 }
57 }
58