Eric Bower
·
2026-01-25
ssh_test.go
1package pipe
2
3import (
4 "context"
5 "crypto/ed25519"
6 "crypto/rand"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "strings"
12 "testing"
13 "time"
14
15 "github.com/antoniomika/syncmap"
16 "github.com/picosh/pico/pkg/db"
17 "github.com/picosh/pico/pkg/db/stub"
18 "github.com/picosh/pico/pkg/pssh"
19 psub "github.com/picosh/pico/pkg/pubsub"
20 "github.com/picosh/pico/pkg/shared"
21 "github.com/prometheus/client_golang/prometheus"
22 "golang.org/x/crypto/ssh"
23)
24
25type TestDB struct {
26 *stub.StubDB
27 Users []*db.User
28 Pubkeys []*db.PublicKey
29 Features []*db.FeatureFlag
30 PipeMonitors []*db.PipeMonitor
31}
32
33func NewTestDB(logger *slog.Logger) *TestDB {
34 return &TestDB{
35 StubDB: stub.NewStubDB(logger),
36 }
37}
38
39func (t *TestDB) FindUserByPubkey(key string) (*db.User, error) {
40 for _, pk := range t.Pubkeys {
41 if pk.Key == key {
42 return t.FindUser(pk.UserID)
43 }
44 }
45 return nil, fmt.Errorf("user not found for pubkey")
46}
47
48func (t *TestDB) FindUser(userID string) (*db.User, error) {
49 for _, user := range t.Users {
50 if user.ID == userID {
51 return user, nil
52 }
53 }
54 return nil, fmt.Errorf("user not found")
55}
56
57func (t *TestDB) FindUserByName(name string) (*db.User, error) {
58 for _, user := range t.Users {
59 if user.Name == name {
60 return user, nil
61 }
62 }
63 return nil, fmt.Errorf("user not found")
64}
65
66func (t *TestDB) FindFeature(userID, name string) (*db.FeatureFlag, error) {
67 for _, ff := range t.Features {
68 if ff.UserID == userID && ff.Name == name {
69 return ff, nil
70 }
71 }
72 return nil, fmt.Errorf("feature not found")
73}
74
75func (t *TestDB) HasFeatureByUser(userID string, feature string) bool {
76 ff, err := t.FindFeature(userID, feature)
77 if err != nil {
78 return false
79 }
80 return ff.IsValid()
81}
82
83func (t *TestDB) InsertAccessLog(_ *db.AccessLog) error {
84 return nil
85}
86
87func (t *TestDB) Close() error {
88 return nil
89}
90
91func (t *TestDB) AddUser(user *db.User) {
92 t.Users = append(t.Users, user)
93}
94
95func (t *TestDB) AddPubkey(pubkey *db.PublicKey) {
96 t.Pubkeys = append(t.Pubkeys, pubkey)
97}
98
99func (t *TestDB) UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error {
100 for _, m := range t.PipeMonitors {
101 if m.UserId == userID && m.Topic == topic {
102 m.WindowDur = dur
103 m.WindowEnd = winEnd
104 now := time.Now()
105 m.UpdatedAt = &now
106 return nil
107 }
108 }
109 now := time.Now()
110 t.PipeMonitors = append(t.PipeMonitors, &db.PipeMonitor{
111 ID: fmt.Sprintf("monitor-%s-%s", userID, topic),
112 UserId: userID,
113 Topic: topic,
114 WindowDur: dur,
115 WindowEnd: winEnd,
116 CreatedAt: &now,
117 UpdatedAt: &now,
118 })
119 return nil
120}
121
122func (t *TestDB) UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error {
123 for _, m := range t.PipeMonitors {
124 if m.UserId == userID && m.Topic == topic {
125 m.LastPing = lastPing
126 now := time.Now()
127 m.UpdatedAt = &now
128 return nil
129 }
130 }
131 return fmt.Errorf("monitor not found")
132}
133
134func (t *TestDB) RemovePipeMonitor(userID, topic string) error {
135 for i, m := range t.PipeMonitors {
136 if m.UserId == userID && m.Topic == topic {
137 t.PipeMonitors = append(t.PipeMonitors[:i], t.PipeMonitors[i+1:]...)
138 return nil
139 }
140 }
141 return fmt.Errorf("monitor not found")
142}
143
144func (t *TestDB) FindPipeMonitorByTopic(userID, topic string) (*db.PipeMonitor, error) {
145 for _, m := range t.PipeMonitors {
146 if m.UserId == userID && m.Topic == topic {
147 return m, nil
148 }
149 }
150 return nil, fmt.Errorf("monitor not found")
151}
152
153func (t *TestDB) FindPipeMonitorsByUser(userID string) ([]*db.PipeMonitor, error) {
154 var monitors []*db.PipeMonitor
155 for _, m := range t.PipeMonitors {
156 if m.UserId == userID {
157 monitors = append(monitors, m)
158 }
159 }
160 return monitors, nil
161}
162
163func (t *TestDB) InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error {
164 return nil
165}
166
167func (t *TestDB) FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*db.PipeMonitorHistory, error) {
168 return nil, nil
169}
170
171type TestSSHServer struct {
172 Cfg *shared.ConfigSite
173 DBPool *TestDB
174 PipeHandler *CliHandler
175 Cancel context.CancelFunc
176}
177
178func NewTestSSHServer(t *testing.T) *TestSSHServer {
179 t.Helper()
180
181 opts := &slog.HandlerOptions{
182 AddSource: true,
183 Level: slog.LevelDebug,
184 }
185 logger := slog.New(slog.NewTextHandler(os.Stdout, opts))
186
187 dbpool := NewTestDB(logger)
188
189 cfg := &shared.ConfigSite{
190 Domain: "pipe.test",
191 Port: "2225",
192 PortOverride: "2225",
193 Protocol: "ssh",
194 Logger: logger,
195 Space: "pipe",
196 }
197
198 ctx, cancel := context.WithCancel(context.Background())
199
200 pubsub := psub.NewMulticast(logger)
201 handler := &CliHandler{
202 Logger: logger,
203 DBPool: dbpool,
204 PubSub: pubsub,
205 Cfg: cfg,
206 Waiters: syncmap.New[string, []string](),
207 Access: syncmap.New[string, []string](),
208 }
209
210 sshAuth := shared.NewSshAuthHandler(dbpool, logger, "pipe")
211
212 prometheus.DefaultRegisterer = prometheus.NewRegistry()
213
214 server, err := pssh.NewSSHServerWithConfig(
215 ctx,
216 logger,
217 "pipe-ssh-test",
218 "localhost",
219 cfg.Port,
220 "9223",
221 "../../ssh_data/term_info_ed25519",
222 func(conn ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) {
223 perms, _ := sshAuth.PubkeyAuthHandler(conn, key)
224 if perms == nil {
225 perms = &ssh.Permissions{
226 Extensions: map[string]string{
227 "pubkey": shared.KeyForKeyText(key),
228 },
229 }
230 }
231 return perms, nil
232 },
233 []pssh.SSHServerMiddleware{
234 Middleware(handler),
235 pssh.LogMiddleware(handler, dbpool),
236 },
237 nil,
238 nil,
239 )
240
241 if err != nil {
242 t.Fatalf("failed to create ssh server: %v", err)
243 }
244
245 go func() {
246 if err := server.ListenAndServe(); err != nil {
247 logger.Error("serve", "err", err.Error())
248 }
249 }()
250
251 time.Sleep(100 * time.Millisecond)
252
253 return &TestSSHServer{
254 Cfg: cfg,
255 DBPool: dbpool,
256 PipeHandler: handler,
257 Cancel: cancel,
258 }
259}
260
261func (s *TestSSHServer) Shutdown() {
262 s.Cancel()
263 time.Sleep(10 * time.Millisecond)
264}
265
266type UserSSH struct {
267 username string
268 signer ssh.Signer
269 privateKey []byte
270}
271
272func GenerateUser(username string) UserSSH {
273 _, userKey, err := ed25519.GenerateKey(rand.Reader)
274 if err != nil {
275 panic(err)
276 }
277
278 b, err := ssh.MarshalPrivateKey(userKey, "")
279 if err != nil {
280 panic(err)
281 }
282
283 userSigner, err := ssh.NewSignerFromKey(userKey)
284 if err != nil {
285 panic(err)
286 }
287
288 return UserSSH{
289 username: username,
290 signer: userSigner,
291 privateKey: b.Bytes,
292 }
293}
294
295func (u UserSSH) PublicKey() string {
296 return shared.KeyForKeyText(u.signer.PublicKey())
297}
298
299func (u UserSSH) NewClient() (*ssh.Client, error) {
300 config := &ssh.ClientConfig{
301 User: u.username,
302 Auth: []ssh.AuthMethod{
303 ssh.PublicKeys(u.signer),
304 },
305 HostKeyCallback: ssh.InsecureIgnoreHostKey(),
306 }
307
308 return ssh.Dial("tcp", "localhost:2225", config)
309}
310
311func (u UserSSH) RunCommand(client *ssh.Client, cmd string) (string, error) {
312 session, err := client.NewSession()
313 if err != nil {
314 return "", err
315 }
316 defer func() { _ = session.Close() }()
317
318 stdoutPipe, err := session.StdoutPipe()
319 if err != nil {
320 return "", err
321 }
322
323 stderrPipe, err := session.StderrPipe()
324 if err != nil {
325 return "", err
326 }
327
328 if err := session.Start(cmd); err != nil {
329 return "", err
330 }
331
332 stdout := new(strings.Builder)
333 stderr := new(strings.Builder)
334 _, _ = io.Copy(stdout, stdoutPipe)
335 _, _ = io.Copy(stderr, stderrPipe)
336
337 _ = session.Wait()
338 return stdout.String() + stderr.String(), nil
339}
340
341func (u UserSSH) RunCommandWithStdin(client *ssh.Client, cmd string, stdin string) (string, error) {
342 session, err := client.NewSession()
343 if err != nil {
344 return "", err
345 }
346 defer func() { _ = session.Close() }()
347
348 stdinPipe, err := session.StdinPipe()
349 if err != nil {
350 return "", err
351 }
352
353 stdoutPipe, err := session.StdoutPipe()
354 if err != nil {
355 return "", err
356 }
357
358 if err := session.Start(cmd); err != nil {
359 return "", err
360 }
361
362 _, err = stdinPipe.Write([]byte(stdin))
363 if err != nil {
364 return "", err
365 }
366 _ = stdinPipe.Close()
367
368 buf := new(strings.Builder)
369 _, err = io.Copy(buf, stdoutPipe)
370 if err != nil {
371 return "", err
372 }
373
374 _ = session.Wait()
375 return buf.String(), nil
376}
377
378func RegisterUserWithServer(server *TestSSHServer, user UserSSH) {
379 dbUser := &db.User{
380 ID: user.username + "-id",
381 Name: user.username,
382 }
383 server.DBPool.AddUser(dbUser)
384 server.DBPool.AddPubkey(&db.PublicKey{
385 ID: user.username + "-pubkey-id",
386 UserID: dbUser.ID,
387 Key: user.PublicKey(),
388 })
389}
390
391func TestLs_UnauthenticatedUserDenied(t *testing.T) {
392 server := NewTestSSHServer(t)
393 defer server.Shutdown()
394
395 user := GenerateUser("anonymous")
396
397 client, err := user.NewClient()
398 if err != nil {
399 t.Fatalf("failed to connect: %v", err)
400 }
401 defer func() { _ = client.Close() }()
402
403 output, err := user.RunCommand(client, "ls")
404 if err != nil {
405 t.Logf("command error (expected): %v", err)
406 }
407
408 if !strings.Contains(output, "access denied") {
409 t.Errorf("expected 'access denied', got: %s", output)
410 }
411}
412
413func TestLs_AuthenticatedUser(t *testing.T) {
414 server := NewTestSSHServer(t)
415 defer server.Shutdown()
416
417 user := GenerateUser("alice")
418 RegisterUserWithServer(server, user)
419
420 client, err := user.NewClient()
421 if err != nil {
422 t.Fatalf("failed to connect: %v", err)
423 }
424 defer func() { _ = client.Close() }()
425
426 output, err := user.RunCommand(client, "ls")
427 if err != nil {
428 t.Logf("command completed with: %v", err)
429 }
430
431 if strings.Contains(output, "access denied") {
432 t.Errorf("authenticated user should not get access denied, got: %s", output)
433 }
434
435 if !strings.Contains(output, "no pubsub channels found") {
436 t.Errorf("expected 'no pubsub channels found' for empty state, got: %s", output)
437 }
438}
439
440func TestPubSub_BasicFlow(t *testing.T) {
441 server := NewTestSSHServer(t)
442 defer server.Shutdown()
443
444 user := GenerateUser("alice")
445 RegisterUserWithServer(server, user)
446
447 subClient, err := user.NewClient()
448 if err != nil {
449 t.Fatalf("failed to connect subscriber: %v", err)
450 }
451 defer func() { _ = subClient.Close() }()
452
453 pubClient, err := user.NewClient()
454 if err != nil {
455 t.Fatalf("failed to connect publisher: %v", err)
456 }
457 defer func() { _ = pubClient.Close() }()
458
459 subSession, err := subClient.NewSession()
460 if err != nil {
461 t.Fatalf("failed to create sub session: %v", err)
462 }
463 defer func() { _ = subSession.Close() }()
464
465 subStdout, err := subSession.StdoutPipe()
466 if err != nil {
467 t.Fatalf("failed to get sub stdout: %v", err)
468 }
469
470 if err := subSession.Start("sub testtopic -c"); err != nil {
471 t.Fatalf("failed to start sub: %v", err)
472 }
473
474 time.Sleep(100 * time.Millisecond)
475
476 testMessage := "hello from pub"
477 _, err = user.RunCommandWithStdin(pubClient, "pub testtopic -c", testMessage)
478 if err != nil {
479 t.Logf("pub command completed: %v", err)
480 }
481
482 received := make([]byte, len(testMessage)+10)
483 n, err := subStdout.Read(received)
484 if err != nil && err != io.EOF {
485 t.Logf("read error: %v", err)
486 }
487
488 receivedStr := string(received[:n])
489 if !strings.Contains(receivedStr, testMessage) {
490 t.Errorf("subscriber did not receive message, got: %q, want: %q", receivedStr, testMessage)
491 }
492}
493
494func TestPubSub_PublicTopic(t *testing.T) {
495 server := NewTestSSHServer(t)
496 defer server.Shutdown()
497
498 alice := GenerateUser("alice")
499 bob := GenerateUser("bob")
500 RegisterUserWithServer(server, alice)
501 RegisterUserWithServer(server, bob)
502
503 subClient, err := bob.NewClient()
504 if err != nil {
505 t.Fatalf("failed to connect subscriber: %v", err)
506 }
507 defer func() { _ = subClient.Close() }()
508
509 pubClient, err := alice.NewClient()
510 if err != nil {
511 t.Fatalf("failed to connect publisher: %v", err)
512 }
513 defer func() { _ = pubClient.Close() }()
514
515 subSession, err := subClient.NewSession()
516 if err != nil {
517 t.Fatalf("failed to create sub session: %v", err)
518 }
519 defer func() { _ = subSession.Close() }()
520
521 subStdout, err := subSession.StdoutPipe()
522 if err != nil {
523 t.Fatalf("failed to get sub stdout: %v", err)
524 }
525
526 if err := subSession.Start("sub publictopic -p -c"); err != nil {
527 t.Fatalf("failed to start sub: %v", err)
528 }
529
530 time.Sleep(100 * time.Millisecond)
531
532 testMessage := "public message"
533 _, err = alice.RunCommandWithStdin(pubClient, "pub publictopic -p -c", testMessage)
534 if err != nil {
535 t.Logf("pub command completed: %v", err)
536 }
537
538 received := make([]byte, len(testMessage)+10)
539 n, err := subStdout.Read(received)
540 if err != nil && err != io.EOF {
541 t.Logf("read error: %v", err)
542 }
543
544 receivedStr := string(received[:n])
545 if !strings.Contains(receivedStr, testMessage) {
546 t.Errorf("subscriber did not receive public message, got: %q, want: %q", receivedStr, testMessage)
547 }
548}
549
550func TestPipe_Bidirectional(t *testing.T) {
551 server := NewTestSSHServer(t)
552 defer server.Shutdown()
553
554 alice := GenerateUser("alice")
555 bob := GenerateUser("bob")
556 RegisterUserWithServer(server, alice)
557 RegisterUserWithServer(server, bob)
558
559 aliceClient, err := alice.NewClient()
560 if err != nil {
561 t.Fatalf("failed to connect alice: %v", err)
562 }
563 defer func() { _ = aliceClient.Close() }()
564
565 bobClient, err := bob.NewClient()
566 if err != nil {
567 t.Fatalf("failed to connect bob: %v", err)
568 }
569 defer func() { _ = bobClient.Close() }()
570
571 aliceSession, err := aliceClient.NewSession()
572 if err != nil {
573 t.Fatalf("failed to create alice session: %v", err)
574 }
575 defer func() { _ = aliceSession.Close() }()
576
577 aliceStdin, err := aliceSession.StdinPipe()
578 if err != nil {
579 t.Fatalf("failed to get alice stdin: %v", err)
580 }
581
582 aliceStdout, err := aliceSession.StdoutPipe()
583 if err != nil {
584 t.Fatalf("failed to get alice stdout: %v", err)
585 }
586
587 if err := aliceSession.Start("pipe pipetopic -p -c"); err != nil {
588 t.Fatalf("failed to start alice pipe: %v", err)
589 }
590
591 time.Sleep(100 * time.Millisecond)
592
593 bobSession, err := bobClient.NewSession()
594 if err != nil {
595 t.Fatalf("failed to create bob session: %v", err)
596 }
597 defer func() { _ = bobSession.Close() }()
598
599 bobStdin, err := bobSession.StdinPipe()
600 if err != nil {
601 t.Fatalf("failed to get bob stdin: %v", err)
602 }
603
604 bobStdout, err := bobSession.StdoutPipe()
605 if err != nil {
606 t.Fatalf("failed to get bob stdout: %v", err)
607 }
608
609 if err := bobSession.Start("pipe pipetopic -p -c"); err != nil {
610 t.Fatalf("failed to start bob pipe: %v", err)
611 }
612
613 time.Sleep(100 * time.Millisecond)
614
615 aliceMsg := "hello from alice\n"
616 _, err = aliceStdin.Write([]byte(aliceMsg))
617 if err != nil {
618 t.Fatalf("alice failed to write: %v", err)
619 }
620
621 bobReceived := make([]byte, 100)
622 n, err := bobStdout.Read(bobReceived)
623 if err != nil && err != io.EOF {
624 t.Logf("bob read error: %v", err)
625 }
626 if !strings.Contains(string(bobReceived[:n]), "hello from alice") {
627 t.Errorf("bob did not receive alice's message, got: %q", string(bobReceived[:n]))
628 }
629
630 bobMsg := "hello from bob\n"
631 _, err = bobStdin.Write([]byte(bobMsg))
632 if err != nil {
633 t.Fatalf("bob failed to write: %v", err)
634 }
635
636 aliceReceived := make([]byte, 100)
637 n, err = aliceStdout.Read(aliceReceived)
638 if err != nil && err != io.EOF {
639 t.Logf("alice read error: %v", err)
640 }
641 if !strings.Contains(string(aliceReceived[:n]), "hello from bob") {
642 t.Errorf("alice did not receive bob's message, got: %q", string(aliceReceived[:n]))
643 }
644}
645
646func TestPipe_AutoGeneratedTopic(t *testing.T) {
647 server := NewTestSSHServer(t)
648 defer server.Shutdown()
649
650 user := GenerateUser("alice")
651 RegisterUserWithServer(server, user)
652
653 client, err := user.NewClient()
654 if err != nil {
655 t.Fatalf("failed to connect: %v", err)
656 }
657 defer func() { _ = client.Close() }()
658
659 session, err := client.NewSession()
660 if err != nil {
661 t.Fatalf("failed to create session: %v", err)
662 }
663 defer func() { _ = session.Close() }()
664
665 stdout, err := session.StdoutPipe()
666 if err != nil {
667 t.Fatalf("failed to get stdout: %v", err)
668 }
669
670 if err := session.Start("pipe"); err != nil {
671 t.Fatalf("failed to start pipe: %v", err)
672 }
673
674 received := make([]byte, 200)
675 n, err := stdout.Read(received)
676 if err != nil && err != io.EOF {
677 t.Logf("read error: %v", err)
678 }
679
680 output := string(received[:n])
681 if !strings.Contains(output, "subscribe to this topic") {
682 t.Errorf("expected topic subscription instructions, got: %q", output)
683 }
684}
685
686func TestAccessControl_AllowedUserViaFullPath(t *testing.T) {
687 server := NewTestSSHServer(t)
688 defer server.Shutdown()
689
690 alice := GenerateUser("alice")
691 bob := GenerateUser("bob")
692 RegisterUserWithServer(server, alice)
693 RegisterUserWithServer(server, bob)
694
695 aliceClient, err := alice.NewClient()
696 if err != nil {
697 t.Fatalf("failed to connect alice: %v", err)
698 }
699 defer func() { _ = aliceClient.Close() }()
700
701 aliceSession, err := aliceClient.NewSession()
702 if err != nil {
703 t.Fatalf("failed to create alice session: %v", err)
704 }
705 defer func() { _ = aliceSession.Close() }()
706
707 aliceStdout, err := aliceSession.StdoutPipe()
708 if err != nil {
709 t.Fatalf("failed to get alice stdout: %v", err)
710 }
711
712 if err := aliceSession.Start("sub sharedtopic -a alice,bob -c"); err != nil {
713 t.Fatalf("failed to start alice sub: %v", err)
714 }
715
716 time.Sleep(100 * time.Millisecond)
717
718 bobClient, err := bob.NewClient()
719 if err != nil {
720 t.Fatalf("failed to connect bob: %v", err)
721 }
722 defer func() { _ = bobClient.Close() }()
723
724 _, err = bob.RunCommandWithStdin(bobClient, "pub alice/sharedtopic -c", "bob allowed")
725 if err != nil {
726 t.Logf("bob pub completed: %v", err)
727 }
728
729 aliceReceived := make([]byte, 100)
730 n, _ := aliceStdout.Read(aliceReceived)
731
732 if !strings.Contains(string(aliceReceived[:n]), "bob allowed") {
733 t.Errorf("alice should receive bob's message on shared topic, got: %q", string(aliceReceived[:n]))
734 }
735}
736
737func TestPubSub_BlockingWaitsForSubscriber(t *testing.T) {
738 server := NewTestSSHServer(t)
739 defer server.Shutdown()
740
741 user := GenerateUser("alice")
742 RegisterUserWithServer(server, user)
743
744 pubClient, err := user.NewClient()
745 if err != nil {
746 t.Fatalf("failed to connect publisher: %v", err)
747 }
748 defer func() { _ = pubClient.Close() }()
749
750 subClient, err := user.NewClient()
751 if err != nil {
752 t.Fatalf("failed to connect subscriber: %v", err)
753 }
754 defer func() { _ = subClient.Close() }()
755
756 pubSession, err := pubClient.NewSession()
757 if err != nil {
758 t.Fatalf("failed to create pub session: %v", err)
759 }
760 defer func() { _ = pubSession.Close() }()
761
762 pubStdin, err := pubSession.StdinPipe()
763 if err != nil {
764 t.Fatalf("failed to get pub stdin: %v", err)
765 }
766
767 pubStdout, err := pubSession.StdoutPipe()
768 if err != nil {
769 t.Fatalf("failed to get pub stdout: %v", err)
770 }
771
772 // Start publisher with blocking enabled (default -b=true)
773 // Publisher should wait for subscriber
774 if err := pubSession.Start("pub blockingtopic"); err != nil {
775 t.Fatalf("failed to start pub: %v", err)
776 }
777
778 // Read output until we see "waiting" message or timeout
779 // Need to read in a loop because Read() may return partial data
780 var output string
781 readDone := make(chan struct{})
782 go func() {
783 buf := make([]byte, 1024)
784 for {
785 n, err := pubStdout.Read(buf)
786 if n > 0 {
787 output += string(buf[:n])
788 if strings.Contains(output, "waiting") {
789 close(readDone)
790 return
791 }
792 }
793 if err != nil {
794 close(readDone)
795 return
796 }
797 }
798 }()
799
800 select {
801 case <-readDone:
802 case <-time.After(2 * time.Second):
803 t.Fatalf("timeout waiting for 'waiting' message, got: %q", output)
804 }
805
806 if !strings.Contains(output, "waiting") {
807 t.Errorf("expected 'waiting' message for blocking pub, got: %q", output)
808 }
809
810 // Now start subscriber - this should unblock the publisher
811 subSession, err := subClient.NewSession()
812 if err != nil {
813 t.Fatalf("failed to create sub session: %v", err)
814 }
815 defer func() { _ = subSession.Close() }()
816
817 subStdout, err := subSession.StdoutPipe()
818 if err != nil {
819 t.Fatalf("failed to get sub stdout: %v", err)
820 }
821
822 if err := subSession.Start("sub blockingtopic -c"); err != nil {
823 t.Fatalf("failed to start sub: %v", err)
824 }
825
826 time.Sleep(100 * time.Millisecond)
827
828 // Now send the message
829 testMessage := "blocking message"
830 _, err = pubStdin.Write([]byte(testMessage))
831 if err != nil {
832 t.Fatalf("failed to write message: %v", err)
833 }
834 _ = pubStdin.Close()
835
836 // Subscriber should receive the message
837 received := make([]byte, 100)
838 nRead, err := subStdout.Read(received)
839 if err != nil && err != io.EOF {
840 t.Logf("read error: %v", err)
841 }
842
843 if !strings.Contains(string(received[:nRead]), testMessage) {
844 t.Errorf("subscriber did not receive blocking message, got: %q, want: %q", string(received[:nRead]), testMessage)
845 }
846}
847
848func TestPubSub_NonBlockingDoesNotWait(t *testing.T) {
849 server := NewTestSSHServer(t)
850 defer server.Shutdown()
851
852 user := GenerateUser("alice")
853 RegisterUserWithServer(server, user)
854
855 client, err := user.NewClient()
856 if err != nil {
857 t.Fatalf("failed to connect: %v", err)
858 }
859 defer func() { _ = client.Close() }()
860
861 // Publish with -b=false (non-blocking) and no subscriber
862 // Should complete immediately without waiting
863 done := make(chan struct{})
864 var output string
865 var cmdErr error
866
867 go func() {
868 output, cmdErr = user.RunCommandWithStdin(client, "pub nonblockingtopic -b=false -c", "non-blocking message")
869 close(done)
870 }()
871
872 select {
873 case <-done:
874 // Command completed - this is expected for non-blocking
875 if cmdErr != nil {
876 t.Logf("non-blocking pub completed with: %v", cmdErr)
877 }
878 t.Logf("non-blocking pub output: %q", output)
879 case <-time.After(2 * time.Second):
880 t.Errorf("non-blocking pub should complete immediately, but it blocked")
881 }
882}
883
884func TestPubSub_BlockingTimeout(t *testing.T) {
885 server := NewTestSSHServer(t)
886 defer server.Shutdown()
887
888 user := GenerateUser("alice")
889 RegisterUserWithServer(server, user)
890
891 client, err := user.NewClient()
892 if err != nil {
893 t.Fatalf("failed to connect: %v", err)
894 }
895 defer func() { _ = client.Close() }()
896
897 // Publish with blocking and short timeout, no subscriber
898 // Should timeout after the specified duration
899 done := make(chan struct{})
900 var output string
901
902 go func() {
903 output, _ = user.RunCommandWithStdin(client, "pub timeouttopic -b=true -t=500ms", "timeout message")
904 close(done)
905 }()
906
907 select {
908 case <-done:
909 // Command completed due to timeout
910 if !strings.Contains(output, "timeout") && !strings.Contains(output, "waiting") {
911 t.Logf("blocking pub with timeout output: %q", output)
912 }
913 case <-time.After(3 * time.Second):
914 t.Errorf("blocking pub with timeout should have timed out after 500ms")
915 }
916}
917
918func TestSub_WaitsForPublisher(t *testing.T) {
919 server := NewTestSSHServer(t)
920 defer server.Shutdown()
921
922 user := GenerateUser("alice")
923 RegisterUserWithServer(server, user)
924
925 subClient, err := user.NewClient()
926 if err != nil {
927 t.Fatalf("failed to connect subscriber: %v", err)
928 }
929 defer func() { _ = subClient.Close() }()
930
931 pubClient, err := user.NewClient()
932 if err != nil {
933 t.Fatalf("failed to connect publisher: %v", err)
934 }
935 defer func() { _ = pubClient.Close() }()
936
937 // Start subscriber first - it should wait for publisher
938 subSession, err := subClient.NewSession()
939 if err != nil {
940 t.Fatalf("failed to create sub session: %v", err)
941 }
942 defer func() { _ = subSession.Close() }()
943
944 subStdout, err := subSession.StdoutPipe()
945 if err != nil {
946 t.Fatalf("failed to get sub stdout: %v", err)
947 }
948
949 if err := subSession.Start("sub waitfortopic -c"); err != nil {
950 t.Fatalf("failed to start sub: %v", err)
951 }
952
953 // Subscriber is now waiting - give it a moment
954 time.Sleep(100 * time.Millisecond)
955
956 // Now publish - subscriber should receive it
957 testMessage := "delayed publish"
958 _, err = user.RunCommandWithStdin(pubClient, "pub waitfortopic -c", testMessage)
959 if err != nil {
960 t.Logf("pub completed: %v", err)
961 }
962
963 received := make([]byte, 100)
964 n, err := subStdout.Read(received)
965 if err != nil && err != io.EOF {
966 t.Logf("read error: %v", err)
967 }
968
969 if !strings.Contains(string(received[:n]), testMessage) {
970 t.Errorf("subscriber waiting for publisher did not receive message, got: %q, want: %q", string(received[:n]), testMessage)
971 }
972}
973
974func TestSub_KeepAliveReceivesMultipleMessages(t *testing.T) {
975 server := NewTestSSHServer(t)
976 defer server.Shutdown()
977
978 user := GenerateUser("alice")
979 RegisterUserWithServer(server, user)
980
981 subClient, err := user.NewClient()
982 if err != nil {
983 t.Fatalf("failed to connect subscriber: %v", err)
984 }
985 defer func() { _ = subClient.Close() }()
986
987 pubClient1, err := user.NewClient()
988 if err != nil {
989 t.Fatalf("failed to connect publisher 1: %v", err)
990 }
991 defer func() { _ = pubClient1.Close() }()
992
993 pubClient2, err := user.NewClient()
994 if err != nil {
995 t.Fatalf("failed to connect publisher 2: %v", err)
996 }
997 defer func() { _ = pubClient2.Close() }()
998
999 // Start subscriber with keepAlive (-k) flag
1000 subSession, err := subClient.NewSession()
1001 if err != nil {
1002 t.Fatalf("failed to create sub session: %v", err)
1003 }
1004 defer func() { _ = subSession.Close() }()
1005
1006 subStdout, err := subSession.StdoutPipe()
1007 if err != nil {
1008 t.Fatalf("failed to get sub stdout: %v", err)
1009 }
1010
1011 if err := subSession.Start("sub keepalivetopic -k -c"); err != nil {
1012 t.Fatalf("failed to start sub: %v", err)
1013 }
1014
1015 time.Sleep(100 * time.Millisecond)
1016
1017 // Send first message
1018 msg1 := "first message\n"
1019 _, err = user.RunCommandWithStdin(pubClient1, "pub keepalivetopic -c", msg1)
1020 if err != nil {
1021 t.Logf("pub 1 completed: %v", err)
1022 }
1023
1024 received1 := make([]byte, 100)
1025 n1, _ := subStdout.Read(received1)
1026 if !strings.Contains(string(received1[:n1]), "first message") {
1027 t.Errorf("subscriber did not receive first message, got: %q", string(received1[:n1]))
1028 }
1029
1030 // Send second message - subscriber with keepAlive should still receive it
1031 msg2 := "second message\n"
1032 _, err = user.RunCommandWithStdin(pubClient2, "pub keepalivetopic -c", msg2)
1033 if err != nil {
1034 t.Logf("pub 2 completed: %v", err)
1035 }
1036
1037 received2 := make([]byte, 100)
1038 n2, _ := subStdout.Read(received2)
1039 if !strings.Contains(string(received2[:n2]), "second message") {
1040 t.Errorf("subscriber with keepAlive did not receive second message, got: %q", string(received2[:n2]))
1041 }
1042}
1043
1044func TestSub_WithoutKeepAliveExitsAfterPublisher(t *testing.T) {
1045 server := NewTestSSHServer(t)
1046 defer server.Shutdown()
1047
1048 user := GenerateUser("alice")
1049 RegisterUserWithServer(server, user)
1050
1051 subClient, err := user.NewClient()
1052 if err != nil {
1053 t.Fatalf("failed to connect subscriber: %v", err)
1054 }
1055 defer func() { _ = subClient.Close() }()
1056
1057 pubClient, err := user.NewClient()
1058 if err != nil {
1059 t.Fatalf("failed to connect publisher: %v", err)
1060 }
1061 defer func() { _ = pubClient.Close() }()
1062
1063 // Start subscriber without keepAlive
1064 subSession, err := subClient.NewSession()
1065 if err != nil {
1066 t.Fatalf("failed to create sub session: %v", err)
1067 }
1068
1069 subStdout, err := subSession.StdoutPipe()
1070 if err != nil {
1071 t.Fatalf("failed to get sub stdout: %v", err)
1072 }
1073
1074 if err := subSession.Start("sub exitaftertopic -c"); err != nil {
1075 t.Fatalf("failed to start sub: %v", err)
1076 }
1077
1078 time.Sleep(100 * time.Millisecond)
1079
1080 // Publish a message
1081 testMessage := "single message"
1082 _, err = user.RunCommandWithStdin(pubClient, "pub exitaftertopic -c", testMessage)
1083 if err != nil {
1084 t.Logf("pub completed: %v", err)
1085 }
1086
1087 // Read the message
1088 received := make([]byte, 100)
1089 n, _ := subStdout.Read(received)
1090 if !strings.Contains(string(received[:n]), testMessage) {
1091 t.Errorf("subscriber did not receive message, got: %q", string(received[:n]))
1092 }
1093
1094 // Subscriber session should exit after publisher disconnects
1095 done := make(chan error)
1096 go func() {
1097 done <- subSession.Wait()
1098 }()
1099
1100 select {
1101 case err := <-done:
1102 // Session ended as expected
1103 t.Logf("subscriber session ended: %v", err)
1104 case <-time.After(2 * time.Second):
1105 t.Errorf("subscriber without keepAlive should have exited after publisher disconnected")
1106 _ = subSession.Close()
1107 }
1108}
1109
1110func TestPub_EmptyMessage(t *testing.T) {
1111 server := NewTestSSHServer(t)
1112 defer server.Shutdown()
1113
1114 user := GenerateUser("alice")
1115 RegisterUserWithServer(server, user)
1116
1117 subClient, err := user.NewClient()
1118 if err != nil {
1119 t.Fatalf("failed to connect subscriber: %v", err)
1120 }
1121 defer func() { _ = subClient.Close() }()
1122
1123 pubClient, err := user.NewClient()
1124 if err != nil {
1125 t.Fatalf("failed to connect publisher: %v", err)
1126 }
1127 defer func() { _ = pubClient.Close() }()
1128
1129 // Start subscriber
1130 subSession, err := subClient.NewSession()
1131 if err != nil {
1132 t.Fatalf("failed to create sub session: %v", err)
1133 }
1134 defer func() { _ = subSession.Close() }()
1135
1136 subStdout, err := subSession.StdoutPipe()
1137 if err != nil {
1138 t.Fatalf("failed to get sub stdout: %v", err)
1139 }
1140
1141 if err := subSession.Start("sub emptytopic -c"); err != nil {
1142 t.Fatalf("failed to start sub: %v", err)
1143 }
1144
1145 time.Sleep(100 * time.Millisecond)
1146
1147 // Publish with -e flag (empty message) - should not require stdin
1148 output, err := user.RunCommand(pubClient, "pub emptytopic -e -c")
1149 if err != nil {
1150 t.Logf("pub -e completed: %v, output: %s", err, output)
1151 }
1152
1153 // Subscriber should receive something (even if empty/minimal)
1154 // The -e flag sends a 1-byte buffer
1155 received := make([]byte, 10)
1156 n, err := subStdout.Read(received)
1157 if err != nil && err != io.EOF {
1158 t.Logf("read result: n=%d, err=%v", n, err)
1159 }
1160
1161 // With -e flag, we expect to receive at least 1 byte
1162 if n < 1 {
1163 t.Errorf("subscriber should receive empty message signal, got %d bytes", n)
1164 }
1165}
1166
1167func TestPipe_AccessControl(t *testing.T) {
1168 server := NewTestSSHServer(t)
1169 defer server.Shutdown()
1170
1171 alice := GenerateUser("alice")
1172 bob := GenerateUser("bob")
1173 RegisterUserWithServer(server, alice)
1174 RegisterUserWithServer(server, bob)
1175
1176 aliceClient, err := alice.NewClient()
1177 if err != nil {
1178 t.Fatalf("failed to connect alice: %v", err)
1179 }
1180 defer func() { _ = aliceClient.Close() }()
1181
1182 bobClient, err := bob.NewClient()
1183 if err != nil {
1184 t.Fatalf("failed to connect bob: %v", err)
1185 }
1186 defer func() { _ = bobClient.Close() }()
1187
1188 // Alice creates a pipe with access control allowing bob
1189 aliceSession, err := aliceClient.NewSession()
1190 if err != nil {
1191 t.Fatalf("failed to create alice session: %v", err)
1192 }
1193 defer func() { _ = aliceSession.Close() }()
1194
1195 aliceStdin, err := aliceSession.StdinPipe()
1196 if err != nil {
1197 t.Fatalf("failed to get alice stdin: %v", err)
1198 }
1199
1200 aliceStdout, err := aliceSession.StdoutPipe()
1201 if err != nil {
1202 t.Fatalf("failed to get alice stdout: %v", err)
1203 }
1204
1205 if err := aliceSession.Start("pipe accesspipe -a alice,bob -c"); err != nil {
1206 t.Fatalf("failed to start alice pipe: %v", err)
1207 }
1208
1209 time.Sleep(100 * time.Millisecond)
1210
1211 // Bob joins the pipe using alice's namespace
1212 bobSession, err := bobClient.NewSession()
1213 if err != nil {
1214 t.Fatalf("failed to create bob session: %v", err)
1215 }
1216 defer func() { _ = bobSession.Close() }()
1217
1218 bobStdin, err := bobSession.StdinPipe()
1219 if err != nil {
1220 t.Fatalf("failed to get bob stdin: %v", err)
1221 }
1222
1223 bobStdout, err := bobSession.StdoutPipe()
1224 if err != nil {
1225 t.Fatalf("failed to get bob stdout: %v", err)
1226 }
1227
1228 if err := bobSession.Start("pipe alice/accesspipe -c"); err != nil {
1229 t.Fatalf("failed to start bob pipe: %v", err)
1230 }
1231
1232 time.Sleep(100 * time.Millisecond)
1233
1234 // Alice sends message to bob
1235 aliceMsg := "hello bob\n"
1236 _, err = aliceStdin.Write([]byte(aliceMsg))
1237 if err != nil {
1238 t.Fatalf("alice failed to write: %v", err)
1239 }
1240
1241 bobReceived := make([]byte, 100)
1242 n, _ := bobStdout.Read(bobReceived)
1243 if !strings.Contains(string(bobReceived[:n]), "hello bob") {
1244 t.Errorf("bob did not receive alice's message, got: %q", string(bobReceived[:n]))
1245 }
1246
1247 // Bob sends message to alice
1248 bobMsg := "hello alice\n"
1249 _, err = bobStdin.Write([]byte(bobMsg))
1250 if err != nil {
1251 t.Fatalf("bob failed to write: %v", err)
1252 }
1253
1254 aliceReceived := make([]byte, 100)
1255 n, _ = aliceStdout.Read(aliceReceived)
1256 if !strings.Contains(string(aliceReceived[:n]), "hello alice") {
1257 t.Errorf("alice did not receive bob's message, got: %q", string(aliceReceived[:n]))
1258 }
1259}
1260
1261func TestPipe_Replay(t *testing.T) {
1262 server := NewTestSSHServer(t)
1263 defer server.Shutdown()
1264
1265 user := GenerateUser("alice")
1266 RegisterUserWithServer(server, user)
1267
1268 client, err := user.NewClient()
1269 if err != nil {
1270 t.Fatalf("failed to connect: %v", err)
1271 }
1272 defer func() { _ = client.Close() }()
1273
1274 // Start pipe with replay flag (-r)
1275 session, err := client.NewSession()
1276 if err != nil {
1277 t.Fatalf("failed to create session: %v", err)
1278 }
1279 defer func() { _ = session.Close() }()
1280
1281 stdin, err := session.StdinPipe()
1282 if err != nil {
1283 t.Fatalf("failed to get stdin: %v", err)
1284 }
1285
1286 stdout, err := session.StdoutPipe()
1287 if err != nil {
1288 t.Fatalf("failed to get stdout: %v", err)
1289 }
1290
1291 if err := session.Start("pipe replaytopic -r -c"); err != nil {
1292 t.Fatalf("failed to start pipe: %v", err)
1293 }
1294
1295 time.Sleep(100 * time.Millisecond)
1296
1297 // Send a message - with -r flag, should receive it back
1298 testMsg := "echo back\n"
1299 _, err = stdin.Write([]byte(testMsg))
1300 if err != nil {
1301 t.Fatalf("failed to write: %v", err)
1302 }
1303
1304 received := make([]byte, 100)
1305 n, err := stdout.Read(received)
1306 if err != nil && err != io.EOF {
1307 t.Logf("read error: %v", err)
1308 }
1309
1310 if !strings.Contains(string(received[:n]), "echo back") {
1311 t.Errorf("with -r flag, sender should receive own message back, got: %q", string(received[:n]))
1312 }
1313}
1314
1315func TestAccessControl_UnauthorizedUserDenied(t *testing.T) {
1316 server := NewTestSSHServer(t)
1317 defer server.Shutdown()
1318
1319 alice := GenerateUser("alice")
1320 bob := GenerateUser("bob")
1321 charlie := GenerateUser("charlie")
1322 RegisterUserWithServer(server, alice)
1323 RegisterUserWithServer(server, bob)
1324 RegisterUserWithServer(server, charlie)
1325
1326 aliceClient, err := alice.NewClient()
1327 if err != nil {
1328 t.Fatalf("failed to connect alice: %v", err)
1329 }
1330 defer func() { _ = aliceClient.Close() }()
1331
1332 charlieClient, err := charlie.NewClient()
1333 if err != nil {
1334 t.Fatalf("failed to connect charlie: %v", err)
1335 }
1336 defer func() { _ = charlieClient.Close() }()
1337
1338 // Alice creates a topic with access only for alice and bob (not charlie)
1339 aliceSession, err := aliceClient.NewSession()
1340 if err != nil {
1341 t.Fatalf("failed to create alice session: %v", err)
1342 }
1343 defer func() { _ = aliceSession.Close() }()
1344
1345 if err := aliceSession.Start("sub restrictedtopic -a alice,bob -c"); err != nil {
1346 t.Fatalf("failed to start alice sub: %v", err)
1347 }
1348
1349 time.Sleep(100 * time.Millisecond)
1350
1351 // Charlie tries to publish to alice's restricted topic - should be denied
1352 output, err := charlie.RunCommandWithStdin(charlieClient, "pub alice/restrictedtopic -c", "unauthorized message")
1353 if err != nil {
1354 t.Logf("charlie pub completed with error (expected): %v", err)
1355 }
1356
1357 // Charlie should get access denied or the message should not be delivered
1358 if strings.Contains(output, "access denied") {
1359 t.Logf("charlie correctly received access denied")
1360 } else {
1361 t.Logf("charlie output: %q (access control may work differently)", output)
1362 }
1363}
1364
1365func TestPubSub_MultipleSubscribers(t *testing.T) {
1366 server := NewTestSSHServer(t)
1367 defer server.Shutdown()
1368
1369 user := GenerateUser("alice")
1370 RegisterUserWithServer(server, user)
1371
1372 pubClient, err := user.NewClient()
1373 if err != nil {
1374 t.Fatalf("failed to connect publisher: %v", err)
1375 }
1376 defer func() { _ = pubClient.Close() }()
1377
1378 sub1Client, err := user.NewClient()
1379 if err != nil {
1380 t.Fatalf("failed to connect subscriber 1: %v", err)
1381 }
1382 defer func() { _ = sub1Client.Close() }()
1383
1384 sub2Client, err := user.NewClient()
1385 if err != nil {
1386 t.Fatalf("failed to connect subscriber 2: %v", err)
1387 }
1388 defer func() { _ = sub2Client.Close() }()
1389
1390 sub3Client, err := user.NewClient()
1391 if err != nil {
1392 t.Fatalf("failed to connect subscriber 3: %v", err)
1393 }
1394 defer func() { _ = sub3Client.Close() }()
1395
1396 // Start three subscribers
1397 sub1Session, err := sub1Client.NewSession()
1398 if err != nil {
1399 t.Fatalf("failed to create sub1 session: %v", err)
1400 }
1401 defer func() { _ = sub1Session.Close() }()
1402
1403 sub1Stdout, err := sub1Session.StdoutPipe()
1404 if err != nil {
1405 t.Fatalf("failed to get sub1 stdout: %v", err)
1406 }
1407
1408 if err := sub1Session.Start("sub fanout -c"); err != nil {
1409 t.Fatalf("failed to start sub1: %v", err)
1410 }
1411
1412 sub2Session, err := sub2Client.NewSession()
1413 if err != nil {
1414 t.Fatalf("failed to create sub2 session: %v", err)
1415 }
1416 defer func() { _ = sub2Session.Close() }()
1417
1418 sub2Stdout, err := sub2Session.StdoutPipe()
1419 if err != nil {
1420 t.Fatalf("failed to get sub2 stdout: %v", err)
1421 }
1422
1423 if err := sub2Session.Start("sub fanout -c"); err != nil {
1424 t.Fatalf("failed to start sub2: %v", err)
1425 }
1426
1427 sub3Session, err := sub3Client.NewSession()
1428 if err != nil {
1429 t.Fatalf("failed to create sub3 session: %v", err)
1430 }
1431 defer func() { _ = sub3Session.Close() }()
1432
1433 sub3Stdout, err := sub3Session.StdoutPipe()
1434 if err != nil {
1435 t.Fatalf("failed to get sub3 stdout: %v", err)
1436 }
1437
1438 if err := sub3Session.Start("sub fanout -c"); err != nil {
1439 t.Fatalf("failed to start sub3: %v", err)
1440 }
1441
1442 time.Sleep(100 * time.Millisecond)
1443
1444 // Publish a single message
1445 testMessage := "broadcast message"
1446 _, err = user.RunCommandWithStdin(pubClient, "pub fanout -c", testMessage)
1447 if err != nil {
1448 t.Logf("pub completed: %v", err)
1449 }
1450
1451 // All three subscribers should receive the message
1452 received1 := make([]byte, 100)
1453 n1, _ := sub1Stdout.Read(received1)
1454 if !strings.Contains(string(received1[:n1]), testMessage) {
1455 t.Errorf("subscriber 1 did not receive message, got: %q", string(received1[:n1]))
1456 }
1457
1458 received2 := make([]byte, 100)
1459 n2, _ := sub2Stdout.Read(received2)
1460 if !strings.Contains(string(received2[:n2]), testMessage) {
1461 t.Errorf("subscriber 2 did not receive message, got: %q", string(received2[:n2]))
1462 }
1463
1464 received3 := make([]byte, 100)
1465 n3, _ := sub3Stdout.Read(received3)
1466 if !strings.Contains(string(received3[:n3]), testMessage) {
1467 t.Errorf("subscriber 3 did not receive message, got: %q", string(received3[:n3]))
1468 }
1469}
1470
1471// Monitor CLI Tests
1472
1473func TestMonitor_UnauthenticatedUserDenied(t *testing.T) {
1474 server := NewTestSSHServer(t)
1475 defer server.Shutdown()
1476
1477 user := GenerateUser("anonymous")
1478
1479 client, err := user.NewClient()
1480 if err != nil {
1481 t.Fatalf("failed to connect: %v", err)
1482 }
1483 defer func() { _ = client.Close() }()
1484
1485 output, err := user.RunCommand(client, "monitor my-service 1h")
1486 if err != nil {
1487 t.Logf("command error (expected): %v", err)
1488 }
1489
1490 if !strings.Contains(output, "access denied") {
1491 t.Errorf("expected 'access denied', got: %s", output)
1492 }
1493}
1494
1495func TestMonitor_CreateMonitor(t *testing.T) {
1496 server := NewTestSSHServer(t)
1497 defer server.Shutdown()
1498
1499 user := GenerateUser("alice")
1500 RegisterUserWithServer(server, user)
1501
1502 client, err := user.NewClient()
1503 if err != nil {
1504 t.Fatalf("failed to connect: %v", err)
1505 }
1506 defer func() { _ = client.Close() }()
1507
1508 output, err := user.RunCommand(client, "monitor pico-uptime 24h")
1509 if err != nil {
1510 t.Logf("command completed: %v", err)
1511 }
1512
1513 if strings.Contains(output, "access denied") {
1514 t.Errorf("authenticated user should not get access denied, got: %s", output)
1515 }
1516
1517 // Verify monitor was created in DB (topic is stored with user prefix)
1518 monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/pico-uptime")
1519 if err != nil {
1520 t.Fatalf("monitor should exist in DB: %v", err)
1521 }
1522
1523 if monitor.WindowDur != 24*time.Hour {
1524 t.Errorf("expected window duration 24h, got: %v", monitor.WindowDur)
1525 }
1526
1527 if !strings.Contains(output, "alice/pico-uptime") || !strings.Contains(output, "24h") {
1528 t.Errorf("output should confirm monitor creation, got: %s", output)
1529 }
1530}
1531
1532func TestMonitor_UpdateMonitor(t *testing.T) {
1533 server := NewTestSSHServer(t)
1534 defer server.Shutdown()
1535
1536 user := GenerateUser("alice")
1537 RegisterUserWithServer(server, user)
1538
1539 client, err := user.NewClient()
1540 if err != nil {
1541 t.Fatalf("failed to connect: %v", err)
1542 }
1543 defer func() { _ = client.Close() }()
1544
1545 // Create initial monitor
1546 _, err = user.RunCommand(client, "monitor my-cron 1h")
1547 if err != nil {
1548 t.Logf("create command completed: %v", err)
1549 }
1550
1551 // Upsert with new duration
1552 output, err := user.RunCommand(client, "monitor my-cron 6h")
1553 if err != nil {
1554 t.Logf("update command completed: %v", err)
1555 }
1556
1557 // Verify monitor was updated (topic is stored with user prefix)
1558 monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/my-cron")
1559 if err != nil {
1560 t.Fatalf("monitor should exist in DB: %v", err)
1561 }
1562
1563 if monitor.WindowDur != 6*time.Hour {
1564 t.Errorf("expected window duration 6h after update, got: %v", monitor.WindowDur)
1565 }
1566
1567 if !strings.Contains(output, "6h") {
1568 t.Errorf("output should confirm updated duration, got: %s", output)
1569 }
1570}
1571
1572func TestMonitor_DeleteMonitor(t *testing.T) {
1573 server := NewTestSSHServer(t)
1574 defer server.Shutdown()
1575
1576 user := GenerateUser("alice")
1577 RegisterUserWithServer(server, user)
1578
1579 client, err := user.NewClient()
1580 if err != nil {
1581 t.Fatalf("failed to connect: %v", err)
1582 }
1583 defer func() { _ = client.Close() }()
1584
1585 // Create monitor first
1586 _, err = user.RunCommand(client, "monitor to-delete 1h")
1587 if err != nil {
1588 t.Logf("create command completed: %v", err)
1589 }
1590
1591 // Verify it exists (topic is stored with user prefix)
1592 _, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/to-delete")
1593 if err != nil {
1594 t.Fatalf("monitor should exist before deletion: %v", err)
1595 }
1596
1597 // Delete it
1598 output, err := user.RunCommand(client, "monitor to-delete -d")
1599 if err != nil {
1600 t.Logf("delete command completed: %v", err)
1601 }
1602
1603 // Verify it's gone (topic is stored with user prefix)
1604 _, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/to-delete")
1605 if err == nil {
1606 t.Errorf("monitor should be deleted from DB")
1607 }
1608
1609 if !strings.Contains(output, "deleted") && !strings.Contains(output, "removed") {
1610 t.Logf("output should confirm deletion, got: %s", output)
1611 }
1612}
1613
1614func TestMonitor_InvalidDuration(t *testing.T) {
1615 server := NewTestSSHServer(t)
1616 defer server.Shutdown()
1617
1618 user := GenerateUser("alice")
1619 RegisterUserWithServer(server, user)
1620
1621 client, err := user.NewClient()
1622 if err != nil {
1623 t.Fatalf("failed to connect: %v", err)
1624 }
1625 defer func() { _ = client.Close() }()
1626
1627 output, err := user.RunCommand(client, "monitor my-service invaliduration")
1628 if err != nil {
1629 t.Logf("command error (expected): %v", err)
1630 }
1631
1632 if !strings.Contains(output, "invalid") && !strings.Contains(output, "duration") && !strings.Contains(output, "error") {
1633 t.Errorf("expected error about invalid duration, got: %s", output)
1634 }
1635}
1636
1637func TestMonitor_MissingTopic(t *testing.T) {
1638 server := NewTestSSHServer(t)
1639 defer server.Shutdown()
1640
1641 user := GenerateUser("alice")
1642 RegisterUserWithServer(server, user)
1643
1644 client, err := user.NewClient()
1645 if err != nil {
1646 t.Fatalf("failed to connect: %v", err)
1647 }
1648 defer func() { _ = client.Close() }()
1649
1650 output, err := user.RunCommand(client, "monitor")
1651 if err != nil {
1652 t.Logf("command error (expected): %v", err)
1653 }
1654
1655 // Should show usage or error about missing topic
1656 if !strings.Contains(output, "Usage") && !strings.Contains(output, "topic") && !strings.Contains(output, "error") {
1657 t.Errorf("expected usage info or error about missing topic, got: %s", output)
1658 }
1659}
1660
1661// Status CLI Tests
1662
1663func TestStatus_UnauthenticatedUserDenied(t *testing.T) {
1664 server := NewTestSSHServer(t)
1665 defer server.Shutdown()
1666
1667 user := GenerateUser("anonymous")
1668
1669 client, err := user.NewClient()
1670 if err != nil {
1671 t.Fatalf("failed to connect: %v", err)
1672 }
1673 defer func() { _ = client.Close() }()
1674
1675 output, err := user.RunCommand(client, "status")
1676 if err != nil {
1677 t.Logf("command error (expected): %v", err)
1678 }
1679
1680 if !strings.Contains(output, "access denied") {
1681 t.Errorf("expected 'access denied', got: %s", output)
1682 }
1683}
1684
1685func TestStatus_NoMonitors(t *testing.T) {
1686 server := NewTestSSHServer(t)
1687 defer server.Shutdown()
1688
1689 user := GenerateUser("alice")
1690 RegisterUserWithServer(server, user)
1691
1692 client, err := user.NewClient()
1693 if err != nil {
1694 t.Fatalf("failed to connect: %v", err)
1695 }
1696 defer func() { _ = client.Close() }()
1697
1698 output, err := user.RunCommand(client, "status")
1699 if err != nil {
1700 t.Logf("command completed: %v", err)
1701 }
1702
1703 if !strings.Contains(output, "no monitors") && !strings.Contains(output, "empty") {
1704 t.Errorf("expected message about no monitors, got: %s", output)
1705 }
1706}
1707
1708func TestStatus_ShowsMonitorStatus(t *testing.T) {
1709 server := NewTestSSHServer(t)
1710 defer server.Shutdown()
1711
1712 user := GenerateUser("alice")
1713 RegisterUserWithServer(server, user)
1714
1715 client, err := user.NewClient()
1716 if err != nil {
1717 t.Fatalf("failed to connect: %v", err)
1718 }
1719 defer func() { _ = client.Close() }()
1720
1721 // Create a monitor
1722 _, err = user.RunCommand(client, "monitor web-check 1h")
1723 if err != nil {
1724 t.Logf("create monitor completed: %v", err)
1725 }
1726
1727 // Check status
1728 output, err := user.RunCommand(client, "status")
1729 if err != nil {
1730 t.Logf("status command completed: %v", err)
1731 }
1732
1733 if !strings.Contains(output, "web-check") {
1734 t.Errorf("status should list the monitor topic, got: %s", output)
1735 }
1736}
1737
1738func TestStatus_ShowsHealthyUnhealthy(t *testing.T) {
1739 server := NewTestSSHServer(t)
1740 defer server.Shutdown()
1741
1742 user := GenerateUser("alice")
1743 RegisterUserWithServer(server, user)
1744
1745 // Create monitors directly in DB with different states
1746 now := time.Now()
1747 windowEnd := now.Add(1 * time.Hour)
1748 recentPing := now.Add(-30 * time.Minute) // within window - healthy
1749 oldPing := now.Add(-2 * time.Hour) // outside window - unhealthy
1750
1751 _ = server.DBPool.UpsertPipeMonitor("alice-id", "healthy-service", 1*time.Hour, &windowEnd)
1752 _ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "healthy-service", &recentPing)
1753
1754 _ = server.DBPool.UpsertPipeMonitor("alice-id", "unhealthy-service", 1*time.Hour, &windowEnd)
1755 _ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "unhealthy-service", &oldPing)
1756
1757 client, err := user.NewClient()
1758 if err != nil {
1759 t.Fatalf("failed to connect: %v", err)
1760 }
1761 defer func() { _ = client.Close() }()
1762
1763 output, err := user.RunCommand(client, "status")
1764 if err != nil {
1765 t.Logf("status command completed: %v", err)
1766 }
1767
1768 if !strings.Contains(output, "healthy-service") {
1769 t.Errorf("status should list healthy-service, got: %s", output)
1770 }
1771
1772 if !strings.Contains(output, "unhealthy-service") {
1773 t.Errorf("status should list unhealthy-service, got: %s", output)
1774 }
1775
1776 // Should indicate different health states
1777 if !strings.Contains(strings.ToLower(output), "healthy") && !strings.Contains(strings.ToLower(output), "ok") && !strings.Contains(output, "✓") {
1778 t.Logf("status output should indicate health state: %s", output)
1779 }
1780}
1781
1782// RSS CLI Tests
1783
1784func TestRss_UnauthenticatedUserDenied(t *testing.T) {
1785 server := NewTestSSHServer(t)
1786 defer server.Shutdown()
1787
1788 user := GenerateUser("anonymous")
1789
1790 client, err := user.NewClient()
1791 if err != nil {
1792 t.Fatalf("failed to connect: %v", err)
1793 }
1794 defer func() { _ = client.Close() }()
1795
1796 output, err := user.RunCommand(client, "rss")
1797 if err != nil {
1798 t.Logf("command error (expected): %v", err)
1799 }
1800
1801 if !strings.Contains(output, "access denied") {
1802 t.Errorf("expected 'access denied', got: %s", output)
1803 }
1804}
1805
1806func TestRss_GeneratesValidRSS(t *testing.T) {
1807 server := NewTestSSHServer(t)
1808 defer server.Shutdown()
1809
1810 user := GenerateUser("alice")
1811 RegisterUserWithServer(server, user)
1812
1813 // Create a monitor
1814 now := time.Now()
1815 windowEnd := now.Add(1 * time.Hour)
1816 _ = server.DBPool.UpsertPipeMonitor("alice-id", "rss-test-service", 1*time.Hour, &windowEnd)
1817
1818 client, err := user.NewClient()
1819 if err != nil {
1820 t.Fatalf("failed to connect: %v", err)
1821 }
1822 defer func() { _ = client.Close() }()
1823
1824 output, err := user.RunCommand(client, "rss")
1825 if err != nil {
1826 t.Logf("rss command completed: %v", err)
1827 }
1828
1829 // Should output valid RSS XML
1830 if !strings.Contains(output, "<?xml") || !strings.Contains(output, "<rss") {
1831 t.Errorf("expected RSS XML output, got: %s", output)
1832 }
1833
1834 if !strings.Contains(output, "rss-test-service") {
1835 t.Errorf("RSS should contain monitor topic, got: %s", output)
1836 }
1837}
1838
1839func TestRss_AlertsOnStaleMonitor(t *testing.T) {
1840 server := NewTestSSHServer(t)
1841 defer server.Shutdown()
1842
1843 user := GenerateUser("alice")
1844 RegisterUserWithServer(server, user)
1845
1846 // Create a stale monitor (last ping outside window)
1847 now := time.Now()
1848 windowEnd := now.Add(-30 * time.Minute) // window already ended
1849 oldPing := now.Add(-2 * time.Hour)
1850
1851 _ = server.DBPool.UpsertPipeMonitor("alice-id", "stale-service", 1*time.Hour, &windowEnd)
1852 _ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "stale-service", &oldPing)
1853
1854 client, err := user.NewClient()
1855 if err != nil {
1856 t.Fatalf("failed to connect: %v", err)
1857 }
1858 defer func() { _ = client.Close() }()
1859
1860 output, err := user.RunCommand(client, "rss")
1861 if err != nil {
1862 t.Logf("rss command completed: %v", err)
1863 }
1864
1865 // Should contain alert item for stale service
1866 if !strings.Contains(output, "stale-service") {
1867 t.Errorf("RSS should contain stale-service alert, got: %s", output)
1868 }
1869
1870 // Should have item element for the alert
1871 if !strings.Contains(output, "<item>") {
1872 t.Errorf("RSS should contain item element for alert, got: %s", output)
1873 }
1874}
1875
1876// Pub integration with Monitor
1877
1878func TestPub_UpdatesMonitorLastPing(t *testing.T) {
1879 server := NewTestSSHServer(t)
1880 defer server.Shutdown()
1881
1882 user := GenerateUser("alice")
1883 RegisterUserWithServer(server, user)
1884
1885 // Create a monitor first (topic is stored with user prefix)
1886 now := time.Now()
1887 windowEnd := now.Add(1 * time.Hour)
1888 _ = server.DBPool.UpsertPipeMonitor("alice-id", "alice/ping-test", 1*time.Hour, &windowEnd)
1889
1890 subClient, err := user.NewClient()
1891 if err != nil {
1892 t.Fatalf("failed to connect subscriber: %v", err)
1893 }
1894 defer func() { _ = subClient.Close() }()
1895
1896 pubClient, err := user.NewClient()
1897 if err != nil {
1898 t.Fatalf("failed to connect publisher: %v", err)
1899 }
1900 defer func() { _ = pubClient.Close() }()
1901
1902 // Start subscriber
1903 subSession, err := subClient.NewSession()
1904 if err != nil {
1905 t.Fatalf("failed to create sub session: %v", err)
1906 }
1907 defer func() { _ = subSession.Close() }()
1908
1909 if err := subSession.Start("sub ping-test -c"); err != nil {
1910 t.Fatalf("failed to start sub: %v", err)
1911 }
1912
1913 time.Sleep(100 * time.Millisecond)
1914
1915 // Publish to the monitored topic
1916 _, err = user.RunCommandWithStdin(pubClient, "pub ping-test -c", "health check")
1917 if err != nil {
1918 t.Logf("pub command completed: %v", err)
1919 }
1920
1921 // Verify last_ping was updated (topic is stored with user prefix)
1922 monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/ping-test")
1923 if err != nil {
1924 t.Fatalf("monitor should exist: %v", err)
1925 }
1926
1927 if monitor.LastPing == nil {
1928 t.Errorf("last_ping should be set after pub")
1929 } else if time.Since(*monitor.LastPing) > 5*time.Second {
1930 t.Errorf("last_ping should be recent, got: %v", monitor.LastPing)
1931 }
1932}
1933
1934// Tests for monitor status edge cases
1935
1936func TestStatus_PingAtExactWindowStart(t *testing.T) {
1937 // Bug fix: Status() should use >= for windowStart comparison
1938 // A ping exactly at windowStart should be healthy
1939 now := time.Now().UTC()
1940 windowEnd := now.Add(1 * time.Hour)
1941 windowStart := windowEnd.Add(-1 * time.Hour) // equals now
1942
1943 monitor := &db.PipeMonitor{
1944 LastPing: &windowStart, // ping exactly at window start
1945 WindowEnd: &windowEnd,
1946 WindowDur: 1 * time.Hour,
1947 }
1948
1949 err := monitor.Status()
1950 if err != nil {
1951 t.Errorf("ping at exact window start should be healthy, got: %v", err)
1952 }
1953}
1954
1955func TestStatus_WindowExpired(t *testing.T) {
1956 // Bug fix: Status() should check if current time is past windowEnd
1957 now := time.Now().UTC()
1958 windowEnd := now.Add(-1 * time.Minute) // window ended 1 minute ago
1959 lastPing := now.Add(-30 * time.Second) // ping was 30 seconds ago
1960
1961 monitor := &db.PipeMonitor{
1962 LastPing: &lastPing,
1963 WindowEnd: &windowEnd,
1964 WindowDur: 1 * time.Hour,
1965 }
1966
1967 err := monitor.Status()
1968 if err == nil {
1969 t.Error("expired window should be unhealthy")
1970 }
1971 if !strings.Contains(err.Error(), "window expired") {
1972 t.Errorf("error should mention window expired, got: %v", err)
1973 }
1974}
1975
1976func TestStatus_PingResetsWindow(t *testing.T) {
1977 // Bug fix: Every ping should reset window to now + duration
1978 server := NewTestSSHServer(t)
1979 defer server.Shutdown()
1980
1981 user := GenerateUser("alice")
1982 RegisterUserWithServer(server, user)
1983
1984 // Create a monitor with an expired window
1985 expiredWindowEnd := time.Now().UTC().Add(-10 * time.Minute)
1986 _ = server.DBPool.UpsertPipeMonitor("alice-id", "alice/reset-test", 5*time.Minute, &expiredWindowEnd)
1987
1988 client, err := user.NewClient()
1989 if err != nil {
1990 t.Fatalf("failed to connect: %v", err)
1991 }
1992 defer func() { _ = client.Close() }()
1993
1994 // Start a subscriber first so pub doesn't block
1995 subClient, err := user.NewClient()
1996 if err != nil {
1997 t.Fatalf("failed to connect subscriber: %v", err)
1998 }
1999 defer func() { _ = subClient.Close() }()
2000
2001 subSession, err := subClient.NewSession()
2002 if err != nil {
2003 t.Fatalf("failed to create sub session: %v", err)
2004 }
2005 defer func() { _ = subSession.Close() }()
2006
2007 if err := subSession.Start("sub reset-test -c"); err != nil {
2008 t.Fatalf("failed to start sub: %v", err)
2009 }
2010
2011 time.Sleep(100 * time.Millisecond)
2012
2013 // Pub to trigger monitor update
2014 _, err = user.RunCommandWithStdin(client, "pub reset-test -c", "ping")
2015 if err != nil {
2016 t.Logf("pub command completed: %v", err)
2017 }
2018
2019 // Check that window was reset
2020 monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/reset-test")
2021 if err != nil {
2022 t.Fatalf("monitor should exist: %v", err)
2023 }
2024
2025 if monitor.WindowEnd == nil {
2026 t.Fatal("window_end should be set")
2027 }
2028
2029 // Window end should now be in the future
2030 if !monitor.WindowEnd.After(time.Now().UTC()) {
2031 t.Errorf("window_end should be in the future after ping, got: %v", monitor.WindowEnd)
2032 }
2033}
2034
2035func TestStatus_HealthyImmediatelyAfterPing(t *testing.T) {
2036 // Bug fix: After a ping, status should immediately show healthy
2037 server := NewTestSSHServer(t)
2038 defer server.Shutdown()
2039
2040 user := GenerateUser("alice")
2041 RegisterUserWithServer(server, user)
2042
2043 client, err := user.NewClient()
2044 if err != nil {
2045 t.Fatalf("failed to connect: %v", err)
2046 }
2047 defer func() { _ = client.Close() }()
2048
2049 // Create monitor
2050 _, err = user.RunCommand(client, "monitor health-test 5m")
2051 if err != nil {
2052 t.Fatalf("failed to create monitor: %v", err)
2053 }
2054
2055 // Start subscriber
2056 subClient, err := user.NewClient()
2057 if err != nil {
2058 t.Fatalf("failed to connect subscriber: %v", err)
2059 }
2060 defer func() { _ = subClient.Close() }()
2061
2062 subSession, err := subClient.NewSession()
2063 if err != nil {
2064 t.Fatalf("failed to create sub session: %v", err)
2065 }
2066 defer func() { _ = subSession.Close() }()
2067
2068 if err := subSession.Start("sub health-test -c"); err != nil {
2069 t.Fatalf("failed to start sub: %v", err)
2070 }
2071
2072 time.Sleep(100 * time.Millisecond)
2073
2074 // Pub to trigger ping
2075 pubClient, err := user.NewClient()
2076 if err != nil {
2077 t.Fatalf("failed to connect publisher: %v", err)
2078 }
2079 defer func() { _ = pubClient.Close() }()
2080
2081 _, err = user.RunCommandWithStdin(pubClient, "pub health-test -c", "ping")
2082 if err != nil {
2083 t.Logf("pub completed: %v", err)
2084 }
2085
2086 // Immediately check status
2087 statusClient, err := user.NewClient()
2088 if err != nil {
2089 t.Fatalf("failed to connect for status: %v", err)
2090 }
2091 defer func() { _ = statusClient.Close() }()
2092
2093 output, err := user.RunCommand(statusClient, "status")
2094 if err != nil {
2095 t.Logf("status completed: %v", err)
2096 }
2097
2098 if strings.Contains(output, "unhealthy") {
2099 t.Errorf("status should be healthy immediately after ping, got: %s", output)
2100 }
2101 if !strings.Contains(output, "healthy") {
2102 t.Errorf("status should show healthy, got: %s", output)
2103 }
2104}
2105
2106// TestMonitor_FixedWindowNonSliding verifies that pings within the same window
2107// do not slide the window forward. This is a regression test for a bug where
2108// each ping reset window_end to now+dur, creating a sliding window that never fails.
2109//
2110// Expected behavior:
2111// - last_ping: always updated to show most recent activity (user visibility).
2112// - window_end: only advances when current time exceeds it (health scheduling).
2113func TestMonitor_FixedWindowNonSliding(t *testing.T) {
2114 server := NewTestSSHServer(t)
2115 defer server.Shutdown()
2116
2117 user := GenerateUser("alice")
2118 RegisterUserWithServer(server, user)
2119
2120 client, err := user.NewClient()
2121 if err != nil {
2122 t.Fatalf("failed to connect: %v", err)
2123 }
2124 defer func() { _ = client.Close() }()
2125
2126 // Create a monitor with 1 hour window
2127 _, err = user.RunCommand(client, "monitor fixed-window-test 1h")
2128 if err != nil {
2129 t.Logf("create command completed: %v", err)
2130 }
2131
2132 // Get the initial window_end
2133 monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
2134 if err != nil {
2135 t.Fatalf("monitor should exist: %v", err)
2136 }
2137 initialWindowEnd := *monitor.WindowEnd
2138
2139 // Simulate a ping by calling updateMonitor directly
2140 handler := server.PipeHandler
2141
2142 // Create a mock CliCmd
2143 mockUser := &db.User{ID: "alice-id", Name: "alice"}
2144 cmd := &CliCmd{
2145 userName: "alice",
2146 user: mockUser,
2147 }
2148
2149 // First ping - should record last_ping but NOT change window_end
2150 handler.updateMonitor(cmd, "alice/fixed-window-test")
2151
2152 monitor, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
2153 if err != nil {
2154 t.Fatalf("monitor should exist after first ping: %v", err)
2155 }
2156
2157 if monitor.LastPing == nil {
2158 t.Fatalf("last_ping should be set after first ping")
2159 }
2160 firstPingTime := *monitor.LastPing
2161 windowEndAfterFirstPing := *monitor.WindowEnd
2162
2163 // BUG CHECK: With the bug, window_end would have slid forward to now+1h
2164 // With the fix, window_end should remain at the original scheduled time
2165 if !windowEndAfterFirstPing.Equal(initialWindowEnd) {
2166 t.Errorf("BUG DETECTED: window_end should NOT change after first ping within window\n"+
2167 "initial window_end: %v\n"+
2168 "window_end after ping: %v\n"+
2169 "Window slid forward by: %v",
2170 initialWindowEnd.Format(time.RFC3339),
2171 windowEndAfterFirstPing.Format(time.RFC3339),
2172 windowEndAfterFirstPing.Sub(initialWindowEnd))
2173 }
2174
2175 // Second ping - last_ping SHOULD be updated (for user visibility)
2176 // but window_end should NOT change
2177 time.Sleep(10 * time.Millisecond) // Small delay to get different timestamp
2178 handler.updateMonitor(cmd, "alice/fixed-window-test")
2179
2180 monitor, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
2181 if err != nil {
2182 t.Fatalf("monitor should exist after second ping: %v", err)
2183 }
2184
2185 // last_ping SHOULD be updated to show most recent activity
2186 if monitor.LastPing.Equal(firstPingTime) {
2187 t.Errorf("last_ping SHOULD be updated for user visibility\n"+
2188 "first ping time: %v\n"+
2189 "last_ping after second call: %v",
2190 firstPingTime.Format(time.RFC3339Nano),
2191 monitor.LastPing.Format(time.RFC3339Nano))
2192 }
2193
2194 // But window_end should still be the original value (not sliding)
2195 if !monitor.WindowEnd.Equal(initialWindowEnd) {
2196 t.Errorf("BUG DETECTED: window_end should remain at original value\n"+
2197 "initial: %v\n"+
2198 "current: %v",
2199 initialWindowEnd.Format(time.RFC3339),
2200 monitor.WindowEnd.Format(time.RFC3339))
2201 }
2202}