Eric Bower
·
2026-02-03
cli.go
1package pipe
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "errors"
8 "flag"
9 "fmt"
10 "io"
11 "log/slog"
12 "slices"
13 "strings"
14 "sync/atomic"
15 "text/tabwriter"
16 "time"
17
18 "github.com/antoniomika/syncmap"
19 "github.com/google/uuid"
20 "github.com/gorilla/feeds"
21 "github.com/picosh/pico/pkg/db"
22 "github.com/picosh/pico/pkg/pssh"
23 psub "github.com/picosh/pico/pkg/pubsub"
24 "github.com/picosh/pico/pkg/shared"
25 gossh "golang.org/x/crypto/ssh"
26)
27
28func Middleware(handler *CliHandler) pssh.SSHServerMiddleware {
29 return func(next pssh.SSHServerHandler) pssh.SSHServerHandler {
30 return func(sesh *pssh.SSHServerConnSession) error {
31 ctx := sesh.Context()
32 logger := pssh.GetLogger(sesh)
33 user := pssh.GetUser(sesh)
34
35 args := sesh.Command()
36 if len(args) == 0 {
37 help(handler.Cfg, sesh)
38 return next(sesh)
39 }
40
41 userName := "public"
42 userNameAddition := ""
43 uuidStr := uuid.NewString()
44 isAdmin := false
45 if user != nil {
46 isAdmin = handler.DBPool.HasFeatureByUser(user.ID, "admin")
47 if isAdmin && strings.HasPrefix(sesh.User(), "admin__") {
48 uuidStr = fmt.Sprintf("admin-%s", uuidStr)
49 }
50
51 userName = user.Name
52 if user.PublicKey != nil && user.PublicKey.Name != "" {
53 addition := user.PublicKey.Name
54 identity := sesh.Permissions().Extensions["identity"]
55 if identity != "" && identity != "pubkey" {
56 addition = identity
57 }
58 userNameAddition = fmt.Sprintf("-%s", addition)
59 }
60 }
61
62 pipeCtx, cancel := context.WithCancel(ctx)
63
64 cliCmd := &CliCmd{
65 sesh: sesh,
66 args: args,
67 userName: userName,
68 isAdmin: isAdmin,
69 pipeCtx: pipeCtx,
70 cancel: cancel,
71 user: user,
72 }
73
74 cmd := strings.TrimSpace(args[0])
75 switch cmd {
76 case "help":
77 help(handler.Cfg, sesh)
78 return next(sesh)
79 case "ls":
80 err := handler.ls(cliCmd)
81 if err != nil {
82 logger.Error("ls cmd", "err", err)
83 sesh.Fatal(err)
84 }
85 return next(sesh)
86 case "monitor":
87 err := handler.monitor(cliCmd, user)
88 if err != nil {
89 logger.Error("monitor cmd", "err", err)
90 sesh.Fatal(err)
91 }
92 return next(sesh)
93 case "status":
94 err := handler.status(cliCmd, user)
95 if err != nil {
96 logger.Error("status cmd", "err", err)
97 sesh.Fatal(err)
98 }
99 return next(sesh)
100 case "rss":
101 rss, err := MonitorRss(handler.DBPool, user, handler.Cfg.Domain)
102 _, _ = fmt.Fprintln(sesh, rss)
103 if err != nil {
104 logger.Error("rss cmd", "err", err)
105 sesh.Fatal(err)
106 }
107 return next(sesh)
108 }
109
110 topic := ""
111 cmdArgs := args[1:]
112 if len(args) > 1 && !strings.HasPrefix(args[1], "-") {
113 topic = strings.TrimSpace(args[1])
114 cmdArgs = args[2:]
115 }
116 // sub commands after this line expect clipped args
117 cliCmd.args = cmdArgs
118
119 logger.Info(
120 "pubsub middleware detected command",
121 "args", args,
122 "cmd", cmd,
123 "topic", topic,
124 "cmdArgs", cmdArgs,
125 )
126
127 clientID := fmt.Sprintf(
128 "%s (%s%s@%s)",
129 uuidStr,
130 userName,
131 userNameAddition,
132 sesh.RemoteAddr().String(),
133 )
134
135 go func() {
136 defer cancel()
137
138 ticker := time.NewTicker(5 * time.Second)
139 defer ticker.Stop()
140
141 for {
142 select {
143 case <-pipeCtx.Done():
144 return
145 case <-ticker.C:
146 _, err := sesh.SendRequest("ping@pico.sh", false, nil)
147 if err != nil {
148 logger.Error("error sending ping", "err", err)
149 return
150 }
151 }
152 }
153 }()
154
155 switch cmd {
156 case "pub":
157 err := handler.pub(cliCmd, topic, clientID)
158 if err != nil {
159 logger.Error("pub cmd", "err", err)
160 sesh.Fatal(err)
161 }
162 case "sub":
163 err := handler.sub(cliCmd, topic, clientID)
164 if err != nil {
165 logger.Error("sub cmd", "err", err)
166 sesh.Fatal(err)
167 }
168 case "pipe":
169 err := handler.pipe(cliCmd, topic, clientID)
170 if err != nil {
171 logger.Error("pipe cmd", "err", err)
172 sesh.Fatal(err)
173 }
174 case "uptime":
175 err := handler.uptime(cliCmd, topic, user)
176 if err != nil {
177 logger.Error("uptime cmd", "err", err)
178 sesh.Fatal(err)
179 }
180 }
181
182 return next(sesh)
183 }
184 }
185}
186
187type CliHandler struct {
188 DBPool db.DB
189 Logger *slog.Logger
190 PubSub psub.PubSub
191 Cfg *shared.ConfigSite
192 Waiters *syncmap.Map[string, []string]
193 Access *syncmap.Map[string, []string]
194}
195
196func (h *CliHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
197 return h.Logger
198}
199
200type CliCmd struct {
201 sesh *pssh.SSHServerConnSession
202 args []string
203 userName string
204 isAdmin bool
205 pipeCtx context.Context
206 cancel context.CancelFunc
207 user *db.User
208}
209
210func help(cfg *shared.ConfigSite, sesh *pssh.SSHServerConnSession) {
211 data := fmt.Sprintf(`Command: ssh %s <command> [args...]
212
213The simplest authenticated pubsub system. Send messages through
214user-defined topics. Topics are private to the authenticated
215ssh user. The default pubsub model is multicast with bidirectional
216blocking, meaning a publisher ("pub") will send its message to all
217subscribers ("sub"). Further, both "pub" and "sub" will wait for
218at least one event to be sent or received. Pipe ("pipe") allows
219for bidirectional messages to be sent between any clients connected
220to a pipe.
221
222Commands:
223 help Show this help message
224 ls List active pubsub channels
225 pub <topic> [flags] Publish messages to a topic
226 sub <topic> [flags] Subscribe to messages from a topic
227 pipe <topic> [flags] Bidirectional messaging between clients
228
229Monitoring commands:
230 monitor <topic> <duration> Create/update a health monitor for a topic
231 monitor <topic> -d Delete a monitor
232 status Show health status of all monitors
233 uptime Show uptime for a topic
234 rss Get RSS feed of monitor alerts
235
236Use "ssh %s <command> -h" for help on a specific command.
237`, toSshCmd(cfg), toSshCmd(cfg))
238
239 data = strings.ReplaceAll(data, "\n", "\r\n")
240 _, _ = fmt.Fprintln(sesh, data)
241}
242
243func (handler *CliHandler) ls(cmd *CliCmd) error {
244 if cmd.userName == "public" {
245 err := fmt.Errorf("access denied")
246 return err
247 }
248
249 topicFilter := fmt.Sprintf("%s/", cmd.userName)
250 if cmd.isAdmin {
251 topicFilter = ""
252 if len(cmd.args) > 1 {
253 topicFilter = cmd.args[1]
254 }
255 }
256
257 var channels []*psub.Channel
258 waitingChannels := map[string][]string{}
259
260 for topic, channel := range handler.PubSub.GetChannels() {
261 if strings.HasPrefix(topic, topicFilter) {
262 channels = append(channels, channel)
263 }
264 }
265
266 for channel, clients := range handler.Waiters.Range {
267 if strings.HasPrefix(channel, topicFilter) {
268 waitingChannels[channel] = clients
269 }
270 }
271
272 if len(channels) == 0 && len(waitingChannels) == 0 {
273 _, _ = fmt.Fprintln(cmd.sesh, "no pubsub channels found")
274 } else {
275 var outputData string
276 if len(channels) > 0 || len(waitingChannels) > 0 {
277 outputData += "Channel Information\r\n"
278 for _, channel := range channels {
279 extraData := ""
280
281 if accessList, ok := handler.Access.Load(channel.Topic); ok && len(accessList) > 0 {
282 extraData += fmt.Sprintf(" (Access List: %s)", strings.Join(accessList, ", "))
283 }
284
285 outputData += fmt.Sprintf("- %s:%s\r\n", channel.Topic, extraData)
286
287 var pubs []*psub.Client
288 var subs []*psub.Client
289 var pipes []*psub.Client
290
291 for _, client := range channel.GetClients() {
292 switch client.Direction {
293 case psub.ChannelDirectionInput:
294 pubs = append(pubs, client)
295 case psub.ChannelDirectionOutput:
296 subs = append(subs, client)
297 case psub.ChannelDirectionInputOutput:
298 pipes = append(pipes, client)
299 }
300 }
301 outputData += clientInfo(pubs, cmd.isAdmin, "Pubs")
302 outputData += clientInfo(subs, cmd.isAdmin, "Subs")
303 outputData += clientInfo(pipes, cmd.isAdmin, "Pipes")
304 }
305
306 for waitingChannel, channelPubs := range waitingChannels {
307 extraData := ""
308
309 if accessList, ok := handler.Access.Load(waitingChannel); ok && len(accessList) > 0 {
310 extraData += fmt.Sprintf(" (Access List: %s)", strings.Join(accessList, ", "))
311 }
312
313 outputData += fmt.Sprintf("- %s:%s\r\n", waitingChannel, extraData)
314 outputData += fmt.Sprintf(" %s:\r\n", "Waiting Pubs")
315 for _, client := range channelPubs {
316 if strings.HasPrefix(client, "admin-") && !cmd.isAdmin {
317 continue
318 }
319 outputData += fmt.Sprintf(" - %s\r\n", client)
320 }
321 }
322 }
323
324 _, _ = cmd.sesh.Write([]byte(outputData))
325 }
326
327 return nil
328}
329
330func (handler *CliHandler) monitor(cmd *CliCmd, user *db.User) error {
331 if user == nil {
332 return fmt.Errorf("access denied")
333 }
334
335 args := cmd.sesh.Command()
336 topic := ""
337 cmdArgs := args[1:]
338 if len(args) > 1 && !strings.HasPrefix(args[1], "-") {
339 topic = strings.TrimSpace(args[1])
340 cmdArgs = args[2:]
341 }
342
343 monitorCmd := flagSet("monitor", cmd.sesh)
344 del := monitorCmd.Bool("d", false, "Delete the monitor")
345
346 if !flagCheck(monitorCmd, topic, cmdArgs) {
347 return nil
348 }
349
350 if topic == "" {
351 _, _ = fmt.Fprintln(cmd.sesh, "Usage: monitor <topic> <duration>")
352 _, _ = fmt.Fprintln(cmd.sesh, " monitor <topic> -d")
353 return fmt.Errorf("topic is required")
354 }
355
356 // Resolve to fully qualified topic name
357 result := resolveTopic(TopicResolveInput{
358 UserName: cmd.userName,
359 Topic: topic,
360 IsAdmin: cmd.isAdmin,
361 IsPublic: false,
362 })
363 resolvedTopic := result.Name
364
365 if *del {
366 handler.Logger.Info("removing pipe monitor", "topic", resolvedTopic)
367 err := handler.DBPool.RemovePipeMonitor(user.ID, resolvedTopic)
368 if err != nil {
369 return fmt.Errorf("failed to delete monitor: %w", err)
370 }
371 _, _ = fmt.Fprintf(cmd.sesh, "monitor deleted: %s\r\n", resolvedTopic)
372 return nil
373 }
374
375 // Create/update monitor - need duration argument
376 durStr := ""
377 if monitorCmd.NArg() > 0 {
378 durStr = monitorCmd.Arg(0)
379 } else if len(cmdArgs) > 0 {
380 durStr = cmdArgs[0]
381 }
382
383 if durStr == "" {
384 _, _ = fmt.Fprintln(cmd.sesh, "Usage: monitor <topic> <duration>")
385 return fmt.Errorf("duration is required")
386 }
387
388 dur, err := time.ParseDuration(durStr)
389 if err != nil {
390 return fmt.Errorf("invalid duration %q: %w", durStr, err)
391 }
392
393 winEnd := time.Now().UTC().Add(dur)
394 handler.Logger.Info(
395 "upserting pipe monitor",
396 "topic", resolvedTopic,
397 "dur", dur,
398 "window", winEnd.UTC().Format(time.RFC3339),
399 )
400 err = handler.DBPool.UpsertPipeMonitor(user.ID, resolvedTopic, dur, &winEnd)
401 if err != nil {
402 return fmt.Errorf("failed to create monitor: %w", err)
403 }
404
405 _, _ = fmt.Fprintf(cmd.sesh, "monitor created: %s (window: %s)\r\n", resolvedTopic, dur)
406 return nil
407}
408
409func (handler *CliHandler) status(cmd *CliCmd, user *db.User) error {
410 if user == nil {
411 return fmt.Errorf("access denied")
412 }
413
414 monitors, err := handler.DBPool.FindPipeMonitorsByUser(user.ID)
415 if err != nil {
416 return fmt.Errorf("failed to fetch monitors: %w", err)
417 }
418
419 if len(monitors) == 0 {
420 _, _ = fmt.Fprintln(cmd.sesh, "no monitors found")
421 return nil
422 }
423
424 writer := tabwriter.NewWriter(cmd.sesh, 0, 0, 2, ' ', tabwriter.TabIndent)
425 _, _ = fmt.Fprintln(writer, "Topic\tStatus\tWindow\tLast Ping\tWindow End\tReason")
426
427 for _, m := range monitors {
428 status := "healthy"
429 reason := ""
430 if err := m.Status(); err != nil {
431 status = "unhealthy"
432 reason = err.Error()
433 }
434
435 lastPing := "never"
436 if m.LastPing != nil {
437 lastPing = m.LastPing.UTC().Format(time.RFC3339)
438 }
439
440 windowEnd := ""
441 if m.WindowEnd != nil {
442 windowEnd = m.WindowEnd.UTC().Format(time.RFC3339)
443 }
444
445 _, _ = fmt.Fprintf(
446 writer,
447 "%s\t%s\t%s\t%s\t%s\t%s\r\n",
448 m.Topic,
449 status,
450 m.WindowDur.String(),
451 lastPing,
452 windowEnd,
453 reason,
454 )
455 }
456 _ = writer.Flush()
457 return nil
458}
459
460func (handler *CliHandler) uptime(cmd *CliCmd, topic string, user *db.User) error {
461 if user == nil {
462 return fmt.Errorf("access denied")
463 }
464
465 if topic == "" {
466 _, _ = fmt.Fprintln(cmd.sesh, "usage: uptime <topic> [--from <time>] [--to <time>]")
467 _, _ = fmt.Fprintln(cmd.sesh, " --from: start time (RFC3339 or duration like '24h', '7d', default: 24h)")
468 _, _ = fmt.Fprintln(cmd.sesh, " --to: end time (RFC3339, default: now)")
469 return nil
470 }
471
472 fs := flag.NewFlagSet("uptime", flag.ContinueOnError)
473 fs.SetOutput(cmd.sesh)
474 fromStr := fs.String("from", "", "start time (RFC3339 or duration like '24h', '7d')")
475 toStr := fs.String("to", "", "end time (RFC3339, defaults to now)")
476
477 if err := fs.Parse(cmd.args); err != nil {
478 return nil
479 }
480
481 topicResult := resolveTopic(TopicResolveInput{
482 UserName: cmd.userName,
483 Topic: topic,
484 IsAdmin: cmd.isAdmin,
485 IsPublic: false,
486 })
487 resolvedTopic := topicResult.Name
488
489 monitor, err := handler.DBPool.FindPipeMonitorByTopic(user.ID, resolvedTopic)
490 if err != nil {
491 if errors.Is(err, sql.ErrNoRows) {
492 return fmt.Errorf("monitor not found: %s", topic)
493 }
494 return fmt.Errorf("failed to find monitor: %w", err)
495 }
496
497 now := time.Now().UTC()
498 to := now
499 from := now.Add(-24 * time.Hour)
500
501 if *fromStr != "" {
502 if parsed, err := time.Parse(time.RFC3339, *fromStr); err == nil {
503 from = parsed.UTC()
504 } else if dur, err := parseDuration(*fromStr); err == nil {
505 from = now.Add(-dur)
506 } else {
507 return fmt.Errorf("invalid --from value: %s", *fromStr)
508 }
509 }
510
511 if *toStr != "" {
512 if parsed, err := time.Parse(time.RFC3339, *toStr); err == nil {
513 to = parsed.UTC()
514 } else {
515 return fmt.Errorf("invalid --to value: %s", *toStr)
516 }
517 }
518
519 history, err := handler.DBPool.FindPipeMonitorHistory(monitor.ID, from, to)
520 if err != nil {
521 return fmt.Errorf("failed to fetch history: %w", err)
522 }
523
524 result := db.ComputeUptime(history, from, to)
525
526 _, _ = fmt.Fprintf(cmd.sesh, "Monitor: %s\r\n", topic)
527 _, _ = fmt.Fprintf(cmd.sesh, "Period: %s to %s\r\n", from.Format(time.RFC3339), to.Format(time.RFC3339))
528 _, _ = fmt.Fprintf(cmd.sesh, "Total Duration: %s\r\n", result.TotalDuration.Round(time.Second))
529 _, _ = fmt.Fprintf(cmd.sesh, "Uptime Duration: %s\r\n", result.UptimeDuration.Round(time.Second))
530 _, _ = fmt.Fprintf(cmd.sesh, "Uptime: %.2f%%\r\n", result.UptimePercent)
531
532 return nil
533}
534
535func parseDuration(s string) (time.Duration, error) {
536 if len(s) == 0 {
537 return 0, fmt.Errorf("empty duration")
538 }
539 last := s[len(s)-1]
540 if last == 'd' {
541 var n int
542 _, err := fmt.Sscanf(s, "%d", &n)
543 if err != nil {
544 return 0, fmt.Errorf("invalid duration: %s", s)
545 }
546 return time.Duration(n) * 24 * time.Hour, nil
547 }
548 return time.ParseDuration(s)
549}
550
551func MonitorRss(dbpool db.DB, user *db.User, domain string) (string, error) {
552 if user == nil {
553 return "", fmt.Errorf("access denied")
554 }
555
556 monitors, err := dbpool.FindPipeMonitorsByUser(user.ID)
557 if err != nil {
558 return "", fmt.Errorf("failed to fetch monitors: %w", err)
559 }
560
561 now := time.Now()
562 feed := &feeds.Feed{
563 Title: fmt.Sprintf("Pipe Monitors for %s", user.Name),
564 Link: &feeds.Link{Href: fmt.Sprintf("https://%s", domain)},
565 Description: "Alerts for pipe monitor status changes",
566 Author: &feeds.Author{Name: user.Name},
567 Created: now,
568 }
569
570 var feedItems []*feeds.Item
571 for _, m := range monitors {
572 if err := m.Status(); err != nil {
573 item := &feeds.Item{
574 Id: fmt.Sprintf("%s-%s-%d", user.ID, m.Topic, now.Unix()),
575 Title: fmt.Sprintf("ALERT: %s is unhealthy", m.Topic),
576 Link: &feeds.Link{Href: fmt.Sprintf("https://%s", domain)},
577 Description: err.Error(),
578 Created: now,
579 Updated: now,
580 Author: &feeds.Author{Name: user.Name},
581 }
582 feedItems = append(feedItems, item)
583 }
584 }
585 feed.Items = feedItems
586
587 rss, err := feed.ToRss()
588 if err != nil {
589 return "", fmt.Errorf("failed to generate RSS: %w", err)
590 }
591
592 return rss, nil
593}
594
595func (handler *CliHandler) pub(cmd *CliCmd, topic string, clientID string) error {
596 pubCmd := flagSet("pub", cmd.sesh)
597 access := pubCmd.String("a", "", "Comma separated list of pico usernames or ssh-key fingerprints to allow access to a topic")
598 empty := pubCmd.Bool("e", false, "Send an empty message to subs")
599 public := pubCmd.Bool("p", false, "Publish message to public topic")
600 block := pubCmd.Bool("b", true, "Block writes until a subscriber is available")
601 timeout := pubCmd.Duration("t", 30*24*time.Hour, "Timeout as a Go duration to block for a subscriber to be available. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is 30 days.")
602 clean := pubCmd.Bool("c", false, "Don't send status messages")
603 dispatcher := pubCmd.String("d", "multicast", "Type of dispatcher (e.g. multicast, round_robin)")
604
605 if !flagCheck(pubCmd, topic, cmd.args) {
606 return fmt.Errorf("invalid cmd args")
607 }
608
609 if pubCmd.NArg() == 1 && topic == "" {
610 topic = pubCmd.Arg(0)
611 }
612
613 handler.Logger.Info(
614 "flags parsed",
615 "cmd", "pub",
616 "empty", *empty,
617 "public", *public,
618 "block", *block,
619 "timeout", *timeout,
620 "topic", topic,
621 "access", *access,
622 "clean", *clean,
623 "dispatcher", *dispatcher,
624 )
625
626 var accessList []string
627
628 if *access != "" {
629 accessList = parseArgList(*access)
630 }
631
632 var rw io.ReadWriter
633 if *empty {
634 rw = bytes.NewBuffer(make([]byte, 1))
635 } else {
636 rw = cmd.sesh
637 }
638
639 if topic == "" {
640 topic = uuid.NewString()
641 }
642
643 msgFlag := ""
644 if *public {
645 msgFlag = "-p "
646 }
647
648 // Initial resolution to get the topic name for access storage
649 initialResult := resolveTopic(TopicResolveInput{
650 UserName: cmd.userName,
651 Topic: topic,
652 IsAdmin: cmd.isAdmin,
653 IsPublic: *public,
654 })
655 name := initialResult.Name
656
657 var accessListCreator bool
658 _, loaded := handler.Access.LoadOrStore(name, accessList)
659 if !loaded {
660 defer func() {
661 handler.Access.Delete(name)
662 }()
663 accessListCreator = true
664 }
665
666 // Check for existing access list and resolve final topic name
667 existingAccessList, hasExistingAccess := handler.Access.Load(initialResult.WithoutUser)
668 result := resolveTopic(TopicResolveInput{
669 UserName: cmd.userName,
670 Topic: topic,
671 IsAdmin: cmd.isAdmin,
672 IsPublic: *public,
673 ExistingAccessList: existingAccessList,
674 HasExistingAccess: hasExistingAccess,
675 IsAccessCreator: accessListCreator,
676 HasUserAccess: checkAccess(existingAccessList, cmd.userName, cmd.sesh),
677 })
678 name = result.Name
679
680 if result.GenerateNewTopic {
681 topic = uuid.NewString()
682 name = toPublicTopic(topic)
683 }
684
685 if !*clean {
686 fmtTopic := topic
687 if *access != "" {
688 fmtTopic = fmt.Sprintf("%s/%s", cmd.userName, topic)
689 }
690
691 _, _ = fmt.Fprintf(
692 cmd.sesh,
693 "subscribe to this channel:\n ssh %s sub %s%s\n",
694 toSshCmd(handler.Cfg),
695 msgFlag,
696 fmtTopic,
697 )
698 }
699
700 if *block {
701 count := 0
702 for topic, channel := range handler.PubSub.GetChannels() {
703 if topic == name {
704 for _, client := range channel.GetClients() {
705 if client.Direction == psub.ChannelDirectionOutput || client.Direction == psub.ChannelDirectionInputOutput {
706 count++
707 }
708 }
709 break
710 }
711 }
712
713 tt := *timeout
714 if count == 0 {
715 currentWaiters, _ := handler.Waiters.LoadOrStore(name, nil)
716 handler.Waiters.Store(name, append(currentWaiters, clientID))
717
718 termMsg := "no subs found ... waiting"
719 if tt > 0 {
720 termMsg += " " + tt.String()
721 }
722
723 if !*clean {
724 _, _ = fmt.Fprintln(cmd.sesh, termMsg)
725 }
726
727 ready := make(chan struct{})
728
729 go func() {
730 for {
731 select {
732 case <-cmd.pipeCtx.Done():
733 cmd.cancel()
734 return
735 case <-time.After(1 * time.Millisecond):
736 count := 0
737 for topic, channel := range handler.PubSub.GetChannels() {
738 if topic == name {
739 for _, client := range channel.GetClients() {
740 if client.Direction == psub.ChannelDirectionOutput || client.Direction == psub.ChannelDirectionInputOutput {
741 count++
742 }
743 }
744 break
745 }
746 }
747
748 if count > 0 {
749 close(ready)
750 return
751 }
752 }
753 }
754 }()
755
756 select {
757 case <-ready:
758 case <-cmd.pipeCtx.Done():
759 case <-time.After(tt):
760 cmd.cancel()
761
762 if !*clean {
763 return fmt.Errorf("timeout reached, exiting")
764 } else {
765 err := cmd.sesh.Exit(1)
766 if err != nil {
767 handler.Logger.Error("error exiting session", "err", err)
768 }
769
770 _ = cmd.sesh.Close()
771 }
772 }
773
774 newWaiters, _ := handler.Waiters.LoadOrStore(name, nil)
775 newWaiters = slices.DeleteFunc(newWaiters, func(cl string) bool {
776 return cl == clientID
777 })
778 handler.Waiters.Store(name, newWaiters)
779
780 var toDelete []string
781
782 for channel, clients := range handler.Waiters.Range {
783 if len(clients) == 0 {
784 toDelete = append(toDelete, channel)
785 }
786 }
787
788 for _, channel := range toDelete {
789 handler.Waiters.Delete(channel)
790 }
791 }
792 }
793
794 if !*clean {
795 _, _ = fmt.Fprintln(cmd.sesh, "sending msg ...")
796 }
797
798 throttledRW := newThrottledMonitorRW(rw, handler, cmd, name)
799
800 var dsp psub.MessageDispatcher
801 dsp = &psub.MulticastDispatcher{}
802 if *dispatcher == "round_robin" {
803 dsp = &psub.RoundRobinDispatcher{}
804 }
805 channel := psub.NewChannel(name)
806 _ = handler.PubSub.SetDispatcher(dsp, []*psub.Channel{channel})
807
808 err := handler.PubSub.Pub(
809 cmd.pipeCtx,
810 clientID,
811 throttledRW,
812 []*psub.Channel{channel},
813 *block,
814 )
815
816 if !*clean {
817 _, _ = fmt.Fprintln(cmd.sesh, "msg sent!")
818 }
819
820 if err != nil && !*clean {
821 return err
822 }
823
824 handler.updateMonitor(cmd, name)
825
826 return nil
827}
828
829func (handler *CliHandler) updateMonitor(cmd *CliCmd, topic string) {
830 if cmd.user == nil {
831 return
832 }
833
834 handler.Logger.Info("update monitor", "topic", topic)
835 monitor, err := handler.DBPool.FindPipeMonitorByTopic(cmd.user.ID, topic)
836 if err != nil || monitor == nil {
837 handler.Logger.Info("no monitor found", "topic", topic)
838 return
839 }
840
841 now := time.Now().UTC()
842
843 // Fixed window semantics: windows are discrete, non-overlapping time slots.
844 // - last_ping: always updated to show most recent activity (user visibility)
845 // - window_end: only advances when current time exceeds it (health scheduling)
846
847 // If we're past the current window, advance to the window containing `now`
848 newWindowEnd := *monitor.WindowEnd
849 if !now.Before(*monitor.WindowEnd) {
850 // Record history for the completed window before advancing
851 // This captures that the old window was healthy (had activity)
852 if err := handler.DBPool.InsertPipeMonitorHistory(monitor.ID, monitor.WindowDur, monitor.WindowEnd, monitor.LastPing); err != nil {
853 handler.Logger.Error("failed to insert monitor history", "err", err, "topic", topic)
854 }
855
856 // Calculate which window period `now` falls into
857 elapsed := now.Sub(*monitor.WindowEnd)
858 periods := int(elapsed/monitor.WindowDur) + 1
859 newWindowEnd = monitor.WindowEnd.Add(time.Duration(periods) * monitor.WindowDur)
860
861 if err := handler.DBPool.UpsertPipeMonitor(cmd.user.ID, topic, monitor.WindowDur, &newWindowEnd); err != nil {
862 handler.Logger.Error("failed to advance monitor window", "err", err, "topic", topic)
863 }
864 handler.Logger.Info("advanced monitor window",
865 "topic", topic,
866 "oldWindowEnd", monitor.WindowEnd.Format(time.RFC3339),
867 "newWindowEnd", newWindowEnd.Format(time.RFC3339),
868 "periodsMissed", periods-1,
869 )
870 }
871
872 // Always record the latest ping for user visibility
873 if err := handler.DBPool.UpdatePipeMonitorLastPing(cmd.user.ID, topic, &now); err != nil {
874 handler.Logger.Error("failed to update monitor last_ping", "err", err, "topic", topic)
875 }
876
877 handler.Logger.Info("recorded monitor ping",
878 "topic", topic,
879 "pingTime", now.Format(time.RFC3339),
880 "windowEnd", newWindowEnd.Format(time.RFC3339),
881 )
882}
883
884const monitorThrottleInterval = 15 * time.Second
885
886type throttledMonitorRW struct {
887 rw io.ReadWriter
888 handler *CliHandler
889 cmd *CliCmd
890 topic string
891 lastPing atomic.Int64 // Unix nanoseconds
892}
893
894func newThrottledMonitorRW(rw io.ReadWriter, handler *CliHandler, cmd *CliCmd, topic string) *throttledMonitorRW {
895 return &throttledMonitorRW{
896 rw: rw,
897 handler: handler,
898 cmd: cmd,
899 topic: topic,
900 }
901}
902
903func (t *throttledMonitorRW) throttledUpdate() {
904 now := time.Now().UnixNano()
905 last := t.lastPing.Load()
906
907 // First ping (last == 0) or interval elapsed
908 if last == 0 || now-last >= int64(monitorThrottleInterval) {
909 if t.lastPing.CompareAndSwap(last, now) {
910 t.handler.updateMonitor(t.cmd, t.topic)
911 }
912 }
913}
914
915func (t *throttledMonitorRW) Read(p []byte) (int, error) {
916 n, err := t.rw.Read(p)
917 if n > 0 {
918 t.throttledUpdate()
919 }
920 return n, err
921}
922
923func (t *throttledMonitorRW) Write(p []byte) (int, error) {
924 n, err := t.rw.Write(p)
925 if n > 0 {
926 t.throttledUpdate()
927 }
928 return n, err
929}
930
931func (handler *CliHandler) sub(cmd *CliCmd, topic string, clientID string) error {
932 subCmd := flagSet("sub", cmd.sesh)
933 access := subCmd.String("a", "", "Comma separated list of pico usernames or ssh-key fingerprints to allow access to a topic")
934 public := subCmd.Bool("p", false, "Subscribe to a public topic")
935 keepAlive := subCmd.Bool("k", false, "Keep the subscription alive even after the publisher has died")
936 clean := subCmd.Bool("c", false, "Don't send status messages")
937
938 if !flagCheck(subCmd, topic, cmd.args) {
939 return fmt.Errorf("invalid cmd args")
940 }
941
942 if subCmd.NArg() == 1 && topic == "" {
943 topic = subCmd.Arg(0)
944 }
945
946 handler.Logger.Info(
947 "flags parsed",
948 "cmd", cmd,
949 "public", *public,
950 "keepAlive", *keepAlive,
951 "topic", topic,
952 "clean", *clean,
953 "access", *access,
954 )
955
956 var accessList []string
957
958 if *access != "" {
959 accessList = parseArgList(*access)
960 }
961
962 // Initial resolution to get the topic name for access storage
963 initialResult := resolveTopic(TopicResolveInput{
964 UserName: cmd.userName,
965 Topic: topic,
966 IsAdmin: cmd.isAdmin,
967 IsPublic: *public,
968 })
969 name := initialResult.Name
970
971 var accessListCreator bool
972
973 _, loaded := handler.Access.LoadOrStore(name, accessList)
974 if !loaded {
975 defer func() {
976 handler.Access.Delete(name)
977 }()
978 accessListCreator = true
979 }
980
981 // Check for existing access list and resolve final topic name
982 existingAccessList, hasExistingAccess := handler.Access.Load(initialResult.WithoutUser)
983 result := resolveTopic(TopicResolveInput{
984 UserName: cmd.userName,
985 Topic: topic,
986 IsAdmin: cmd.isAdmin,
987 IsPublic: *public,
988 ExistingAccessList: existingAccessList,
989 HasExistingAccess: hasExistingAccess,
990 IsAccessCreator: accessListCreator,
991 HasUserAccess: checkAccess(existingAccessList, cmd.userName, cmd.sesh),
992 })
993 name = result.Name
994
995 if result.AccessDenied {
996 return fmt.Errorf("access denied")
997 }
998
999 err := handler.PubSub.Sub(
1000 cmd.pipeCtx,
1001 clientID,
1002 cmd.sesh,
1003 []*psub.Channel{
1004 psub.NewChannel(name),
1005 },
1006 *keepAlive,
1007 )
1008
1009 if err != nil && !*clean {
1010 return err
1011 }
1012
1013 return nil
1014}
1015
1016func (handler *CliHandler) pipe(cmd *CliCmd, topic string, clientID string) error {
1017 pipeCmd := flagSet("pipe", cmd.sesh)
1018 access := pipeCmd.String("a", "", "Comma separated list of pico usernames or ssh-key fingerprints to allow access to a topic")
1019 public := pipeCmd.Bool("p", false, "Pipe to a public topic")
1020 replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
1021 clean := pipeCmd.Bool("c", false, "Don't send status messages")
1022 dispatcher := pipeCmd.String("d", "multicast", "Type of dispatcher (e.g. multicast, round_robin)")
1023
1024 if !flagCheck(pipeCmd, topic, cmd.args) {
1025 return fmt.Errorf("invalid cmd args")
1026 }
1027
1028 if pipeCmd.NArg() == 1 && topic == "" {
1029 topic = pipeCmd.Arg(0)
1030 }
1031
1032 handler.Logger.Info(
1033 "flags parsed",
1034 "cmd", cmd,
1035 "public", *public,
1036 "replay", *replay,
1037 "topic", topic,
1038 "access", *access,
1039 "clean", *clean,
1040 "dispatcher", *dispatcher,
1041 )
1042
1043 var accessList []string
1044
1045 if *access != "" {
1046 accessList = parseArgList(*access)
1047 }
1048
1049 isCreator := topic == ""
1050 if isCreator {
1051 topic = uuid.NewString()
1052 }
1053
1054 flagMsg := ""
1055 if *public {
1056 flagMsg = "-p "
1057 }
1058
1059 // Initial resolution to get the topic name for access storage
1060 initialResult := resolveTopic(TopicResolveInput{
1061 UserName: cmd.userName,
1062 Topic: topic,
1063 IsAdmin: cmd.isAdmin,
1064 IsPublic: *public,
1065 })
1066 name := initialResult.Name
1067
1068 var accessListCreator bool
1069
1070 _, loaded := handler.Access.LoadOrStore(name, accessList)
1071 if !loaded {
1072 defer func() {
1073 handler.Access.Delete(name)
1074 }()
1075 accessListCreator = true
1076 }
1077
1078 // Check for existing access list and resolve final topic name
1079 existingAccessList, hasExistingAccess := handler.Access.Load(initialResult.WithoutUser)
1080 result := resolveTopic(TopicResolveInput{
1081 UserName: cmd.userName,
1082 Topic: topic,
1083 IsAdmin: cmd.isAdmin,
1084 IsPublic: *public,
1085 ExistingAccessList: existingAccessList,
1086 HasExistingAccess: hasExistingAccess,
1087 IsAccessCreator: accessListCreator,
1088 HasUserAccess: checkAccess(existingAccessList, cmd.userName, cmd.sesh),
1089 })
1090 name = result.Name
1091
1092 if result.GenerateNewTopic {
1093 topic = uuid.NewString()
1094 name = toPublicTopic(topic)
1095 }
1096
1097 if isCreator && !*clean {
1098 fmtTopic := topic
1099 if *access != "" {
1100 fmtTopic = fmt.Sprintf("%s/%s", cmd.userName, topic)
1101 }
1102
1103 _, _ = fmt.Fprintf(
1104 cmd.sesh,
1105 "subscribe to this topic:\n ssh %s sub %s%s\n",
1106 toSshCmd(handler.Cfg),
1107 flagMsg,
1108 fmtTopic,
1109 )
1110 }
1111
1112 throttledRW := newThrottledMonitorRW(cmd.sesh, handler, cmd, name)
1113
1114 var dsp psub.MessageDispatcher
1115 dsp = &psub.MulticastDispatcher{}
1116 if *dispatcher == "round_robin" {
1117 dsp = &psub.RoundRobinDispatcher{}
1118 }
1119 channel := psub.NewChannel(name)
1120 _ = handler.PubSub.SetDispatcher(dsp, []*psub.Channel{channel})
1121
1122 readErr, writeErr := handler.PubSub.Pipe(
1123 cmd.pipeCtx,
1124 clientID,
1125 throttledRW,
1126 []*psub.Channel{
1127 psub.NewChannel(name),
1128 },
1129 *replay,
1130 )
1131
1132 if readErr != nil && !*clean {
1133 return readErr
1134 }
1135
1136 if writeErr != nil && !*clean {
1137 return writeErr
1138 }
1139
1140 handler.updateMonitor(cmd, name)
1141
1142 return nil
1143}
1144
1145func toSshCmd(cfg *shared.ConfigSite) string {
1146 port := ""
1147 if cfg.PortOverride != "22" {
1148 port = fmt.Sprintf("-p %s ", cfg.PortOverride)
1149 }
1150 return fmt.Sprintf("%s%s", port, cfg.Domain)
1151}
1152
1153// parseArgList parses a comma separated list of arguments.
1154func parseArgList(arg string) []string {
1155 argList := strings.Split(arg, ",")
1156 for i, acc := range argList {
1157 argList[i] = strings.TrimSpace(acc)
1158 }
1159 return argList
1160}
1161
1162// checkAccess checks if the user has access to a topic based on an access list.
1163func checkAccess(accessList []string, userName string, sesh *pssh.SSHServerConnSession) bool {
1164 for _, acc := range accessList {
1165 if acc == userName {
1166 return true
1167 }
1168
1169 if key := sesh.PublicKey(); key != nil && acc == gossh.FingerprintSHA256(key) {
1170 return true
1171 }
1172 }
1173
1174 return false
1175}
1176
1177func flagSet(cmdName string, sesh *pssh.SSHServerConnSession) *flag.FlagSet {
1178 cmd := flag.NewFlagSet(cmdName, flag.ContinueOnError)
1179 cmd.SetOutput(sesh)
1180 cmd.Usage = func() {
1181 _, _ = fmt.Fprintf(cmd.Output(), "Usage: %s <topic> [args...]\nArgs:\n", cmdName)
1182 cmd.PrintDefaults()
1183 }
1184 return cmd
1185}
1186
1187func flagCheck(cmd *flag.FlagSet, posArg string, cmdArgs []string) bool {
1188 err := cmd.Parse(cmdArgs)
1189
1190 if err != nil || posArg == "help" {
1191 if posArg == "help" {
1192 cmd.Usage()
1193 }
1194 return false
1195 }
1196 return true
1197}
1198
1199func clientInfo(clients []*psub.Client, isAdmin bool, clientType string) string {
1200 if len(clients) == 0 {
1201 return ""
1202 }
1203
1204 outputData := fmt.Sprintf(" %s:\r\n", clientType)
1205
1206 for _, client := range clients {
1207 if strings.HasPrefix(client.ID, "admin-") && !isAdmin {
1208 continue
1209 }
1210
1211 outputData += fmt.Sprintf(" - %s\r\n", client.ID)
1212 }
1213
1214 return outputData
1215}