Eric Bower
·
2026-01-25
api.go
1package pipe
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "errors"
8 "fmt"
9 "io"
10 "net/http"
11 "net/url"
12 "os"
13 "regexp"
14 "strings"
15 "sync"
16 "time"
17
18 "github.com/google/uuid"
19 "github.com/gorilla/websocket"
20 "github.com/picosh/pico/pkg/db"
21 "github.com/picosh/pico/pkg/db/postgres"
22 "github.com/picosh/pico/pkg/shared"
23 "github.com/picosh/pico/pkg/shared/router"
24 "github.com/picosh/utils/pipe"
25 "github.com/prometheus/client_golang/prometheus/promhttp"
26)
27
28var (
29 cleanRegex = regexp.MustCompile(`[^0-9a-zA-Z,/]`)
30 sshClient *pipe.Client
31 upgrader = websocket.Upgrader{
32 CheckOrigin: func(r *http.Request) bool {
33 return true
34 },
35 }
36)
37
38func serveFile(file string, contentType string) http.HandlerFunc {
39 return func(w http.ResponseWriter, r *http.Request) {
40 logger := router.GetLogger(r)
41 cfg := router.GetCfg(r)
42
43 contents, err := os.ReadFile(cfg.StaticPath(fmt.Sprintf("public/%s", file)))
44 if err != nil {
45 logger.Error("could not read statis file", "err", err.Error())
46 http.Error(w, "file not found", 404)
47 }
48 w.Header().Add("Content-Type", contentType)
49
50 _, err = w.Write(contents)
51 if err != nil {
52 logger.Error("could not write static file", "err", err.Error())
53 http.Error(w, "server error", http.StatusInternalServerError)
54 }
55 }
56}
57
58func createStaticRoutes() []router.Route {
59 return []router.Route{
60 router.NewRoute("GET", "/main.css", serveFile("main.css", "text/css")),
61 router.NewRoute("GET", "/smol.css", serveFile("smol.css", "text/css")),
62 router.NewRoute("GET", "/syntax.css", serveFile("syntax.css", "text/css")),
63 router.NewRoute("GET", "/card.png", serveFile("card.png", "image/png")),
64 router.NewRoute("GET", "/favicon-16x16.png", serveFile("favicon-16x16.png", "image/png")),
65 router.NewRoute("GET", "/favicon-32x32.png", serveFile("favicon-32x32.png", "image/png")),
66 router.NewRoute("GET", "/apple-touch-icon.png", serveFile("apple-touch-icon.png", "image/png")),
67 router.NewRoute("GET", "/favicon.ico", serveFile("favicon.ico", "image/x-icon")),
68 router.NewRoute("GET", "/robots.txt", serveFile("robots.txt", "text/plain")),
69 router.NewRoute("GET", "/anim.js", serveFile("anim.js", "text/javascript")),
70 }
71}
72
73type writeFlusher struct {
74 responseWriter http.ResponseWriter
75 controller *http.ResponseController
76}
77
78func (w writeFlusher) Write(p []byte) (n int, err error) {
79 n, err = w.responseWriter.Write(p)
80 if err == nil {
81 err = w.controller.Flush()
82 }
83 return
84}
85
86var _ io.Writer = writeFlusher{}
87
88func handleSub(pubsub bool) http.HandlerFunc {
89 return func(w http.ResponseWriter, r *http.Request) {
90 logger := router.GetLogger(r)
91
92 clientInfo := shared.NewPicoPipeClient()
93 topic, _ := url.PathUnescape(router.GetField(r, 0))
94
95 topic = cleanRegex.ReplaceAllString(topic, "")
96
97 logger.Info("sub", "topic", topic, "info", clientInfo, "pubsub", pubsub)
98
99 params := "-p"
100 if r.URL.Query().Get("persist") == "true" {
101 params += " -k"
102 }
103
104 if accessList := r.URL.Query().Get("access"); accessList != "" {
105 logger.Info("adding access list", "topic", topic, "info", clientInfo, "access", accessList)
106 cleanList := cleanRegex.ReplaceAllString(accessList, "")
107 params += fmt.Sprintf(" -a=%s", cleanList)
108 }
109
110 id := uuid.NewString()
111
112 p, err := sshClient.AddSession(id, fmt.Sprintf("sub %s %s", params, topic), 0, -1, -1)
113 if err != nil {
114 logger.Error("sub error", "topic", topic, "info", clientInfo, "err", err.Error())
115 http.Error(w, "server error", http.StatusInternalServerError)
116 return
117 }
118
119 go func() {
120 <-r.Context().Done()
121 err := sshClient.RemoveSession(id)
122 if err != nil {
123 logger.Error("sub remove error", "topic", topic, "info", clientInfo, "err", err.Error())
124 }
125 }()
126
127 if mime := r.URL.Query().Get("mime"); mime != "" {
128 w.Header().Add("Content-Type", r.URL.Query().Get("mime"))
129 }
130
131 w.WriteHeader(http.StatusOK)
132
133 _, err = io.Copy(writeFlusher{w, http.NewResponseController(w)}, p)
134 if err != nil {
135 logger.Error("sub copy error", "topic", topic, "info", clientInfo, "err", err.Error())
136 return
137 }
138 }
139}
140
141func handlePub(pubsub bool) http.HandlerFunc {
142 return func(w http.ResponseWriter, r *http.Request) {
143 logger := router.GetLogger(r)
144
145 clientInfo := shared.NewPicoPipeClient()
146 topic, _ := url.PathUnescape(router.GetField(r, 0))
147
148 topic = cleanRegex.ReplaceAllString(topic, "")
149
150 logger.Info("pub", "topic", topic, "info", clientInfo)
151
152 params := "-p"
153 if pubsub {
154 params += " -b=false"
155 }
156
157 if accessList := r.URL.Query().Get("access"); accessList != "" {
158 logger.Info("adding access list", "topic", topic, "info", clientInfo, "access", accessList)
159 cleanList := cleanRegex.ReplaceAllString(accessList, "")
160 params += fmt.Sprintf(" -a=%s", cleanList)
161 }
162
163 prefix := r.URL.Query().Get("prefix")
164
165 var wg sync.WaitGroup
166
167 reader := bufio.NewReaderSize(r.Body, 1)
168
169 first := make([]byte, 1)
170
171 nFirst, err := reader.Read(first)
172 if err != nil && !errors.Is(err, io.EOF) {
173 logger.Error("pub peek error", "topic", topic, "info", clientInfo, "err", err.Error())
174 http.Error(w, "server error", http.StatusInternalServerError)
175 return
176 }
177
178 if nFirst == 0 {
179 params += " -e"
180 }
181
182 id := uuid.NewString()
183
184 p, err := sshClient.AddSession(id, fmt.Sprintf("pub %s %s", params, topic), 0, -1, -1)
185 if err != nil {
186 logger.Error("pub error", "topic", topic, "info", clientInfo, "err", err.Error())
187 http.Error(w, "server error", http.StatusInternalServerError)
188 return
189 }
190
191 go func() {
192 <-r.Context().Done()
193 err := sshClient.RemoveSession(id)
194 if err != nil {
195 logger.Error("pub remove error", "topic", topic, "info", clientInfo, "err", err.Error())
196 }
197 }()
198
199 var scanErr error
200 scanStatus := http.StatusInternalServerError
201
202 wg.Add(1)
203
204 go func() {
205 defer wg.Done()
206
207 s := bufio.NewScanner(p)
208 s.Buffer(make([]byte, 32*1024), 32*1024)
209
210 for s.Scan() {
211 if s.Text() == "sending msg ..." {
212 time.Sleep(10 * time.Millisecond)
213 break
214 }
215
216 if strings.HasPrefix(s.Text(), " ssh ") {
217 f := strings.Fields(s.Text())
218 if len(f) > 1 && f[len(f)-1] != topic {
219 scanErr = fmt.Errorf("pub is not same as used, expected `%s` and received `%s`", topic, f[len(f)-1])
220 scanStatus = http.StatusUnauthorized
221 return
222 }
223 }
224 }
225
226 if err := s.Err(); err != nil {
227 scanErr = err
228 return
229 }
230 }()
231
232 wg.Wait()
233
234 if scanErr != nil {
235 logger.Error("pub scan error", "topic", topic, "info", clientInfo, "err", scanErr.Error())
236
237 msg := "server error"
238 if scanStatus == http.StatusUnauthorized {
239 msg = "access denied"
240 }
241
242 http.Error(w, msg, scanStatus)
243 return
244 }
245
246 outer:
247 for {
248 select {
249 case <-r.Context().Done():
250 break outer
251 default:
252 messageToWrite := first
253 if prefix != "" {
254 messageToWrite = append([]byte(prefix), messageToWrite...)
255 }
256
257 n, err := p.Write(messageToWrite)
258 if err != nil {
259 logger.Error("pub write error", "topic", topic, "info", clientInfo, "err", err.Error())
260 http.Error(w, "server error", http.StatusInternalServerError)
261 return
262 }
263
264 if n > 0 {
265 break outer
266 }
267
268 time.Sleep(10 * time.Millisecond)
269 }
270 }
271
272 _, err = io.Copy(p, reader)
273 if err != nil {
274 logger.Error("pub copy error", "topic", topic, "info", clientInfo, "err", err.Error())
275 http.Error(w, "server error", http.StatusInternalServerError)
276 return
277 }
278
279 w.WriteHeader(http.StatusOK)
280
281 time.Sleep(10 * time.Millisecond)
282 }
283}
284
285func handlePipe() http.HandlerFunc {
286 return func(w http.ResponseWriter, r *http.Request) {
287 logger := router.GetLogger(r)
288
289 c, err := upgrader.Upgrade(w, r, nil)
290 if err != nil {
291 logger.Error("pipe upgrade error", "err", err.Error())
292 return
293 }
294
295 defer func() {
296 _ = c.Close()
297 }()
298
299 clientInfo := shared.NewPicoPipeClient()
300 topic, _ := url.PathUnescape(router.GetField(r, 0))
301
302 topic = cleanRegex.ReplaceAllString(topic, "")
303
304 logger.Info("pipe", "topic", topic, "info", clientInfo)
305
306 params := "-p -c"
307 if r.URL.Query().Get("status") == "true" {
308 params = params[:len(params)-3]
309 }
310
311 if r.URL.Query().Get("replay") == "true" {
312 params += " -r"
313 }
314
315 messageType := websocket.TextMessage
316 if r.URL.Query().Get("binary") == "true" {
317 messageType = websocket.BinaryMessage
318 }
319
320 if accessList := r.URL.Query().Get("access"); accessList != "" {
321 logger.Info("adding access list", "topic", topic, "info", clientInfo, "access", accessList)
322 cleanList := cleanRegex.ReplaceAllString(accessList, "")
323 params += fmt.Sprintf(" -a=%s", cleanList)
324 }
325
326 prefix := r.URL.Query().Get("prefix")
327
328 id := uuid.NewString()
329
330 p, err := sshClient.AddSession(id, fmt.Sprintf("pipe %s %s", params, topic), 0, -1, -1)
331 if err != nil {
332 logger.Error("pipe error", "topic", topic, "info", clientInfo, "err", err.Error())
333 http.Error(w, "server error", http.StatusInternalServerError)
334 return
335 }
336
337 go func() {
338 <-r.Context().Done()
339 err := sshClient.RemoveSession(id)
340 if err != nil {
341 logger.Error("pipe remove error", "topic", topic, "info", clientInfo, "err", err.Error())
342 }
343 _ = c.Close()
344 }()
345
346 var wg sync.WaitGroup
347 wg.Add(2)
348
349 go func() {
350 defer func() {
351 _ = p.Close()
352 _ = c.Close()
353 wg.Done()
354 }()
355
356 for {
357 _, message, err := c.ReadMessage()
358 if err != nil {
359 logger.Error("pipe read error", "topic", topic, "info", clientInfo, "err", err.Error())
360 break
361 }
362
363 _, err = p.Write(message)
364 if err != nil {
365 logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
366 break
367 }
368 }
369 }()
370
371 go func() {
372 defer func() {
373 _ = p.Close()
374 _ = c.Close()
375 wg.Done()
376 }()
377
378 var messageBuffer []byte
379
380 for {
381 buf := make([]byte, 32*1024)
382
383 n, err := p.Read(buf)
384 if err != nil {
385 logger.Error("pipe read error", "topic", topic, "info", clientInfo, "err", err.Error())
386 break
387 }
388
389 messageBuffer = append(messageBuffer, buf[:n]...)
390
391 if prefix != "" {
392 // Buffer and split on prefix boundaries
393 for {
394 firstIdx := bytes.Index(messageBuffer, []byte(prefix))
395 if firstIdx == -1 {
396 // No prefix found, clear buffer (shouldn't happen in normal use)
397 messageBuffer = nil
398 break
399 }
400
401 // Look for next prefix after the first one
402 secondIdx := bytes.Index(messageBuffer[firstIdx+len(prefix):], []byte(prefix))
403 if secondIdx == -1 {
404 // No complete message yet, keep buffer as is
405 break
406 }
407
408 // We have a complete message, extract and send it
409 messageToSend := messageBuffer[firstIdx : firstIdx+len(prefix)+secondIdx]
410 err = c.WriteMessage(messageType, messageToSend)
411 if err != nil {
412 logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
413 break
414 }
415
416 // Update buffer to remove sent message
417 messageBuffer = messageBuffer[firstIdx+len(prefix)+secondIdx:]
418 }
419 } else {
420 // No prefix set, send all data as-is
421 if len(messageBuffer) > 0 {
422 err = c.WriteMessage(messageType, messageBuffer)
423 if err != nil {
424 logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
425 break
426 }
427 messageBuffer = nil
428 }
429 }
430 }
431 }()
432
433 wg.Wait()
434 }
435}
436
437func rssHandler(cfg *shared.ConfigSite, dbpool db.DB) http.HandlerFunc {
438 return func(w http.ResponseWriter, r *http.Request) {
439 apiToken, _ := url.PathUnescape(router.GetField(r, 0))
440 user, err := dbpool.FindUserByToken(apiToken)
441 if err != nil {
442 cfg.Logger.Error(
443 "could not find user for token",
444 "err", err.Error(),
445 "token", apiToken,
446 )
447 http.Error(w, "invalid token", http.StatusNotFound)
448 return
449 }
450 rss, err := MonitorRss(dbpool, user, cfg.Domain)
451 if err != nil {
452 cfg.Logger.Error(
453 "error generating monitor rss feed",
454 "err", err,
455 "token", apiToken,
456 )
457 http.Error(w, "error generating monitor rss feed", http.StatusInternalServerError)
458 return
459 }
460
461 _, err = w.Write([]byte(rss))
462 if err != nil {
463 cfg.Logger.Error(
464 "error with rss response writer",
465 "err", err,
466 "token", apiToken,
467 )
468 http.Error(w, "error generating monitor rss feederror with rss response writer", http.StatusInternalServerError)
469 return
470 }
471 }
472}
473
474func createMainRoutes(staticRoutes []router.Route, cfg *shared.ConfigSite, dbpool db.DB) []router.Route {
475 routes := []router.Route{
476 router.NewRoute("GET", "/", router.CreatePageHandler("html/marketing.page.tmpl")),
477 router.NewRoute("GET", "/check", router.CheckHandler),
478 router.NewRoute("GET", "/rss/(.+)", rssHandler(cfg, dbpool)),
479 router.NewRoute("GET", "/_metrics", promhttp.Handler().ServeHTTP),
480 }
481
482 pipeRoutes := []router.Route{
483 router.NewRoute("GET", "/topic/(.+)", handleSub(false)),
484 router.NewRoute("POST", "/topic/(.+)", handlePub(false)),
485 router.NewRoute("GET", "/pubsub/(.+)", handleSub(true)),
486 router.NewRoute("POST", "/pubsub/(.+)", handlePub(true)),
487 router.NewRoute("GET", "/pipe/(.+)", handlePipe()),
488 }
489
490 for _, route := range pipeRoutes {
491 route.CorsEnabled = true
492 routes = append(routes, route)
493 }
494
495 routes = append(
496 routes,
497 staticRoutes...,
498 )
499
500 return routes
501}
502
503func StartApiServer() {
504 cfg := NewConfigSite("pipe-web")
505 db := postgres.NewDB(cfg.DbURL, cfg.Logger)
506 defer func() {
507 _ = db.Close()
508 }()
509 logger := cfg.Logger
510
511 staticRoutes := createStaticRoutes()
512
513 if cfg.Debug {
514 staticRoutes = router.CreatePProfRoutes(staticRoutes)
515 }
516
517 mainRoutes := createMainRoutes(staticRoutes, cfg, db)
518 subdomainRoutes := staticRoutes
519
520 info := shared.NewPicoPipeClient()
521
522 client, err := pipe.NewClient(context.Background(), logger.With("info", info), info)
523 if err != nil {
524 panic(err)
525 }
526
527 sshClient = client
528
529 pingSession, err := sshClient.AddSession("ping", "pub -b=false -c ping", 0, -1, -1)
530 if err != nil {
531 panic(err)
532 }
533
534 go func() {
535 for {
536 _, err := fmt.Fprintf(pingSession, "%s: pipe-web ping\n", time.Now().UTC().Format(time.RFC3339))
537 if err != nil {
538 logger.Error("pipe ping error", "err", err.Error())
539 }
540
541 time.Sleep(5 * time.Second)
542 }
543 }()
544
545 apiConfig := &router.ApiConfig{
546 Cfg: cfg,
547 Dbpool: db,
548 }
549 handler := router.CreateServe(mainRoutes, subdomainRoutes, apiConfig)
550 router := http.HandlerFunc(handler)
551
552 portStr := fmt.Sprintf(":%s", cfg.Port)
553 logger.Info(
554 "Starting server on port",
555 "port", cfg.Port,
556 "domain", cfg.Domain,
557 )
558
559 logger.Error("listen", "err", http.ListenAndServe(portStr, router).Error())
560}