repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
b2aa9b3
parent
2c02fb4
author
Eric Bower
date
2026-01-08 15:43:06 -0500 EST
feat(pipe): monitors

This change introduces pipe monitors: a way to receive status updates on
your pipes.
8 files changed,  +1686, -20
M Makefile
+2, -1
 1@@ -143,10 +143,11 @@ migrate:
 2 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20250410_add_index_analytics_visits_host_list.sql
 3 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20250418_add_project_post_idx_analytics.sql
 4 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20251217_add_access_logs_table.sql
 5+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20251226_add_pipe_monitoring.sql
 6 .PHONY: migrate
 7 
 8 latest:
 9-	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20251217_add_access_logs_table.sql
10+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20251226_add_pipe_monitoring.sql
11 .PHONY: latest
12 
13 psql:
M pkg/apps/pipe/cli.go
+432, -9
  1@@ -3,16 +3,21 @@ package pipe
  2 import (
  3 	"bytes"
  4 	"context"
  5+	"database/sql"
  6+	"errors"
  7 	"flag"
  8 	"fmt"
  9 	"io"
 10 	"log/slog"
 11 	"slices"
 12 	"strings"
 13+	"sync/atomic"
 14+	"text/tabwriter"
 15 	"time"
 16 
 17 	"github.com/antoniomika/syncmap"
 18 	"github.com/google/uuid"
 19+	"github.com/gorilla/feeds"
 20 	"github.com/picosh/pico/pkg/db"
 21 	"github.com/picosh/pico/pkg/pssh"
 22 	"github.com/picosh/pico/pkg/shared"
 23@@ -58,6 +63,7 @@ func Middleware(handler *CliHandler) pssh.SSHServerMiddleware {
 24 				isAdmin:  isAdmin,
 25 				pipeCtx:  pipeCtx,
 26 				cancel:   cancel,
 27+				user:     user,
 28 			}
 29 
 30 			cmd := strings.TrimSpace(args[0])
 31@@ -68,6 +74,28 @@ func Middleware(handler *CliHandler) pssh.SSHServerMiddleware {
 32 			case "ls":
 33 				err := handler.ls(cliCmd)
 34 				if err != nil {
 35+					logger.Error("ls cmd", "err", err)
 36+					sesh.Fatal(err)
 37+				}
 38+				return next(sesh)
 39+			case "monitor":
 40+				err := handler.monitor(cliCmd, user)
 41+				if err != nil {
 42+					logger.Error("monitor cmd", "err", err)
 43+					sesh.Fatal(err)
 44+				}
 45+				return next(sesh)
 46+			case "status":
 47+				err := handler.status(cliCmd, user)
 48+				if err != nil {
 49+					logger.Error("status cmd", "err", err)
 50+					sesh.Fatal(err)
 51+				}
 52+				return next(sesh)
 53+			case "rss":
 54+				err := handler.rss(cliCmd, user)
 55+				if err != nil {
 56+					logger.Error("rss cmd", "err", err)
 57 					sesh.Fatal(err)
 58 				}
 59 				return next(sesh)
 60@@ -122,16 +150,25 @@ func Middleware(handler *CliHandler) pssh.SSHServerMiddleware {
 61 			case "pub":
 62 				err := handler.pub(cliCmd, topic, clientID)
 63 				if err != nil {
 64+					logger.Error("pub cmd", "err", err)
 65 					sesh.Fatal(err)
 66 				}
 67 			case "sub":
 68 				err := handler.sub(cliCmd, topic, clientID)
 69 				if err != nil {
 70+					logger.Error("sub cmd", "err", err)
 71 					sesh.Fatal(err)
 72 				}
 73 			case "pipe":
 74 				err := handler.pipe(cliCmd, topic, clientID)
 75 				if err != nil {
 76+					logger.Error("pipe cmd", "err", err)
 77+					sesh.Fatal(err)
 78+				}
 79+			case "uptime":
 80+				err := handler.uptime(cliCmd, topic, user)
 81+				if err != nil {
 82+					logger.Error("uptime cmd", "err", err)
 83 					sesh.Fatal(err)
 84 				}
 85 			}
 86@@ -161,10 +198,11 @@ type CliCmd struct {
 87 	isAdmin  bool
 88 	pipeCtx  context.Context
 89 	cancel   context.CancelFunc
 90+	user     *db.User
 91 }
 92 
 93 func help(cfg *shared.ConfigSite, sesh *pssh.SSHServerConnSession) {
 94-	data := fmt.Sprintf(`Command: ssh %s <help | ls | pub | sub | pipe> <topic> [-h | args...]
 95+	data := fmt.Sprintf(`Command: ssh %s <command> [args...]
 96 
 97 The simplest authenticated pubsub system.  Send messages through
 98 user-defined topics.  Topics are private to the authenticated
 99@@ -175,13 +213,22 @@ at least one event to be sent or received. Pipe ("pipe") allows
100 for bidirectional messages to be sent between any clients connected
101 to a pipe.
102 
103-Think of these different commands in terms of the direction the
104-data is being sent:
105+Commands:
106+  help                        Show this help message
107+  ls                          List active pubsub channels
108+  pub <topic> [flags]         Publish messages to a topic
109+  sub <topic> [flags]         Subscribe to messages from a topic
110+  pipe <topic> [flags]        Bidirectional messaging between clients
111+
112+Monitoring commands:
113+  monitor <topic> <duration>  Create/update a health monitor for a topic
114+  monitor <topic> -d          Delete a monitor
115+  status                      Show health status of all monitors
116+  uptime                      Show uptime for a topic
117+  rss                         Get RSS feed of monitor alerts
118 
119-- pub => writes to client
120-- sub => reads from client
121-- pipe => read and write between clients
122-`, toSshCmd(cfg))
123+Use "ssh %s <command> -h" for help on a specific command.
124+`, toSshCmd(cfg), toSshCmd(cfg))
125 
126 	data = strings.ReplaceAll(data, "\n", "\r\n")
127 	_, _ = fmt.Fprintln(sesh, data)
128@@ -274,6 +321,272 @@ func (handler *CliHandler) ls(cmd *CliCmd) error {
129 	return nil
130 }
131 
132+func (handler *CliHandler) monitor(cmd *CliCmd, user *db.User) error {
133+	if user == nil {
134+		return fmt.Errorf("access denied")
135+	}
136+
137+	args := cmd.sesh.Command()
138+	topic := ""
139+	cmdArgs := args[1:]
140+	if len(args) > 1 && !strings.HasPrefix(args[1], "-") {
141+		topic = strings.TrimSpace(args[1])
142+		cmdArgs = args[2:]
143+	}
144+
145+	monitorCmd := flagSet("monitor", cmd.sesh)
146+	del := monitorCmd.Bool("d", false, "Delete the monitor")
147+
148+	if !flagCheck(monitorCmd, topic, cmdArgs) {
149+		return nil
150+	}
151+
152+	if topic == "" {
153+		_, _ = fmt.Fprintln(cmd.sesh, "Usage: monitor <topic> <duration>")
154+		_, _ = fmt.Fprintln(cmd.sesh, "       monitor <topic> -d")
155+		return fmt.Errorf("topic is required")
156+	}
157+
158+	// Resolve to fully qualified topic name
159+	result := resolveTopic(TopicResolveInput{
160+		UserName: cmd.userName,
161+		Topic:    topic,
162+		IsAdmin:  cmd.isAdmin,
163+		IsPublic: false,
164+	})
165+	resolvedTopic := result.Name
166+
167+	if *del {
168+		handler.Logger.Info("removing pipe monitor", "topic", resolvedTopic)
169+		err := handler.DBPool.RemovePipeMonitor(user.ID, resolvedTopic)
170+		if err != nil {
171+			return fmt.Errorf("failed to delete monitor: %w", err)
172+		}
173+		_, _ = fmt.Fprintf(cmd.sesh, "monitor deleted: %s\r\n", resolvedTopic)
174+		return nil
175+	}
176+
177+	// Create/update monitor - need duration argument
178+	durStr := ""
179+	if monitorCmd.NArg() > 0 {
180+		durStr = monitorCmd.Arg(0)
181+	} else if len(cmdArgs) > 0 {
182+		durStr = cmdArgs[0]
183+	}
184+
185+	if durStr == "" {
186+		_, _ = fmt.Fprintln(cmd.sesh, "Usage: monitor <topic> <duration>")
187+		return fmt.Errorf("duration is required")
188+	}
189+
190+	dur, err := time.ParseDuration(durStr)
191+	if err != nil {
192+		return fmt.Errorf("invalid duration %q: %w", durStr, err)
193+	}
194+
195+	winEnd := time.Now().UTC().Add(dur)
196+	handler.Logger.Info(
197+		"upserting pipe monitor",
198+		"topic", resolvedTopic,
199+		"dur", dur,
200+		"window", winEnd.UTC().Format(time.RFC3339),
201+	)
202+	err = handler.DBPool.UpsertPipeMonitor(user.ID, resolvedTopic, dur, &winEnd)
203+	if err != nil {
204+		return fmt.Errorf("failed to create monitor: %w", err)
205+	}
206+
207+	_, _ = fmt.Fprintf(cmd.sesh, "monitor created: %s (window: %s)\r\n", resolvedTopic, dur)
208+	return nil
209+}
210+
211+func (handler *CliHandler) status(cmd *CliCmd, user *db.User) error {
212+	if user == nil {
213+		return fmt.Errorf("access denied")
214+	}
215+
216+	monitors, err := handler.DBPool.FindPipeMonitorsByUser(user.ID)
217+	if err != nil {
218+		return fmt.Errorf("failed to fetch monitors: %w", err)
219+	}
220+
221+	if len(monitors) == 0 {
222+		_, _ = fmt.Fprintln(cmd.sesh, "no monitors found")
223+		return nil
224+	}
225+
226+	writer := tabwriter.NewWriter(cmd.sesh, 0, 0, 2, ' ', tabwriter.TabIndent)
227+	_, _ = fmt.Fprintln(writer, "Topic\tStatus\tWindow\tLast Ping\tWindow End\tReason")
228+
229+	for _, m := range monitors {
230+		status := "healthy"
231+		reason := ""
232+		if err := m.Status(); err != nil {
233+			status = "unhealthy"
234+			reason = err.Error()
235+		}
236+
237+		lastPing := "never"
238+		if m.LastPing != nil {
239+			lastPing = m.LastPing.UTC().Format(time.RFC3339)
240+		}
241+
242+		windowEnd := ""
243+		if m.WindowEnd != nil {
244+			windowEnd = m.WindowEnd.UTC().Format(time.RFC3339)
245+		}
246+
247+		_, _ = fmt.Fprintf(
248+			writer,
249+			"%s\t%s\t%s\t%s\t%s\t%s\r\n",
250+			m.Topic,
251+			status,
252+			m.WindowDur.String(),
253+			lastPing,
254+			windowEnd,
255+			reason,
256+		)
257+	}
258+	_ = writer.Flush()
259+	return nil
260+}
261+
262+func (handler *CliHandler) uptime(cmd *CliCmd, topic string, user *db.User) error {
263+	if user == nil {
264+		return fmt.Errorf("access denied")
265+	}
266+
267+	if topic == "" {
268+		_, _ = fmt.Fprintln(cmd.sesh, "usage: uptime <topic> [--from <time>] [--to <time>]")
269+		_, _ = fmt.Fprintln(cmd.sesh, "  --from: start time (RFC3339 or duration like '24h', '7d', default: 24h)")
270+		_, _ = fmt.Fprintln(cmd.sesh, "  --to: end time (RFC3339, default: now)")
271+		return nil
272+	}
273+
274+	fs := flag.NewFlagSet("uptime", flag.ContinueOnError)
275+	fs.SetOutput(cmd.sesh)
276+	fromStr := fs.String("from", "", "start time (RFC3339 or duration like '24h', '7d')")
277+	toStr := fs.String("to", "", "end time (RFC3339, defaults to now)")
278+
279+	if err := fs.Parse(cmd.args); err != nil {
280+		return nil
281+	}
282+
283+	topicResult := resolveTopic(TopicResolveInput{
284+		UserName: cmd.userName,
285+		Topic:    topic,
286+		IsAdmin:  cmd.isAdmin,
287+		IsPublic: false,
288+	})
289+	resolvedTopic := topicResult.Name
290+
291+	monitor, err := handler.DBPool.FindPipeMonitorByTopic(user.ID, resolvedTopic)
292+	if err != nil {
293+		if errors.Is(err, sql.ErrNoRows) {
294+			return fmt.Errorf("monitor not found: %s", topic)
295+		}
296+		return fmt.Errorf("failed to find monitor: %w", err)
297+	}
298+
299+	now := time.Now().UTC()
300+	to := now
301+	from := now.Add(-24 * time.Hour)
302+
303+	if *fromStr != "" {
304+		if parsed, err := time.Parse(time.RFC3339, *fromStr); err == nil {
305+			from = parsed.UTC()
306+		} else if dur, err := parseDuration(*fromStr); err == nil {
307+			from = now.Add(-dur)
308+		} else {
309+			return fmt.Errorf("invalid --from value: %s", *fromStr)
310+		}
311+	}
312+
313+	if *toStr != "" {
314+		if parsed, err := time.Parse(time.RFC3339, *toStr); err == nil {
315+			to = parsed.UTC()
316+		} else {
317+			return fmt.Errorf("invalid --to value: %s", *toStr)
318+		}
319+	}
320+
321+	history, err := handler.DBPool.FindPipeMonitorHistory(monitor.ID, from, to)
322+	if err != nil {
323+		return fmt.Errorf("failed to fetch history: %w", err)
324+	}
325+
326+	result := db.ComputeUptime(history, from, to)
327+
328+	_, _ = fmt.Fprintf(cmd.sesh, "Monitor: %s\r\n", topic)
329+	_, _ = fmt.Fprintf(cmd.sesh, "Period: %s to %s\r\n", from.Format(time.RFC3339), to.Format(time.RFC3339))
330+	_, _ = fmt.Fprintf(cmd.sesh, "Total Duration: %s\r\n", result.TotalDuration.Round(time.Second))
331+	_, _ = fmt.Fprintf(cmd.sesh, "Uptime Duration: %s\r\n", result.UptimeDuration.Round(time.Second))
332+	_, _ = fmt.Fprintf(cmd.sesh, "Uptime: %.2f%%\r\n", result.UptimePercent)
333+
334+	return nil
335+}
336+
337+func parseDuration(s string) (time.Duration, error) {
338+	if len(s) == 0 {
339+		return 0, fmt.Errorf("empty duration")
340+	}
341+	last := s[len(s)-1]
342+	if last == 'd' {
343+		var n int
344+		_, err := fmt.Sscanf(s, "%d", &n)
345+		if err != nil {
346+			return 0, fmt.Errorf("invalid duration: %s", s)
347+		}
348+		return time.Duration(n) * 24 * time.Hour, nil
349+	}
350+	return time.ParseDuration(s)
351+}
352+
353+func (handler *CliHandler) rss(cmd *CliCmd, user *db.User) error {
354+	if user == nil {
355+		return fmt.Errorf("access denied")
356+	}
357+
358+	monitors, err := handler.DBPool.FindPipeMonitorsByUser(user.ID)
359+	if err != nil {
360+		return fmt.Errorf("failed to fetch monitors: %w", err)
361+	}
362+
363+	now := time.Now()
364+	feed := &feeds.Feed{
365+		Title:       fmt.Sprintf("Pipe Monitors for %s", user.Name),
366+		Link:        &feeds.Link{Href: fmt.Sprintf("https://%s", handler.Cfg.Domain)},
367+		Description: "Alerts for pipe monitor status changes",
368+		Author:      &feeds.Author{Name: user.Name},
369+		Created:     now,
370+	}
371+
372+	var feedItems []*feeds.Item
373+	for _, m := range monitors {
374+		if err := m.Status(); err != nil {
375+			item := &feeds.Item{
376+				Id:          fmt.Sprintf("%s-%s-%d", user.ID, m.Topic, now.Unix()),
377+				Title:       fmt.Sprintf("ALERT: %s is unhealthy", m.Topic),
378+				Link:        &feeds.Link{Href: fmt.Sprintf("https://%s", handler.Cfg.Domain)},
379+				Description: err.Error(),
380+				Created:     now,
381+				Updated:     now,
382+				Author:      &feeds.Author{Name: user.Name},
383+			}
384+			feedItems = append(feedItems, item)
385+		}
386+	}
387+	feed.Items = feedItems
388+
389+	rss, err := feed.ToRss()
390+	if err != nil {
391+		return fmt.Errorf("failed to generate RSS: %w", err)
392+	}
393+
394+	_, _ = fmt.Fprint(cmd.sesh, rss)
395+	return nil
396+}
397+
398 func (handler *CliHandler) pub(cmd *CliCmd, topic string, clientID string) error {
399 	pubCmd := flagSet("pub", cmd.sesh)
400 	access := pubCmd.String("a", "", "Comma separated list of pico usernames or ssh-key fingerprints to allow access to a topic")
401@@ -475,10 +788,12 @@ func (handler *CliHandler) pub(cmd *CliCmd, topic string, clientID string) error
402 		_, _ = fmt.Fprintln(cmd.sesh, "sending msg ...")
403 	}
404 
405+	throttledRW := newThrottledMonitorRW(rw, handler, cmd, name)
406+
407 	err := handler.PubSub.Pub(
408 		cmd.pipeCtx,
409 		clientID,
410-		rw,
411+		throttledRW,
412 		[]*psub.Channel{
413 			psub.NewChannel(name),
414 		},
415@@ -493,9 +808,113 @@ func (handler *CliHandler) pub(cmd *CliCmd, topic string, clientID string) error
416 		return err
417 	}
418 
419+	handler.updateMonitor(cmd, name)
420+
421 	return nil
422 }
423 
424+func (handler *CliHandler) updateMonitor(cmd *CliCmd, topic string) {
425+	if cmd.user == nil {
426+		return
427+	}
428+
429+	handler.Logger.Info("update monitor", "topic", topic)
430+	monitor, err := handler.DBPool.FindPipeMonitorByTopic(cmd.user.ID, topic)
431+	if err != nil || monitor == nil {
432+		handler.Logger.Info("no monitor found", "topic", topic)
433+		return
434+	}
435+
436+	now := time.Now().UTC()
437+
438+	// Fixed window semantics: windows are discrete, non-overlapping time slots.
439+	// - last_ping: always updated to show most recent activity (user visibility)
440+	// - window_end: only advances when current time exceeds it (health scheduling)
441+
442+	// If we're past the current window, advance to the window containing `now`
443+	newWindowEnd := *monitor.WindowEnd
444+	if !now.Before(*monitor.WindowEnd) {
445+		// Record history for the completed window before advancing
446+		// This captures that the old window was healthy (had activity)
447+		if err := handler.DBPool.InsertPipeMonitorHistory(monitor.ID, monitor.WindowDur, monitor.WindowEnd, monitor.LastPing); err != nil {
448+			handler.Logger.Error("failed to insert monitor history", "err", err, "topic", topic)
449+		}
450+
451+		// Calculate which window period `now` falls into
452+		elapsed := now.Sub(*monitor.WindowEnd)
453+		periods := int(elapsed/monitor.WindowDur) + 1
454+		newWindowEnd = monitor.WindowEnd.Add(time.Duration(periods) * monitor.WindowDur)
455+
456+		if err := handler.DBPool.UpsertPipeMonitor(cmd.user.ID, topic, monitor.WindowDur, &newWindowEnd); err != nil {
457+			handler.Logger.Error("failed to advance monitor window", "err", err, "topic", topic)
458+		}
459+		handler.Logger.Info("advanced monitor window",
460+			"topic", topic,
461+			"oldWindowEnd", monitor.WindowEnd.Format(time.RFC3339),
462+			"newWindowEnd", newWindowEnd.Format(time.RFC3339),
463+			"periodsMissed", periods-1,
464+		)
465+	}
466+
467+	// Always record the latest ping for user visibility
468+	if err := handler.DBPool.UpdatePipeMonitorLastPing(cmd.user.ID, topic, &now); err != nil {
469+		handler.Logger.Error("failed to update monitor last_ping", "err", err, "topic", topic)
470+	}
471+
472+	handler.Logger.Info("recorded monitor ping",
473+		"topic", topic,
474+		"pingTime", now.Format(time.RFC3339),
475+		"windowEnd", newWindowEnd.Format(time.RFC3339),
476+	)
477+}
478+
479+const monitorThrottleInterval = 15 * time.Second
480+
481+type throttledMonitorRW struct {
482+	rw       io.ReadWriter
483+	handler  *CliHandler
484+	cmd      *CliCmd
485+	topic    string
486+	lastPing atomic.Int64 // Unix nanoseconds
487+}
488+
489+func newThrottledMonitorRW(rw io.ReadWriter, handler *CliHandler, cmd *CliCmd, topic string) *throttledMonitorRW {
490+	return &throttledMonitorRW{
491+		rw:      rw,
492+		handler: handler,
493+		cmd:     cmd,
494+		topic:   topic,
495+	}
496+}
497+
498+func (t *throttledMonitorRW) throttledUpdate() {
499+	now := time.Now().UnixNano()
500+	last := t.lastPing.Load()
501+
502+	// First ping (last == 0) or interval elapsed
503+	if last == 0 || now-last >= int64(monitorThrottleInterval) {
504+		if t.lastPing.CompareAndSwap(last, now) {
505+			t.handler.updateMonitor(t.cmd, t.topic)
506+		}
507+	}
508+}
509+
510+func (t *throttledMonitorRW) Read(p []byte) (int, error) {
511+	n, err := t.rw.Read(p)
512+	if n > 0 {
513+		t.throttledUpdate()
514+	}
515+	return n, err
516+}
517+
518+func (t *throttledMonitorRW) Write(p []byte) (int, error) {
519+	n, err := t.rw.Write(p)
520+	if n > 0 {
521+		t.throttledUpdate()
522+	}
523+	return n, err
524+}
525+
526 func (handler *CliHandler) sub(cmd *CliCmd, topic string, clientID string) error {
527 	subCmd := flagSet("sub", cmd.sesh)
528 	access := subCmd.String("a", "", "Comma separated list of pico usernames or ssh-key fingerprints to allow access to a topic")
529@@ -675,10 +1094,12 @@ func (handler *CliHandler) pipe(cmd *CliCmd, topic string, clientID string) erro
530 		)
531 	}
532 
533+	throttledRW := newThrottledMonitorRW(cmd.sesh, handler, cmd, name)
534+
535 	readErr, writeErr := handler.PubSub.Pipe(
536 		cmd.pipeCtx,
537 		clientID,
538-		cmd.sesh,
539+		throttledRW,
540 		[]*psub.Channel{
541 			psub.NewChannel(name),
542 		},
543@@ -693,6 +1114,8 @@ func (handler *CliHandler) pipe(cmd *CliCmd, topic string, clientID string) erro
544 		return writeErr
545 	}
546 
547+	handler.updateMonitor(cmd, name)
548+
549 	return nil
550 }
551 
M pkg/apps/pipe/ssh_test.go
+817, -9
  1@@ -25,9 +25,10 @@ import (
  2 
  3 type TestDB struct {
  4 	*stub.StubDB
  5-	Users    []*db.User
  6-	Pubkeys  []*db.PublicKey
  7-	Features []*db.FeatureFlag
  8+	Users        []*db.User
  9+	Pubkeys      []*db.PublicKey
 10+	Features     []*db.FeatureFlag
 11+	PipeMonitors []*db.PipeMonitor
 12 }
 13 
 14 func NewTestDB(logger *slog.Logger) *TestDB {
 15@@ -96,10 +97,83 @@ func (t *TestDB) AddPubkey(pubkey *db.PublicKey) {
 16 	t.Pubkeys = append(t.Pubkeys, pubkey)
 17 }
 18 
 19+func (t *TestDB) UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error {
 20+	for _, m := range t.PipeMonitors {
 21+		if m.UserId == userID && m.Topic == topic {
 22+			m.WindowDur = dur
 23+			m.WindowEnd = winEnd
 24+			now := time.Now()
 25+			m.UpdatedAt = &now
 26+			return nil
 27+		}
 28+	}
 29+	now := time.Now()
 30+	t.PipeMonitors = append(t.PipeMonitors, &db.PipeMonitor{
 31+		ID:        fmt.Sprintf("monitor-%s-%s", userID, topic),
 32+		UserId:    userID,
 33+		Topic:     topic,
 34+		WindowDur: dur,
 35+		WindowEnd: winEnd,
 36+		CreatedAt: &now,
 37+		UpdatedAt: &now,
 38+	})
 39+	return nil
 40+}
 41+
 42+func (t *TestDB) UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error {
 43+	for _, m := range t.PipeMonitors {
 44+		if m.UserId == userID && m.Topic == topic {
 45+			m.LastPing = lastPing
 46+			now := time.Now()
 47+			m.UpdatedAt = &now
 48+			return nil
 49+		}
 50+	}
 51+	return fmt.Errorf("monitor not found")
 52+}
 53+
 54+func (t *TestDB) RemovePipeMonitor(userID, topic string) error {
 55+	for i, m := range t.PipeMonitors {
 56+		if m.UserId == userID && m.Topic == topic {
 57+			t.PipeMonitors = append(t.PipeMonitors[:i], t.PipeMonitors[i+1:]...)
 58+			return nil
 59+		}
 60+	}
 61+	return fmt.Errorf("monitor not found")
 62+}
 63+
 64+func (t *TestDB) FindPipeMonitorByTopic(userID, topic string) (*db.PipeMonitor, error) {
 65+	for _, m := range t.PipeMonitors {
 66+		if m.UserId == userID && m.Topic == topic {
 67+			return m, nil
 68+		}
 69+	}
 70+	return nil, fmt.Errorf("monitor not found")
 71+}
 72+
 73+func (t *TestDB) FindPipeMonitorsByUser(userID string) ([]*db.PipeMonitor, error) {
 74+	var monitors []*db.PipeMonitor
 75+	for _, m := range t.PipeMonitors {
 76+		if m.UserId == userID {
 77+			monitors = append(monitors, m)
 78+		}
 79+	}
 80+	return monitors, nil
 81+}
 82+
 83+func (t *TestDB) InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error {
 84+	return nil
 85+}
 86+
 87+func (t *TestDB) FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*db.PipeMonitorHistory, error) {
 88+	return nil, nil
 89+}
 90+
 91 type TestSSHServer struct {
 92-	Cfg    *shared.ConfigSite
 93-	DBPool *TestDB
 94-	Cancel context.CancelFunc
 95+	Cfg         *shared.ConfigSite
 96+	DBPool      *TestDB
 97+	PipeHandler *CliHandler
 98+	Cancel      context.CancelFunc
 99 }
100 
101 func NewTestSSHServer(t *testing.T) *TestSSHServer {
102@@ -178,9 +252,10 @@ func NewTestSSHServer(t *testing.T) *TestSSHServer {
103 	time.Sleep(100 * time.Millisecond)
104 
105 	return &TestSSHServer{
106-		Cfg:    cfg,
107-		DBPool: dbpool,
108-		Cancel: cancel,
109+		Cfg:         cfg,
110+		DBPool:      dbpool,
111+		PipeHandler: handler,
112+		Cancel:      cancel,
113 	}
114 }
115 
116@@ -1393,3 +1468,736 @@ func TestPubSub_MultipleSubscribers(t *testing.T) {
117 		t.Errorf("subscriber 3 did not receive message, got: %q", string(received3[:n3]))
118 	}
119 }
120+
121+// Monitor CLI Tests
122+
123+func TestMonitor_UnauthenticatedUserDenied(t *testing.T) {
124+	server := NewTestSSHServer(t)
125+	defer server.Shutdown()
126+
127+	user := GenerateUser("anonymous")
128+
129+	client, err := user.NewClient()
130+	if err != nil {
131+		t.Fatalf("failed to connect: %v", err)
132+	}
133+	defer func() { _ = client.Close() }()
134+
135+	output, err := user.RunCommand(client, "monitor my-service 1h")
136+	if err != nil {
137+		t.Logf("command error (expected): %v", err)
138+	}
139+
140+	if !strings.Contains(output, "access denied") {
141+		t.Errorf("expected 'access denied', got: %s", output)
142+	}
143+}
144+
145+func TestMonitor_CreateMonitor(t *testing.T) {
146+	server := NewTestSSHServer(t)
147+	defer server.Shutdown()
148+
149+	user := GenerateUser("alice")
150+	RegisterUserWithServer(server, user)
151+
152+	client, err := user.NewClient()
153+	if err != nil {
154+		t.Fatalf("failed to connect: %v", err)
155+	}
156+	defer func() { _ = client.Close() }()
157+
158+	output, err := user.RunCommand(client, "monitor pico-uptime 24h")
159+	if err != nil {
160+		t.Logf("command completed: %v", err)
161+	}
162+
163+	if strings.Contains(output, "access denied") {
164+		t.Errorf("authenticated user should not get access denied, got: %s", output)
165+	}
166+
167+	// Verify monitor was created in DB (topic is stored with user prefix)
168+	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/pico-uptime")
169+	if err != nil {
170+		t.Fatalf("monitor should exist in DB: %v", err)
171+	}
172+
173+	if monitor.WindowDur != 24*time.Hour {
174+		t.Errorf("expected window duration 24h, got: %v", monitor.WindowDur)
175+	}
176+
177+	if !strings.Contains(output, "alice/pico-uptime") || !strings.Contains(output, "24h") {
178+		t.Errorf("output should confirm monitor creation, got: %s", output)
179+	}
180+}
181+
182+func TestMonitor_UpdateMonitor(t *testing.T) {
183+	server := NewTestSSHServer(t)
184+	defer server.Shutdown()
185+
186+	user := GenerateUser("alice")
187+	RegisterUserWithServer(server, user)
188+
189+	client, err := user.NewClient()
190+	if err != nil {
191+		t.Fatalf("failed to connect: %v", err)
192+	}
193+	defer func() { _ = client.Close() }()
194+
195+	// Create initial monitor
196+	_, err = user.RunCommand(client, "monitor my-cron 1h")
197+	if err != nil {
198+		t.Logf("create command completed: %v", err)
199+	}
200+
201+	// Upsert with new duration
202+	output, err := user.RunCommand(client, "monitor my-cron 6h")
203+	if err != nil {
204+		t.Logf("update command completed: %v", err)
205+	}
206+
207+	// Verify monitor was updated (topic is stored with user prefix)
208+	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/my-cron")
209+	if err != nil {
210+		t.Fatalf("monitor should exist in DB: %v", err)
211+	}
212+
213+	if monitor.WindowDur != 6*time.Hour {
214+		t.Errorf("expected window duration 6h after update, got: %v", monitor.WindowDur)
215+	}
216+
217+	if !strings.Contains(output, "6h") {
218+		t.Errorf("output should confirm updated duration, got: %s", output)
219+	}
220+}
221+
222+func TestMonitor_DeleteMonitor(t *testing.T) {
223+	server := NewTestSSHServer(t)
224+	defer server.Shutdown()
225+
226+	user := GenerateUser("alice")
227+	RegisterUserWithServer(server, user)
228+
229+	client, err := user.NewClient()
230+	if err != nil {
231+		t.Fatalf("failed to connect: %v", err)
232+	}
233+	defer func() { _ = client.Close() }()
234+
235+	// Create monitor first
236+	_, err = user.RunCommand(client, "monitor to-delete 1h")
237+	if err != nil {
238+		t.Logf("create command completed: %v", err)
239+	}
240+
241+	// Verify it exists (topic is stored with user prefix)
242+	_, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/to-delete")
243+	if err != nil {
244+		t.Fatalf("monitor should exist before deletion: %v", err)
245+	}
246+
247+	// Delete it
248+	output, err := user.RunCommand(client, "monitor to-delete -d")
249+	if err != nil {
250+		t.Logf("delete command completed: %v", err)
251+	}
252+
253+	// Verify it's gone (topic is stored with user prefix)
254+	_, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/to-delete")
255+	if err == nil {
256+		t.Errorf("monitor should be deleted from DB")
257+	}
258+
259+	if !strings.Contains(output, "deleted") && !strings.Contains(output, "removed") {
260+		t.Logf("output should confirm deletion, got: %s", output)
261+	}
262+}
263+
264+func TestMonitor_InvalidDuration(t *testing.T) {
265+	server := NewTestSSHServer(t)
266+	defer server.Shutdown()
267+
268+	user := GenerateUser("alice")
269+	RegisterUserWithServer(server, user)
270+
271+	client, err := user.NewClient()
272+	if err != nil {
273+		t.Fatalf("failed to connect: %v", err)
274+	}
275+	defer func() { _ = client.Close() }()
276+
277+	output, err := user.RunCommand(client, "monitor my-service invaliduration")
278+	if err != nil {
279+		t.Logf("command error (expected): %v", err)
280+	}
281+
282+	if !strings.Contains(output, "invalid") && !strings.Contains(output, "duration") && !strings.Contains(output, "error") {
283+		t.Errorf("expected error about invalid duration, got: %s", output)
284+	}
285+}
286+
287+func TestMonitor_MissingTopic(t *testing.T) {
288+	server := NewTestSSHServer(t)
289+	defer server.Shutdown()
290+
291+	user := GenerateUser("alice")
292+	RegisterUserWithServer(server, user)
293+
294+	client, err := user.NewClient()
295+	if err != nil {
296+		t.Fatalf("failed to connect: %v", err)
297+	}
298+	defer func() { _ = client.Close() }()
299+
300+	output, err := user.RunCommand(client, "monitor")
301+	if err != nil {
302+		t.Logf("command error (expected): %v", err)
303+	}
304+
305+	// Should show usage or error about missing topic
306+	if !strings.Contains(output, "Usage") && !strings.Contains(output, "topic") && !strings.Contains(output, "error") {
307+		t.Errorf("expected usage info or error about missing topic, got: %s", output)
308+	}
309+}
310+
311+// Status CLI Tests
312+
313+func TestStatus_UnauthenticatedUserDenied(t *testing.T) {
314+	server := NewTestSSHServer(t)
315+	defer server.Shutdown()
316+
317+	user := GenerateUser("anonymous")
318+
319+	client, err := user.NewClient()
320+	if err != nil {
321+		t.Fatalf("failed to connect: %v", err)
322+	}
323+	defer func() { _ = client.Close() }()
324+
325+	output, err := user.RunCommand(client, "status")
326+	if err != nil {
327+		t.Logf("command error (expected): %v", err)
328+	}
329+
330+	if !strings.Contains(output, "access denied") {
331+		t.Errorf("expected 'access denied', got: %s", output)
332+	}
333+}
334+
335+func TestStatus_NoMonitors(t *testing.T) {
336+	server := NewTestSSHServer(t)
337+	defer server.Shutdown()
338+
339+	user := GenerateUser("alice")
340+	RegisterUserWithServer(server, user)
341+
342+	client, err := user.NewClient()
343+	if err != nil {
344+		t.Fatalf("failed to connect: %v", err)
345+	}
346+	defer func() { _ = client.Close() }()
347+
348+	output, err := user.RunCommand(client, "status")
349+	if err != nil {
350+		t.Logf("command completed: %v", err)
351+	}
352+
353+	if !strings.Contains(output, "no monitors") && !strings.Contains(output, "empty") {
354+		t.Errorf("expected message about no monitors, got: %s", output)
355+	}
356+}
357+
358+func TestStatus_ShowsMonitorStatus(t *testing.T) {
359+	server := NewTestSSHServer(t)
360+	defer server.Shutdown()
361+
362+	user := GenerateUser("alice")
363+	RegisterUserWithServer(server, user)
364+
365+	client, err := user.NewClient()
366+	if err != nil {
367+		t.Fatalf("failed to connect: %v", err)
368+	}
369+	defer func() { _ = client.Close() }()
370+
371+	// Create a monitor
372+	_, err = user.RunCommand(client, "monitor web-check 1h")
373+	if err != nil {
374+		t.Logf("create monitor completed: %v", err)
375+	}
376+
377+	// Check status
378+	output, err := user.RunCommand(client, "status")
379+	if err != nil {
380+		t.Logf("status command completed: %v", err)
381+	}
382+
383+	if !strings.Contains(output, "web-check") {
384+		t.Errorf("status should list the monitor topic, got: %s", output)
385+	}
386+}
387+
388+func TestStatus_ShowsHealthyUnhealthy(t *testing.T) {
389+	server := NewTestSSHServer(t)
390+	defer server.Shutdown()
391+
392+	user := GenerateUser("alice")
393+	RegisterUserWithServer(server, user)
394+
395+	// Create monitors directly in DB with different states
396+	now := time.Now()
397+	windowEnd := now.Add(1 * time.Hour)
398+	recentPing := now.Add(-30 * time.Minute) // within window - healthy
399+	oldPing := now.Add(-2 * time.Hour)       // outside window - unhealthy
400+
401+	_ = server.DBPool.UpsertPipeMonitor("alice-id", "healthy-service", 1*time.Hour, &windowEnd)
402+	_ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "healthy-service", &recentPing)
403+
404+	_ = server.DBPool.UpsertPipeMonitor("alice-id", "unhealthy-service", 1*time.Hour, &windowEnd)
405+	_ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "unhealthy-service", &oldPing)
406+
407+	client, err := user.NewClient()
408+	if err != nil {
409+		t.Fatalf("failed to connect: %v", err)
410+	}
411+	defer func() { _ = client.Close() }()
412+
413+	output, err := user.RunCommand(client, "status")
414+	if err != nil {
415+		t.Logf("status command completed: %v", err)
416+	}
417+
418+	if !strings.Contains(output, "healthy-service") {
419+		t.Errorf("status should list healthy-service, got: %s", output)
420+	}
421+
422+	if !strings.Contains(output, "unhealthy-service") {
423+		t.Errorf("status should list unhealthy-service, got: %s", output)
424+	}
425+
426+	// Should indicate different health states
427+	if !strings.Contains(strings.ToLower(output), "healthy") && !strings.Contains(strings.ToLower(output), "ok") && !strings.Contains(output, "✓") {
428+		t.Logf("status output should indicate health state: %s", output)
429+	}
430+}
431+
432+// RSS CLI Tests
433+
434+func TestRss_UnauthenticatedUserDenied(t *testing.T) {
435+	server := NewTestSSHServer(t)
436+	defer server.Shutdown()
437+
438+	user := GenerateUser("anonymous")
439+
440+	client, err := user.NewClient()
441+	if err != nil {
442+		t.Fatalf("failed to connect: %v", err)
443+	}
444+	defer func() { _ = client.Close() }()
445+
446+	output, err := user.RunCommand(client, "rss")
447+	if err != nil {
448+		t.Logf("command error (expected): %v", err)
449+	}
450+
451+	if !strings.Contains(output, "access denied") {
452+		t.Errorf("expected 'access denied', got: %s", output)
453+	}
454+}
455+
456+func TestRss_GeneratesValidRSS(t *testing.T) {
457+	server := NewTestSSHServer(t)
458+	defer server.Shutdown()
459+
460+	user := GenerateUser("alice")
461+	RegisterUserWithServer(server, user)
462+
463+	// Create a monitor
464+	now := time.Now()
465+	windowEnd := now.Add(1 * time.Hour)
466+	_ = server.DBPool.UpsertPipeMonitor("alice-id", "rss-test-service", 1*time.Hour, &windowEnd)
467+
468+	client, err := user.NewClient()
469+	if err != nil {
470+		t.Fatalf("failed to connect: %v", err)
471+	}
472+	defer func() { _ = client.Close() }()
473+
474+	output, err := user.RunCommand(client, "rss")
475+	if err != nil {
476+		t.Logf("rss command completed: %v", err)
477+	}
478+
479+	// Should output valid RSS XML
480+	if !strings.Contains(output, "<?xml") || !strings.Contains(output, "<rss") {
481+		t.Errorf("expected RSS XML output, got: %s", output)
482+	}
483+
484+	if !strings.Contains(output, "rss-test-service") {
485+		t.Errorf("RSS should contain monitor topic, got: %s", output)
486+	}
487+}
488+
489+func TestRss_AlertsOnStaleMonitor(t *testing.T) {
490+	server := NewTestSSHServer(t)
491+	defer server.Shutdown()
492+
493+	user := GenerateUser("alice")
494+	RegisterUserWithServer(server, user)
495+
496+	// Create a stale monitor (last ping outside window)
497+	now := time.Now()
498+	windowEnd := now.Add(-30 * time.Minute) // window already ended
499+	oldPing := now.Add(-2 * time.Hour)
500+
501+	_ = server.DBPool.UpsertPipeMonitor("alice-id", "stale-service", 1*time.Hour, &windowEnd)
502+	_ = server.DBPool.UpdatePipeMonitorLastPing("alice-id", "stale-service", &oldPing)
503+
504+	client, err := user.NewClient()
505+	if err != nil {
506+		t.Fatalf("failed to connect: %v", err)
507+	}
508+	defer func() { _ = client.Close() }()
509+
510+	output, err := user.RunCommand(client, "rss")
511+	if err != nil {
512+		t.Logf("rss command completed: %v", err)
513+	}
514+
515+	// Should contain alert item for stale service
516+	if !strings.Contains(output, "stale-service") {
517+		t.Errorf("RSS should contain stale-service alert, got: %s", output)
518+	}
519+
520+	// Should have item element for the alert
521+	if !strings.Contains(output, "<item>") {
522+		t.Errorf("RSS should contain item element for alert, got: %s", output)
523+	}
524+}
525+
526+// Pub integration with Monitor
527+
528+func TestPub_UpdatesMonitorLastPing(t *testing.T) {
529+	server := NewTestSSHServer(t)
530+	defer server.Shutdown()
531+
532+	user := GenerateUser("alice")
533+	RegisterUserWithServer(server, user)
534+
535+	// Create a monitor first (topic is stored with user prefix)
536+	now := time.Now()
537+	windowEnd := now.Add(1 * time.Hour)
538+	_ = server.DBPool.UpsertPipeMonitor("alice-id", "alice/ping-test", 1*time.Hour, &windowEnd)
539+
540+	subClient, err := user.NewClient()
541+	if err != nil {
542+		t.Fatalf("failed to connect subscriber: %v", err)
543+	}
544+	defer func() { _ = subClient.Close() }()
545+
546+	pubClient, err := user.NewClient()
547+	if err != nil {
548+		t.Fatalf("failed to connect publisher: %v", err)
549+	}
550+	defer func() { _ = pubClient.Close() }()
551+
552+	// Start subscriber
553+	subSession, err := subClient.NewSession()
554+	if err != nil {
555+		t.Fatalf("failed to create sub session: %v", err)
556+	}
557+	defer func() { _ = subSession.Close() }()
558+
559+	if err := subSession.Start("sub ping-test -c"); err != nil {
560+		t.Fatalf("failed to start sub: %v", err)
561+	}
562+
563+	time.Sleep(100 * time.Millisecond)
564+
565+	// Publish to the monitored topic
566+	_, err = user.RunCommandWithStdin(pubClient, "pub ping-test -c", "health check")
567+	if err != nil {
568+		t.Logf("pub command completed: %v", err)
569+	}
570+
571+	// Verify last_ping was updated (topic is stored with user prefix)
572+	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/ping-test")
573+	if err != nil {
574+		t.Fatalf("monitor should exist: %v", err)
575+	}
576+
577+	if monitor.LastPing == nil {
578+		t.Errorf("last_ping should be set after pub")
579+	} else if time.Since(*monitor.LastPing) > 5*time.Second {
580+		t.Errorf("last_ping should be recent, got: %v", monitor.LastPing)
581+	}
582+}
583+
584+// Tests for monitor status edge cases
585+
586+func TestStatus_PingAtExactWindowStart(t *testing.T) {
587+	// Bug fix: Status() should use >= for windowStart comparison
588+	// A ping exactly at windowStart should be healthy
589+	now := time.Now().UTC()
590+	windowEnd := now.Add(1 * time.Hour)
591+	windowStart := windowEnd.Add(-1 * time.Hour) // equals now
592+
593+	monitor := &db.PipeMonitor{
594+		LastPing:  &windowStart, // ping exactly at window start
595+		WindowEnd: &windowEnd,
596+		WindowDur: 1 * time.Hour,
597+	}
598+
599+	err := monitor.Status()
600+	if err != nil {
601+		t.Errorf("ping at exact window start should be healthy, got: %v", err)
602+	}
603+}
604+
605+func TestStatus_WindowExpired(t *testing.T) {
606+	// Bug fix: Status() should check if current time is past windowEnd
607+	now := time.Now().UTC()
608+	windowEnd := now.Add(-1 * time.Minute) // window ended 1 minute ago
609+	lastPing := now.Add(-30 * time.Second) // ping was 30 seconds ago
610+
611+	monitor := &db.PipeMonitor{
612+		LastPing:  &lastPing,
613+		WindowEnd: &windowEnd,
614+		WindowDur: 1 * time.Hour,
615+	}
616+
617+	err := monitor.Status()
618+	if err == nil {
619+		t.Error("expired window should be unhealthy")
620+	}
621+	if !strings.Contains(err.Error(), "window expired") {
622+		t.Errorf("error should mention window expired, got: %v", err)
623+	}
624+}
625+
626+func TestStatus_PingResetsWindow(t *testing.T) {
627+	// Bug fix: Every ping should reset window to now + duration
628+	server := NewTestSSHServer(t)
629+	defer server.Shutdown()
630+
631+	user := GenerateUser("alice")
632+	RegisterUserWithServer(server, user)
633+
634+	// Create a monitor with an expired window
635+	expiredWindowEnd := time.Now().UTC().Add(-10 * time.Minute)
636+	_ = server.DBPool.UpsertPipeMonitor("alice-id", "alice/reset-test", 5*time.Minute, &expiredWindowEnd)
637+
638+	client, err := user.NewClient()
639+	if err != nil {
640+		t.Fatalf("failed to connect: %v", err)
641+	}
642+	defer func() { _ = client.Close() }()
643+
644+	// Start a subscriber first so pub doesn't block
645+	subClient, err := user.NewClient()
646+	if err != nil {
647+		t.Fatalf("failed to connect subscriber: %v", err)
648+	}
649+	defer func() { _ = subClient.Close() }()
650+
651+	subSession, err := subClient.NewSession()
652+	if err != nil {
653+		t.Fatalf("failed to create sub session: %v", err)
654+	}
655+	defer func() { _ = subSession.Close() }()
656+
657+	if err := subSession.Start("sub reset-test -c"); err != nil {
658+		t.Fatalf("failed to start sub: %v", err)
659+	}
660+
661+	time.Sleep(100 * time.Millisecond)
662+
663+	// Pub to trigger monitor update
664+	_, err = user.RunCommandWithStdin(client, "pub reset-test -c", "ping")
665+	if err != nil {
666+		t.Logf("pub command completed: %v", err)
667+	}
668+
669+	// Check that window was reset
670+	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/reset-test")
671+	if err != nil {
672+		t.Fatalf("monitor should exist: %v", err)
673+	}
674+
675+	if monitor.WindowEnd == nil {
676+		t.Fatal("window_end should be set")
677+	}
678+
679+	// Window end should now be in the future
680+	if !monitor.WindowEnd.After(time.Now().UTC()) {
681+		t.Errorf("window_end should be in the future after ping, got: %v", monitor.WindowEnd)
682+	}
683+}
684+
685+func TestStatus_HealthyImmediatelyAfterPing(t *testing.T) {
686+	// Bug fix: After a ping, status should immediately show healthy
687+	server := NewTestSSHServer(t)
688+	defer server.Shutdown()
689+
690+	user := GenerateUser("alice")
691+	RegisterUserWithServer(server, user)
692+
693+	client, err := user.NewClient()
694+	if err != nil {
695+		t.Fatalf("failed to connect: %v", err)
696+	}
697+	defer func() { _ = client.Close() }()
698+
699+	// Create monitor
700+	_, err = user.RunCommand(client, "monitor health-test 5m")
701+	if err != nil {
702+		t.Fatalf("failed to create monitor: %v", err)
703+	}
704+
705+	// Start subscriber
706+	subClient, err := user.NewClient()
707+	if err != nil {
708+		t.Fatalf("failed to connect subscriber: %v", err)
709+	}
710+	defer func() { _ = subClient.Close() }()
711+
712+	subSession, err := subClient.NewSession()
713+	if err != nil {
714+		t.Fatalf("failed to create sub session: %v", err)
715+	}
716+	defer func() { _ = subSession.Close() }()
717+
718+	if err := subSession.Start("sub health-test -c"); err != nil {
719+		t.Fatalf("failed to start sub: %v", err)
720+	}
721+
722+	time.Sleep(100 * time.Millisecond)
723+
724+	// Pub to trigger ping
725+	pubClient, err := user.NewClient()
726+	if err != nil {
727+		t.Fatalf("failed to connect publisher: %v", err)
728+	}
729+	defer func() { _ = pubClient.Close() }()
730+
731+	_, err = user.RunCommandWithStdin(pubClient, "pub health-test -c", "ping")
732+	if err != nil {
733+		t.Logf("pub completed: %v", err)
734+	}
735+
736+	// Immediately check status
737+	statusClient, err := user.NewClient()
738+	if err != nil {
739+		t.Fatalf("failed to connect for status: %v", err)
740+	}
741+	defer func() { _ = statusClient.Close() }()
742+
743+	output, err := user.RunCommand(statusClient, "status")
744+	if err != nil {
745+		t.Logf("status completed: %v", err)
746+	}
747+
748+	if strings.Contains(output, "unhealthy") {
749+		t.Errorf("status should be healthy immediately after ping, got: %s", output)
750+	}
751+	if !strings.Contains(output, "healthy") {
752+		t.Errorf("status should show healthy, got: %s", output)
753+	}
754+}
755+
756+// TestMonitor_FixedWindowNonSliding verifies that pings within the same window
757+// do not slide the window forward. This is a regression test for a bug where
758+// each ping reset window_end to now+dur, creating a sliding window that never fails.
759+//
760+// Expected behavior:
761+//   - last_ping: always updated to show most recent activity (user visibility).
762+//   - window_end: only advances when current time exceeds it (health scheduling).
763+func TestMonitor_FixedWindowNonSliding(t *testing.T) {
764+	server := NewTestSSHServer(t)
765+	defer server.Shutdown()
766+
767+	user := GenerateUser("alice")
768+	RegisterUserWithServer(server, user)
769+
770+	client, err := user.NewClient()
771+	if err != nil {
772+		t.Fatalf("failed to connect: %v", err)
773+	}
774+	defer func() { _ = client.Close() }()
775+
776+	// Create a monitor with 1 hour window
777+	_, err = user.RunCommand(client, "monitor fixed-window-test 1h")
778+	if err != nil {
779+		t.Logf("create command completed: %v", err)
780+	}
781+
782+	// Get the initial window_end
783+	monitor, err := server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
784+	if err != nil {
785+		t.Fatalf("monitor should exist: %v", err)
786+	}
787+	initialWindowEnd := *monitor.WindowEnd
788+
789+	// Simulate a ping by calling updateMonitor directly
790+	handler := server.PipeHandler
791+
792+	// Create a mock CliCmd
793+	mockUser := &db.User{ID: "alice-id", Name: "alice"}
794+	cmd := &CliCmd{
795+		userName: "alice",
796+		user:     mockUser,
797+	}
798+
799+	// First ping - should record last_ping but NOT change window_end
800+	handler.updateMonitor(cmd, "alice/fixed-window-test")
801+
802+	monitor, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
803+	if err != nil {
804+		t.Fatalf("monitor should exist after first ping: %v", err)
805+	}
806+
807+	if monitor.LastPing == nil {
808+		t.Fatalf("last_ping should be set after first ping")
809+	}
810+	firstPingTime := *monitor.LastPing
811+	windowEndAfterFirstPing := *monitor.WindowEnd
812+
813+	// BUG CHECK: With the bug, window_end would have slid forward to now+1h
814+	// With the fix, window_end should remain at the original scheduled time
815+	if !windowEndAfterFirstPing.Equal(initialWindowEnd) {
816+		t.Errorf("BUG DETECTED: window_end should NOT change after first ping within window\n"+
817+			"initial window_end: %v\n"+
818+			"window_end after ping: %v\n"+
819+			"Window slid forward by: %v",
820+			initialWindowEnd.Format(time.RFC3339),
821+			windowEndAfterFirstPing.Format(time.RFC3339),
822+			windowEndAfterFirstPing.Sub(initialWindowEnd))
823+	}
824+
825+	// Second ping - last_ping SHOULD be updated (for user visibility)
826+	// but window_end should NOT change
827+	time.Sleep(10 * time.Millisecond) // Small delay to get different timestamp
828+	handler.updateMonitor(cmd, "alice/fixed-window-test")
829+
830+	monitor, err = server.DBPool.FindPipeMonitorByTopic("alice-id", "alice/fixed-window-test")
831+	if err != nil {
832+		t.Fatalf("monitor should exist after second ping: %v", err)
833+	}
834+
835+	// last_ping SHOULD be updated to show most recent activity
836+	if monitor.LastPing.Equal(firstPingTime) {
837+		t.Errorf("last_ping SHOULD be updated for user visibility\n"+
838+			"first ping time: %v\n"+
839+			"last_ping after second call: %v",
840+			firstPingTime.Format(time.RFC3339Nano),
841+			monitor.LastPing.Format(time.RFC3339Nano))
842+	}
843+
844+	// But window_end should still be the original value (not sliding)
845+	if !monitor.WindowEnd.Equal(initialWindowEnd) {
846+		t.Errorf("BUG DETECTED: window_end should remain at original value\n"+
847+			"initial: %v\n"+
848+			"current: %v",
849+			initialWindowEnd.Format(time.RFC3339),
850+			monitor.WindowEnd.Format(time.RFC3339))
851+	}
852+}
M pkg/db/db.go
+141, -0
  1@@ -5,6 +5,7 @@ import (
  2 	"database/sql/driver"
  3 	"encoding/json"
  4 	"errors"
  5+	"fmt"
  6 	"regexp"
  7 	"time"
  8 )
  9@@ -378,6 +379,137 @@ type TunsEventLog struct {
 10 	CreatedAt      *time.Time `json:"created_at" db:"created_at"`
 11 }
 12 
 13+type PipeMonitor struct {
 14+	ID        string        `json:"id" db:"id"`
 15+	UserId    string        `json:"user_id" db:"user_id"`
 16+	Topic     string        `json:"topic" db:"topic"`
 17+	WindowDur time.Duration `json:"window_dur" db:"window_dur"`
 18+	WindowEnd *time.Time    `json:"window_end" db:"window_end"`
 19+	LastPing  *time.Time    `json:"last_ping" db:"last_ping"`
 20+	CreatedAt *time.Time    `json:"created_at" db:"created_at"`
 21+	UpdatedAt *time.Time    `json:"updated_at" db:"updated_at"`
 22+}
 23+
 24+type PipeMonitorHistory struct {
 25+	ID        string        `json:"id" db:"id"`
 26+	MonitorID string        `json:"monitor_id" db:"monitor_id"`
 27+	WindowDur time.Duration `json:"window_dur" db:"window_dur"`
 28+	WindowEnd *time.Time    `json:"window_end" db:"window_end"`
 29+	LastPing  *time.Time    `json:"last_ping" db:"last_ping"`
 30+	CreatedAt *time.Time    `json:"created_at" db:"created_at"`
 31+	UpdatedAt *time.Time    `json:"updated_at" db:"updated_at"`
 32+}
 33+
 34+type UptimeResult struct {
 35+	TotalDuration  time.Duration
 36+	UptimeDuration time.Duration
 37+	UptimePercent  float64
 38+}
 39+
 40+func ComputeUptime(history []*PipeMonitorHistory, from, to time.Time) UptimeResult {
 41+	totalDuration := to.Sub(from)
 42+	if totalDuration <= 0 {
 43+		return UptimeResult{}
 44+	}
 45+
 46+	if len(history) == 0 {
 47+		return UptimeResult{TotalDuration: totalDuration}
 48+	}
 49+
 50+	type interval struct {
 51+		start, end time.Time
 52+	}
 53+
 54+	var intervals []interval
 55+	for _, h := range history {
 56+		if h.WindowEnd == nil {
 57+			continue
 58+		}
 59+		windowStart := h.WindowEnd.Add(-h.WindowDur)
 60+		windowEnd := *h.WindowEnd
 61+
 62+		if windowStart.Before(from) {
 63+			windowStart = from
 64+		}
 65+		if windowEnd.After(to) {
 66+			windowEnd = to
 67+		}
 68+
 69+		if windowStart.Before(windowEnd) {
 70+			intervals = append(intervals, interval{start: windowStart, end: windowEnd})
 71+		}
 72+	}
 73+
 74+	if len(intervals) == 0 {
 75+		return UptimeResult{TotalDuration: totalDuration}
 76+	}
 77+
 78+	// Sort by start time
 79+	for i := range intervals {
 80+		for j := i + 1; j < len(intervals); j++ {
 81+			if intervals[j].start.Before(intervals[i].start) {
 82+				intervals[i], intervals[j] = intervals[j], intervals[i]
 83+			}
 84+		}
 85+	}
 86+
 87+	// Merge overlapping intervals
 88+	merged := []interval{intervals[0]}
 89+	for _, curr := range intervals[1:] {
 90+		last := &merged[len(merged)-1]
 91+		if !curr.start.After(last.end) {
 92+			if curr.end.After(last.end) {
 93+				last.end = curr.end
 94+			}
 95+		} else {
 96+			merged = append(merged, curr)
 97+		}
 98+	}
 99+
100+	var uptimeDuration time.Duration
101+	for _, iv := range merged {
102+		uptimeDuration += iv.end.Sub(iv.start)
103+	}
104+
105+	uptimePercent := float64(uptimeDuration) / float64(totalDuration) * 100
106+
107+	return UptimeResult{
108+		TotalDuration:  totalDuration,
109+		UptimeDuration: uptimeDuration,
110+		UptimePercent:  uptimePercent,
111+	}
112+}
113+
114+func (m *PipeMonitor) Status() error {
115+	if m.LastPing == nil {
116+		return fmt.Errorf("no ping received yet")
117+	}
118+	if m.WindowEnd == nil {
119+		return fmt.Errorf("window end not set")
120+	}
121+	now := time.Now().UTC()
122+	if now.After(*m.WindowEnd) {
123+		return fmt.Errorf(
124+			"window expired at %s",
125+			m.WindowEnd.UTC().Format("2006-01-02 15:04:05Z"),
126+		)
127+	}
128+	windowStart := m.WindowEnd.Add(-m.WindowDur)
129+	lastPingAfterStart := !m.LastPing.Before(windowStart)
130+	if !lastPingAfterStart {
131+		return fmt.Errorf(
132+			"last ping before window start: %s",
133+			windowStart.UTC().Format("2006-01-02 15:04:05Z"),
134+		)
135+	}
136+	return nil
137+}
138+
139+func (m *PipeMonitor) GetNextWindow() *time.Time {
140+	win := m.WindowEnd.Add(m.WindowDur)
141+	return &win
142+}
143+
144 var NameValidator = regexp.MustCompile("^[a-zA-Z0-9]{1,50}$")
145 var DenyList = []string{
146 	"admin",
147@@ -468,5 +600,14 @@ type DB interface {
148 	FindPubkeysInAccessLogs(userID string) ([]string, error)
149 	FindAccessLogsByPubkey(pubkey string, fromDate *time.Time) ([]*AccessLog, error)
150 
151+	UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error
152+	UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error
153+	RemovePipeMonitor(userID, topic string) error
154+	FindPipeMonitorByTopic(userID, topic string) (*PipeMonitor, error)
155+	FindPipeMonitorsByUser(userID string) ([]*PipeMonitor, error)
156+
157+	InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error
158+	FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*PipeMonitorHistory, error)
159+
160 	Close() error
161 }
M pkg/db/postgres/storage.go
+73, -0
 1@@ -1519,3 +1519,76 @@ func (me *PsqlDB) InsertAccessLog(log *db.AccessLog) error {
 2 	)
 3 	return err
 4 }
 5+
 6+func (me *PsqlDB) UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error {
 7+	durStr := fmt.Sprintf("%d seconds", int64(dur.Seconds()))
 8+	_, err := me.Db.Exec(
 9+		`INSERT INTO pipe_monitors (user_id, topic, window_dur, window_end)
10+		VALUES ($1, $2, $3::interval, $4)
11+		ON CONFLICT (user_id, topic) DO UPDATE SET window_dur = $3::interval, window_end = $4, updated_at = NOW();`,
12+		userID,
13+		topic,
14+		durStr,
15+		winEnd,
16+	)
17+	return err
18+}
19+
20+func (me *PsqlDB) UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error {
21+	_, err := me.Db.Exec(
22+		`UPDATE pipe_monitors SET last_ping = $3, updated_at = NOW() WHERE user_id = $1 AND topic = $2;`,
23+		userID,
24+		topic,
25+		lastPing,
26+	)
27+	return err
28+}
29+
30+func (me *PsqlDB) RemovePipeMonitor(userID, topic string) error {
31+	_, err := me.Db.Exec(
32+		`DELETE FROM pipe_monitors WHERE user_id = $1 AND topic = $2;`,
33+		userID,
34+		topic,
35+	)
36+	return err
37+}
38+
39+func (me *PsqlDB) FindPipeMonitorByTopic(userID, topic string) (*db.PipeMonitor, error) {
40+	monitor := &db.PipeMonitor{}
41+	err := me.Db.Get(monitor, `SELECT id, user_id, topic, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors WHERE user_id = $1 AND topic = $2;`, userID, topic)
42+	if err != nil {
43+		return nil, err
44+	}
45+	return monitor, nil
46+}
47+
48+func (me *PsqlDB) FindPipeMonitorsByUser(userID string) ([]*db.PipeMonitor, error) {
49+	var monitors []*db.PipeMonitor
50+	err := me.Db.Select(&monitors, `SELECT id, user_id, topic, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors WHERE user_id = $1 ORDER BY topic;`, userID)
51+	if err != nil {
52+		return nil, err
53+	}
54+	return monitors, nil
55+}
56+
57+func (me *PsqlDB) InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error {
58+	durStr := fmt.Sprintf("%d seconds", int64(windowDur.Seconds()))
59+	_, err := me.Db.Exec(
60+		`INSERT INTO pipe_monitors_history (monitor_id, window_dur, window_end, last_ping) VALUES ($1, $2::interval, $3, $4)`,
61+		monitorID, durStr, windowEnd, lastPing,
62+	)
63+	return err
64+}
65+
66+func (me *PsqlDB) FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*db.PipeMonitorHistory, error) {
67+	var history []*db.PipeMonitorHistory
68+	err := me.Db.Select(
69+		&history,
70+		`SELECT id, monitor_id, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors_history WHERE monitor_id = $1 AND last_ping <= $2 AND window_end >= $3 ORDER BY last_ping ASC`,
71+		monitorID, to, from,
72+	)
73+	if err != nil {
74+		return nil, err
75+	}
76+	return history, nil
77+}
M pkg/db/postgres/storage_test.go
+157, -1
  1@@ -169,7 +169,7 @@ func cleanupTestData(t *testing.T) {
  2 		"access_logs", "tuns_event_logs", "analytics_visits",
  3 		"feed_items", "post_aliases", "post_tags", "posts",
  4 		"projects", "feature_flags", "payment_history", "tokens",
  5-		"public_keys", "app_users",
  6+		"public_keys", "pipe_monitors", "app_users",
  7 	}
  8 	for _, table := range tables {
  9 		_, err := testDB.Db.Exec(fmt.Sprintf("DELETE FROM %s", table))
 10@@ -1472,3 +1472,159 @@ func TestPaymentHistoryData_JSONBRoundtrip(t *testing.T) {
 11 		t.Errorf("expected tx_id 'tx789', got '%s'", txId)
 12 	}
 13 }
 14+
 15+// ============ Pipe Monitor Tests ============
 16+
 17+func TestUpsertPipeMonitor(t *testing.T) {
 18+	cleanupTestData(t)
 19+
 20+	user, _ := testDB.RegisterUser("pipemonitorowner", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI pipemonitorowner", "comment")
 21+
 22+	winEnd := time.Now().Add(time.Hour)
 23+	err := testDB.UpsertPipeMonitor(user.ID, "test-topic", 5*time.Minute, &winEnd)
 24+	if err != nil {
 25+		t.Fatalf("UpsertPipeMonitor failed: %v", err)
 26+	}
 27+
 28+	monitor, err := testDB.FindPipeMonitorByTopic(user.ID, "test-topic")
 29+	if err != nil {
 30+		t.Fatalf("FindPipeMonitorByTopic failed: %v", err)
 31+	}
 32+	if monitor.Topic != "test-topic" {
 33+		t.Errorf("expected topic 'test-topic', got '%s'", monitor.Topic)
 34+	}
 35+	if monitor.WindowDur != 5*time.Minute {
 36+		t.Errorf("expected window_dur 5m, got %v", monitor.WindowDur)
 37+	}
 38+}
 39+
 40+func TestUpsertPipeMonitor_Update(t *testing.T) {
 41+	cleanupTestData(t)
 42+
 43+	user, _ := testDB.RegisterUser("pipeupdateowner", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI pipeupdateowner", "comment")
 44+
 45+	winEnd1 := time.Now().Add(time.Hour)
 46+	err := testDB.UpsertPipeMonitor(user.ID, "update-topic", 5*time.Minute, &winEnd1)
 47+	if err != nil {
 48+		t.Fatalf("first UpsertPipeMonitor failed: %v", err)
 49+	}
 50+
 51+	winEnd2 := time.Now().Add(2 * time.Hour)
 52+	err = testDB.UpsertPipeMonitor(user.ID, "update-topic", 10*time.Minute, &winEnd2)
 53+	if err != nil {
 54+		t.Fatalf("second UpsertPipeMonitor failed: %v", err)
 55+	}
 56+
 57+	monitor, err := testDB.FindPipeMonitorByTopic(user.ID, "update-topic")
 58+	if err != nil {
 59+		t.Fatalf("FindPipeMonitorByTopic failed: %v", err)
 60+	}
 61+	if monitor.WindowDur != 10*time.Minute {
 62+		t.Errorf("expected window_dur 10m after update, got %v", monitor.WindowDur)
 63+	}
 64+}
 65+
 66+func TestUpdatePipeMonitorLastPing(t *testing.T) {
 67+	cleanupTestData(t)
 68+
 69+	user, _ := testDB.RegisterUser("pipepingowner", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI pipepingowner", "comment")
 70+
 71+	winEnd := time.Now().Add(time.Hour)
 72+	err := testDB.UpsertPipeMonitor(user.ID, "ping-topic", 5*time.Minute, &winEnd)
 73+	if err != nil {
 74+		t.Fatalf("UpsertPipeMonitor failed: %v", err)
 75+	}
 76+
 77+	lastPing := time.Now()
 78+	err = testDB.UpdatePipeMonitorLastPing(user.ID, "ping-topic", &lastPing)
 79+	if err != nil {
 80+		t.Fatalf("UpdatePipeMonitorLastPing failed: %v", err)
 81+	}
 82+
 83+	monitor, err := testDB.FindPipeMonitorByTopic(user.ID, "ping-topic")
 84+	if err != nil {
 85+		t.Fatalf("FindPipeMonitorByTopic failed: %v", err)
 86+	}
 87+	if monitor.LastPing == nil {
 88+		t.Error("expected last_ping to be set, got nil")
 89+	}
 90+}
 91+
 92+func TestRemovePipeMonitor(t *testing.T) {
 93+	cleanupTestData(t)
 94+
 95+	user, _ := testDB.RegisterUser("piperemoveowner", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI piperemoveowner", "comment")
 96+
 97+	winEnd := time.Now().Add(time.Hour)
 98+	err := testDB.UpsertPipeMonitor(user.ID, "remove-topic", 5*time.Minute, &winEnd)
 99+	if err != nil {
100+		t.Fatalf("UpsertPipeMonitor failed: %v", err)
101+	}
102+
103+	err = testDB.RemovePipeMonitor(user.ID, "remove-topic")
104+	if err != nil {
105+		t.Fatalf("RemovePipeMonitor failed: %v", err)
106+	}
107+
108+	_, err = testDB.FindPipeMonitorByTopic(user.ID, "remove-topic")
109+	if err == nil {
110+		t.Error("expected error finding removed monitor, got nil")
111+	}
112+}
113+
114+func TestFindPipeMonitorByTopic_NotFound(t *testing.T) {
115+	cleanupTestData(t)
116+
117+	user, _ := testDB.RegisterUser("pipenotfoundowner", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI pipenotfoundowner", "comment")
118+
119+	_, err := testDB.FindPipeMonitorByTopic(user.ID, "nonexistent-topic")
120+	if err == nil {
121+		t.Error("expected error for nonexistent monitor, got nil")
122+	}
123+}
124+
125+func TestFindPipeMonitorsByUser(t *testing.T) {
126+	cleanupTestData(t)
127+
128+	user, _ := testDB.RegisterUser("pipemonlistowner", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI pipemonlistowner", "comment")
129+
130+	winEnd := time.Now().Add(time.Hour)
131+	_ = testDB.UpsertPipeMonitor(user.ID, "service-a", 5*time.Minute, &winEnd)
132+	_ = testDB.UpsertPipeMonitor(user.ID, "service-b", 10*time.Minute, &winEnd)
133+	_ = testDB.UpsertPipeMonitor(user.ID, "service-c", 1*time.Hour, &winEnd)
134+
135+	monitors, err := testDB.FindPipeMonitorsByUser(user.ID)
136+	if err != nil {
137+		t.Fatalf("FindPipeMonitorsByUser failed: %v", err)
138+	}
139+
140+	if len(monitors) != 3 {
141+		t.Errorf("expected 3 monitors, got %d", len(monitors))
142+	}
143+
144+	// Should be ordered by topic
145+	if monitors[0].Topic != "service-a" {
146+		t.Errorf("expected first topic 'service-a', got %s", monitors[0].Topic)
147+	}
148+	if monitors[1].Topic != "service-b" {
149+		t.Errorf("expected second topic 'service-b', got %s", monitors[1].Topic)
150+	}
151+	if monitors[2].Topic != "service-c" {
152+		t.Errorf("expected third topic 'service-c', got %s", monitors[2].Topic)
153+	}
154+}
155+
156+func TestFindPipeMonitorsByUser_Empty(t *testing.T) {
157+	cleanupTestData(t)
158+
159+	user, _ := testDB.RegisterUser("pipenomonitors", "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAI pipenomonitors", "comment")
160+
161+	monitors, err := testDB.FindPipeMonitorsByUser(user.ID)
162+	if err != nil {
163+		t.Fatalf("FindPipeMonitorsByUser failed: %v", err)
164+	}
165+
166+	if len(monitors) != 0 {
167+		t.Errorf("expected 0 monitors for user with none, got %d", len(monitors))
168+	}
169+}
M pkg/db/stub/stub.go
+28, -0
 1@@ -243,3 +243,31 @@ func (me *StubDB) FindPubkeysInAccessLogs(userID string) ([]string, error) {
 2 func (me *StubDB) InsertAccessLog(log *db.AccessLog) error {
 3 	return errNotImpl
 4 }
 5+
 6+func (me *StubDB) UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error {
 7+	return errNotImpl
 8+}
 9+
10+func (me *StubDB) UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error {
11+	return errNotImpl
12+}
13+
14+func (me *StubDB) RemovePipeMonitor(userID, topic string) error {
15+	return errNotImpl
16+}
17+
18+func (me *StubDB) FindPipeMonitorByTopic(userID, topic string) (*db.PipeMonitor, error) {
19+	return nil, errNotImpl
20+}
21+
22+func (me *StubDB) FindPipeMonitorsByUser(userID string) ([]*db.PipeMonitor, error) {
23+	return nil, errNotImpl
24+}
25+
26+func (me *StubDB) InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error {
27+	return errNotImpl
28+}
29+
30+func (me *StubDB) FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*db.PipeMonitorHistory, error) {
31+	return nil, errNotImpl
32+}
A sql/migrations/20251226_add_pipe_monitoring.sql
+36, -0
 1@@ -0,0 +1,36 @@
 2+CREATE TABLE IF NOT EXISTS pipe_monitors (
 3+  id uuid NOT NULL DEFAULT uuid_generate_v4(),
 4+  user_id uuid NOT NULL,
 5+  topic text NOT NULL,
 6+  window_dur interval NOT NULL,
 7+  window_end timestamp without time zone NOT NULL DEFAULT NOW(),
 8+  last_ping timestamp,
 9+  created_at timestamp without time zone NOT NULL DEFAULT NOW(),
10+  updated_at timestamp without time zone NOT NULL DEFAULT NOW(),
11+  CONSTRAINT pipe_monitors_unique_topic UNIQUE (user_id, topic),
12+  CONSTRAINT pipe_monitoring_pkey PRIMARY KEY (id),
13+  CONSTRAINT fk_pipe_monitoring_app_users
14+    FOREIGN KEY(user_id)
15+  REFERENCES app_users(id)
16+  ON DELETE CASCADE
17+  ON UPDATE CASCADE
18+);
19+
20+CREATE TABLE IF NOT EXISTS pipe_monitors_history (
21+  id uuid NOT NULL DEFAULT uuid_generate_v4(),
22+  monitor_id uuid NOT NULL,
23+  window_dur interval NOT NULL,
24+  window_end timestamp without time zone NOT NULL DEFAULT NOW(),
25+  last_ping timestamp,
26+  created_at timestamp without time zone NOT NULL DEFAULT NOW(),
27+  updated_at timestamp without time zone NOT NULL DEFAULT NOW(),
28+  CONSTRAINT pipe_monitor_history_pkey PRIMARY KEY (id),
29+  CONSTRAINT fk_pipe_monitor_history_pipe_monitors
30+    FOREIGN KEY(monitor_id)
31+  REFERENCES pipe_monitors(id)
32+  ON DELETE CASCADE
33+  ON UPDATE CASCADE
34+);
35+
36+CREATE INDEX IF NOT EXISTS idx_pipe_mon_hist_monitor_last_ping ON pipe_monitors_history (monitor_id, last_ping);
37+CREATE INDEX IF NOT EXISTS idx_pipe_mon_hist_monitor_window_end ON pipe_monitors_history (monitor_id, window_end);