repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
a9243f5
parent
f60c265
author
Eric Bower
date
2025-12-26 11:09:38 -0500 EST
chore(pipe): more e2e tests for blocking and non-blocking behavior
1 files changed,  +352, -0
M pkg/apps/pipe/ssh_test.go
+352, -0
  1@@ -659,3 +659,355 @@ func TestAccessControl_AllowedUserViaFullPath(t *testing.T) {
  2 		t.Errorf("alice should receive bob's message on shared topic, got: %q", string(aliceReceived[:n]))
  3 	}
  4 }
  5+
  6+func TestPubSub_BlockingWaitsForSubscriber(t *testing.T) {
  7+	server := NewTestSSHServer(t)
  8+	defer server.Shutdown()
  9+
 10+	user := GenerateUser("alice")
 11+	RegisterUserWithServer(server, user)
 12+
 13+	pubClient, err := user.NewClient()
 14+	if err != nil {
 15+		t.Fatalf("failed to connect publisher: %v", err)
 16+	}
 17+	defer func() { _ = pubClient.Close() }()
 18+
 19+	subClient, err := user.NewClient()
 20+	if err != nil {
 21+		t.Fatalf("failed to connect subscriber: %v", err)
 22+	}
 23+	defer func() { _ = subClient.Close() }()
 24+
 25+	pubSession, err := pubClient.NewSession()
 26+	if err != nil {
 27+		t.Fatalf("failed to create pub session: %v", err)
 28+	}
 29+	defer func() { _ = pubSession.Close() }()
 30+
 31+	pubStdin, err := pubSession.StdinPipe()
 32+	if err != nil {
 33+		t.Fatalf("failed to get pub stdin: %v", err)
 34+	}
 35+
 36+	pubStdout, err := pubSession.StdoutPipe()
 37+	if err != nil {
 38+		t.Fatalf("failed to get pub stdout: %v", err)
 39+	}
 40+
 41+	// Start publisher with blocking enabled (default -b=true)
 42+	// Publisher should wait for subscriber
 43+	if err := pubSession.Start("pub blockingtopic"); err != nil {
 44+		t.Fatalf("failed to start pub: %v", err)
 45+	}
 46+
 47+	// Read initial output - should indicate waiting for subscribers
 48+	initialOutput := make([]byte, 500)
 49+	n, err := pubStdout.Read(initialOutput)
 50+	if err != nil && err != io.EOF {
 51+		t.Fatalf("failed to read initial output: %v", err)
 52+	}
 53+	output := string(initialOutput[:n])
 54+	if !strings.Contains(output, "waiting") {
 55+		t.Errorf("expected 'waiting' message for blocking pub, got: %q", output)
 56+	}
 57+
 58+	// Now start subscriber - this should unblock the publisher
 59+	subSession, err := subClient.NewSession()
 60+	if err != nil {
 61+		t.Fatalf("failed to create sub session: %v", err)
 62+	}
 63+	defer func() { _ = subSession.Close() }()
 64+
 65+	subStdout, err := subSession.StdoutPipe()
 66+	if err != nil {
 67+		t.Fatalf("failed to get sub stdout: %v", err)
 68+	}
 69+
 70+	if err := subSession.Start("sub blockingtopic -c"); err != nil {
 71+		t.Fatalf("failed to start sub: %v", err)
 72+	}
 73+
 74+	time.Sleep(100 * time.Millisecond)
 75+
 76+	// Now send the message
 77+	testMessage := "blocking message"
 78+	_, err = pubStdin.Write([]byte(testMessage))
 79+	if err != nil {
 80+		t.Fatalf("failed to write message: %v", err)
 81+	}
 82+	_ = pubStdin.Close()
 83+
 84+	// Subscriber should receive the message
 85+	received := make([]byte, 100)
 86+	n, err = subStdout.Read(received)
 87+	if err != nil && err != io.EOF {
 88+		t.Logf("read error: %v", err)
 89+	}
 90+
 91+	if !strings.Contains(string(received[:n]), testMessage) {
 92+		t.Errorf("subscriber did not receive blocking message, got: %q, want: %q", string(received[:n]), testMessage)
 93+	}
 94+}
 95+
 96+func TestPubSub_NonBlockingDoesNotWait(t *testing.T) {
 97+	server := NewTestSSHServer(t)
 98+	defer server.Shutdown()
 99+
100+	user := GenerateUser("alice")
101+	RegisterUserWithServer(server, user)
102+
103+	client, err := user.NewClient()
104+	if err != nil {
105+		t.Fatalf("failed to connect: %v", err)
106+	}
107+	defer func() { _ = client.Close() }()
108+
109+	// Publish with -b=false (non-blocking) and no subscriber
110+	// Should complete immediately without waiting
111+	done := make(chan struct{})
112+	var output string
113+	var cmdErr error
114+
115+	go func() {
116+		output, cmdErr = user.RunCommandWithStdin(client, "pub nonblockingtopic -b=false -c", "non-blocking message")
117+		close(done)
118+	}()
119+
120+	select {
121+	case <-done:
122+		// Command completed - this is expected for non-blocking
123+		if cmdErr != nil {
124+			t.Logf("non-blocking pub completed with: %v", cmdErr)
125+		}
126+		t.Logf("non-blocking pub output: %q", output)
127+	case <-time.After(2 * time.Second):
128+		t.Errorf("non-blocking pub should complete immediately, but it blocked")
129+	}
130+}
131+
132+func TestPubSub_BlockingTimeout(t *testing.T) {
133+	server := NewTestSSHServer(t)
134+	defer server.Shutdown()
135+
136+	user := GenerateUser("alice")
137+	RegisterUserWithServer(server, user)
138+
139+	client, err := user.NewClient()
140+	if err != nil {
141+		t.Fatalf("failed to connect: %v", err)
142+	}
143+	defer func() { _ = client.Close() }()
144+
145+	// Publish with blocking and short timeout, no subscriber
146+	// Should timeout after the specified duration
147+	done := make(chan struct{})
148+	var output string
149+
150+	go func() {
151+		output, _ = user.RunCommandWithStdin(client, "pub timeouttopic -b=true -t=500ms", "timeout message")
152+		close(done)
153+	}()
154+
155+	select {
156+	case <-done:
157+		// Command completed due to timeout
158+		if !strings.Contains(output, "timeout") && !strings.Contains(output, "waiting") {
159+			t.Logf("blocking pub with timeout output: %q", output)
160+		}
161+	case <-time.After(3 * time.Second):
162+		t.Errorf("blocking pub with timeout should have timed out after 500ms")
163+	}
164+}
165+
166+func TestSub_WaitsForPublisher(t *testing.T) {
167+	server := NewTestSSHServer(t)
168+	defer server.Shutdown()
169+
170+	user := GenerateUser("alice")
171+	RegisterUserWithServer(server, user)
172+
173+	subClient, err := user.NewClient()
174+	if err != nil {
175+		t.Fatalf("failed to connect subscriber: %v", err)
176+	}
177+	defer func() { _ = subClient.Close() }()
178+
179+	pubClient, err := user.NewClient()
180+	if err != nil {
181+		t.Fatalf("failed to connect publisher: %v", err)
182+	}
183+	defer func() { _ = pubClient.Close() }()
184+
185+	// Start subscriber first - it should wait for publisher
186+	subSession, err := subClient.NewSession()
187+	if err != nil {
188+		t.Fatalf("failed to create sub session: %v", err)
189+	}
190+	defer func() { _ = subSession.Close() }()
191+
192+	subStdout, err := subSession.StdoutPipe()
193+	if err != nil {
194+		t.Fatalf("failed to get sub stdout: %v", err)
195+	}
196+
197+	if err := subSession.Start("sub waitfortopic -c"); err != nil {
198+		t.Fatalf("failed to start sub: %v", err)
199+	}
200+
201+	// Subscriber is now waiting - give it a moment
202+	time.Sleep(100 * time.Millisecond)
203+
204+	// Now publish - subscriber should receive it
205+	testMessage := "delayed publish"
206+	_, err = user.RunCommandWithStdin(pubClient, "pub waitfortopic -c", testMessage)
207+	if err != nil {
208+		t.Logf("pub completed: %v", err)
209+	}
210+
211+	received := make([]byte, 100)
212+	n, err := subStdout.Read(received)
213+	if err != nil && err != io.EOF {
214+		t.Logf("read error: %v", err)
215+	}
216+
217+	if !strings.Contains(string(received[:n]), testMessage) {
218+		t.Errorf("subscriber waiting for publisher did not receive message, got: %q, want: %q", string(received[:n]), testMessage)
219+	}
220+}
221+
222+func TestSub_KeepAliveReceivesMultipleMessages(t *testing.T) {
223+	server := NewTestSSHServer(t)
224+	defer server.Shutdown()
225+
226+	user := GenerateUser("alice")
227+	RegisterUserWithServer(server, user)
228+
229+	subClient, err := user.NewClient()
230+	if err != nil {
231+		t.Fatalf("failed to connect subscriber: %v", err)
232+	}
233+	defer func() { _ = subClient.Close() }()
234+
235+	pubClient1, err := user.NewClient()
236+	if err != nil {
237+		t.Fatalf("failed to connect publisher 1: %v", err)
238+	}
239+	defer func() { _ = pubClient1.Close() }()
240+
241+	pubClient2, err := user.NewClient()
242+	if err != nil {
243+		t.Fatalf("failed to connect publisher 2: %v", err)
244+	}
245+	defer func() { _ = pubClient2.Close() }()
246+
247+	// Start subscriber with keepAlive (-k) flag
248+	subSession, err := subClient.NewSession()
249+	if err != nil {
250+		t.Fatalf("failed to create sub session: %v", err)
251+	}
252+	defer func() { _ = subSession.Close() }()
253+
254+	subStdout, err := subSession.StdoutPipe()
255+	if err != nil {
256+		t.Fatalf("failed to get sub stdout: %v", err)
257+	}
258+
259+	if err := subSession.Start("sub keepalivetopic -k -c"); err != nil {
260+		t.Fatalf("failed to start sub: %v", err)
261+	}
262+
263+	time.Sleep(100 * time.Millisecond)
264+
265+	// Send first message
266+	msg1 := "first message\n"
267+	_, err = user.RunCommandWithStdin(pubClient1, "pub keepalivetopic -c", msg1)
268+	if err != nil {
269+		t.Logf("pub 1 completed: %v", err)
270+	}
271+
272+	received1 := make([]byte, 100)
273+	n1, _ := subStdout.Read(received1)
274+	if !strings.Contains(string(received1[:n1]), "first message") {
275+		t.Errorf("subscriber did not receive first message, got: %q", string(received1[:n1]))
276+	}
277+
278+	// Send second message - subscriber with keepAlive should still receive it
279+	msg2 := "second message\n"
280+	_, err = user.RunCommandWithStdin(pubClient2, "pub keepalivetopic -c", msg2)
281+	if err != nil {
282+		t.Logf("pub 2 completed: %v", err)
283+	}
284+
285+	received2 := make([]byte, 100)
286+	n2, _ := subStdout.Read(received2)
287+	if !strings.Contains(string(received2[:n2]), "second message") {
288+		t.Errorf("subscriber with keepAlive did not receive second message, got: %q", string(received2[:n2]))
289+	}
290+}
291+
292+func TestSub_WithoutKeepAliveExitsAfterPublisher(t *testing.T) {
293+	server := NewTestSSHServer(t)
294+	defer server.Shutdown()
295+
296+	user := GenerateUser("alice")
297+	RegisterUserWithServer(server, user)
298+
299+	subClient, err := user.NewClient()
300+	if err != nil {
301+		t.Fatalf("failed to connect subscriber: %v", err)
302+	}
303+	defer func() { _ = subClient.Close() }()
304+
305+	pubClient, err := user.NewClient()
306+	if err != nil {
307+		t.Fatalf("failed to connect publisher: %v", err)
308+	}
309+	defer func() { _ = pubClient.Close() }()
310+
311+	// Start subscriber without keepAlive
312+	subSession, err := subClient.NewSession()
313+	if err != nil {
314+		t.Fatalf("failed to create sub session: %v", err)
315+	}
316+
317+	subStdout, err := subSession.StdoutPipe()
318+	if err != nil {
319+		t.Fatalf("failed to get sub stdout: %v", err)
320+	}
321+
322+	if err := subSession.Start("sub exitaftertopic -c"); err != nil {
323+		t.Fatalf("failed to start sub: %v", err)
324+	}
325+
326+	time.Sleep(100 * time.Millisecond)
327+
328+	// Publish a message
329+	testMessage := "single message"
330+	_, err = user.RunCommandWithStdin(pubClient, "pub exitaftertopic -c", testMessage)
331+	if err != nil {
332+		t.Logf("pub completed: %v", err)
333+	}
334+
335+	// Read the message
336+	received := make([]byte, 100)
337+	n, _ := subStdout.Read(received)
338+	if !strings.Contains(string(received[:n]), testMessage) {
339+		t.Errorf("subscriber did not receive message, got: %q", string(received[:n]))
340+	}
341+
342+	// Subscriber session should exit after publisher disconnects
343+	done := make(chan error)
344+	go func() {
345+		done <- subSession.Wait()
346+	}()
347+
348+	select {
349+	case err := <-done:
350+		// Session ended as expected
351+		t.Logf("subscriber session ended: %v", err)
352+	case <-time.After(2 * time.Second):
353+		t.Errorf("subscriber without keepAlive should have exited after publisher disconnected")
354+		_ = subSession.Close()
355+	}
356+}