repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
a812bed
parent
46cac13
author
Antonio Mika
date
2025-01-27 22:02:02 -0500 EST
Add websocket to pipe web
3 files changed,  +122, -0
M go.mod
M go.sum
M go.mod
+1, -0
1@@ -42,6 +42,7 @@ require (
2 	github.com/google/go-cmp v0.6.0
3 	github.com/google/uuid v1.6.0
4 	github.com/gorilla/feeds v1.2.0
5+	github.com/gorilla/websocket v1.5.3
6 	github.com/jmoiron/sqlx v1.4.0
7 	github.com/lib/pq v1.10.9
8 	github.com/microcosm-cc/bluemonday v1.0.27
M go.sum
+2, -0
1@@ -457,6 +457,8 @@ github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8=
2 github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0=
3 github.com/gorilla/feeds v1.2.0 h1:O6pBiXJ5JHhPvqy53NsjKOThq+dNFm8+DFrxBEdzSCc=
4 github.com/gorilla/feeds v1.2.0/go.mod h1:WMib8uJP3BbY+X8Szd1rA5Pzhdfh+HCCAYT2z7Fza6Y=
5+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
6+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
7 github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
8 github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
9 github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
M pipe/api.go
+119, -0
  1@@ -15,6 +15,7 @@ import (
  2 	"time"
  3 
  4 	"github.com/google/uuid"
  5+	"github.com/gorilla/websocket"
  6 	"github.com/picosh/pico/db/postgres"
  7 	"github.com/picosh/pico/shared"
  8 	"github.com/picosh/utils/pipe"
  9@@ -23,6 +24,11 @@ import (
 10 var (
 11 	cleanRegex = regexp.MustCompile(`[^0-9a-zA-Z,/]`)
 12 	sshClient  *pipe.Client
 13+	upgrader   = websocket.Upgrader{
 14+		CheckOrigin: func(r *http.Request) bool {
 15+			return true
 16+		},
 17+	}
 18 )
 19 
 20 func serveFile(file string, contentType string) http.HandlerFunc {
 21@@ -264,6 +270,118 @@ func handlePub(pubsub bool) http.HandlerFunc {
 22 	}
 23 }
 24 
 25+func handlePipe() http.HandlerFunc {
 26+	return func(w http.ResponseWriter, r *http.Request) {
 27+		logger := shared.GetLogger(r)
 28+
 29+		c, err := upgrader.Upgrade(w, r, nil)
 30+		if err != nil {
 31+			logger.Error("pipe upgrade error", "err", err.Error())
 32+			return
 33+		}
 34+
 35+		defer c.Close()
 36+
 37+		clientInfo := shared.NewPicoPipeClient()
 38+		topic, _ := url.PathUnescape(shared.GetField(r, 0))
 39+
 40+		topic = cleanRegex.ReplaceAllString(topic, "")
 41+
 42+		logger.Info("pipe", "topic", topic, "info", clientInfo)
 43+
 44+		params := "-p -c"
 45+		if r.URL.Query().Get("status") == "true" {
 46+			params = params[:len(params)-3]
 47+		}
 48+
 49+		if r.URL.Query().Get("replay") == "true" {
 50+			params += " -r"
 51+		}
 52+
 53+		messageType := websocket.TextMessage
 54+		if r.URL.Query().Get("binary") == "true" {
 55+			messageType = websocket.BinaryMessage
 56+		}
 57+
 58+		if accessList := r.URL.Query().Get("access"); accessList != "" {
 59+			logger.Info("adding access list", "topic", topic, "info", clientInfo, "access", accessList)
 60+			cleanList := cleanRegex.ReplaceAllString(accessList, "")
 61+			params += fmt.Sprintf(" -a=%s", cleanList)
 62+		}
 63+
 64+		id := uuid.NewString()
 65+
 66+		p, err := sshClient.AddSession(id, fmt.Sprintf("pipe %s %s", params, topic), 0, -1, -1)
 67+		if err != nil {
 68+			logger.Error("pipe error", "topic", topic, "info", clientInfo, "err", err.Error())
 69+			http.Error(w, "server error", http.StatusInternalServerError)
 70+			return
 71+		}
 72+
 73+		go func() {
 74+			<-r.Context().Done()
 75+			err := sshClient.RemoveSession(id)
 76+			if err != nil {
 77+				logger.Error("pipe remove error", "topic", topic, "info", clientInfo, "err", err.Error())
 78+			}
 79+			c.Close()
 80+		}()
 81+
 82+		var wg sync.WaitGroup
 83+		wg.Add(2)
 84+
 85+		go func() {
 86+			defer func() {
 87+				p.Close()
 88+				c.Close()
 89+				wg.Done()
 90+			}()
 91+
 92+			for {
 93+				_, message, err := c.ReadMessage()
 94+				if err != nil {
 95+					logger.Error("pipe read error", "topic", topic, "info", clientInfo, "err", err.Error())
 96+					break
 97+				}
 98+
 99+				_, err = p.Write(message)
100+				if err != nil {
101+					logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
102+					break
103+				}
104+			}
105+		}()
106+
107+		go func() {
108+			defer func() {
109+				p.Close()
110+				c.Close()
111+				wg.Done()
112+			}()
113+
114+			for {
115+				buf := make([]byte, 32*1024)
116+
117+				n, err := p.Read(buf)
118+				if err != nil {
119+					logger.Error("pipe read error", "topic", topic, "info", clientInfo, "err", err.Error())
120+					break
121+				}
122+
123+				buf = buf[:n]
124+
125+				err = c.WriteMessage(messageType, buf)
126+				if err != nil {
127+					logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
128+					break
129+				}
130+			}
131+		}()
132+
133+		wg.Wait()
134+	}
135+}
136+
137 func createMainRoutes(staticRoutes []shared.Route) []shared.Route {
138 	routes := []shared.Route{
139 		shared.NewRoute("GET", "/", shared.CreatePageHandler("html/marketing.page.tmpl")),
140@@ -275,6 +393,7 @@ func createMainRoutes(staticRoutes []shared.Route) []shared.Route {
141 		shared.NewRoute("POST", "/topic/(.+)", handlePub(false)),
142 		shared.NewRoute("GET", "/pubsub/(.+)", handleSub(true)),
143 		shared.NewRoute("POST", "/pubsub/(.+)", handlePub(true)),
144+		shared.NewRoute("GET", "/pipe/(.+)", handlePipe()),
145 	}
146 
147 	for _, route := range pipeRoutes {