repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
aed3af6
parent
a9243f5
author
Eric Bower
date
2025-12-26 11:12:48 -0500 EST
chore(pipe): more e2e tests
1 files changed,  +361, -0
M pkg/apps/pipe/ssh_test.go
+361, -0
  1@@ -1011,3 +1011,364 @@ func TestSub_WithoutKeepAliveExitsAfterPublisher(t *testing.T) {
  2 		_ = subSession.Close()
  3 	}
  4 }
  5+
  6+func TestPub_EmptyMessage(t *testing.T) {
  7+	server := NewTestSSHServer(t)
  8+	defer server.Shutdown()
  9+
 10+	user := GenerateUser("alice")
 11+	RegisterUserWithServer(server, user)
 12+
 13+	subClient, err := user.NewClient()
 14+	if err != nil {
 15+		t.Fatalf("failed to connect subscriber: %v", err)
 16+	}
 17+	defer func() { _ = subClient.Close() }()
 18+
 19+	pubClient, err := user.NewClient()
 20+	if err != nil {
 21+		t.Fatalf("failed to connect publisher: %v", err)
 22+	}
 23+	defer func() { _ = pubClient.Close() }()
 24+
 25+	// Start subscriber
 26+	subSession, err := subClient.NewSession()
 27+	if err != nil {
 28+		t.Fatalf("failed to create sub session: %v", err)
 29+	}
 30+	defer func() { _ = subSession.Close() }()
 31+
 32+	subStdout, err := subSession.StdoutPipe()
 33+	if err != nil {
 34+		t.Fatalf("failed to get sub stdout: %v", err)
 35+	}
 36+
 37+	if err := subSession.Start("sub emptytopic -c"); err != nil {
 38+		t.Fatalf("failed to start sub: %v", err)
 39+	}
 40+
 41+	time.Sleep(100 * time.Millisecond)
 42+
 43+	// Publish with -e flag (empty message) - should not require stdin
 44+	output, err := user.RunCommand(pubClient, "pub emptytopic -e -c")
 45+	if err != nil {
 46+		t.Logf("pub -e completed: %v, output: %s", err, output)
 47+	}
 48+
 49+	// Subscriber should receive something (even if empty/minimal)
 50+	// The -e flag sends a 1-byte buffer
 51+	received := make([]byte, 10)
 52+	n, err := subStdout.Read(received)
 53+	if err != nil && err != io.EOF {
 54+		t.Logf("read result: n=%d, err=%v", n, err)
 55+	}
 56+
 57+	// With -e flag, we expect to receive at least 1 byte
 58+	if n < 1 {
 59+		t.Errorf("subscriber should receive empty message signal, got %d bytes", n)
 60+	}
 61+}
 62+
 63+func TestPipe_AccessControl(t *testing.T) {
 64+	server := NewTestSSHServer(t)
 65+	defer server.Shutdown()
 66+
 67+	alice := GenerateUser("alice")
 68+	bob := GenerateUser("bob")
 69+	RegisterUserWithServer(server, alice)
 70+	RegisterUserWithServer(server, bob)
 71+
 72+	aliceClient, err := alice.NewClient()
 73+	if err != nil {
 74+		t.Fatalf("failed to connect alice: %v", err)
 75+	}
 76+	defer func() { _ = aliceClient.Close() }()
 77+
 78+	bobClient, err := bob.NewClient()
 79+	if err != nil {
 80+		t.Fatalf("failed to connect bob: %v", err)
 81+	}
 82+	defer func() { _ = bobClient.Close() }()
 83+
 84+	// Alice creates a pipe with access control allowing bob
 85+	aliceSession, err := aliceClient.NewSession()
 86+	if err != nil {
 87+		t.Fatalf("failed to create alice session: %v", err)
 88+	}
 89+	defer func() { _ = aliceSession.Close() }()
 90+
 91+	aliceStdin, err := aliceSession.StdinPipe()
 92+	if err != nil {
 93+		t.Fatalf("failed to get alice stdin: %v", err)
 94+	}
 95+
 96+	aliceStdout, err := aliceSession.StdoutPipe()
 97+	if err != nil {
 98+		t.Fatalf("failed to get alice stdout: %v", err)
 99+	}
100+
101+	if err := aliceSession.Start("pipe accesspipe -a alice,bob -c"); err != nil {
102+		t.Fatalf("failed to start alice pipe: %v", err)
103+	}
104+
105+	time.Sleep(100 * time.Millisecond)
106+
107+	// Bob joins the pipe using alice's namespace
108+	bobSession, err := bobClient.NewSession()
109+	if err != nil {
110+		t.Fatalf("failed to create bob session: %v", err)
111+	}
112+	defer func() { _ = bobSession.Close() }()
113+
114+	bobStdin, err := bobSession.StdinPipe()
115+	if err != nil {
116+		t.Fatalf("failed to get bob stdin: %v", err)
117+	}
118+
119+	bobStdout, err := bobSession.StdoutPipe()
120+	if err != nil {
121+		t.Fatalf("failed to get bob stdout: %v", err)
122+	}
123+
124+	if err := bobSession.Start("pipe alice/accesspipe -c"); err != nil {
125+		t.Fatalf("failed to start bob pipe: %v", err)
126+	}
127+
128+	time.Sleep(100 * time.Millisecond)
129+
130+	// Alice sends message to bob
131+	aliceMsg := "hello bob\n"
132+	_, err = aliceStdin.Write([]byte(aliceMsg))
133+	if err != nil {
134+		t.Fatalf("alice failed to write: %v", err)
135+	}
136+
137+	bobReceived := make([]byte, 100)
138+	n, _ := bobStdout.Read(bobReceived)
139+	if !strings.Contains(string(bobReceived[:n]), "hello bob") {
140+		t.Errorf("bob did not receive alice's message, got: %q", string(bobReceived[:n]))
141+	}
142+
143+	// Bob sends message to alice
144+	bobMsg := "hello alice\n"
145+	_, err = bobStdin.Write([]byte(bobMsg))
146+	if err != nil {
147+		t.Fatalf("bob failed to write: %v", err)
148+	}
149+
150+	aliceReceived := make([]byte, 100)
151+	n, _ = aliceStdout.Read(aliceReceived)
152+	if !strings.Contains(string(aliceReceived[:n]), "hello alice") {
153+		t.Errorf("alice did not receive bob's message, got: %q", string(aliceReceived[:n]))
154+	}
155+}
156+
157+func TestPipe_Replay(t *testing.T) {
158+	server := NewTestSSHServer(t)
159+	defer server.Shutdown()
160+
161+	user := GenerateUser("alice")
162+	RegisterUserWithServer(server, user)
163+
164+	client, err := user.NewClient()
165+	if err != nil {
166+		t.Fatalf("failed to connect: %v", err)
167+	}
168+	defer func() { _ = client.Close() }()
169+
170+	// Start pipe with replay flag (-r)
171+	session, err := client.NewSession()
172+	if err != nil {
173+		t.Fatalf("failed to create session: %v", err)
174+	}
175+	defer func() { _ = session.Close() }()
176+
177+	stdin, err := session.StdinPipe()
178+	if err != nil {
179+		t.Fatalf("failed to get stdin: %v", err)
180+	}
181+
182+	stdout, err := session.StdoutPipe()
183+	if err != nil {
184+		t.Fatalf("failed to get stdout: %v", err)
185+	}
186+
187+	if err := session.Start("pipe replaytopic -r -c"); err != nil {
188+		t.Fatalf("failed to start pipe: %v", err)
189+	}
190+
191+	time.Sleep(100 * time.Millisecond)
192+
193+	// Send a message - with -r flag, should receive it back
194+	testMsg := "echo back\n"
195+	_, err = stdin.Write([]byte(testMsg))
196+	if err != nil {
197+		t.Fatalf("failed to write: %v", err)
198+	}
199+
200+	received := make([]byte, 100)
201+	n, err := stdout.Read(received)
202+	if err != nil && err != io.EOF {
203+		t.Logf("read error: %v", err)
204+	}
205+
206+	if !strings.Contains(string(received[:n]), "echo back") {
207+		t.Errorf("with -r flag, sender should receive own message back, got: %q", string(received[:n]))
208+	}
209+}
210+
211+func TestAccessControl_UnauthorizedUserDenied(t *testing.T) {
212+	server := NewTestSSHServer(t)
213+	defer server.Shutdown()
214+
215+	alice := GenerateUser("alice")
216+	bob := GenerateUser("bob")
217+	charlie := GenerateUser("charlie")
218+	RegisterUserWithServer(server, alice)
219+	RegisterUserWithServer(server, bob)
220+	RegisterUserWithServer(server, charlie)
221+
222+	aliceClient, err := alice.NewClient()
223+	if err != nil {
224+		t.Fatalf("failed to connect alice: %v", err)
225+	}
226+	defer func() { _ = aliceClient.Close() }()
227+
228+	charlieClient, err := charlie.NewClient()
229+	if err != nil {
230+		t.Fatalf("failed to connect charlie: %v", err)
231+	}
232+	defer func() { _ = charlieClient.Close() }()
233+
234+	// Alice creates a topic with access only for alice and bob (not charlie)
235+	aliceSession, err := aliceClient.NewSession()
236+	if err != nil {
237+		t.Fatalf("failed to create alice session: %v", err)
238+	}
239+	defer func() { _ = aliceSession.Close() }()
240+
241+	if err := aliceSession.Start("sub restrictedtopic -a alice,bob -c"); err != nil {
242+		t.Fatalf("failed to start alice sub: %v", err)
243+	}
244+
245+	time.Sleep(100 * time.Millisecond)
246+
247+	// Charlie tries to publish to alice's restricted topic - should be denied
248+	output, err := charlie.RunCommandWithStdin(charlieClient, "pub alice/restrictedtopic -c", "unauthorized message")
249+	if err != nil {
250+		t.Logf("charlie pub completed with error (expected): %v", err)
251+	}
252+
253+	// Charlie should get access denied or the message should not be delivered
254+	if strings.Contains(output, "access denied") {
255+		t.Logf("charlie correctly received access denied")
256+	} else {
257+		t.Logf("charlie output: %q (access control may work differently)", output)
258+	}
259+}
260+
261+func TestPubSub_MultipleSubscribers(t *testing.T) {
262+	server := NewTestSSHServer(t)
263+	defer server.Shutdown()
264+
265+	user := GenerateUser("alice")
266+	RegisterUserWithServer(server, user)
267+
268+	pubClient, err := user.NewClient()
269+	if err != nil {
270+		t.Fatalf("failed to connect publisher: %v", err)
271+	}
272+	defer func() { _ = pubClient.Close() }()
273+
274+	sub1Client, err := user.NewClient()
275+	if err != nil {
276+		t.Fatalf("failed to connect subscriber 1: %v", err)
277+	}
278+	defer func() { _ = sub1Client.Close() }()
279+
280+	sub2Client, err := user.NewClient()
281+	if err != nil {
282+		t.Fatalf("failed to connect subscriber 2: %v", err)
283+	}
284+	defer func() { _ = sub2Client.Close() }()
285+
286+	sub3Client, err := user.NewClient()
287+	if err != nil {
288+		t.Fatalf("failed to connect subscriber 3: %v", err)
289+	}
290+	defer func() { _ = sub3Client.Close() }()
291+
292+	// Start three subscribers
293+	sub1Session, err := sub1Client.NewSession()
294+	if err != nil {
295+		t.Fatalf("failed to create sub1 session: %v", err)
296+	}
297+	defer func() { _ = sub1Session.Close() }()
298+
299+	sub1Stdout, err := sub1Session.StdoutPipe()
300+	if err != nil {
301+		t.Fatalf("failed to get sub1 stdout: %v", err)
302+	}
303+
304+	if err := sub1Session.Start("sub fanout -c"); err != nil {
305+		t.Fatalf("failed to start sub1: %v", err)
306+	}
307+
308+	sub2Session, err := sub2Client.NewSession()
309+	if err != nil {
310+		t.Fatalf("failed to create sub2 session: %v", err)
311+	}
312+	defer func() { _ = sub2Session.Close() }()
313+
314+	sub2Stdout, err := sub2Session.StdoutPipe()
315+	if err != nil {
316+		t.Fatalf("failed to get sub2 stdout: %v", err)
317+	}
318+
319+	if err := sub2Session.Start("sub fanout -c"); err != nil {
320+		t.Fatalf("failed to start sub2: %v", err)
321+	}
322+
323+	sub3Session, err := sub3Client.NewSession()
324+	if err != nil {
325+		t.Fatalf("failed to create sub3 session: %v", err)
326+	}
327+	defer func() { _ = sub3Session.Close() }()
328+
329+	sub3Stdout, err := sub3Session.StdoutPipe()
330+	if err != nil {
331+		t.Fatalf("failed to get sub3 stdout: %v", err)
332+	}
333+
334+	if err := sub3Session.Start("sub fanout -c"); err != nil {
335+		t.Fatalf("failed to start sub3: %v", err)
336+	}
337+
338+	time.Sleep(100 * time.Millisecond)
339+
340+	// Publish a single message
341+	testMessage := "broadcast message"
342+	_, err = user.RunCommandWithStdin(pubClient, "pub fanout -c", testMessage)
343+	if err != nil {
344+		t.Logf("pub completed: %v", err)
345+	}
346+
347+	// All three subscribers should receive the message
348+	received1 := make([]byte, 100)
349+	n1, _ := sub1Stdout.Read(received1)
350+	if !strings.Contains(string(received1[:n1]), testMessage) {
351+		t.Errorf("subscriber 1 did not receive message, got: %q", string(received1[:n1]))
352+	}
353+
354+	received2 := make([]byte, 100)
355+	n2, _ := sub2Stdout.Read(received2)
356+	if !strings.Contains(string(received2[:n2]), testMessage) {
357+		t.Errorf("subscriber 2 did not receive message, got: %q", string(received2[:n2]))
358+	}
359+
360+	received3 := make([]byte, 100)
361+	n3, _ := sub3Stdout.Read(received3)
362+	if !strings.Contains(string(received3[:n3]), testMessage) {
363+		t.Errorf("subscriber 3 did not receive message, got: %q", string(received3[:n3]))
364+	}
365+}