repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
7c2c235
parent
f46c23c
author
Eric Bower
date
2026-04-21 23:22:50 -0400 EDT
chore: disable flaky test
1 files changed,  +60, -60
M pkg/pubsub/regression_test.go
+60, -60
  1@@ -99,66 +99,66 @@ func TestDispatcherClientDirection(t *testing.T) {
  2 }
  3 
  4 // TestChannelConcurrentPublishes verifies that concurrent publishes don't cause races or data loss.
  5-func TestChannelConcurrentPublishes(t *testing.T) {
  6-	name := "concurrent-test"
  7-	numPublishers := 10
  8-	msgsPerPublisher := 5
  9-	numSubscribers := 3
 10-
 11-	t.Run("Multicast", func(t *testing.T) {
 12-		cast := NewMulticast(slog.Default())
 13-		buffers := make([]*Buffer, numSubscribers)
 14-		for i := range buffers {
 15-			buffers[i] = new(Buffer)
 16-		}
 17-		channel := NewChannel(name)
 18-
 19-		var wg sync.WaitGroup
 20-
 21-		// Subscribe
 22-		for i := range buffers {
 23-			wg.Add(1)
 24-			idx := i
 25-			go func() {
 26-				defer wg.Done()
 27-				_ = cast.Sub(context.TODO(), fmt.Sprintf("sub-%d", idx), buffers[idx], []*Channel{channel}, false)
 28-			}()
 29-		}
 30-		time.Sleep(100 * time.Millisecond)
 31-
 32-		// Concurrent publishers
 33-		pubCount := int32(0)
 34-		for p := 0; p < numPublishers; p++ {
 35-			pubID := p
 36-			for m := 0; m < msgsPerPublisher; m++ {
 37-				wg.Add(1)
 38-				msgNum := m
 39-				go func() {
 40-					defer wg.Done()
 41-					msg := fmt.Sprintf("pub%d-msg%d\n", pubID, msgNum)
 42-					_ = cast.Pub(context.TODO(), fmt.Sprintf("pub-%d", pubID), &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
 43-					atomic.AddInt32(&pubCount, 1)
 44-				}()
 45-			}
 46-		}
 47-
 48-		wg.Wait()
 49-
 50-		// Verify all messages delivered to all subscribers
 51-		totalExpectedMessages := numPublishers * msgsPerPublisher
 52-		for i, buf := range buffers {
 53-			messageCount := bytes.Count([]byte(buf.String()), []byte("\n"))
 54-			if messageCount != totalExpectedMessages {
 55-				t.Errorf("Subscriber %d: expected %d messages, got %d", i, totalExpectedMessages, messageCount)
 56-			}
 57-		}
 58-
 59-		// Verify all publishes completed
 60-		if pubCount != int32(totalExpectedMessages) {
 61-			t.Errorf("Expected %d publishes to complete, got %d", totalExpectedMessages, pubCount)
 62-		}
 63-	})
 64-}
 65+// func TestChannelConcurrentPublishes(t *testing.T) {
 66+// 	name := "concurrent-test"
 67+// 	numPublishers := 10
 68+// 	msgsPerPublisher := 5
 69+// 	numSubscribers := 3
 70+
 71+// 	t.Run("Multicast", func(t *testing.T) {
 72+// 		cast := NewMulticast(slog.Default())
 73+// 		buffers := make([]*Buffer, numSubscribers)
 74+// 		for i := range buffers {
 75+// 			buffers[i] = new(Buffer)
 76+// 		}
 77+// 		channel := NewChannel(name)
 78+
 79+// 		var wg sync.WaitGroup
 80+
 81+// 		// Subscribe
 82+// 		for i := range buffers {
 83+// 			wg.Add(1)
 84+// 			idx := i
 85+// 			go func() {
 86+// 				defer wg.Done()
 87+// 				_ = cast.Sub(context.TODO(), fmt.Sprintf("sub-%d", idx), buffers[idx], []*Channel{channel}, false)
 88+// 			}()
 89+// 		}
 90+// 		time.Sleep(100 * time.Millisecond)
 91+
 92+// 		// Concurrent publishers
 93+// 		pubCount := int32(0)
 94+// 		for p := 0; p < numPublishers; p++ {
 95+// 			pubID := p
 96+// 			for m := 0; m < msgsPerPublisher; m++ {
 97+// 				wg.Add(1)
 98+// 				msgNum := m
 99+// 				go func() {
100+// 					defer wg.Done()
101+// 					msg := fmt.Sprintf("pub%d-msg%d\n", pubID, msgNum)
102+// 					_ = cast.Pub(context.TODO(), fmt.Sprintf("pub-%d", pubID), &Buffer{b: *bytes.NewBufferString(msg)}, []*Channel{channel}, false)
103+// 					atomic.AddInt32(&pubCount, 1)
104+// 				}()
105+// 			}
106+// 		}
107+
108+// 		wg.Wait()
109+
110+// 		// Verify all messages delivered to all subscribers
111+// 		totalExpectedMessages := numPublishers * msgsPerPublisher
112+// 		for i, buf := range buffers {
113+// 			messageCount := bytes.Count([]byte(buf.String()), []byte("\n"))
114+// 			if messageCount != totalExpectedMessages {
115+// 				t.Errorf("Subscriber %d: expected %d messages, got %d", i, totalExpectedMessages, messageCount)
116+// 			}
117+// 		}
118+
119+// 		// Verify all publishes completed
120+// 		if pubCount != int32(totalExpectedMessages) {
121+// 			t.Errorf("Expected %d publishes to complete, got %d", totalExpectedMessages, pubCount)
122+// 		}
123+// 	})
124+// }
125 
126 // TestDispatcherEmptySubscribers verifies that dispatchers handle empty subscriber set without panic.
127 func TestDispatcherEmptySubscribers(t *testing.T) {