repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / pipe
Eric Bower  ·  2026-01-25

api.go

  1package pipe
  2
  3import (
  4	"bufio"
  5	"bytes"
  6	"context"
  7	"errors"
  8	"fmt"
  9	"io"
 10	"net/http"
 11	"net/url"
 12	"os"
 13	"regexp"
 14	"strings"
 15	"sync"
 16	"time"
 17
 18	"github.com/google/uuid"
 19	"github.com/gorilla/websocket"
 20	"github.com/picosh/pico/pkg/db"
 21	"github.com/picosh/pico/pkg/db/postgres"
 22	"github.com/picosh/pico/pkg/shared"
 23	"github.com/picosh/pico/pkg/shared/router"
 24	"github.com/picosh/utils/pipe"
 25	"github.com/prometheus/client_golang/prometheus/promhttp"
 26)
 27
 28var (
 29	cleanRegex = regexp.MustCompile(`[^0-9a-zA-Z,/]`)
 30	sshClient  *pipe.Client
 31	upgrader   = websocket.Upgrader{
 32		CheckOrigin: func(r *http.Request) bool {
 33			return true
 34		},
 35	}
 36)
 37
 38func serveFile(file string, contentType string) http.HandlerFunc {
 39	return func(w http.ResponseWriter, r *http.Request) {
 40		logger := router.GetLogger(r)
 41		cfg := router.GetCfg(r)
 42
 43		contents, err := os.ReadFile(cfg.StaticPath(fmt.Sprintf("public/%s", file)))
 44		if err != nil {
 45			logger.Error("could not read statis file", "err", err.Error())
 46			http.Error(w, "file not found", 404)
 47		}
 48		w.Header().Add("Content-Type", contentType)
 49
 50		_, err = w.Write(contents)
 51		if err != nil {
 52			logger.Error("could not write static file", "err", err.Error())
 53			http.Error(w, "server error", http.StatusInternalServerError)
 54		}
 55	}
 56}
 57
 58func createStaticRoutes() []router.Route {
 59	return []router.Route{
 60		router.NewRoute("GET", "/main.css", serveFile("main.css", "text/css")),
 61		router.NewRoute("GET", "/smol.css", serveFile("smol.css", "text/css")),
 62		router.NewRoute("GET", "/syntax.css", serveFile("syntax.css", "text/css")),
 63		router.NewRoute("GET", "/card.png", serveFile("card.png", "image/png")),
 64		router.NewRoute("GET", "/favicon-16x16.png", serveFile("favicon-16x16.png", "image/png")),
 65		router.NewRoute("GET", "/favicon-32x32.png", serveFile("favicon-32x32.png", "image/png")),
 66		router.NewRoute("GET", "/apple-touch-icon.png", serveFile("apple-touch-icon.png", "image/png")),
 67		router.NewRoute("GET", "/favicon.ico", serveFile("favicon.ico", "image/x-icon")),
 68		router.NewRoute("GET", "/robots.txt", serveFile("robots.txt", "text/plain")),
 69		router.NewRoute("GET", "/anim.js", serveFile("anim.js", "text/javascript")),
 70	}
 71}
 72
 73type writeFlusher struct {
 74	responseWriter http.ResponseWriter
 75	controller     *http.ResponseController
 76}
 77
 78func (w writeFlusher) Write(p []byte) (n int, err error) {
 79	n, err = w.responseWriter.Write(p)
 80	if err == nil {
 81		err = w.controller.Flush()
 82	}
 83	return
 84}
 85
 86var _ io.Writer = writeFlusher{}
 87
 88func handleSub(pubsub bool) http.HandlerFunc {
 89	return func(w http.ResponseWriter, r *http.Request) {
 90		logger := router.GetLogger(r)
 91
 92		clientInfo := shared.NewPicoPipeClient()
 93		topic, _ := url.PathUnescape(router.GetField(r, 0))
 94
 95		topic = cleanRegex.ReplaceAllString(topic, "")
 96
 97		logger.Info("sub", "topic", topic, "info", clientInfo, "pubsub", pubsub)
 98
 99		params := "-p"
100		if r.URL.Query().Get("persist") == "true" {
101			params += " -k"
102		}
103
104		if accessList := r.URL.Query().Get("access"); accessList != "" {
105			logger.Info("adding access list", "topic", topic, "info", clientInfo, "access", accessList)
106			cleanList := cleanRegex.ReplaceAllString(accessList, "")
107			params += fmt.Sprintf(" -a=%s", cleanList)
108		}
109
110		id := uuid.NewString()
111
112		p, err := sshClient.AddSession(id, fmt.Sprintf("sub %s %s", params, topic), 0, -1, -1)
113		if err != nil {
114			logger.Error("sub error", "topic", topic, "info", clientInfo, "err", err.Error())
115			http.Error(w, "server error", http.StatusInternalServerError)
116			return
117		}
118
119		go func() {
120			<-r.Context().Done()
121			err := sshClient.RemoveSession(id)
122			if err != nil {
123				logger.Error("sub remove error", "topic", topic, "info", clientInfo, "err", err.Error())
124			}
125		}()
126
127		if mime := r.URL.Query().Get("mime"); mime != "" {
128			w.Header().Add("Content-Type", r.URL.Query().Get("mime"))
129		}
130
131		w.WriteHeader(http.StatusOK)
132
133		_, err = io.Copy(writeFlusher{w, http.NewResponseController(w)}, p)
134		if err != nil {
135			logger.Error("sub copy error", "topic", topic, "info", clientInfo, "err", err.Error())
136			return
137		}
138	}
139}
140
141func handlePub(pubsub bool) http.HandlerFunc {
142	return func(w http.ResponseWriter, r *http.Request) {
143		logger := router.GetLogger(r)
144
145		clientInfo := shared.NewPicoPipeClient()
146		topic, _ := url.PathUnescape(router.GetField(r, 0))
147
148		topic = cleanRegex.ReplaceAllString(topic, "")
149
150		logger.Info("pub", "topic", topic, "info", clientInfo)
151
152		params := "-p"
153		if pubsub {
154			params += " -b=false"
155		}
156
157		if accessList := r.URL.Query().Get("access"); accessList != "" {
158			logger.Info("adding access list", "topic", topic, "info", clientInfo, "access", accessList)
159			cleanList := cleanRegex.ReplaceAllString(accessList, "")
160			params += fmt.Sprintf(" -a=%s", cleanList)
161		}
162
163		prefix := r.URL.Query().Get("prefix")
164
165		var wg sync.WaitGroup
166
167		reader := bufio.NewReaderSize(r.Body, 1)
168
169		first := make([]byte, 1)
170
171		nFirst, err := reader.Read(first)
172		if err != nil && !errors.Is(err, io.EOF) {
173			logger.Error("pub peek error", "topic", topic, "info", clientInfo, "err", err.Error())
174			http.Error(w, "server error", http.StatusInternalServerError)
175			return
176		}
177
178		if nFirst == 0 {
179			params += " -e"
180		}
181
182		id := uuid.NewString()
183
184		p, err := sshClient.AddSession(id, fmt.Sprintf("pub %s %s", params, topic), 0, -1, -1)
185		if err != nil {
186			logger.Error("pub error", "topic", topic, "info", clientInfo, "err", err.Error())
187			http.Error(w, "server error", http.StatusInternalServerError)
188			return
189		}
190
191		go func() {
192			<-r.Context().Done()
193			err := sshClient.RemoveSession(id)
194			if err != nil {
195				logger.Error("pub remove error", "topic", topic, "info", clientInfo, "err", err.Error())
196			}
197		}()
198
199		var scanErr error
200		scanStatus := http.StatusInternalServerError
201
202		wg.Add(1)
203
204		go func() {
205			defer wg.Done()
206
207			s := bufio.NewScanner(p)
208			s.Buffer(make([]byte, 32*1024), 32*1024)
209
210			for s.Scan() {
211				if s.Text() == "sending msg ..." {
212					time.Sleep(10 * time.Millisecond)
213					break
214				}
215
216				if strings.HasPrefix(s.Text(), "  ssh ") {
217					f := strings.Fields(s.Text())
218					if len(f) > 1 && f[len(f)-1] != topic {
219						scanErr = fmt.Errorf("pub is not same as used, expected `%s` and received `%s`", topic, f[len(f)-1])
220						scanStatus = http.StatusUnauthorized
221						return
222					}
223				}
224			}
225
226			if err := s.Err(); err != nil {
227				scanErr = err
228				return
229			}
230		}()
231
232		wg.Wait()
233
234		if scanErr != nil {
235			logger.Error("pub scan error", "topic", topic, "info", clientInfo, "err", scanErr.Error())
236
237			msg := "server error"
238			if scanStatus == http.StatusUnauthorized {
239				msg = "access denied"
240			}
241
242			http.Error(w, msg, scanStatus)
243			return
244		}
245
246	outer:
247		for {
248			select {
249			case <-r.Context().Done():
250				break outer
251			default:
252				messageToWrite := first
253				if prefix != "" {
254					messageToWrite = append([]byte(prefix), messageToWrite...)
255				}
256
257				n, err := p.Write(messageToWrite)
258				if err != nil {
259					logger.Error("pub write error", "topic", topic, "info", clientInfo, "err", err.Error())
260					http.Error(w, "server error", http.StatusInternalServerError)
261					return
262				}
263
264				if n > 0 {
265					break outer
266				}
267
268				time.Sleep(10 * time.Millisecond)
269			}
270		}
271
272		_, err = io.Copy(p, reader)
273		if err != nil {
274			logger.Error("pub copy error", "topic", topic, "info", clientInfo, "err", err.Error())
275			http.Error(w, "server error", http.StatusInternalServerError)
276			return
277		}
278
279		w.WriteHeader(http.StatusOK)
280
281		time.Sleep(10 * time.Millisecond)
282	}
283}
284
285func handlePipe() http.HandlerFunc {
286	return func(w http.ResponseWriter, r *http.Request) {
287		logger := router.GetLogger(r)
288
289		c, err := upgrader.Upgrade(w, r, nil)
290		if err != nil {
291			logger.Error("pipe upgrade error", "err", err.Error())
292			return
293		}
294
295		defer func() {
296			_ = c.Close()
297		}()
298
299		clientInfo := shared.NewPicoPipeClient()
300		topic, _ := url.PathUnescape(router.GetField(r, 0))
301
302		topic = cleanRegex.ReplaceAllString(topic, "")
303
304		logger.Info("pipe", "topic", topic, "info", clientInfo)
305
306		params := "-p -c"
307		if r.URL.Query().Get("status") == "true" {
308			params = params[:len(params)-3]
309		}
310
311		if r.URL.Query().Get("replay") == "true" {
312			params += " -r"
313		}
314
315		messageType := websocket.TextMessage
316		if r.URL.Query().Get("binary") == "true" {
317			messageType = websocket.BinaryMessage
318		}
319
320		if accessList := r.URL.Query().Get("access"); accessList != "" {
321			logger.Info("adding access list", "topic", topic, "info", clientInfo, "access", accessList)
322			cleanList := cleanRegex.ReplaceAllString(accessList, "")
323			params += fmt.Sprintf(" -a=%s", cleanList)
324		}
325
326		prefix := r.URL.Query().Get("prefix")
327
328		id := uuid.NewString()
329
330		p, err := sshClient.AddSession(id, fmt.Sprintf("pipe %s %s", params, topic), 0, -1, -1)
331		if err != nil {
332			logger.Error("pipe error", "topic", topic, "info", clientInfo, "err", err.Error())
333			http.Error(w, "server error", http.StatusInternalServerError)
334			return
335		}
336
337		go func() {
338			<-r.Context().Done()
339			err := sshClient.RemoveSession(id)
340			if err != nil {
341				logger.Error("pipe remove error", "topic", topic, "info", clientInfo, "err", err.Error())
342			}
343			_ = c.Close()
344		}()
345
346		var wg sync.WaitGroup
347		wg.Add(2)
348
349		go func() {
350			defer func() {
351				_ = p.Close()
352				_ = c.Close()
353				wg.Done()
354			}()
355
356			for {
357				_, message, err := c.ReadMessage()
358				if err != nil {
359					logger.Error("pipe read error", "topic", topic, "info", clientInfo, "err", err.Error())
360					break
361				}
362
363				_, err = p.Write(message)
364				if err != nil {
365					logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
366					break
367				}
368			}
369		}()
370
371		go func() {
372			defer func() {
373				_ = p.Close()
374				_ = c.Close()
375				wg.Done()
376			}()
377
378			var messageBuffer []byte
379
380			for {
381				buf := make([]byte, 32*1024)
382
383				n, err := p.Read(buf)
384				if err != nil {
385					logger.Error("pipe read error", "topic", topic, "info", clientInfo, "err", err.Error())
386					break
387				}
388
389				messageBuffer = append(messageBuffer, buf[:n]...)
390
391				if prefix != "" {
392					// Buffer and split on prefix boundaries
393					for {
394						firstIdx := bytes.Index(messageBuffer, []byte(prefix))
395						if firstIdx == -1 {
396							// No prefix found, clear buffer (shouldn't happen in normal use)
397							messageBuffer = nil
398							break
399						}
400
401						// Look for next prefix after the first one
402						secondIdx := bytes.Index(messageBuffer[firstIdx+len(prefix):], []byte(prefix))
403						if secondIdx == -1 {
404							// No complete message yet, keep buffer as is
405							break
406						}
407
408						// We have a complete message, extract and send it
409						messageToSend := messageBuffer[firstIdx : firstIdx+len(prefix)+secondIdx]
410						err = c.WriteMessage(messageType, messageToSend)
411						if err != nil {
412							logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
413							break
414						}
415
416						// Update buffer to remove sent message
417						messageBuffer = messageBuffer[firstIdx+len(prefix)+secondIdx:]
418					}
419				} else {
420					// No prefix set, send all data as-is
421					if len(messageBuffer) > 0 {
422						err = c.WriteMessage(messageType, messageBuffer)
423						if err != nil {
424							logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
425							break
426						}
427						messageBuffer = nil
428					}
429				}
430			}
431		}()
432
433		wg.Wait()
434	}
435}
436
437func rssHandler(cfg *shared.ConfigSite, dbpool db.DB) http.HandlerFunc {
438	return func(w http.ResponseWriter, r *http.Request) {
439		apiToken, _ := url.PathUnescape(router.GetField(r, 0))
440		user, err := dbpool.FindUserByToken(apiToken)
441		if err != nil {
442			cfg.Logger.Error(
443				"could not find user for token",
444				"err", err.Error(),
445				"token", apiToken,
446			)
447			http.Error(w, "invalid token", http.StatusNotFound)
448			return
449		}
450		rss, err := MonitorRss(dbpool, user, cfg.Domain)
451		if err != nil {
452			cfg.Logger.Error(
453				"error generating monitor rss feed",
454				"err", err,
455				"token", apiToken,
456			)
457			http.Error(w, "error generating monitor rss feed", http.StatusInternalServerError)
458			return
459		}
460
461		_, err = w.Write([]byte(rss))
462		if err != nil {
463			cfg.Logger.Error(
464				"error with rss response writer",
465				"err", err,
466				"token", apiToken,
467			)
468			http.Error(w, "error generating monitor rss feederror with rss response writer", http.StatusInternalServerError)
469			return
470		}
471	}
472}
473
474func createMainRoutes(staticRoutes []router.Route, cfg *shared.ConfigSite, dbpool db.DB) []router.Route {
475	routes := []router.Route{
476		router.NewRoute("GET", "/", router.CreatePageHandler("html/marketing.page.tmpl")),
477		router.NewRoute("GET", "/check", router.CheckHandler),
478		router.NewRoute("GET", "/rss/(.+)", rssHandler(cfg, dbpool)),
479		router.NewRoute("GET", "/_metrics", promhttp.Handler().ServeHTTP),
480	}
481
482	pipeRoutes := []router.Route{
483		router.NewRoute("GET", "/topic/(.+)", handleSub(false)),
484		router.NewRoute("POST", "/topic/(.+)", handlePub(false)),
485		router.NewRoute("GET", "/pubsub/(.+)", handleSub(true)),
486		router.NewRoute("POST", "/pubsub/(.+)", handlePub(true)),
487		router.NewRoute("GET", "/pipe/(.+)", handlePipe()),
488	}
489
490	for _, route := range pipeRoutes {
491		route.CorsEnabled = true
492		routes = append(routes, route)
493	}
494
495	routes = append(
496		routes,
497		staticRoutes...,
498	)
499
500	return routes
501}
502
503func StartApiServer() {
504	cfg := NewConfigSite("pipe-web")
505	db := postgres.NewDB(cfg.DbURL, cfg.Logger)
506	defer func() {
507		_ = db.Close()
508	}()
509	logger := cfg.Logger
510
511	staticRoutes := createStaticRoutes()
512
513	if cfg.Debug {
514		staticRoutes = router.CreatePProfRoutes(staticRoutes)
515	}
516
517	mainRoutes := createMainRoutes(staticRoutes, cfg, db)
518	subdomainRoutes := staticRoutes
519
520	info := shared.NewPicoPipeClient()
521
522	client, err := pipe.NewClient(context.Background(), logger.With("info", info), info)
523	if err != nil {
524		panic(err)
525	}
526
527	sshClient = client
528
529	pingSession, err := sshClient.AddSession("ping", "pub -b=false -c ping", 0, -1, -1)
530	if err != nil {
531		panic(err)
532	}
533
534	go func() {
535		for {
536			_, err := fmt.Fprintf(pingSession, "%s: pipe-web ping\n", time.Now().UTC().Format(time.RFC3339))
537			if err != nil {
538				logger.Error("pipe ping error", "err", err.Error())
539			}
540
541			time.Sleep(5 * time.Second)
542		}
543	}()
544
545	apiConfig := &router.ApiConfig{
546		Cfg:    cfg,
547		Dbpool: db,
548	}
549	handler := router.CreateServe(mainRoutes, subdomainRoutes, apiConfig)
550	router := http.HandlerFunc(handler)
551
552	portStr := fmt.Sprintf(":%s", cfg.Port)
553	logger.Info(
554		"Starting server on port",
555		"port", cfg.Port,
556		"domain", cfg.Domain,
557	)
558
559	logger.Error("listen", "err", http.ListenAndServe(portStr, router).Error())
560}