repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / pipe
Eric Bower  ·  2026-01-25

ssh_test.go

   1package pipe
   2
   3import (
   4	"context"
   5	"crypto/ed25519"
   6	"crypto/rand"
   7	"fmt"
   8	"io"
   9	"log/slog"
  10	"os"
  11	"strings"
  12	"testing"
  13	"time"
  14
  15	"github.com/antoniomika/syncmap"
  16	"github.com/picosh/pico/pkg/db"
  17	"github.com/picosh/pico/pkg/db/stub"
  18	"github.com/picosh/pico/pkg/pssh"
  19	psub "github.com/picosh/pico/pkg/pubsub"
  20	"github.com/picosh/pico/pkg/shared"
  21	"github.com/prometheus/client_golang/prometheus"
  22	"golang.org/x/crypto/ssh"
  23)
  24
  25type TestDB struct {
  26	*stub.StubDB
  27	Users        []*db.User
  28	Pubkeys      []*db.PublicKey
  29	Features     []*db.FeatureFlag
  30	PipeMonitors []*db.PipeMonitor
  31}
  32
  33func NewTestDB(logger *slog.Logger) *TestDB {
  34	return &TestDB{
  35		StubDB: stub.NewStubDB(logger),
  36	}
  37}
  38
  39func (t *TestDB) FindUserByPubkey(key string) (*db.User, error) {
  40	for _, pk := range t.Pubkeys {
  41		if pk.Key == key {
  42			return t.FindUser(pk.UserID)
  43		}
  44	}
  45	return nil, fmt.Errorf("user not found for pubkey")
  46}
  47
  48func (t *TestDB) FindUser(userID string) (*db.User, error) {
  49	for _, user := range t.Users {
  50		if user.ID == userID {
  51			return user, nil
  52		}
  53	}
  54	return nil, fmt.Errorf("user not found")
  55}
  56
  57func (t *TestDB) FindUserByName(name string) (*db.User, error) {
  58	for _, user := range t.Users {
  59		if user.Name == name {
  60			return user, nil
  61		}
  62	}
  63	return nil, fmt.Errorf("user not found")
  64}
  65
  66func (t *TestDB) FindFeature(userID, name string) (*db.FeatureFlag, error) {
  67	for _, ff := range t.Features {
  68		if ff.UserID == userID && ff.Name == name {
  69			return ff, nil
  70		}
  71	}
  72	return nil, fmt.Errorf("feature not found")
  73}
  74
  75func (t *TestDB) HasFeatureByUser(userID string, feature string) bool {
  76	ff, err := t.FindFeature(userID, feature)
  77	if err != nil {
  78		return false
  79	}
  80	return ff.IsValid()
  81}
  82
  83func (t *TestDB) InsertAccessLog(_ *db.AccessLog) error {
  84	return nil
  85}
  86
  87func (t *TestDB) Close() error {
  88	return nil
  89}
  90
  91func (t *TestDB) AddUser(user *db.User) {
  92	t.Users = append(t.Users, user)
  93}
  94
  95func (t *TestDB) AddPubkey(pubkey *db.PublicKey) {
  96	t.Pubkeys = append(t.Pubkeys, pubkey)
  97}
  98
  99func (t *TestDB) UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error {
 100	for _, m := range t.PipeMonitors {
 101		if m.UserId == userID && m.Topic == topic {
 102			m.WindowDur = dur
 103			m.WindowEnd = winEnd
 104			now := time.Now()
 105			m.UpdatedAt = &now
 106			return nil
 107		}
 108	}
 109	now := time.Now()
 110	t.PipeMonitors = append(t.PipeMonitors, &db.PipeMonitor{
 111		ID:        fmt.Sprintf("monitor-%s-%s", userID, topic),
 112		UserId:    userID,
 113		Topic:     topic,
 114		WindowDur: dur,
 115		WindowEnd: winEnd,
 116		CreatedAt: &now,
 117		UpdatedAt: &now,
 118	})
 119	return nil
 120}
 121
 122func (t *TestDB) UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error {
 123	for _, m := range t.PipeMonitors {
 124		if m.UserId == userID && m.Topic == topic {
 125			m.LastPing = lastPing
 126			now := time.Now()
 127			m.UpdatedAt = &now
 128			return nil
 129		}
 130	}
 131	return fmt.Errorf("monitor not found")
 132}
 133
 134func (t *TestDB) RemovePipeMonitor(userID, topic string) error {
 135	for i, m := range t.PipeMonitors {
 136		if m.UserId == userID && m.Topic == topic {
 137			t.PipeMonitors = append(t.PipeMonitors[:i], t.PipeMonitors[i+1:]...)
 138			return nil
 139		}
 140	}
 141	return fmt.Errorf("monitor not found")
 142}
 143
 144func (t *TestDB) FindPipeMonitorByTopic(userID, topic string) (*db.PipeMonitor, error) {
 145	for _, m := range t.PipeMonitors {
 146		if m.UserId == userID && m.Topic == topic {
 147			return m, nil
 148		}
 149	}
 150	return nil, fmt.Errorf("monitor not found")
 151}
 152
 153func (t *TestDB) FindPipeMonitorsByUser(userID string) ([]*db.PipeMonitor, error) {
 154	var monitors []*db.PipeMonitor
 155	for _, m := range t.PipeMonitors {
 156		if m.UserId == userID {
 157			monitors = append(monitors, m)
 158		}
 159	}
 160	return monitors, nil
 161}
 162
 163func (t *TestDB) InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error {
 164	return nil
 165}
 166
 167func (t *TestDB) FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*db.PipeMonitorHistory, error) {
 168	return nil, nil
 169}
 170
 171type TestSSHServer struct {
 172	Cfg         *shared.ConfigSite
 173	DBPool      *TestDB
 174	PipeHandler *CliHandler
 175	Cancel      context.CancelFunc
 176}
 177
 178func NewTestSSHServer(t *testing.T) *TestSSHServer {
 179	t.Helper()
 180
 181	opts := &slog.HandlerOptions{
 182		AddSource: true,
 183		Level:     slog.LevelDebug,
 184	}
 185	logger := slog.New(slog.NewTextHandler(os.Stdout, opts))
 186
 187	dbpool := NewTestDB(logger)
 188
 189	cfg := &shared.ConfigSite{
 190		Domain:       "pipe.test",
 191		Port:         "2225",
 192		PortOverride: "2225",
 193		Protocol:     "ssh",
 194		Logger:       logger,
 195		Space:        "pipe",
 196	}
 197
 198	ctx, cancel := context.WithCancel(context.Background())
 199
 200	pubsub := psub.NewMulticast(logger)
 201	handler := &CliHandler{
 202		Logger:  logger,
 203		DBPool:  dbpool,
 204		PubSub:  pubsub,
 205		Cfg:     cfg,
 206		Waiters: syncmap.New[string, []string](),
 207		Access:  syncmap.New[string, []string](),
 208	}
 209
 210	sshAuth := shared.NewSshAuthHandler(dbpool, logger, "pipe")
 211
 212	prometheus.DefaultRegisterer = prometheus.NewRegistry()
 213
 214	server, err := pssh.NewSSHServerWithConfig(
 215		ctx,
 216		logger,
 217		"pipe-ssh-test",
 218		"localhost",
 219		cfg.Port,
 220		"9223",
 221		"../../ssh_data/term_info_ed25519",
 222		func(conn ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) {
 223			perms, _ := sshAuth.PubkeyAuthHandler(conn, key)
 224			if perms == nil {
 225				perms = &ssh.Permissions{
 226					Extensions: map[string]string{
 227						"pubkey": shared.KeyForKeyText(key),
 228					},
 229				}
 230			}
 231			return perms, nil
 232		},
 233		[]pssh.SSHServerMiddleware{
 234			Middleware(handler),
 235			pssh.LogMiddleware(handler, dbpool),
 236		},
 237		nil,
 238		nil,
 239	)
 240
 241	if err != nil {
 242		t.Fatalf("failed to create ssh server: %v", err)
 243	}
 244
 245	go func() {
 246		if err := server.ListenAndServe(); err != nil {
 247			logger.Error("serve", "err", err.Error())
 248		}
 249	}()
 250
 251	time.Sleep(100 * time.Millisecond)
 252
 253	return &TestSSHServer{
 254		Cfg:         cfg,
 255		DBPool:      dbpool,
 256		PipeHandler: handler,
 257		Cancel:      cancel,
 258	}
 259}
 260
 261func (s *TestSSHServer) Shutdown() {
 262	s.Cancel()
 263	time.Sleep(10 * time.Millisecond)
 264}
 265
 266type UserSSH struct {
 267	username   string
 268	signer     ssh.Signer
 269	privateKey []byte
 270}
 271
 272func GenerateUser(username string) UserSSH {
 273	_, userKey, err := ed25519.GenerateKey(rand.Reader)
 274	if err != nil {
 275		panic(err)
 276	}
 277
 278	b, err := ssh.MarshalPrivateKey(userKey, "")
 279	if err != nil {
 280		panic(err)
 281	}
 282
 283	userSigner, err := ssh.NewSignerFromKey(userKey)
 284	if err != nil {
 285		panic(err)
 286	}
 287
 288	return UserSSH{
 289		username:   username,
 290		signer:     userSigner,
 291		privateKey: b.Bytes,
 292	}
 293}
 294
 295func (u UserSSH) PublicKey() string {
 296	return shared.KeyForKeyText(u.signer.PublicKey())
 297}
 298
 299func (u UserSSH) NewClient() (*ssh.Client, error) {
 300	config := &ssh.ClientConfig{
 301		User: u.username,
 302		Auth: []ssh.AuthMethod{
 303			ssh.PublicKeys(u.signer),
 304		},
 305		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
 306	}
 307
 308	return ssh.Dial("tcp", "localhost:2225", config)
 309}
 310
 311func (u UserSSH) RunCommand(client *ssh.Client, cmd string) (string, error) {
 312	session, err := client.NewSession()
 313	if err != nil {
 314		return "", err
 315	}
 316	defer func() { _ = session.Close() }()
 317
 318	stdoutPipe, err := session.StdoutPipe()
 319	if err != nil {
 320		return "", err
 321	}
 322
 323	stderrPipe, err := session.StderrPipe()
 324	if err != nil {
 325		return "", err
 326	}
 327
 328	if err := session.Start(cmd); err != nil {
 329		return "", err
 330	}
 331
 332	stdout := new(strings.Builder)
 333	stderr := new(strings.Builder)
 334	_, _ = io.Copy(stdout, stdoutPipe)
 335	_, _ = io.Copy(stderr, stderrPipe)
 336
 337	_ = session.Wait()
 338	return stdout.String() + stderr.String(), nil
 339}
 340
 341func (u UserSSH) RunCommandWithStdin(client *ssh.Client, cmd string, stdin string) (string, error) {
 342	session, err := client.NewSession()
 343	if err != nil {
 344		return "", err
 345	}
 346	defer func() { _ = session.Close() }()
 347
 348	stdinPipe, err := session.StdinPipe()
 349	if err != nil {
 350		return "", err
 351	}
 352
 353	stdoutPipe, err := session.StdoutPipe()
 354	if err != nil {
 355		return "", err
 356	}
 357
 358	if err := session.Start(cmd); err != nil {
 359		return "", err
 360	}
 361
 362	_, err = stdinPipe.Write([]byte(stdin))
 363	if err != nil {
 364		return "", err
 365	}
 366	_ = stdinPipe.Close()
 367
 368	buf := new(strings.Builder)
 369	_, err = io.Copy(buf, stdoutPipe)
 370	if err != nil {
 371		return "", err
 372	}
 373
 374	_ = session.Wait()
 375	return buf.String(), nil
 376}
 377
 378func RegisterUserWithServer(server *TestSSHServer, user UserSSH) {
 379	dbUser := &db.User{
 380		ID:   user.username + "-id",
 381		Name: user.username,
 382	}
 383	server.DBPool.AddUser(dbUser)
 384	server.DBPool.AddPubkey(&db.PublicKey{
 385		ID:     user.username + "-pubkey-id",
 386		UserID: dbUser.ID,
 387		Key:    user.PublicKey(),
 388	})
 389}
 390
 391func TestLs_UnauthenticatedUserDenied(t *testing.T) {
 392	server := NewTestSSHServer(t)
 393	defer server.Shutdown()
 394
 395	user := GenerateUser("anonymous")
 396
 397	client, err := user.NewClient()
 398	if err != nil {
 399		t.Fatalf("failed to connect: %v", err)
 400	}
 401	defer func() { _ = client.Close() }()
 402
 403	output, err := user.RunCommand(client, "ls")
 404	if err != nil {
 405		t.Logf("command error (expected): %v", err)
 406	}
 407
 408	if !strings.Contains(output, "access denied") {
 409		t.Errorf("expected 'access denied', got: %s", output)
 410	}
 411}
 412
 413func TestLs_AuthenticatedUser(t *testing.T) {
 414	server := NewTestSSHServer(t)
 415	defer server.Shutdown()
 416
 417	user := GenerateUser("alice")
 418	RegisterUserWithServer(server, user)
 419
 420	client, err := user.NewClient()
 421	if err != nil {
 422		t.Fatalf("failed to connect: %v", err)
 423	}
 424	defer func() { _ = client.Close() }()
 425
 426	output, err := user.RunCommand(client, "ls")
 427	if err != nil {
 428		t.Logf("command completed with: %v", err)
 429	}
 430
 431	if strings.Contains(output, "access denied") {
 432		t.Errorf("authenticated user should not get access denied, got: %s", output)
 433	}
 434
 435	if !strings.Contains(output, "no pubsub channels found") {
 436		t.Errorf("expected 'no pubsub channels found' for empty state, got: %s", output)
 437	}
 438}
 439
 440func TestPubSub_BasicFlow(t *testing.T) {
 441	server := NewTestSSHServer(t)
 442	defer server.Shutdown()
 443
 444	user := GenerateUser("alice")
 445	RegisterUserWithServer(server, user)
 446
 447	subClient, err := user.NewClient()
 448	if err != nil {
 449		t.Fatalf("failed to connect subscriber: %v", err)
 450	}
 451	defer func() { _ = subClient.Close() }()
 452
 453	pubClient, err := user.NewClient()
 454	if err != nil {
 455		t.Fatalf("failed to connect publisher: %v", err)
 456	}
 457	defer func() { _ = pubClient.Close() }()
 458
 459	subSession, err := subClient.NewSession()
 460	if err != nil {
 461		t.Fatalf("failed to create sub session: %v", err)
 462	}
 463	defer func() { _ = subSession.Close() }()
 464
 465	subStdout, err := subSession.StdoutPipe()
 466	if err != nil {
 467		t.Fatalf("failed to get sub stdout: %v", err)
 468	}
 469
 470	if err := subSession.Start("sub testtopic -c"); err != nil {
 471		t.Fatalf("failed to start sub: %v", err)
 472	}
 473
 474	time.Sleep(100 * time.Millisecond)
 475
 476	testMessage := "hello from pub"
 477	_, err = user.RunCommandWithStdin(pubClient, "pub testtopic -c", testMessage)
 478	if err != nil {
 479		t.Logf("pub command completed: %v", err)
 480	}
 481
 482	received := make([]byte, len(testMessage)+10)
 483	n, err := subStdout.Read(received)
 484	if err != nil && err != io.EOF {
 485		t.Logf("read error: %v", err)
 486	}
 487
 488	receivedStr := string(received[:n])
 489	if !strings.Contains(receivedStr, testMessage) {
 490		t.Errorf("subscriber did not receive message, got: %q, want: %q", receivedStr, testMessage)
 491	}
 492}
 493
 494func TestPubSub_PublicTopic(t *testing.T) {
 495	server := NewTestSSHServer(t)
 496	defer server.Shutdown()
 497
 498	alice := GenerateUser("alice")
 499	bob := GenerateUser("bob")
 500	RegisterUserWithServer(server, alice)
 501	RegisterUserWithServer(server, bob)
 502
 503	subClient, err := bob.NewClient()
 504	if err != nil {
 505		t.Fatalf("failed to connect subscriber: %v", err)
 506	}
 507	defer func() { _ = subClient.Close() }()
 508
 509	pubClient, err := alice.NewClient()
 510	if err != nil {
 511		t.Fatalf("failed to connect publisher: %v", err)
 512	}
 513	defer func() { _ = pubClient.Close() }()
 514
 515	subSession, err := subClient.NewSession()
 516	if err != nil {
 517		t.Fatalf("failed to create sub session: %v", err)
 518	}
 519	defer func() { _ = subSession.Close() }()
 520
 521	subStdout, err := subSession.StdoutPipe()
 522	if err != nil {
 523		t.Fatalf("failed to get sub stdout: %v", err)
 524	}
 525
 526	if err := subSession.Start("sub publictopic -p -c"); err != nil {
 527		t.Fatalf("failed to start sub: %v", err)
 528	}
 529
 530	time.Sleep(100 * time.Millisecond)
 531
 532	testMessage := "public message"
 533	_, err = alice.RunCommandWithStdin(pubClient, "pub publictopic -p -c", testMessage)
 534	if err != nil {
 535		t.Logf("pub command completed: %v", err)
 536	}
 537
 538	received := make([]byte, len(testMessage)+10)
 539	n, err := subStdout.Read(received)
 540	if err != nil && err != io.EOF {
 541		t.Logf("read error: %v", err)
 542	}
 543
 544	receivedStr := string(received[:n])
 545	if !strings.Contains(receivedStr, testMessage) {
 546		t.Errorf("subscriber did not receive public message, got: %q, want: %q", receivedStr, testMessage)
 547	}
 548}
 549
 550func TestPipe_Bidirectional(t *testing.T) {
 551	server := NewTestSSHServer(t)
 552	defer server.Shutdown()
 553
 554	alice := GenerateUser("alice")
 555	bob := GenerateUser("bob")
 556	RegisterUserWithServer(server, alice)
 557	RegisterUserWithServer(server, bob)
 558
 559	aliceClient, err := alice.NewClient()
 560	if err != nil {
 561		t.Fatalf("failed to connect alice: %v", err)
 562	}
 563	defer func() { _ = aliceClient.Close() }()
 564
 565	bobClient, err := bob.NewClient()
 566	if err != nil {
 567		t.Fatalf("failed to connect bob: %v", err)
 568	}
 569	defer func() { _ = bobClient.Close() }()
 570
 571	aliceSession, err := aliceClient.NewSession()
 572	if err != nil {
 573		t.Fatalf("failed to create alice session: %v", err)
 574	}
 575	defer func() { _ = aliceSession.Close() }()
 576
 577	aliceStdin, err := aliceSession.StdinPipe()
 578	if err != nil {
 579		t.Fatalf("failed to get alice stdin: %v", err)
 580	}
 581
 582	aliceStdout, err := aliceSession.StdoutPipe()
 583	if err != nil {
 584		t.Fatalf("failed to get alice stdout: %v", err)
 585	}
 586
 587	if err := aliceSession.Start("pipe pipetopic -p -c"); err != nil {
 588		t.Fatalf("failed to start alice pipe: %v", err)
 589	}
 590
 591	time.Sleep(100 * time.Millisecond)
 592
 593	bobSession, err := bobClient.NewSession()
 594	if err != nil {
 595		t.Fatalf("failed to create bob session: %v", err)
 596	}
 597	defer func() { _ = bobSession.Close() }()
 598
 599	bobStdin, err := bobSession.StdinPipe()
 600	if err != nil {
 601		t.Fatalf("failed to get bob stdin: %v", err)
 602	}
 603
 604	bobStdout, err := bobSession.StdoutPipe()
 605	if err != nil {
 606		t.Fatalf("failed to get bob stdout: %v", err)
 607	}
 608
 609	if err := bobSession.Start("pipe pipetopic -p -c"); err != nil {
 610		t.Fatalf("failed to start bob pipe: %v", err)
 611	}
 612
 613	time.Sleep(100 * time.Millisecond)
 614
 615	aliceMsg := "hello from alice\n"
 616	_, err = aliceStdin.Write([]byte(aliceMsg))
 617	if err != nil {
 618		t.Fatalf("alice failed to write: %v", err)
 619	}
 620
 621	bobReceived := make([]byte, 100)
 622	n, err := bobStdout.Read(bobReceived)
 623	if err != nil && err != io.EOF {
 624		t.Logf("bob read error: %v", err)
 625	}
 626	if !strings.Contains(string(bobReceived[:n]), "hello from alice") {
 627		t.Errorf("bob did not receive alice's message, got: %q", string(bobReceived[:n]))
 628	}
 629
 630	bobMsg := "hello from bob\n"
 631	_, err = bobStdin.Write([]byte(bobMsg))
 632	if err != nil {
 633		t.Fatalf("bob failed to write: %v", err)
 634	}
 635
 636	aliceReceived := make([]byte, 100)
 637	n, err = aliceStdout.Read(aliceReceived)
 638	if err != nil && err != io.EOF {
 639		t.Logf("alice read error: %v", err)
 640	}
 641	if !strings.Contains(string(aliceReceived[:n]), "hello from bob") {
 642		t.Errorf("alice did not receive bob's message, got: %q", string(aliceReceived[:n]))
 643	}
 644}
 645
 646func TestPipe_AutoGeneratedTopic(t *testing.T) {
 647	server := NewTestSSHServer(t)
 648	defer server.Shutdown()
 649
 650	user := GenerateUser("alice")
 651	RegisterUserWithServer(server, user)
 652
 653	client, err := user.NewClient()
 654	if err != nil {
 655		t.Fatalf("failed to connect: %v", err)
 656	}
 657	defer func() { _ = client.Close() }()
 658
 659	session, err := client.NewSession()
 660	if err != nil {
 661		t.Fatalf("failed to create session: %v", err)
 662	}
 663	defer func() { _ = session.Close() }()
 664
 665	stdout, err := session.StdoutPipe()
 666	if err != nil {
 667		t.Fatalf("failed to get stdout: %v", err)
 668	}
 669
 670	if err := session.Start("pipe"); err != nil {
 671		t.Fatalf("failed to start pipe: %v", err)
 672	}
 673
 674	received := make([]byte, 200)
 675	n, err := stdout.Read(received)
 676	if err != nil && err != io.EOF {
 677		t.Logf("read error: %v", err)
 678	}
 679
 680	output := string(received[:n])
 681	if !strings.Contains(output, "subscribe to this topic") {
 682		t.Errorf("expected topic subscription instructions, got: %q", output)
 683	}
 684}
 685
 686func TestAccessControl_AllowedUserViaFullPath(t *testing.T) {
 687	server := NewTestSSHServer(t)
 688	defer server.Shutdown()
 689
 690	alice := GenerateUser("alice")
 691	bob := GenerateUser("bob")
 692	RegisterUserWithServer(server, alice)
 693	RegisterUserWithServer(server, bob)
 694
 695	aliceClient, err := alice.NewClient()
 696	if err != nil {
 697		t.Fatalf("failed to connect alice: %v", err)
 698	}
 699	defer func() { _ = aliceClient.Close() }()
 700
 701	aliceSession, err := aliceClient.NewSession()
 702	if err != nil {
 703		t.Fatalf("failed to create alice session: %v", err)
 704	}
 705	defer func() { _ = aliceSession.Close() }()
 706
 707	aliceStdout, err := aliceSession.StdoutPipe()
 708	if err != nil {
 709		t.Fatalf("failed to get alice stdout: %v", err)
 710	}
 711
 712	if err := aliceSession.Start("sub sharedtopic -a alice,bob -c"); err != nil {
 713		t.Fatalf("failed to start alice sub: %v", err)
 714	}
 715
 716	time.Sleep(100 * time.Millisecond)
 717
 718	bobClient, err := bob.NewClient()
 719	if err != nil {
 720		t.Fatalf("failed to connect bob: %v", err)
 721	}
 722	defer func() { _ = bobClient.Close() }()
 723
 724	_, err = bob.RunCommandWithStdin(bobClient, "pub alice/sharedtopic -c", "bob allowed")
 725	if err != nil {
 726		t.Logf("bob pub completed: %v", err)
 727	}
 728
 729	aliceReceived := make([]byte, 100)
 730	n, _ := aliceStdout.Read(aliceReceived)
 731
 732	if !strings.Contains(string(aliceReceived[:n]), "bob allowed") {
 733		t.Errorf("alice should receive bob's message on shared topic, got: %q", string(aliceReceived[:n]))
 734	}
 735}
 736
 737func TestPubSub_BlockingWaitsForSubscriber(t *testing.T) {
 738	server := NewTestSSHServer(t)
 739	defer server.Shutdown()
 740
 741	user := GenerateUser("alice")
 742	RegisterUserWithServer(server, user)
 743
 744	pubClient, err := user.NewClient()
 745	if err != nil {
 746		t.Fatalf("failed to connect publisher: %v", err)
 747	}
 748	defer func() { _ = pubClient.Close() }()
 749
 750	subClient, err := user.NewClient()
 751	if err != nil {
 752		t.Fatalf("failed to connect subscriber: %v", err)
 753	}
 754	defer func() { _ = subClient.Close() }()
 755
 756	pubSession, err := pubClient.NewSession()
 757	if err != nil {
 758		t.Fatalf("failed to create pub session: %v", err)
 759	}
 760	defer func() { _ = pubSession.Close() }()
 761
 762	pubStdin, err := pubSession.StdinPipe()
 763	if err != nil {
 764		t.Fatalf("failed to get pub stdin: %v", err)
 765	}
 766
 767	pubStdout, err := pubSession.StdoutPipe()
 768	if err != nil {
 769		t.Fatalf("failed to get pub stdout: %v", err)
 770	}
 771
 772	// Start publisher with blocking enabled (default -b=true)
 773	// Publisher should wait for subscriber
 774	if err := pubSession.Start("pub blockingtopic"); err != nil {
 775		t.Fatalf("failed to start pub: %v", err)
 776	}
 777
 778	// Read output until we see "waiting" message or timeout
 779	// Need to read in a loop because Read() may return partial data
 780	var output string
 781	readDone := make(chan struct{})
 782	go func() {
 783		buf := make([]byte, 1024)
 784		for {
 785			n, err := pubStdout.Read(buf)
 786			if n > 0 {
 787				output += string(buf[:n])
 788				if strings.Contains(output, "waiting") {
 789					close(readDone)
 790					return
 791				}
 792			}
 793			if err != nil {
 794				close(readDone)
 795				return
 796			}
 797		}
 798	}()
 799
 800	select {
 801	case <-readDone:
 802	case <-time.After(2 * time.Second):
 803		t.Fatalf("timeout waiting for 'waiting' message, got: %q", output)
 804	}
 805
 806	if !strings.Contains(output, "waiting") {
 807		t.Errorf("expected 'waiting' message for blocking pub, got: %q", output)
 808	}
 809
 810	// Now start subscriber - this should unblock the publisher
 811	subSession, err := subClient.NewSession()
 812	if err != nil {
 813		t.Fatalf("failed to create sub session: %v", err)
 814	}
 815	defer func() { _ = subSession.Close() }()
 816
 817	subStdout, err := subSession.StdoutPipe()
 818	if err != nil {
 819		t.Fatalf("failed to get sub stdout: %v", err)
 820	}
 821
 822	if err := subSession.Start("sub blockingtopic -c"); err != nil {
 823		t.Fatalf("failed to start sub: %v", err)
 824	}
 825
 826	time.Sleep(100 * time.Millisecond)
 827
 828	// Now send the message
 829	testMessage := "blocking message"
 830	_, err = pubStdin.Write([]byte(testMessage))
 831	if err != nil {
 832		t.Fatalf("failed to write message: %v", err)
 833	}
 834	_ = pubStdin.Close()
 835
 836	// Subscriber should receive the message
 837	received := make([]byte, 100)
 838	nRead, err := subStdout.Read(received)
 839	if err != nil && err != io.EOF {
 840		t.Logf("read error: %v", err)
 841	}
 842
 843	if !strings.Contains(string(received[:nRead]), testMessage) {
 844		t.Errorf("subscriber did not receive blocking message, got: %q, want: %q", string(received[:nRead]), testMessage)
 845	}
 846}
 847
 848func TestPubSub_NonBlockingDoesNotWait(t *testing.T) {
 849	server := NewTestSSHServer(t)
 850	defer server.Shutdown()
 851
 852	user := GenerateUser("alice")
 853	RegisterUserWithServer(server, user)
 854
 855	client, err := user.NewClient()
 856	if err != nil {
 857		t.Fatalf("failed to connect: %v", err)
 858	}
 859	defer func() { _ = client.Close() }()
 860
 861	// Publish with -b=false (non-blocking) and no subscriber
 862	// Should complete immediately without waiting
 863	done := make(chan struct{})
 864	var output string
 865	var cmdErr error
 866
 867	go func() {
 868		output, cmdErr = user.RunCommandWithStdin(client, "pub nonblockingtopic -b=false -c", "non-blocking message")
 869		close(done)
 870	}()
 871
 872	select {
 873	case <-done:
 874		// Command completed - this is expected for non-blocking
 875		if cmdErr != nil {
 876			t.Logf("non-blocking pub completed with: %v", cmdErr)
 877		}
 878		t.Logf("non-blocking pub output: %q", output)
 879	case <-time.After(2 * time.Second):
 880		t.Errorf("non-blocking pub should complete immediately, but it blocked")
 881	}
 882}
 883
 884func TestPubSub_BlockingTimeout(t *testing.T) {
 885	server := NewTestSSHServer(t)
 886	defer server.Shutdown()
 887
 888	user := GenerateUser("alice")
 889	RegisterUserWithServer(server, user)
 890
 891	client, err := user.NewClient()
 892	if err != nil {
 893		t.Fatalf("failed to connect: %v", err)
 894	}
 895	defer func() { _ = client.Close() }()
 896
 897	// Publish with blocking and short timeout, no subscriber
 898	// Should timeout after the specified duration
 899	done := make(chan struct{})
 900	var output string
 901
 902	go func() {
 903		output, _ = user.RunCommandWithStdin(client, "pub timeouttopic -b=true -t=500ms", "timeout message")
 904		close(done)
 905	}()
 906
 907	select {
 908	case <-done:
 909		// Command completed due to timeout
 910		if !strings.Contains(output, "timeout") && !strings.Contains(output, "waiting") {
 911			t.Logf("blocking pub with timeout output: %q", output)
 912		}
 913	case <-time.After(3 * time.Second):
 914		t.Errorf("blocking pub with timeout should have timed out after 500ms")
 915	}
 916}
 917
 918func TestSub_WaitsForPublisher(t *testing.T) {
 919	server := NewTestSSHServer(t)
 920	defer server.Shutdown()
 921
 922	user := GenerateUser("alice")
 923	RegisterUserWithServer(server, user)
 924
 925	subClient, err := user.NewClient()
 926	if err != nil {
 927		t.Fatalf("failed to connect subscriber: %v", err)
 928	}
 929	defer func() { _ = subClient.Close() }()
 930
 931	pubClient, err := user.NewClient()
 932	if err != nil {
 933		t.Fatalf("failed to connect publisher: %v", err)
 934	}
 935	defer func() { _ = pubClient.Close() }()
 936
 937	// Start subscriber first - it should wait for publisher
 938	subSession, err := subClient.NewSession()
 939	if err != nil {
 940		t.Fatalf("failed to create sub session: %v", err)
 941	}
 942	defer func() { _ = subSession.Close() }()
 943
 944	subStdout, err := subSession.StdoutPipe()
 945	if err != nil {
 946		t.Fatalf("failed to get sub stdout: %v", err)
 947	}
 948
 949	if err := subSession.Start("sub waitfortopic -c"); err != nil {
 950		t.Fatalf("failed to start sub: %v", err)
 951	}
 952
 953	// Subscriber is now waiting - give it a moment
 954	time.Sleep(100 * time.Millisecond)
 955
 956	// Now publish - subscriber should receive it
 957	testMessage := "delayed publish"
 958	_, err = user.RunCommandWithStdin(pubClient, "pub waitfortopic -c", testMessage)
 959	if err != nil {
 960		t.Logf("pub completed: %v", err)
 961	}
 962
 963	received := make([]byte, 100)
 964	n, err := subStdout.Read(received)
 965	if err != nil && err != io.EOF {
 966		t.Logf("read error: %v", err)
 967	}
 968
 969	if !strings.Contains(string(received[:n]), testMessage) {
 970		t.Errorf("subscriber waiting for publisher did not receive message, got: %q, want: %q", string(received[:n]), testMessage)
 971	}
 972}
 973
 974func TestSub_KeepAliveReceivesMultipleMessages(t *testing.T) {
 975	server := NewTestSSHServer(t)
 976	defer server.Shutdown()
 977
 978	user := GenerateUser("alice")
 979	RegisterUserWithServer(server, user)
 980
 981	subClient, err := user.NewClient()
 982	if err != nil {
 983		t.Fatalf("failed to connect subscriber: %v", err)
 984	}
 985	defer func() { _ = subClient.Close() }()
 986
 987	pubClient1, err := user.NewClient()
 988	if err != nil {
 989		t.Fatalf("failed to connect publisher 1: %v", err)
 990	}
 991	defer func() { _ = pubClient1.Close() }()
 992
 993	pubClient2, err := user.NewClient()
 994	if err != nil {
 995		t.Fatalf("failed to connect publisher 2: %v", err)
 996	}
 997	defer func() { _ = pubClient2.Close() }()
 998
 999	// Start subscriber with keepAlive (-k) flag
1000	subSession, err := subClient.NewSession()
1001	if err != nil {
1002		t.Fatalf("failed to create sub session: %v", err)
1003	}
1004	defer func() { _ = subSession.Close() }()
1005
1006	subStdout, err := subSession.StdoutPipe()
1007	if err != nil {
1008		t.Fatalf("failed to get sub stdout: %v", err)
1009	}
1010
1011	if err := subSession.Start("sub keepalivetopic -k -c"); err != nil {
1012		t.Fatalf("failed to start sub: %v", err)
1013	}
1014
1015	time.Sleep(100 * time.Millisecond)
1016
1017	// Send first message
1018	msg1 := "first message\n"
1019	_, err = user.RunCommandWithStdin(pubClient1, "pub keepalivetopic -c", msg1)
1020	if err != nil {
1021		t.Logf("pub 1 completed: %v", err)
1022	}
1023
1024	received1 := make([]byte, 100)
1025	n1, _ := subStdout.Read(received1)
1026	if !strings.Contains(string(received1[:n1]), "first message") {
1027		t.Errorf("subscriber did not receive first message, got: %q", string(received1[:n1]))
1028	}
1029
1030	// Send second message - subscriber with keepAlive should still receive it
1031	msg2 := "second message\n"
1032	_, err = user.RunCommandWithStdin(pubClient2, "pub keepalivetopic -c", msg2)
1033	if err != nil {
1034		t.Logf("pub 2 completed: %v", err)
1035	}
1036
1037	received2 := make([]byte, 100)
1038	n2, _ := subStdout.Read(received2)
1039	if !strings.Contains(string(received2[:n2]), "second message") {
1040		t.Errorf("subscriber with keepAlive did not receive second message, got: %q", string(received2[:n2]))
1041	}
1042}
1043
1044func TestSub_WithoutKeepAliveExitsAfterPublisher(t *testing.T) {
1045	server := NewTestSSHServer(t)
1046	defer server.Shutdown()
1047
1048	user := GenerateUser("alice")
1049	RegisterUserWithServer(server, user)
1050
1051	subClient, err := user.NewClient()
1052	if err != nil {
1053		t.Fatalf("failed to connect subscriber: %v", err)
1054	}
1055	defer func() { _ = subClient.Close() }()
1056
1057	pubClient, err := user.NewClient()
1058	if err != nil {
1059		t.Fatalf("failed to connect publisher: %v", err)
1060	}
1061	defer func() { _ = pubClient.Close() }()
1062
1063	// Start subscriber without keepAlive
1064	subSession, err := subClient.NewSession()
1065	if err != nil {
1066		t.Fatalf("failed to create sub session: %v", err)
1067	}
1068
1069	subStdout, err := subSession.StdoutPipe()
1070	if err != nil {
1071		t.Fatalf("failed to get sub stdout: %v", err)
1072	}
1073
1074	if err := subSession.Start("sub exitaftertopic -c"); err != nil {
1075		t.Fatalf("failed to start sub: %v", err)
1076	}
1077
1078	time.Sleep(100 * time.Millisecond)
1079
1080	// Publish a message
1081	testMessage := "single message"
1082	_, err = user.RunCommandWithStdin(pubClient, "pub exitaftertopic -c", testMessage)
1083	if err != nil {
1084		t.Logf("pub completed: %v", err)
1085	}
1086
1087	// Read the message
1088	received := make([]byte, 100)
1089	n, _ := subStdout.Read(received)
1090	if !strings.Contains(string(received[:n]), testMessage) {
1091		t.Errorf("subscriber did not receive message, got: %q", string(received[:n]))
1092	}
1093
1094	// Subscriber session should exit after publisher disconnects
1095	done := make(chan error)
1096	go func() {
1097		done <- subSession.Wait()
1098	}()
1099
1100	select {
1101	case err := <-done:
1102		// Session ended as expected
1103		t.Logf("subscriber session ended: %v", err)
1104	case <-time.After(2 * time.Second):
1105		t.Errorf("subscriber without keepAlive should have exited after publisher disconnected")
1106		_ = subSession.Close()
1107	}
1108}
1109
1110func TestPub_EmptyMessage(t *testing.T) {
1111	server := NewTestSSHServer(t)
1112	defer server.Shutdown()
1113
1114	user := GenerateUser("alice")
1115	RegisterUserWithServer(server, user)
1116
1117	subClient, err := user.NewClient()
1118	if err != nil {
1119		t.Fatalf("failed to connect subscriber: %v", err)
1120	}
1121	defer func() { _ = subClient.Close() }()
1122
1123	pubClient, err := user.NewClient()
1124	if err != nil {
1125		t.Fatalf("failed to connect publisher: %v", err)
1126	}
1127	defer func() { _ = pubClient.Close() }()
1128
1129	// Start subscriber
1130	subSession, err := subClient.NewSession()
1131	if err != nil {
1132		t.Fatalf("failed to create sub session: %v", err)
1133	}
1134	defer func() { _ = subSession.Close() }()
1135
1136	subStdout, err := subSession.StdoutPipe()
1137	if err != nil {
1138		t.Fatalf("failed to get sub stdout: %v", err)
1139	}
1140
1141	if err := subSession.Start("sub emptytopic -c"); err != nil {
1142		t.Fatalf("failed to start sub: %v", err)
1143	}
1144
1145	time.Sleep(100 * time.Millisecond)
1146
1147	// Publish with -e flag (empty message) - should not require stdin
1148	output, err := user.RunCommand(pubClient, "pub emptytopic -e -c")
1149	if err != nil {
1150		t.Logf("pub -e completed: %v, output: %s", err, output)
1151	}
1152
1153	// Subscriber should receive something (even if empty/minimal)
1154	// The -e flag sends a 1-byte buffer
1155	received := make([]byte, 10)
1156	n, err := subStdout.Read(received)
1157	if err != nil && err != io.EOF {
1158		t.Logf("read result: n=%d, err=%v", n, err)
1159	}
1160
1161	// With -e flag, we expect to receive at least 1 byte
1162	if n < 1 {
1163		t.Errorf("subscriber should receive empty message signal, got %d bytes", n)
1164	}
1165}
1166
1167func TestPipe_AccessControl(t *testing.T) {
1168	server := NewTestSSHServer(t)
1169	defer server.Shutdown()
1170
1171	alice := GenerateUser("alice")
1172	bob := GenerateUser("bob")
1173	RegisterUserWithServer(server, alice)
1174	RegisterUserWithServer(server, bob)
1175
1176	aliceClient, err := alice.NewClient()
1177	if err != nil {
1178		t.Fatalf("failed to connect alice: %v", err)
1179	}
1180	defer func() { _ = aliceClient.Close() }()
1181
1182	bobClient, err := bob.NewClient()
1183	if err != nil {
1184		t.Fatalf("failed to connect bob: %v", err)
1185	}
1186	defer func() { _ = bobClient.Close() }()
1187
1188	// Alice creates a pipe with access control allowing bob
1189	aliceSession, err := aliceClient.NewSession()
1190	if err != nil {
1191		t.Fatalf("failed to create alice session: %v", err)
1192	}
1193	defer func() { _ = aliceSession.Close() }()
1194
1195	aliceStdin, err := aliceSession.StdinPipe()
1196	if err != nil {
1197		t.Fatalf("failed to get alice stdin: %v", err)
1198	}
1199
1200	aliceStdout, err := aliceSession.StdoutPipe()
1201	if err != nil {
1202		t.Fatalf("failed to get alice stdout: %v", err)
1203	}
1204
1205	if err := aliceSession.Start("pipe accesspipe -a alice,bob -c"); err != nil {
1206		t.Fatalf("failed to start alice pipe: %v", err)
1207	}
1208
1209	time.Sleep(100 * time.Millisecond)
1210
1211	// Bob joins the pipe using alice's namespace
1212	bobSession, err := bobClient.NewSession()
1213	if err != nil {
1214		t.Fatalf("failed to create bob session: %v", err)
1215	}
1216	defer func() { _ = bobSession.Close() }()
1217
1218	bobStdin, err := bobSession.StdinPipe()
1219	if err != nil {
1220		t.Fatalf("failed to get bob stdin: %v", err)
1221	}
1222
1223	bobStdout, err := bobSession.StdoutPipe()
1224	if err != nil {
1225		t.Fatalf("failed to get bob stdout: %v", err)
1226	}
1227
1228	if err := bobSession.Start("pipe alice/accesspipe -c"); err != nil {
1229		t.Fatalf("failed to start bob pipe: %v", err)
1230	}
1231
1232	time.Sleep(100 * time.Millisecond)
1233
1234	// Alice sends message to bob
1235	aliceMsg := "hello bob\n"
1236	_, err = aliceStdin.Write([]byte(aliceMsg))
1237	if err != nil {
1238		t.Fatalf("alice failed to write: %v", err)
1239	}
1240
1241	bobReceived := make([]byte, 100)
1242	n, _ := bobStdout.Read(bobReceived)
1243	if !strings.Contains(string(bobReceived[:n]), "hello bob") {
1244		t.Errorf("bob did not receive alice's message, got: %q", string(bobReceived[:n]))
1245	}
1246
1247	// Bob sends message to alice
1248	bobMsg := "hello alice\n"
1249	_, err = bobStdin.Write([]byte(bobMsg))
1250	if err != nil {
1251		t.Fatalf("bob failed to write: %v", err)
1252	}
1253
1254	aliceReceived := make([]byte, 100)
1255	n, _ = aliceStdout.Read(aliceReceived)
1256	if !strings.Contains(string(aliceReceived[:n]), "hello alice") {
1257		t.Errorf("alice did not receive bob's message, got: %q", string(aliceReceived[:n]))
1258	}
1259}
1260
1261func TestPipe_Replay(t *testing.T) {
1262	server := NewTestSSHServer(t)
1263	defer server.Shutdown()
1264
1265	user := GenerateUser("alice")
1266	RegisterUserWithServer(server, user)
1267
1268	client, err := user.NewClient()
1269	if err != nil {
1270		t.Fatalf("failed to connect: %v", err)
1271	}
1272	defer func() { _ = client.Close() }()
1273
1274	// Start pipe with replay flag (-r)
1275	session, err := client.NewSession()
1276	if err != nil {
1277		t.Fatalf("failed to create session: %v", err)
1278	}
1279	defer func() { _ = session.Close() }()
1280
1281	stdin, err := session.StdinPipe()
1282	if err != nil {
1283		t.Fatalf("failed to get stdin: %v", err)
1284	}
1285
1286	stdout, err := session.StdoutPipe()
1287	if err != nil {
1288		t.Fatalf("failed to get stdout: %v", err)
1289	}
1290
1291	if err := session.Start("pipe replaytopic -r -c"); err != nil {
1292		t.Fatalf("failed to start pipe: %v", err)
1293	}
1294
1295	time.Sleep(100 * time.Millisecond)
1296
1297	// Send a message - with -r flag, should receive it back
1298	testMsg := "echo back\n"
1299	_, err = stdin.Write([]byte(testMsg))
1300	if err != nil {
1301		t.Fatalf("failed to write: %v", err)
1302	}
1303
1304	received := make([]byte, 100)
1305	n, err := stdout.Read(received)
1306	if err != nil && err != io.EOF {
1307		t.Logf("read error: %v", err)
1308	}
1309
1310	if !strings.Contains(string(received[:n]), "echo back") {
1311		t.Errorf("with -r flag, sender should receive own message back, got: %q", string(received[:n]))
1312	}
1313}
1314
1315func TestAccessControl_UnauthorizedUserDenied(t *testing.T) {
1316	server := NewTestSSHServer(t)
1317	defer server.Shutdown()
1318
1319	alice := GenerateUser("alice")
1320	bob := GenerateUser("bob")
1321	charlie := GenerateUser("charlie")
1322	RegisterUserWithServer(server, alice)
1323	RegisterUserWithServer(server, bob)
1324	RegisterUserWithServer(server, charlie)
1325
1326	aliceClient, err := alice.NewClient()
1327	if err != nil {
1328		t.Fatalf("failed to connect alice: %v", err)
1329	}
1330	defer func() { _ = aliceClient.Close() }()
1331
1332	charlieClient, err := charlie.NewClient()
1333	if err != nil {
1334		t.Fatalf("failed to connect charlie: %v", err)
1335	}
1336	defer func() { _ = charlieClient.Close() }()
1337
1338	// Alice creates a topic with access only for alice and bob (not charlie)
1339	aliceSession, err := aliceClient.NewSession()
1340	if err != nil {
1341		t.Fatalf("failed to create alice session: %v", err)
1342	}
1343	defer func() { _ = aliceSession.Close() }()
1344
1345	if err := aliceSession.Start("sub restrictedtopic -a alice,bob -c"); err != nil {
1346		t.Fatalf("failed to start alice sub: %v", err)
1347	}
1348
1349	time.Sleep(100 * time.Millisecond)
1350
1351	// Charlie tries to publish to alice's restricted topic - should be denied
1352	output, err := charlie.RunCommandWithStdin(charlieClient, "pub alice/restrictedtopic -c", "unauthorized message")
1353	if err != nil {
1354		t.Logf("charlie pub completed with error (expected): %v", err)
1355	}
1356
1357	// Charlie should get access denied or the message should not be delivered
1358	if strings.Contains(output, "access denied") {
1359		t.Logf("charlie correctly received access denied")
1360	} else {
1361		t.Logf("charlie output: %q (access control may work differently)", output)
1362	}
1363}
1364
1365func TestPubSub_MultipleSubscribers(t *testing.T) {
1366	server := NewTestSSHServer(t)
1367	defer server.Shutdown()
1368
1369	user := GenerateUser("alice")
1370	RegisterUserWithServer(server, user)
1371
1372	pubClient, err := user.NewClient()
1373	if err != nil {
1374		t.Fatalf("failed to connect publisher: %v", err)
1375	}
1376	defer func() { _ = pubClient.Close() }()
1377
1378	sub1Client, err := user.NewClient()
1379	if err != nil {
1380		t.Fatalf("failed to connect subscriber 1: %v", err)
1381	}
1382	defer func() { _ = sub1Client.Close() }()
1383
1384	sub2Client, err := user.NewClient()
1385	if err != nil {
1386		t.Fatalf("failed to connect subscriber 2: %v", err)
1387	}
1388	defer func() { _ = sub2Client.Close() }()
1389
1390	sub3Client, err := user.NewClient()
1391	if err != nil {
1392		t.Fatalf("failed to connect subscriber 3: %v", err)
1393	}
1394	defer func() { _ = sub3Client.Close() }()
1395
1396	// Start three subscribers
1397	sub1Session, err := sub1Client.NewSession()
1398	if err != nil {
1399		t.Fatalf("failed to create sub1 session: %v", err)
1400	}
1401	defer func() { _ = sub1Session.Close() }()
1402
1403	sub1Stdout, err := sub1Session.StdoutPipe()
1404	if err != nil {
1405		t.Fatalf("failed to get sub1 stdout: %v", err)
1406	}
1407
1408	if err := sub1Session.Start("sub fanout -c"); err != nil {
1409		t.Fatalf("failed to start sub1: %v", err)
1410	}
1411
1412	sub2Session, err := sub2Client.NewSession()
1413	if err != nil {
1414		t.Fatalf("failed to create sub2 session: %v", err)
1415	}
1416	defer func() { _ = sub2Session.Close() }()
1417
1418	sub2Stdout, err := sub2Session.StdoutPipe()
1419	if err != nil {
1420		t.Fatalf("failed to get sub2 stdout: %v", err)
1421	}
1422
1423	if err := sub2Session.Start("sub fanout -c"); err != nil {
1424		t.Fatalf("failed to start sub2: %v", err)
1425	}
1426
1427	sub3Session, err := sub3Client.NewSession()
1428	if err != nil {
1429		t.Fatalf("failed to create sub3 session: %v", err)
1430	}
1431	defer func() { _ = sub3Session.Close() }()
1432
1433	sub3Stdout, err := sub3Session.StdoutPipe()
1434	if err != nil {
1435		t.Fatalf("failed to get sub3 stdout: %v", err)
1436	}
1437
1438	if err := sub3Session.Start("sub fanout -c"); err != nil {
1439		t.Fatalf("failed to start sub3: %v", err)
1440	}
1441
1442	time.Sleep(100 * time.Millisecond)
1443
1444	// Publish a single message
1445	testMessage := "broadcast message"
1446	_, err = user.RunCommandWithStdin(pubClient, "pub fanout -c", testMessage)
1447	if err != nil {
1448		t.Logf("pub completed: %v", err)
1449	}
1450
1451	// All three subscribers should receive the message
1452	received1 := make([]byte, 100)
1453	n1, _ := sub1Stdout.Read(received1)
1454	if !strings.Contains(string(received1[:n1]), testMessage) {
1455		t.Errorf("subscriber 1 did not receive message, got: %q", string(received1[:n1]))
1456	}
1457
1458	received2 := make([]byte, 100)
1459	n2, _ := sub2Stdout.Read(received2)
1460	if !strings.Contains(string(received2[:n2]), testMessage) {
1461		t.Errorf("subscriber 2 did not receive message, got: %q", string(received2[:n2]))
1462	}
1463
1464	received3 := make([]byte, 100)
1465	n3, _ := sub3Stdout.Read(received3)
1466	if !strings.Contains(string(received3[:n3]), testMessage) {
1467		t.Errorf("subscriber 3 did not receive message, got: %q", string(received3[:n3]))
1468	}
1469}
1470
1471// Monitor CLI Tests
1472
1473func TestMonitor_UnauthenticatedUserDenied(t *testing.T) {
1474	server := NewTestSSHServer(t)
1475	defer server.Shutdown()
1476
1477	user := GenerateUser("anonymous")
1478
1479	client, err := user.NewClient()
1480	if err != nil {
1481		t.Fatalf("failed to connect: %v", err)
1482	}
1483	defer func() { _ = client.Close() }()
1484
1485	output, err := user.RunCommand(client, "monitor my-service 1h")
1486	if err != nil {
1487		t.Logf("command error (expected): %v", err)
1488	}
1489
1490	if !strings.Contains(output, "access denied") {
1491		t.Errorf("expected 'access denied', got: %s", output)
1492	}
1493}
1494
1495func TestMonitor_CreateMonitor(t *testing.T) {
1496	server := NewTestSSHServer(t)
1497	defer server.Shutdown()
1498
1499	user := GenerateUser("alice")
1500	RegisterUserWithServer(server, user)
1501
1502	client, err := user.NewClient()
1503	if err != nil {
1504		t.Fatalf("failed to connect: %v", err)
1505	}
1506	defer func() { _ = client.Close() }()
1507
1508	output, err := user.RunCommand(client, "monitor pico-uptime 24h")
1509	if err != nil {
1510		t.Logf("command completed: %v", err)
1511	}
1512
1513	if strings.Contains(output, "access denied") {
1514		t.Errorf("authenticated user should not get access denied, got: %s", output)
1515	}
1516
1517	// Verify monitor was created in DB (topic is stored with user prefix)
1518	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/pico-uptime")
1519	if err != nil {
1520		t.Fatalf("monitor should exist in DB: %v", err)
1521	}
1522
1523	if monitor.WindowDur != 24*time.Hour {
1524		t.Errorf("expected window duration 24h, got: %v", monitor.WindowDur)
1525	}
1526
1527	if !strings.Contains(output, "alice/pico-uptime") || !strings.Contains(output, "24h") {
1528		t.Errorf("output should confirm monitor creation, got: %s", output)
1529	}
1530}
1531
1532func TestMonitor_UpdateMonitor(t *testing.T) {
1533	server := NewTestSSHServer(t)
1534	defer server.Shutdown()
1535
1536	user := GenerateUser("alice")
1537	RegisterUserWithServer(server, user)
1538
1539	client, err := user.NewClient()
1540	if err != nil {
1541		t.Fatalf("failed to connect: %v", err)
1542	}
1543	defer func() { _ = client.Close() }()
1544
1545	// Create initial monitor
1546	_, err = user.RunCommand(client, "monitor my-cron 1h")
1547	if err != nil {
1548		t.Logf("create command completed: %v", err)
1549	}
1550
1551	// Upsert with new duration
1552	output, err := user.RunCommand(client, "monitor my-cron 6h")
1553	if err != nil {
1554		t.Logf("update command completed: %v", err)
1555	}
1556
1557	// Verify monitor was updated (topic is stored with user prefix)
1558	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/my-cron")
1559	if err != nil {
1560		t.Fatalf("monitor should exist in DB: %v", err)
1561	}
1562
1563	if monitor.WindowDur != 6*time.Hour {
1564		t.Errorf("expected window duration 6h after update, got: %v", monitor.WindowDur)
1565	}
1566
1567	if !strings.Contains(output, "6h") {
1568		t.Errorf("output should confirm updated duration, got: %s", output)
1569	}
1570}
1571
1572func TestMonitor_DeleteMonitor(t *testing.T) {
1573	server := NewTestSSHServer(t)
1574	defer server.Shutdown()
1575
1576	user := GenerateUser("alice")
1577	RegisterUserWithServer(server, user)
1578
1579	client, err := user.NewClient()
1580	if err != nil {
1581		t.Fatalf("failed to connect: %v", err)
1582	}
1583	defer func() { _ = client.Close() }()
1584
1585	// Create monitor first
1586	_, err = user.RunCommand(client, "monitor to-delete 1h")
1587	if err != nil {
1588		t.Logf("create command completed: %v", err)
1589	}
1590
1591	// Verify it exists (topic is stored with user prefix)
1592	_, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/to-delete")
1593	if err != nil {
1594		t.Fatalf("monitor should exist before deletion: %v", err)
1595	}
1596
1597	// Delete it
1598	output, err := user.RunCommand(client, "monitor to-delete -d")
1599	if err != nil {
1600		t.Logf("delete command completed: %v", err)
1601	}
1602
1603	// Verify it's gone (topic is stored with user prefix)
1604	_, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/to-delete")
1605	if err == nil {
1606		t.Errorf("monitor should be deleted from DB")
1607	}
1608
1609	if !strings.Contains(output, "deleted") && !strings.Contains(output, "removed") {
1610		t.Logf("output should confirm deletion, got: %s", output)
1611	}
1612}
1613
1614func TestMonitor_InvalidDuration(t *testing.T) {
1615	server := NewTestSSHServer(t)
1616	defer server.Shutdown()
1617
1618	user := GenerateUser("alice")
1619	RegisterUserWithServer(server, user)
1620
1621	client, err := user.NewClient()
1622	if err != nil {
1623		t.Fatalf("failed to connect: %v", err)
1624	}
1625	defer func() { _ = client.Close() }()
1626
1627	output, err := user.RunCommand(client, "monitor my-service invaliduration")
1628	if err != nil {
1629		t.Logf("command error (expected): %v", err)
1630	}
1631
1632	if !strings.Contains(output, "invalid") && !strings.Contains(output, "duration") && !strings.Contains(output, "error") {
1633		t.Errorf("expected error about invalid duration, got: %s", output)
1634	}
1635}
1636
1637func TestMonitor_MissingTopic(t *testing.T) {
1638	server := NewTestSSHServer(t)
1639	defer server.Shutdown()
1640
1641	user := GenerateUser("alice")
1642	RegisterUserWithServer(server, user)
1643
1644	client, err := user.NewClient()
1645	if err != nil {
1646		t.Fatalf("failed to connect: %v", err)
1647	}
1648	defer func() { _ = client.Close() }()
1649
1650	output, err := user.RunCommand(client, "monitor")
1651	if err != nil {
1652		t.Logf("command error (expected): %v", err)
1653	}
1654
1655	// Should show usage or error about missing topic
1656	if !strings.Contains(output, "Usage") && !strings.Contains(output, "topic") && !strings.Contains(output, "error") {
1657		t.Errorf("expected usage info or error about missing topic, got: %s", output)
1658	}
1659}
1660
1661// Status CLI Tests
1662
1663func TestStatus_UnauthenticatedUserDenied(t *testing.T) {
1664	server := NewTestSSHServer(t)
1665	defer server.Shutdown()
1666
1667	user := GenerateUser("anonymous")
1668
1669	client, err := user.NewClient()
1670	if err != nil {
1671		t.Fatalf("failed to connect: %v", err)
1672	}
1673	defer func() { _ = client.Close() }()
1674
1675	output, err := user.RunCommand(client, "status")
1676	if err != nil {
1677		t.Logf("command error (expected): %v", err)
1678	}
1679
1680	if !strings.Contains(output, "access denied") {
1681		t.Errorf("expected 'access denied', got: %s", output)
1682	}
1683}
1684
1685func TestStatus_NoMonitors(t *testing.T) {
1686	server := NewTestSSHServer(t)
1687	defer server.Shutdown()
1688
1689	user := GenerateUser("alice")
1690	RegisterUserWithServer(server, user)
1691
1692	client, err := user.NewClient()
1693	if err != nil {
1694		t.Fatalf("failed to connect: %v", err)
1695	}
1696	defer func() { _ = client.Close() }()
1697
1698	output, err := user.RunCommand(client, "status")
1699	if err != nil {
1700		t.Logf("command completed: %v", err)
1701	}
1702
1703	if !strings.Contains(output, "no monitors") && !strings.Contains(output, "empty") {
1704		t.Errorf("expected message about no monitors, got: %s", output)
1705	}
1706}
1707
1708func TestStatus_ShowsMonitorStatus(t *testing.T) {
1709	server := NewTestSSHServer(t)
1710	defer server.Shutdown()
1711
1712	user := GenerateUser("alice")
1713	RegisterUserWithServer(server, user)
1714
1715	client, err := user.NewClient()
1716	if err != nil {
1717		t.Fatalf("failed to connect: %v", err)
1718	}
1719	defer func() { _ = client.Close() }()
1720
1721	// Create a monitor
1722	_, err = user.RunCommand(client, "monitor web-check 1h")
1723	if err != nil {
1724		t.Logf("create monitor completed: %v", err)
1725	}
1726
1727	// Check status
1728	output, err := user.RunCommand(client, "status")
1729	if err != nil {
1730		t.Logf("status command completed: %v", err)
1731	}
1732
1733	if !strings.Contains(output, "web-check") {
1734		t.Errorf("status should list the monitor topic, got: %s", output)
1735	}
1736}
1737
1738func TestStatus_ShowsHealthyUnhealthy(t *testing.T) {
1739	server := NewTestSSHServer(t)
1740	defer server.Shutdown()
1741
1742	user := GenerateUser("alice")
1743	RegisterUserWithServer(server, user)
1744
1745	// Create monitors directly in DB with different states
1746	now := time.Now()
1747	windowEnd := now.Add(1 * time.Hour)
1748	recentPing := now.Add(-30 * time.Minute) // within window - healthy
1749	oldPing := now.Add(-2 * time.Hour)       // outside window - unhealthy
1750
1751	_ = server.DBPool.UpsertPipeMonitor("alice-id", "healthy-service", 1*time.Hour, &windowEnd)
1752	_ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "healthy-service", &recentPing)
1753
1754	_ = server.DBPool.UpsertPipeMonitor("alice-id", "unhealthy-service", 1*time.Hour, &windowEnd)
1755	_ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "unhealthy-service", &oldPing)
1756
1757	client, err := user.NewClient()
1758	if err != nil {
1759		t.Fatalf("failed to connect: %v", err)
1760	}
1761	defer func() { _ = client.Close() }()
1762
1763	output, err := user.RunCommand(client, "status")
1764	if err != nil {
1765		t.Logf("status command completed: %v", err)
1766	}
1767
1768	if !strings.Contains(output, "healthy-service") {
1769		t.Errorf("status should list healthy-service, got: %s", output)
1770	}
1771
1772	if !strings.Contains(output, "unhealthy-service") {
1773		t.Errorf("status should list unhealthy-service, got: %s", output)
1774	}
1775
1776	// Should indicate different health states
1777	if !strings.Contains(strings.ToLower(output), "healthy") && !strings.Contains(strings.ToLower(output), "ok") && !strings.Contains(output, "✓") {
1778		t.Logf("status output should indicate health state: %s", output)
1779	}
1780}
1781
1782// RSS CLI Tests
1783
1784func TestRss_UnauthenticatedUserDenied(t *testing.T) {
1785	server := NewTestSSHServer(t)
1786	defer server.Shutdown()
1787
1788	user := GenerateUser("anonymous")
1789
1790	client, err := user.NewClient()
1791	if err != nil {
1792		t.Fatalf("failed to connect: %v", err)
1793	}
1794	defer func() { _ = client.Close() }()
1795
1796	output, err := user.RunCommand(client, "rss")
1797	if err != nil {
1798		t.Logf("command error (expected): %v", err)
1799	}
1800
1801	if !strings.Contains(output, "access denied") {
1802		t.Errorf("expected 'access denied', got: %s", output)
1803	}
1804}
1805
1806func TestRss_GeneratesValidRSS(t *testing.T) {
1807	server := NewTestSSHServer(t)
1808	defer server.Shutdown()
1809
1810	user := GenerateUser("alice")
1811	RegisterUserWithServer(server, user)
1812
1813	// Create a monitor
1814	now := time.Now()
1815	windowEnd := now.Add(1 * time.Hour)
1816	_ = server.DBPool.UpsertPipeMonitor("alice-id", "rss-test-service", 1*time.Hour, &windowEnd)
1817
1818	client, err := user.NewClient()
1819	if err != nil {
1820		t.Fatalf("failed to connect: %v", err)
1821	}
1822	defer func() { _ = client.Close() }()
1823
1824	output, err := user.RunCommand(client, "rss")
1825	if err != nil {
1826		t.Logf("rss command completed: %v", err)
1827	}
1828
1829	// Should output valid RSS XML
1830	if !strings.Contains(output, "<?xml") || !strings.Contains(output, "<rss") {
1831		t.Errorf("expected RSS XML output, got: %s", output)
1832	}
1833
1834	if !strings.Contains(output, "rss-test-service") {
1835		t.Errorf("RSS should contain monitor topic, got: %s", output)
1836	}
1837}
1838
1839func TestRss_AlertsOnStaleMonitor(t *testing.T) {
1840	server := NewTestSSHServer(t)
1841	defer server.Shutdown()
1842
1843	user := GenerateUser("alice")
1844	RegisterUserWithServer(server, user)
1845
1846	// Create a stale monitor (last ping outside window)
1847	now := time.Now()
1848	windowEnd := now.Add(-30 * time.Minute) // window already ended
1849	oldPing := now.Add(-2 * time.Hour)
1850
1851	_ = server.DBPool.UpsertPipeMonitor("alice-id", "stale-service", 1*time.Hour, &windowEnd)
1852	_ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "stale-service", &oldPing)
1853
1854	client, err := user.NewClient()
1855	if err != nil {
1856		t.Fatalf("failed to connect: %v", err)
1857	}
1858	defer func() { _ = client.Close() }()
1859
1860	output, err := user.RunCommand(client, "rss")
1861	if err != nil {
1862		t.Logf("rss command completed: %v", err)
1863	}
1864
1865	// Should contain alert item for stale service
1866	if !strings.Contains(output, "stale-service") {
1867		t.Errorf("RSS should contain stale-service alert, got: %s", output)
1868	}
1869
1870	// Should have item element for the alert
1871	if !strings.Contains(output, "<item>") {
1872		t.Errorf("RSS should contain item element for alert, got: %s", output)
1873	}
1874}
1875
1876// Pub integration with Monitor
1877
1878func TestPub_UpdatesMonitorLastPing(t *testing.T) {
1879	server := NewTestSSHServer(t)
1880	defer server.Shutdown()
1881
1882	user := GenerateUser("alice")
1883	RegisterUserWithServer(server, user)
1884
1885	// Create a monitor first (topic is stored with user prefix)
1886	now := time.Now()
1887	windowEnd := now.Add(1 * time.Hour)
1888	_ = server.DBPool.UpsertPipeMonitor("alice-id", "alice/ping-test", 1*time.Hour, &windowEnd)
1889
1890	subClient, err := user.NewClient()
1891	if err != nil {
1892		t.Fatalf("failed to connect subscriber: %v", err)
1893	}
1894	defer func() { _ = subClient.Close() }()
1895
1896	pubClient, err := user.NewClient()
1897	if err != nil {
1898		t.Fatalf("failed to connect publisher: %v", err)
1899	}
1900	defer func() { _ = pubClient.Close() }()
1901
1902	// Start subscriber
1903	subSession, err := subClient.NewSession()
1904	if err != nil {
1905		t.Fatalf("failed to create sub session: %v", err)
1906	}
1907	defer func() { _ = subSession.Close() }()
1908
1909	if err := subSession.Start("sub ping-test -c"); err != nil {
1910		t.Fatalf("failed to start sub: %v", err)
1911	}
1912
1913	time.Sleep(100 * time.Millisecond)
1914
1915	// Publish to the monitored topic
1916	_, err = user.RunCommandWithStdin(pubClient, "pub ping-test -c", "health check")
1917	if err != nil {
1918		t.Logf("pub command completed: %v", err)
1919	}
1920
1921	// Verify last_ping was updated (topic is stored with user prefix)
1922	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/ping-test")
1923	if err != nil {
1924		t.Fatalf("monitor should exist: %v", err)
1925	}
1926
1927	if monitor.LastPing == nil {
1928		t.Errorf("last_ping should be set after pub")
1929	} else if time.Since(*monitor.LastPing) > 5*time.Second {
1930		t.Errorf("last_ping should be recent, got: %v", monitor.LastPing)
1931	}
1932}
1933
1934// Tests for monitor status edge cases
1935
1936func TestStatus_PingAtExactWindowStart(t *testing.T) {
1937	// Bug fix: Status() should use >= for windowStart comparison
1938	// A ping exactly at windowStart should be healthy
1939	now := time.Now().UTC()
1940	windowEnd := now.Add(1 * time.Hour)
1941	windowStart := windowEnd.Add(-1 * time.Hour) // equals now
1942
1943	monitor := &db.PipeMonitor{
1944		LastPing:  &windowStart, // ping exactly at window start
1945		WindowEnd: &windowEnd,
1946		WindowDur: 1 * time.Hour,
1947	}
1948
1949	err := monitor.Status()
1950	if err != nil {
1951		t.Errorf("ping at exact window start should be healthy, got: %v", err)
1952	}
1953}
1954
1955func TestStatus_WindowExpired(t *testing.T) {
1956	// Bug fix: Status() should check if current time is past windowEnd
1957	now := time.Now().UTC()
1958	windowEnd := now.Add(-1 * time.Minute) // window ended 1 minute ago
1959	lastPing := now.Add(-30 * time.Second) // ping was 30 seconds ago
1960
1961	monitor := &db.PipeMonitor{
1962		LastPing:  &lastPing,
1963		WindowEnd: &windowEnd,
1964		WindowDur: 1 * time.Hour,
1965	}
1966
1967	err := monitor.Status()
1968	if err == nil {
1969		t.Error("expired window should be unhealthy")
1970	}
1971	if !strings.Contains(err.Error(), "window expired") {
1972		t.Errorf("error should mention window expired, got: %v", err)
1973	}
1974}
1975
1976func TestStatus_PingResetsWindow(t *testing.T) {
1977	// Bug fix: Every ping should reset window to now + duration
1978	server := NewTestSSHServer(t)
1979	defer server.Shutdown()
1980
1981	user := GenerateUser("alice")
1982	RegisterUserWithServer(server, user)
1983
1984	// Create a monitor with an expired window
1985	expiredWindowEnd := time.Now().UTC().Add(-10 * time.Minute)
1986	_ = server.DBPool.UpsertPipeMonitor("alice-id", "alice/reset-test", 5*time.Minute, &expiredWindowEnd)
1987
1988	client, err := user.NewClient()
1989	if err != nil {
1990		t.Fatalf("failed to connect: %v", err)
1991	}
1992	defer func() { _ = client.Close() }()
1993
1994	// Start a subscriber first so pub doesn't block
1995	subClient, err := user.NewClient()
1996	if err != nil {
1997		t.Fatalf("failed to connect subscriber: %v", err)
1998	}
1999	defer func() { _ = subClient.Close() }()
2000
2001	subSession, err := subClient.NewSession()
2002	if err != nil {
2003		t.Fatalf("failed to create sub session: %v", err)
2004	}
2005	defer func() { _ = subSession.Close() }()
2006
2007	if err := subSession.Start("sub reset-test -c"); err != nil {
2008		t.Fatalf("failed to start sub: %v", err)
2009	}
2010
2011	time.Sleep(100 * time.Millisecond)
2012
2013	// Pub to trigger monitor update
2014	_, err = user.RunCommandWithStdin(client, "pub reset-test -c", "ping")
2015	if err != nil {
2016		t.Logf("pub command completed: %v", err)
2017	}
2018
2019	// Check that window was reset
2020	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/reset-test")
2021	if err != nil {
2022		t.Fatalf("monitor should exist: %v", err)
2023	}
2024
2025	if monitor.WindowEnd == nil {
2026		t.Fatal("window_end should be set")
2027	}
2028
2029	// Window end should now be in the future
2030	if !monitor.WindowEnd.After(time.Now().UTC()) {
2031		t.Errorf("window_end should be in the future after ping, got: %v", monitor.WindowEnd)
2032	}
2033}
2034
2035func TestStatus_HealthyImmediatelyAfterPing(t *testing.T) {
2036	// Bug fix: After a ping, status should immediately show healthy
2037	server := NewTestSSHServer(t)
2038	defer server.Shutdown()
2039
2040	user := GenerateUser("alice")
2041	RegisterUserWithServer(server, user)
2042
2043	client, err := user.NewClient()
2044	if err != nil {
2045		t.Fatalf("failed to connect: %v", err)
2046	}
2047	defer func() { _ = client.Close() }()
2048
2049	// Create monitor
2050	_, err = user.RunCommand(client, "monitor health-test 5m")
2051	if err != nil {
2052		t.Fatalf("failed to create monitor: %v", err)
2053	}
2054
2055	// Start subscriber
2056	subClient, err := user.NewClient()
2057	if err != nil {
2058		t.Fatalf("failed to connect subscriber: %v", err)
2059	}
2060	defer func() { _ = subClient.Close() }()
2061
2062	subSession, err := subClient.NewSession()
2063	if err != nil {
2064		t.Fatalf("failed to create sub session: %v", err)
2065	}
2066	defer func() { _ = subSession.Close() }()
2067
2068	if err := subSession.Start("sub health-test -c"); err != nil {
2069		t.Fatalf("failed to start sub: %v", err)
2070	}
2071
2072	time.Sleep(100 * time.Millisecond)
2073
2074	// Pub to trigger ping
2075	pubClient, err := user.NewClient()
2076	if err != nil {
2077		t.Fatalf("failed to connect publisher: %v", err)
2078	}
2079	defer func() { _ = pubClient.Close() }()
2080
2081	_, err = user.RunCommandWithStdin(pubClient, "pub health-test -c", "ping")
2082	if err != nil {
2083		t.Logf("pub completed: %v", err)
2084	}
2085
2086	// Immediately check status
2087	statusClient, err := user.NewClient()
2088	if err != nil {
2089		t.Fatalf("failed to connect for status: %v", err)
2090	}
2091	defer func() { _ = statusClient.Close() }()
2092
2093	output, err := user.RunCommand(statusClient, "status")
2094	if err != nil {
2095		t.Logf("status completed: %v", err)
2096	}
2097
2098	if strings.Contains(output, "unhealthy") {
2099		t.Errorf("status should be healthy immediately after ping, got: %s", output)
2100	}
2101	if !strings.Contains(output, "healthy") {
2102		t.Errorf("status should show healthy, got: %s", output)
2103	}
2104}
2105
2106// TestMonitor_FixedWindowNonSliding verifies that pings within the same window
2107// do not slide the window forward. This is a regression test for a bug where
2108// each ping reset window_end to now+dur, creating a sliding window that never fails.
2109//
2110// Expected behavior:
2111//   - last_ping: always updated to show most recent activity (user visibility).
2112//   - window_end: only advances when current time exceeds it (health scheduling).
2113func TestMonitor_FixedWindowNonSliding(t *testing.T) {
2114	server := NewTestSSHServer(t)
2115	defer server.Shutdown()
2116
2117	user := GenerateUser("alice")
2118	RegisterUserWithServer(server, user)
2119
2120	client, err := user.NewClient()
2121	if err != nil {
2122		t.Fatalf("failed to connect: %v", err)
2123	}
2124	defer func() { _ = client.Close() }()
2125
2126	// Create a monitor with 1 hour window
2127	_, err = user.RunCommand(client, "monitor fixed-window-test 1h")
2128	if err != nil {
2129		t.Logf("create command completed: %v", err)
2130	}
2131
2132	// Get the initial window_end
2133	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
2134	if err != nil {
2135		t.Fatalf("monitor should exist: %v", err)
2136	}
2137	initialWindowEnd := *monitor.WindowEnd
2138
2139	// Simulate a ping by calling updateMonitor directly
2140	handler := server.PipeHandler
2141
2142	// Create a mock CliCmd
2143	mockUser := &db.User{ID: "alice-id", Name: "alice"}
2144	cmd := &CliCmd{
2145		userName: "alice",
2146		user:     mockUser,
2147	}
2148
2149	// First ping - should record last_ping but NOT change window_end
2150	handler.updateMonitor(cmd, "alice/fixed-window-test")
2151
2152	monitor, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
2153	if err != nil {
2154		t.Fatalf("monitor should exist after first ping: %v", err)
2155	}
2156
2157	if monitor.LastPing == nil {
2158		t.Fatalf("last_ping should be set after first ping")
2159	}
2160	firstPingTime := *monitor.LastPing
2161	windowEndAfterFirstPing := *monitor.WindowEnd
2162
2163	// BUG CHECK: With the bug, window_end would have slid forward to now+1h
2164	// With the fix, window_end should remain at the original scheduled time
2165	if !windowEndAfterFirstPing.Equal(initialWindowEnd) {
2166		t.Errorf("BUG DETECTED: window_end should NOT change after first ping within window\n"+
2167			"initial window_end: %v\n"+
2168			"window_end after ping: %v\n"+
2169			"Window slid forward by: %v",
2170			initialWindowEnd.Format(time.RFC3339),
2171			windowEndAfterFirstPing.Format(time.RFC3339),
2172			windowEndAfterFirstPing.Sub(initialWindowEnd))
2173	}
2174
2175	// Second ping - last_ping SHOULD be updated (for user visibility)
2176	// but window_end should NOT change
2177	time.Sleep(10 * time.Millisecond) // Small delay to get different timestamp
2178	handler.updateMonitor(cmd, "alice/fixed-window-test")
2179
2180	monitor, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
2181	if err != nil {
2182		t.Fatalf("monitor should exist after second ping: %v", err)
2183	}
2184
2185	// last_ping SHOULD be updated to show most recent activity
2186	if monitor.LastPing.Equal(firstPingTime) {
2187		t.Errorf("last_ping SHOULD be updated for user visibility\n"+
2188			"first ping time: %v\n"+
2189			"last_ping after second call: %v",
2190			firstPingTime.Format(time.RFC3339Nano),
2191			monitor.LastPing.Format(time.RFC3339Nano))
2192	}
2193
2194	// But window_end should still be the original value (not sliding)
2195	if !monitor.WindowEnd.Equal(initialWindowEnd) {
2196		t.Errorf("BUG DETECTED: window_end should remain at original value\n"+
2197			"initial: %v\n"+
2198			"current: %v",
2199			initialWindowEnd.Format(time.RFC3339),
2200			monitor.WindowEnd.Format(time.RFC3339))
2201	}
2202}