repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / pipe
Eric Bower  ·  2026-02-03

cli.go

   1package pipe
   2
   3import (
   4	"bytes"
   5	"context"
   6	"database/sql"
   7	"errors"
   8	"flag"
   9	"fmt"
  10	"io"
  11	"log/slog"
  12	"slices"
  13	"strings"
  14	"sync/atomic"
  15	"text/tabwriter"
  16	"time"
  17
  18	"github.com/antoniomika/syncmap"
  19	"github.com/google/uuid"
  20	"github.com/gorilla/feeds"
  21	"github.com/picosh/pico/pkg/db"
  22	"github.com/picosh/pico/pkg/pssh"
  23	psub "github.com/picosh/pico/pkg/pubsub"
  24	"github.com/picosh/pico/pkg/shared"
  25	gossh "golang.org/x/crypto/ssh"
  26)
  27
  28func Middleware(handler *CliHandler) pssh.SSHServerMiddleware {
  29	return func(next pssh.SSHServerHandler) pssh.SSHServerHandler {
  30		return func(sesh *pssh.SSHServerConnSession) error {
  31			ctx := sesh.Context()
  32			logger := pssh.GetLogger(sesh)
  33			user := pssh.GetUser(sesh)
  34
  35			args := sesh.Command()
  36			if len(args) == 0 {
  37				help(handler.Cfg, sesh)
  38				return next(sesh)
  39			}
  40
  41			userName := "public"
  42			userNameAddition := ""
  43			uuidStr := uuid.NewString()
  44			isAdmin := false
  45			if user != nil {
  46				isAdmin = handler.DBPool.HasFeatureByUser(user.ID, "admin")
  47				if isAdmin && strings.HasPrefix(sesh.User(), "admin__") {
  48					uuidStr = fmt.Sprintf("admin-%s", uuidStr)
  49				}
  50
  51				userName = user.Name
  52				if user.PublicKey != nil && user.PublicKey.Name != "" {
  53					addition := user.PublicKey.Name
  54					identity := sesh.Permissions().Extensions["identity"]
  55					if identity != "" && identity != "pubkey" {
  56						addition = identity
  57					}
  58					userNameAddition = fmt.Sprintf("-%s", addition)
  59				}
  60			}
  61
  62			pipeCtx, cancel := context.WithCancel(ctx)
  63
  64			cliCmd := &CliCmd{
  65				sesh:     sesh,
  66				args:     args,
  67				userName: userName,
  68				isAdmin:  isAdmin,
  69				pipeCtx:  pipeCtx,
  70				cancel:   cancel,
  71				user:     user,
  72			}
  73
  74			cmd := strings.TrimSpace(args[0])
  75			switch cmd {
  76			case "help":
  77				help(handler.Cfg, sesh)
  78				return next(sesh)
  79			case "ls":
  80				err := handler.ls(cliCmd)
  81				if err != nil {
  82					logger.Error("ls cmd", "err", err)
  83					sesh.Fatal(err)
  84				}
  85				return next(sesh)
  86			case "monitor":
  87				err := handler.monitor(cliCmd, user)
  88				if err != nil {
  89					logger.Error("monitor cmd", "err", err)
  90					sesh.Fatal(err)
  91				}
  92				return next(sesh)
  93			case "status":
  94				err := handler.status(cliCmd, user)
  95				if err != nil {
  96					logger.Error("status cmd", "err", err)
  97					sesh.Fatal(err)
  98				}
  99				return next(sesh)
 100			case "rss":
 101				rss, err := MonitorRss(handler.DBPool, user, handler.Cfg.Domain)
 102				_, _ = fmt.Fprintln(sesh, rss)
 103				if err != nil {
 104					logger.Error("rss cmd", "err", err)
 105					sesh.Fatal(err)
 106				}
 107				return next(sesh)
 108			}
 109
 110			topic := ""
 111			cmdArgs := args[1:]
 112			if len(args) > 1 && !strings.HasPrefix(args[1], "-") {
 113				topic = strings.TrimSpace(args[1])
 114				cmdArgs = args[2:]
 115			}
 116			// sub commands after this line expect clipped args
 117			cliCmd.args = cmdArgs
 118
 119			logger.Info(
 120				"pubsub middleware detected command",
 121				"args", args,
 122				"cmd", cmd,
 123				"topic", topic,
 124				"cmdArgs", cmdArgs,
 125			)
 126
 127			clientID := fmt.Sprintf(
 128				"%s (%s%s@%s)",
 129				uuidStr,
 130				userName,
 131				userNameAddition,
 132				sesh.RemoteAddr().String(),
 133			)
 134
 135			go func() {
 136				defer cancel()
 137
 138				ticker := time.NewTicker(5 * time.Second)
 139				defer ticker.Stop()
 140
 141				for {
 142					select {
 143					case <-pipeCtx.Done():
 144						return
 145					case <-ticker.C:
 146						_, err := sesh.SendRequest("ping@pico.sh", false, nil)
 147						if err != nil {
 148							logger.Error("error sending ping", "err", err)
 149							return
 150						}
 151					}
 152				}
 153			}()
 154
 155			switch cmd {
 156			case "pub":
 157				err := handler.pub(cliCmd, topic, clientID)
 158				if err != nil {
 159					logger.Error("pub cmd", "err", err)
 160					sesh.Fatal(err)
 161				}
 162			case "sub":
 163				err := handler.sub(cliCmd, topic, clientID)
 164				if err != nil {
 165					logger.Error("sub cmd", "err", err)
 166					sesh.Fatal(err)
 167				}
 168			case "pipe":
 169				err := handler.pipe(cliCmd, topic, clientID)
 170				if err != nil {
 171					logger.Error("pipe cmd", "err", err)
 172					sesh.Fatal(err)
 173				}
 174			case "uptime":
 175				err := handler.uptime(cliCmd, topic, user)
 176				if err != nil {
 177					logger.Error("uptime cmd", "err", err)
 178					sesh.Fatal(err)
 179				}
 180			}
 181
 182			return next(sesh)
 183		}
 184	}
 185}
 186
 187type CliHandler struct {
 188	DBPool  db.DB
 189	Logger  *slog.Logger
 190	PubSub  psub.PubSub
 191	Cfg     *shared.ConfigSite
 192	Waiters *syncmap.Map[string, []string]
 193	Access  *syncmap.Map[string, []string]
 194}
 195
 196func (h *CliHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
 197	return h.Logger
 198}
 199
 200type CliCmd struct {
 201	sesh     *pssh.SSHServerConnSession
 202	args     []string
 203	userName string
 204	isAdmin  bool
 205	pipeCtx  context.Context
 206	cancel   context.CancelFunc
 207	user     *db.User
 208}
 209
 210func help(cfg *shared.ConfigSite, sesh *pssh.SSHServerConnSession) {
 211	data := fmt.Sprintf(`Command: ssh %s <command> [args...]
 212
 213The simplest authenticated pubsub system.  Send messages through
 214user-defined topics.  Topics are private to the authenticated
 215ssh user.  The default pubsub model is multicast with bidirectional
 216blocking, meaning a publisher ("pub") will send its message to all
 217subscribers ("sub").  Further, both "pub" and "sub" will wait for
 218at least one event to be sent or received. Pipe ("pipe") allows
 219for bidirectional messages to be sent between any clients connected
 220to a pipe.
 221
 222Commands:
 223  help                        Show this help message
 224  ls                          List active pubsub channels
 225  pub <topic> [flags]         Publish messages to a topic
 226  sub <topic> [flags]         Subscribe to messages from a topic
 227  pipe <topic> [flags]        Bidirectional messaging between clients
 228
 229Monitoring commands:
 230  monitor <topic> <duration>  Create/update a health monitor for a topic
 231  monitor <topic> -d          Delete a monitor
 232  status                      Show health status of all monitors
 233  uptime                      Show uptime for a topic
 234  rss                         Get RSS feed of monitor alerts
 235
 236Use "ssh %s <command> -h" for help on a specific command.
 237`, toSshCmd(cfg), toSshCmd(cfg))
 238
 239	data = strings.ReplaceAll(data, "\n", "\r\n")
 240	_, _ = fmt.Fprintln(sesh, data)
 241}
 242
 243func (handler *CliHandler) ls(cmd *CliCmd) error {
 244	if cmd.userName == "public" {
 245		err := fmt.Errorf("access denied")
 246		return err
 247	}
 248
 249	topicFilter := fmt.Sprintf("%s/", cmd.userName)
 250	if cmd.isAdmin {
 251		topicFilter = ""
 252		if len(cmd.args) > 1 {
 253			topicFilter = cmd.args[1]
 254		}
 255	}
 256
 257	var channels []*psub.Channel
 258	waitingChannels := map[string][]string{}
 259
 260	for topic, channel := range handler.PubSub.GetChannels() {
 261		if strings.HasPrefix(topic, topicFilter) {
 262			channels = append(channels, channel)
 263		}
 264	}
 265
 266	for channel, clients := range handler.Waiters.Range {
 267		if strings.HasPrefix(channel, topicFilter) {
 268			waitingChannels[channel] = clients
 269		}
 270	}
 271
 272	if len(channels) == 0 && len(waitingChannels) == 0 {
 273		_, _ = fmt.Fprintln(cmd.sesh, "no pubsub channels found")
 274	} else {
 275		var outputData string
 276		if len(channels) > 0 || len(waitingChannels) > 0 {
 277			outputData += "Channel Information\r\n"
 278			for _, channel := range channels {
 279				extraData := ""
 280
 281				if accessList, ok := handler.Access.Load(channel.Topic); ok && len(accessList) > 0 {
 282					extraData += fmt.Sprintf(" (Access List: %s)", strings.Join(accessList, ", "))
 283				}
 284
 285				outputData += fmt.Sprintf("- %s:%s\r\n", channel.Topic, extraData)
 286
 287				var pubs []*psub.Client
 288				var subs []*psub.Client
 289				var pipes []*psub.Client
 290
 291				for _, client := range channel.GetClients() {
 292					switch client.Direction {
 293					case psub.ChannelDirectionInput:
 294						pubs = append(pubs, client)
 295					case psub.ChannelDirectionOutput:
 296						subs = append(subs, client)
 297					case psub.ChannelDirectionInputOutput:
 298						pipes = append(pipes, client)
 299					}
 300				}
 301				outputData += clientInfo(pubs, cmd.isAdmin, "Pubs")
 302				outputData += clientInfo(subs, cmd.isAdmin, "Subs")
 303				outputData += clientInfo(pipes, cmd.isAdmin, "Pipes")
 304			}
 305
 306			for waitingChannel, channelPubs := range waitingChannels {
 307				extraData := ""
 308
 309				if accessList, ok := handler.Access.Load(waitingChannel); ok && len(accessList) > 0 {
 310					extraData += fmt.Sprintf(" (Access List: %s)", strings.Join(accessList, ", "))
 311				}
 312
 313				outputData += fmt.Sprintf("- %s:%s\r\n", waitingChannel, extraData)
 314				outputData += fmt.Sprintf("  %s:\r\n", "Waiting Pubs")
 315				for _, client := range channelPubs {
 316					if strings.HasPrefix(client, "admin-") && !cmd.isAdmin {
 317						continue
 318					}
 319					outputData += fmt.Sprintf("  - %s\r\n", client)
 320				}
 321			}
 322		}
 323
 324		_, _ = cmd.sesh.Write([]byte(outputData))
 325	}
 326
 327	return nil
 328}
 329
 330func (handler *CliHandler) monitor(cmd *CliCmd, user *db.User) error {
 331	if user == nil {
 332		return fmt.Errorf("access denied")
 333	}
 334
 335	args := cmd.sesh.Command()
 336	topic := ""
 337	cmdArgs := args[1:]
 338	if len(args) > 1 && !strings.HasPrefix(args[1], "-") {
 339		topic = strings.TrimSpace(args[1])
 340		cmdArgs = args[2:]
 341	}
 342
 343	monitorCmd := flagSet("monitor", cmd.sesh)
 344	del := monitorCmd.Bool("d", false, "Delete the monitor")
 345
 346	if !flagCheck(monitorCmd, topic, cmdArgs) {
 347		return nil
 348	}
 349
 350	if topic == "" {
 351		_, _ = fmt.Fprintln(cmd.sesh, "Usage: monitor <topic> <duration>")
 352		_, _ = fmt.Fprintln(cmd.sesh, "       monitor <topic> -d")
 353		return fmt.Errorf("topic is required")
 354	}
 355
 356	// Resolve to fully qualified topic name
 357	result := resolveTopic(TopicResolveInput{
 358		UserName: cmd.userName,
 359		Topic:    topic,
 360		IsAdmin:  cmd.isAdmin,
 361		IsPublic: false,
 362	})
 363	resolvedTopic := result.Name
 364
 365	if *del {
 366		handler.Logger.Info("removing pipe monitor", "topic", resolvedTopic)
 367		err := handler.DBPool.RemovePipeMonitor(user.ID, resolvedTopic)
 368		if err != nil {
 369			return fmt.Errorf("failed to delete monitor: %w", err)
 370		}
 371		_, _ = fmt.Fprintf(cmd.sesh, "monitor deleted: %s\r\n", resolvedTopic)
 372		return nil
 373	}
 374
 375	// Create/update monitor - need duration argument
 376	durStr := ""
 377	if monitorCmd.NArg() > 0 {
 378		durStr = monitorCmd.Arg(0)
 379	} else if len(cmdArgs) > 0 {
 380		durStr = cmdArgs[0]
 381	}
 382
 383	if durStr == "" {
 384		_, _ = fmt.Fprintln(cmd.sesh, "Usage: monitor <topic> <duration>")
 385		return fmt.Errorf("duration is required")
 386	}
 387
 388	dur, err := time.ParseDuration(durStr)
 389	if err != nil {
 390		return fmt.Errorf("invalid duration %q: %w", durStr, err)
 391	}
 392
 393	winEnd := time.Now().UTC().Add(dur)
 394	handler.Logger.Info(
 395		"upserting pipe monitor",
 396		"topic", resolvedTopic,
 397		"dur", dur,
 398		"window", winEnd.UTC().Format(time.RFC3339),
 399	)
 400	err = handler.DBPool.UpsertPipeMonitor(user.ID, resolvedTopic, dur, &winEnd)
 401	if err != nil {
 402		return fmt.Errorf("failed to create monitor: %w", err)
 403	}
 404
 405	_, _ = fmt.Fprintf(cmd.sesh, "monitor created: %s (window: %s)\r\n", resolvedTopic, dur)
 406	return nil
 407}
 408
 409func (handler *CliHandler) status(cmd *CliCmd, user *db.User) error {
 410	if user == nil {
 411		return fmt.Errorf("access denied")
 412	}
 413
 414	monitors, err := handler.DBPool.FindPipeMonitorsByUser(user.ID)
 415	if err != nil {
 416		return fmt.Errorf("failed to fetch monitors: %w", err)
 417	}
 418
 419	if len(monitors) == 0 {
 420		_, _ = fmt.Fprintln(cmd.sesh, "no monitors found")
 421		return nil
 422	}
 423
 424	writer := tabwriter.NewWriter(cmd.sesh, 0, 0, 2, ' ', tabwriter.TabIndent)
 425	_, _ = fmt.Fprintln(writer, "Topic\tStatus\tWindow\tLast Ping\tWindow End\tReason")
 426
 427	for _, m := range monitors {
 428		status := "healthy"
 429		reason := ""
 430		if err := m.Status(); err != nil {
 431			status = "unhealthy"
 432			reason = err.Error()
 433		}
 434
 435		lastPing := "never"
 436		if m.LastPing != nil {
 437			lastPing = m.LastPing.UTC().Format(time.RFC3339)
 438		}
 439
 440		windowEnd := ""
 441		if m.WindowEnd != nil {
 442			windowEnd = m.WindowEnd.UTC().Format(time.RFC3339)
 443		}
 444
 445		_, _ = fmt.Fprintf(
 446			writer,
 447			"%s\t%s\t%s\t%s\t%s\t%s\r\n",
 448			m.Topic,
 449			status,
 450			m.WindowDur.String(),
 451			lastPing,
 452			windowEnd,
 453			reason,
 454		)
 455	}
 456	_ = writer.Flush()
 457	return nil
 458}
 459
 460func (handler *CliHandler) uptime(cmd *CliCmd, topic string, user *db.User) error {
 461	if user == nil {
 462		return fmt.Errorf("access denied")
 463	}
 464
 465	if topic == "" {
 466		_, _ = fmt.Fprintln(cmd.sesh, "usage: uptime <topic> [--from <time>] [--to <time>]")
 467		_, _ = fmt.Fprintln(cmd.sesh, "  --from: start time (RFC3339 or duration like '24h', '7d', default: 24h)")
 468		_, _ = fmt.Fprintln(cmd.sesh, "  --to: end time (RFC3339, default: now)")
 469		return nil
 470	}
 471
 472	fs := flag.NewFlagSet("uptime", flag.ContinueOnError)
 473	fs.SetOutput(cmd.sesh)
 474	fromStr := fs.String("from", "", "start time (RFC3339 or duration like '24h', '7d')")
 475	toStr := fs.String("to", "", "end time (RFC3339, defaults to now)")
 476
 477	if err := fs.Parse(cmd.args); err != nil {
 478		return nil
 479	}
 480
 481	topicResult := resolveTopic(TopicResolveInput{
 482		UserName: cmd.userName,
 483		Topic:    topic,
 484		IsAdmin:  cmd.isAdmin,
 485		IsPublic: false,
 486	})
 487	resolvedTopic := topicResult.Name
 488
 489	monitor, err := handler.DBPool.FindPipeMonitorByTopic(user.ID, resolvedTopic)
 490	if err != nil {
 491		if errors.Is(err, sql.ErrNoRows) {
 492			return fmt.Errorf("monitor not found: %s", topic)
 493		}
 494		return fmt.Errorf("failed to find monitor: %w", err)
 495	}
 496
 497	now := time.Now().UTC()
 498	to := now
 499	from := now.Add(-24 * time.Hour)
 500
 501	if *fromStr != "" {
 502		if parsed, err := time.Parse(time.RFC3339, *fromStr); err == nil {
 503			from = parsed.UTC()
 504		} else if dur, err := parseDuration(*fromStr); err == nil {
 505			from = now.Add(-dur)
 506		} else {
 507			return fmt.Errorf("invalid --from value: %s", *fromStr)
 508		}
 509	}
 510
 511	if *toStr != "" {
 512		if parsed, err := time.Parse(time.RFC3339, *toStr); err == nil {
 513			to = parsed.UTC()
 514		} else {
 515			return fmt.Errorf("invalid --to value: %s", *toStr)
 516		}
 517	}
 518
 519	history, err := handler.DBPool.FindPipeMonitorHistory(monitor.ID, from, to)
 520	if err != nil {
 521		return fmt.Errorf("failed to fetch history: %w", err)
 522	}
 523
 524	result := db.ComputeUptime(history, from, to)
 525
 526	_, _ = fmt.Fprintf(cmd.sesh, "Monitor: %s\r\n", topic)
 527	_, _ = fmt.Fprintf(cmd.sesh, "Period: %s to %s\r\n", from.Format(time.RFC3339), to.Format(time.RFC3339))
 528	_, _ = fmt.Fprintf(cmd.sesh, "Total Duration: %s\r\n", result.TotalDuration.Round(time.Second))
 529	_, _ = fmt.Fprintf(cmd.sesh, "Uptime Duration: %s\r\n", result.UptimeDuration.Round(time.Second))
 530	_, _ = fmt.Fprintf(cmd.sesh, "Uptime: %.2f%%\r\n", result.UptimePercent)
 531
 532	return nil
 533}
 534
 535func parseDuration(s string) (time.Duration, error) {
 536	if len(s) == 0 {
 537		return 0, fmt.Errorf("empty duration")
 538	}
 539	last := s[len(s)-1]
 540	if last == 'd' {
 541		var n int
 542		_, err := fmt.Sscanf(s, "%d", &n)
 543		if err != nil {
 544			return 0, fmt.Errorf("invalid duration: %s", s)
 545		}
 546		return time.Duration(n) * 24 * time.Hour, nil
 547	}
 548	return time.ParseDuration(s)
 549}
 550
 551func MonitorRss(dbpool db.DB, user *db.User, domain string) (string, error) {
 552	if user == nil {
 553		return "", fmt.Errorf("access denied")
 554	}
 555
 556	monitors, err := dbpool.FindPipeMonitorsByUser(user.ID)
 557	if err != nil {
 558		return "", fmt.Errorf("failed to fetch monitors: %w", err)
 559	}
 560
 561	now := time.Now()
 562	feed := &feeds.Feed{
 563		Title:       fmt.Sprintf("Pipe Monitors for %s", user.Name),
 564		Link:        &feeds.Link{Href: fmt.Sprintf("https://%s", domain)},
 565		Description: "Alerts for pipe monitor status changes",
 566		Author:      &feeds.Author{Name: user.Name},
 567		Created:     now,
 568	}
 569
 570	var feedItems []*feeds.Item
 571	for _, m := range monitors {
 572		if err := m.Status(); err != nil {
 573			item := &feeds.Item{
 574				Id:          fmt.Sprintf("%s-%s-%d", user.ID, m.Topic, now.Unix()),
 575				Title:       fmt.Sprintf("ALERT: %s is unhealthy", m.Topic),
 576				Link:        &feeds.Link{Href: fmt.Sprintf("https://%s", domain)},
 577				Description: err.Error(),
 578				Created:     now,
 579				Updated:     now,
 580				Author:      &feeds.Author{Name: user.Name},
 581			}
 582			feedItems = append(feedItems, item)
 583		}
 584	}
 585	feed.Items = feedItems
 586
 587	rss, err := feed.ToRss()
 588	if err != nil {
 589		return "", fmt.Errorf("failed to generate RSS: %w", err)
 590	}
 591
 592	return rss, nil
 593}
 594
 595func (handler *CliHandler) pub(cmd *CliCmd, topic string, clientID string) error {
 596	pubCmd := flagSet("pub", cmd.sesh)
 597	access := pubCmd.String("a", "", "Comma separated list of pico usernames or ssh-key fingerprints to allow access to a topic")
 598	empty := pubCmd.Bool("e", false, "Send an empty message to subs")
 599	public := pubCmd.Bool("p", false, "Publish message to public topic")
 600	block := pubCmd.Bool("b", true, "Block writes until a subscriber is available")
 601	timeout := pubCmd.Duration("t", 30*24*time.Hour, "Timeout as a Go duration to block for a subscriber to be available. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is 30 days.")
 602	clean := pubCmd.Bool("c", false, "Don't send status messages")
 603	dispatcher := pubCmd.String("d", "multicast", "Type of dispatcher (e.g. multicast, round_robin)")
 604
 605	if !flagCheck(pubCmd, topic, cmd.args) {
 606		return fmt.Errorf("invalid cmd args")
 607	}
 608
 609	if pubCmd.NArg() == 1 && topic == "" {
 610		topic = pubCmd.Arg(0)
 611	}
 612
 613	handler.Logger.Info(
 614		"flags parsed",
 615		"cmd", "pub",
 616		"empty", *empty,
 617		"public", *public,
 618		"block", *block,
 619		"timeout", *timeout,
 620		"topic", topic,
 621		"access", *access,
 622		"clean", *clean,
 623		"dispatcher", *dispatcher,
 624	)
 625
 626	var accessList []string
 627
 628	if *access != "" {
 629		accessList = parseArgList(*access)
 630	}
 631
 632	var rw io.ReadWriter
 633	if *empty {
 634		rw = bytes.NewBuffer(make([]byte, 1))
 635	} else {
 636		rw = cmd.sesh
 637	}
 638
 639	if topic == "" {
 640		topic = uuid.NewString()
 641	}
 642
 643	msgFlag := ""
 644	if *public {
 645		msgFlag = "-p "
 646	}
 647
 648	// Initial resolution to get the topic name for access storage
 649	initialResult := resolveTopic(TopicResolveInput{
 650		UserName: cmd.userName,
 651		Topic:    topic,
 652		IsAdmin:  cmd.isAdmin,
 653		IsPublic: *public,
 654	})
 655	name := initialResult.Name
 656
 657	var accessListCreator bool
 658	_, loaded := handler.Access.LoadOrStore(name, accessList)
 659	if !loaded {
 660		defer func() {
 661			handler.Access.Delete(name)
 662		}()
 663		accessListCreator = true
 664	}
 665
 666	// Check for existing access list and resolve final topic name
 667	existingAccessList, hasExistingAccess := handler.Access.Load(initialResult.WithoutUser)
 668	result := resolveTopic(TopicResolveInput{
 669		UserName:           cmd.userName,
 670		Topic:              topic,
 671		IsAdmin:            cmd.isAdmin,
 672		IsPublic:           *public,
 673		ExistingAccessList: existingAccessList,
 674		HasExistingAccess:  hasExistingAccess,
 675		IsAccessCreator:    accessListCreator,
 676		HasUserAccess:      checkAccess(existingAccessList, cmd.userName, cmd.sesh),
 677	})
 678	name = result.Name
 679
 680	if result.GenerateNewTopic {
 681		topic = uuid.NewString()
 682		name = toPublicTopic(topic)
 683	}
 684
 685	if !*clean {
 686		fmtTopic := topic
 687		if *access != "" {
 688			fmtTopic = fmt.Sprintf("%s/%s", cmd.userName, topic)
 689		}
 690
 691		_, _ = fmt.Fprintf(
 692			cmd.sesh,
 693			"subscribe to this channel:\n  ssh %s sub %s%s\n",
 694			toSshCmd(handler.Cfg),
 695			msgFlag,
 696			fmtTopic,
 697		)
 698	}
 699
 700	if *block {
 701		count := 0
 702		for topic, channel := range handler.PubSub.GetChannels() {
 703			if topic == name {
 704				for _, client := range channel.GetClients() {
 705					if client.Direction == psub.ChannelDirectionOutput || client.Direction == psub.ChannelDirectionInputOutput {
 706						count++
 707					}
 708				}
 709				break
 710			}
 711		}
 712
 713		tt := *timeout
 714		if count == 0 {
 715			currentWaiters, _ := handler.Waiters.LoadOrStore(name, nil)
 716			handler.Waiters.Store(name, append(currentWaiters, clientID))
 717
 718			termMsg := "no subs found ... waiting"
 719			if tt > 0 {
 720				termMsg += " " + tt.String()
 721			}
 722
 723			if !*clean {
 724				_, _ = fmt.Fprintln(cmd.sesh, termMsg)
 725			}
 726
 727			ready := make(chan struct{})
 728
 729			go func() {
 730				for {
 731					select {
 732					case <-cmd.pipeCtx.Done():
 733						cmd.cancel()
 734						return
 735					case <-time.After(1 * time.Millisecond):
 736						count := 0
 737						for topic, channel := range handler.PubSub.GetChannels() {
 738							if topic == name {
 739								for _, client := range channel.GetClients() {
 740									if client.Direction == psub.ChannelDirectionOutput || client.Direction == psub.ChannelDirectionInputOutput {
 741										count++
 742									}
 743								}
 744								break
 745							}
 746						}
 747
 748						if count > 0 {
 749							close(ready)
 750							return
 751						}
 752					}
 753				}
 754			}()
 755
 756			select {
 757			case <-ready:
 758			case <-cmd.pipeCtx.Done():
 759			case <-time.After(tt):
 760				cmd.cancel()
 761
 762				if !*clean {
 763					return fmt.Errorf("timeout reached, exiting")
 764				} else {
 765					err := cmd.sesh.Exit(1)
 766					if err != nil {
 767						handler.Logger.Error("error exiting session", "err", err)
 768					}
 769
 770					_ = cmd.sesh.Close()
 771				}
 772			}
 773
 774			newWaiters, _ := handler.Waiters.LoadOrStore(name, nil)
 775			newWaiters = slices.DeleteFunc(newWaiters, func(cl string) bool {
 776				return cl == clientID
 777			})
 778			handler.Waiters.Store(name, newWaiters)
 779
 780			var toDelete []string
 781
 782			for channel, clients := range handler.Waiters.Range {
 783				if len(clients) == 0 {
 784					toDelete = append(toDelete, channel)
 785				}
 786			}
 787
 788			for _, channel := range toDelete {
 789				handler.Waiters.Delete(channel)
 790			}
 791		}
 792	}
 793
 794	if !*clean {
 795		_, _ = fmt.Fprintln(cmd.sesh, "sending msg ...")
 796	}
 797
 798	throttledRW := newThrottledMonitorRW(rw, handler, cmd, name)
 799
 800	var dsp psub.MessageDispatcher
 801	dsp = &psub.MulticastDispatcher{}
 802	if *dispatcher == "round_robin" {
 803		dsp = &psub.RoundRobinDispatcher{}
 804	}
 805	channel := psub.NewChannel(name)
 806	_ = handler.PubSub.SetDispatcher(dsp, []*psub.Channel{channel})
 807
 808	err := handler.PubSub.Pub(
 809		cmd.pipeCtx,
 810		clientID,
 811		throttledRW,
 812		[]*psub.Channel{channel},
 813		*block,
 814	)
 815
 816	if !*clean {
 817		_, _ = fmt.Fprintln(cmd.sesh, "msg sent!")
 818	}
 819
 820	if err != nil && !*clean {
 821		return err
 822	}
 823
 824	handler.updateMonitor(cmd, name)
 825
 826	return nil
 827}
 828
 829func (handler *CliHandler) updateMonitor(cmd *CliCmd, topic string) {
 830	if cmd.user == nil {
 831		return
 832	}
 833
 834	handler.Logger.Info("update monitor", "topic", topic)
 835	monitor, err := handler.DBPool.FindPipeMonitorByTopic(cmd.user.ID, topic)
 836	if err != nil || monitor == nil {
 837		handler.Logger.Info("no monitor found", "topic", topic)
 838		return
 839	}
 840
 841	now := time.Now().UTC()
 842
 843	// Fixed window semantics: windows are discrete, non-overlapping time slots.
 844	// - last_ping: always updated to show most recent activity (user visibility)
 845	// - window_end: only advances when current time exceeds it (health scheduling)
 846
 847	// If we're past the current window, advance to the window containing `now`
 848	newWindowEnd := *monitor.WindowEnd
 849	if !now.Before(*monitor.WindowEnd) {
 850		// Record history for the completed window before advancing
 851		// This captures that the old window was healthy (had activity)
 852		if err := handler.DBPool.InsertPipeMonitorHistory(monitor.ID, monitor.WindowDur, monitor.WindowEnd, monitor.LastPing); err != nil {
 853			handler.Logger.Error("failed to insert monitor history", "err", err, "topic", topic)
 854		}
 855
 856		// Calculate which window period `now` falls into
 857		elapsed := now.Sub(*monitor.WindowEnd)
 858		periods := int(elapsed/monitor.WindowDur) + 1
 859		newWindowEnd = monitor.WindowEnd.Add(time.Duration(periods) * monitor.WindowDur)
 860
 861		if err := handler.DBPool.UpsertPipeMonitor(cmd.user.ID, topic, monitor.WindowDur, &newWindowEnd); err != nil {
 862			handler.Logger.Error("failed to advance monitor window", "err", err, "topic", topic)
 863		}
 864		handler.Logger.Info("advanced monitor window",
 865			"topic", topic,
 866			"oldWindowEnd", monitor.WindowEnd.Format(time.RFC3339),
 867			"newWindowEnd", newWindowEnd.Format(time.RFC3339),
 868			"periodsMissed", periods-1,
 869		)
 870	}
 871
 872	// Always record the latest ping for user visibility
 873	if err := handler.DBPool.UpdatePipeMonitorLastPing(cmd.user.ID, topic, &now); err != nil {
 874		handler.Logger.Error("failed to update monitor last_ping", "err", err, "topic", topic)
 875	}
 876
 877	handler.Logger.Info("recorded monitor ping",
 878		"topic", topic,
 879		"pingTime", now.Format(time.RFC3339),
 880		"windowEnd", newWindowEnd.Format(time.RFC3339),
 881	)
 882}
 883
 884const monitorThrottleInterval = 15 * time.Second
 885
 886type throttledMonitorRW struct {
 887	rw       io.ReadWriter
 888	handler  *CliHandler
 889	cmd      *CliCmd
 890	topic    string
 891	lastPing atomic.Int64 // Unix nanoseconds
 892}
 893
 894func newThrottledMonitorRW(rw io.ReadWriter, handler *CliHandler, cmd *CliCmd, topic string) *throttledMonitorRW {
 895	return &throttledMonitorRW{
 896		rw:      rw,
 897		handler: handler,
 898		cmd:     cmd,
 899		topic:   topic,
 900	}
 901}
 902
 903func (t *throttledMonitorRW) throttledUpdate() {
 904	now := time.Now().UnixNano()
 905	last := t.lastPing.Load()
 906
 907	// First ping (last == 0) or interval elapsed
 908	if last == 0 || now-last >= int64(monitorThrottleInterval) {
 909		if t.lastPing.CompareAndSwap(last, now) {
 910			t.handler.updateMonitor(t.cmd, t.topic)
 911		}
 912	}
 913}
 914
 915func (t *throttledMonitorRW) Read(p []byte) (int, error) {
 916	n, err := t.rw.Read(p)
 917	if n > 0 {
 918		t.throttledUpdate()
 919	}
 920	return n, err
 921}
 922
 923func (t *throttledMonitorRW) Write(p []byte) (int, error) {
 924	n, err := t.rw.Write(p)
 925	if n > 0 {
 926		t.throttledUpdate()
 927	}
 928	return n, err
 929}
 930
 931func (handler *CliHandler) sub(cmd *CliCmd, topic string, clientID string) error {
 932	subCmd := flagSet("sub", cmd.sesh)
 933	access := subCmd.String("a", "", "Comma separated list of pico usernames or ssh-key fingerprints to allow access to a topic")
 934	public := subCmd.Bool("p", false, "Subscribe to a public topic")
 935	keepAlive := subCmd.Bool("k", false, "Keep the subscription alive even after the publisher has died")
 936	clean := subCmd.Bool("c", false, "Don't send status messages")
 937
 938	if !flagCheck(subCmd, topic, cmd.args) {
 939		return fmt.Errorf("invalid cmd args")
 940	}
 941
 942	if subCmd.NArg() == 1 && topic == "" {
 943		topic = subCmd.Arg(0)
 944	}
 945
 946	handler.Logger.Info(
 947		"flags parsed",
 948		"cmd", cmd,
 949		"public", *public,
 950		"keepAlive", *keepAlive,
 951		"topic", topic,
 952		"clean", *clean,
 953		"access", *access,
 954	)
 955
 956	var accessList []string
 957
 958	if *access != "" {
 959		accessList = parseArgList(*access)
 960	}
 961
 962	// Initial resolution to get the topic name for access storage
 963	initialResult := resolveTopic(TopicResolveInput{
 964		UserName: cmd.userName,
 965		Topic:    topic,
 966		IsAdmin:  cmd.isAdmin,
 967		IsPublic: *public,
 968	})
 969	name := initialResult.Name
 970
 971	var accessListCreator bool
 972
 973	_, loaded := handler.Access.LoadOrStore(name, accessList)
 974	if !loaded {
 975		defer func() {
 976			handler.Access.Delete(name)
 977		}()
 978		accessListCreator = true
 979	}
 980
 981	// Check for existing access list and resolve final topic name
 982	existingAccessList, hasExistingAccess := handler.Access.Load(initialResult.WithoutUser)
 983	result := resolveTopic(TopicResolveInput{
 984		UserName:           cmd.userName,
 985		Topic:              topic,
 986		IsAdmin:            cmd.isAdmin,
 987		IsPublic:           *public,
 988		ExistingAccessList: existingAccessList,
 989		HasExistingAccess:  hasExistingAccess,
 990		IsAccessCreator:    accessListCreator,
 991		HasUserAccess:      checkAccess(existingAccessList, cmd.userName, cmd.sesh),
 992	})
 993	name = result.Name
 994
 995	if result.AccessDenied {
 996		return fmt.Errorf("access denied")
 997	}
 998
 999	err := handler.PubSub.Sub(
1000		cmd.pipeCtx,
1001		clientID,
1002		cmd.sesh,
1003		[]*psub.Channel{
1004			psub.NewChannel(name),
1005		},
1006		*keepAlive,
1007	)
1008
1009	if err != nil && !*clean {
1010		return err
1011	}
1012
1013	return nil
1014}
1015
1016func (handler *CliHandler) pipe(cmd *CliCmd, topic string, clientID string) error {
1017	pipeCmd := flagSet("pipe", cmd.sesh)
1018	access := pipeCmd.String("a", "", "Comma separated list of pico usernames or ssh-key fingerprints to allow access to a topic")
1019	public := pipeCmd.Bool("p", false, "Pipe to a public topic")
1020	replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
1021	clean := pipeCmd.Bool("c", false, "Don't send status messages")
1022	dispatcher := pipeCmd.String("d", "multicast", "Type of dispatcher (e.g. multicast, round_robin)")
1023
1024	if !flagCheck(pipeCmd, topic, cmd.args) {
1025		return fmt.Errorf("invalid cmd args")
1026	}
1027
1028	if pipeCmd.NArg() == 1 && topic == "" {
1029		topic = pipeCmd.Arg(0)
1030	}
1031
1032	handler.Logger.Info(
1033		"flags parsed",
1034		"cmd", cmd,
1035		"public", *public,
1036		"replay", *replay,
1037		"topic", topic,
1038		"access", *access,
1039		"clean", *clean,
1040		"dispatcher", *dispatcher,
1041	)
1042
1043	var accessList []string
1044
1045	if *access != "" {
1046		accessList = parseArgList(*access)
1047	}
1048
1049	isCreator := topic == ""
1050	if isCreator {
1051		topic = uuid.NewString()
1052	}
1053
1054	flagMsg := ""
1055	if *public {
1056		flagMsg = "-p "
1057	}
1058
1059	// Initial resolution to get the topic name for access storage
1060	initialResult := resolveTopic(TopicResolveInput{
1061		UserName: cmd.userName,
1062		Topic:    topic,
1063		IsAdmin:  cmd.isAdmin,
1064		IsPublic: *public,
1065	})
1066	name := initialResult.Name
1067
1068	var accessListCreator bool
1069
1070	_, loaded := handler.Access.LoadOrStore(name, accessList)
1071	if !loaded {
1072		defer func() {
1073			handler.Access.Delete(name)
1074		}()
1075		accessListCreator = true
1076	}
1077
1078	// Check for existing access list and resolve final topic name
1079	existingAccessList, hasExistingAccess := handler.Access.Load(initialResult.WithoutUser)
1080	result := resolveTopic(TopicResolveInput{
1081		UserName:           cmd.userName,
1082		Topic:              topic,
1083		IsAdmin:            cmd.isAdmin,
1084		IsPublic:           *public,
1085		ExistingAccessList: existingAccessList,
1086		HasExistingAccess:  hasExistingAccess,
1087		IsAccessCreator:    accessListCreator,
1088		HasUserAccess:      checkAccess(existingAccessList, cmd.userName, cmd.sesh),
1089	})
1090	name = result.Name
1091
1092	if result.GenerateNewTopic {
1093		topic = uuid.NewString()
1094		name = toPublicTopic(topic)
1095	}
1096
1097	if isCreator && !*clean {
1098		fmtTopic := topic
1099		if *access != "" {
1100			fmtTopic = fmt.Sprintf("%s/%s", cmd.userName, topic)
1101		}
1102
1103		_, _ = fmt.Fprintf(
1104			cmd.sesh,
1105			"subscribe to this topic:\n  ssh %s sub %s%s\n",
1106			toSshCmd(handler.Cfg),
1107			flagMsg,
1108			fmtTopic,
1109		)
1110	}
1111
1112	throttledRW := newThrottledMonitorRW(cmd.sesh, handler, cmd, name)
1113
1114	var dsp psub.MessageDispatcher
1115	dsp = &psub.MulticastDispatcher{}
1116	if *dispatcher == "round_robin" {
1117		dsp = &psub.RoundRobinDispatcher{}
1118	}
1119	channel := psub.NewChannel(name)
1120	_ = handler.PubSub.SetDispatcher(dsp, []*psub.Channel{channel})
1121
1122	readErr, writeErr := handler.PubSub.Pipe(
1123		cmd.pipeCtx,
1124		clientID,
1125		throttledRW,
1126		[]*psub.Channel{
1127			psub.NewChannel(name),
1128		},
1129		*replay,
1130	)
1131
1132	if readErr != nil && !*clean {
1133		return readErr
1134	}
1135
1136	if writeErr != nil && !*clean {
1137		return writeErr
1138	}
1139
1140	handler.updateMonitor(cmd, name)
1141
1142	return nil
1143}
1144
1145func toSshCmd(cfg *shared.ConfigSite) string {
1146	port := ""
1147	if cfg.PortOverride != "22" {
1148		port = fmt.Sprintf("-p %s ", cfg.PortOverride)
1149	}
1150	return fmt.Sprintf("%s%s", port, cfg.Domain)
1151}
1152
1153// parseArgList parses a comma separated list of arguments.
1154func parseArgList(arg string) []string {
1155	argList := strings.Split(arg, ",")
1156	for i, acc := range argList {
1157		argList[i] = strings.TrimSpace(acc)
1158	}
1159	return argList
1160}
1161
1162// checkAccess checks if the user has access to a topic based on an access list.
1163func checkAccess(accessList []string, userName string, sesh *pssh.SSHServerConnSession) bool {
1164	for _, acc := range accessList {
1165		if acc == userName {
1166			return true
1167		}
1168
1169		if key := sesh.PublicKey(); key != nil && acc == gossh.FingerprintSHA256(key) {
1170			return true
1171		}
1172	}
1173
1174	return false
1175}
1176
1177func flagSet(cmdName string, sesh *pssh.SSHServerConnSession) *flag.FlagSet {
1178	cmd := flag.NewFlagSet(cmdName, flag.ContinueOnError)
1179	cmd.SetOutput(sesh)
1180	cmd.Usage = func() {
1181		_, _ = fmt.Fprintf(cmd.Output(), "Usage: %s <topic> [args...]\nArgs:\n", cmdName)
1182		cmd.PrintDefaults()
1183	}
1184	return cmd
1185}
1186
1187func flagCheck(cmd *flag.FlagSet, posArg string, cmdArgs []string) bool {
1188	err := cmd.Parse(cmdArgs)
1189
1190	if err != nil || posArg == "help" {
1191		if posArg == "help" {
1192			cmd.Usage()
1193		}
1194		return false
1195	}
1196	return true
1197}
1198
1199func clientInfo(clients []*psub.Client, isAdmin bool, clientType string) string {
1200	if len(clients) == 0 {
1201		return ""
1202	}
1203
1204	outputData := fmt.Sprintf("  %s:\r\n", clientType)
1205
1206	for _, client := range clients {
1207		if strings.HasPrefix(client.ID, "admin-") && !isAdmin {
1208			continue
1209		}
1210
1211		outputData += fmt.Sprintf("  - %s\r\n", client.ID)
1212	}
1213
1214	return outputData
1215}