- commit
- 633c17a
- parent
- 7fe30e4
- author
- Eric Bower
- date
- 2026-01-22 18:51:10 -0500 EST
feat(pipe): ?prefix= query param for pub and pipe
1 files changed,
+53,
-7
+53,
-7
1@@ -2,6 +2,7 @@ package pipe
2
3 import (
4 "bufio"
5+ "bytes"
6 "context"
7 "errors"
8 "fmt"
9@@ -158,6 +159,8 @@ func handlePub(pubsub bool) http.HandlerFunc {
10 params += fmt.Sprintf(" -a=%s", cleanList)
11 }
12
13+ prefix := r.URL.Query().Get("prefix")
14+
15 var wg sync.WaitGroup
16
17 reader := bufio.NewReaderSize(r.Body, 1)
18@@ -245,7 +248,12 @@ func handlePub(pubsub bool) http.HandlerFunc {
19 case <-r.Context().Done():
20 break outer
21 default:
22- n, err := p.Write(first)
23+ messageToWrite := first
24+ if prefix != "" {
25+ messageToWrite = append([]byte(prefix), messageToWrite...)
26+ }
27+
28+ n, err := p.Write(messageToWrite)
29 if err != nil {
30 logger.Error("pub write error", "topic", topic, "info", clientInfo, "err", err.Error())
31 http.Error(w, "server error", http.StatusInternalServerError)
32@@ -314,6 +322,8 @@ func handlePipe() http.HandlerFunc {
33 params += fmt.Sprintf(" -a=%s", cleanList)
34 }
35
36+ prefix := r.URL.Query().Get("prefix")
37+
38 id := uuid.NewString()
39
40 p, err := sshClient.AddSession(id, fmt.Sprintf("pipe %s %s", params, topic), 0, -1, -1)
41@@ -364,6 +374,8 @@ func handlePipe() http.HandlerFunc {
42 wg.Done()
43 }()
44
45+ var messageBuffer []byte
46+
47 for {
48 buf := make([]byte, 32*1024)
49
50@@ -373,12 +385,46 @@ func handlePipe() http.HandlerFunc {
51 break
52 }
53
54- buf = buf[:n]
55-
56- err = c.WriteMessage(messageType, buf)
57- if err != nil {
58- logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
59- break
60+ messageBuffer = append(messageBuffer, buf[:n]...)
61+
62+ if prefix != "" {
63+ // Buffer and split on prefix boundaries
64+ for {
65+ firstIdx := bytes.Index(messageBuffer, []byte(prefix))
66+ if firstIdx == -1 {
67+ // No prefix found, clear buffer (shouldn't happen in normal use)
68+ messageBuffer = nil
69+ break
70+ }
71+
72+ // Look for next prefix after the first one
73+ secondIdx := bytes.Index(messageBuffer[firstIdx+len(prefix):], []byte(prefix))
74+ if secondIdx == -1 {
75+ // No complete message yet, keep buffer as is
76+ break
77+ }
78+
79+ // We have a complete message, extract and send it
80+ messageToSend := messageBuffer[firstIdx : firstIdx+len(prefix)+secondIdx]
81+ err = c.WriteMessage(messageType, messageToSend)
82+ if err != nil {
83+ logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
84+ break
85+ }
86+
87+ // Update buffer to remove sent message
88+ messageBuffer = messageBuffer[firstIdx+len(prefix)+secondIdx:]
89+ }
90+ } else {
91+ // No prefix set, send all data as-is
92+ if len(messageBuffer) > 0 {
93+ err = c.WriteMessage(messageType, messageBuffer)
94+ if err != nil {
95+ logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
96+ break
97+ }
98+ messageBuffer = nil
99+ }
100 }
101 }
102 }()