Eric Bower
·
2026-05-31
1package rsyncwire
2
3import (
4 "bytes"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "log/slog"
9)
10
11const (
12 MsgData uint8 = 0
13 MsgInfo uint8 = 2
14 MsgError uint8 = 1
15)
16
17const mplexBase = 7
18
19type MultiplexWriter struct {
20 Writer io.Writer
21}
22
23func (w *MultiplexWriter) Write(p []byte) (n int, err error) {
24 return w.WriteMsg(MsgData, p)
25}
26
27func (w *MultiplexWriter) WriteMsg(tag uint8, p []byte) (n int, err error) {
28 header := uint32(mplexBase+tag)<<24 | uint32(len(p))
29 // log.Printf("len %d (hex %x)", len(p), uint32(len(p)))
30 // log.Printf("header=%v (%x)", header, header)
31 if err := binary.Write(w.Writer, binary.LittleEndian, header); err != nil {
32 return 0, err
33 }
34 return w.Writer.Write(p)
35}
36
37type MultiplexReader struct {
38 Reader io.Reader
39}
40
41// rsync.h defines IO_BUFFER_SIZE as 32 * 1024, but gokr-rsyncd increases it to
42// 256K. Since we use this as the maximum message size, too, we need to at least
43// match it.
44const ioBufferSize = 256 * 1024
45const maxMessageSize = ioBufferSize
46
47func (w *MultiplexReader) ReadMsg() (tag uint8, p []byte, err error) {
48 var header uint32
49 if err := binary.Read(w.Reader, binary.LittleEndian, &header); err != nil {
50 return 0, nil, err
51 }
52
53 tag = uint8(header>>24) - mplexBase
54 length := header & 0x00FFFFFF
55 if length > maxMessageSize {
56 // NOTE: if you run into this error, one alternative to bumping
57 // maxMessageSize is to restructure the program to work with i/o buffer
58 // windowing.
59 return 0, nil, fmt.Errorf("length %d exceeds max message size (%d)", length, maxMessageSize)
60 }
61 p = make([]byte, int(length))
62 if _, err := io.ReadFull(w.Reader, p); err != nil {
63 return 0, nil, err
64 }
65 // log.Printf("header=%v (%x), tag=%v, length=%v", header, header, tag, length)
66 // log.Printf("payload=%x / %q", p, p)
67 return tag, p, nil
68}
69
70func (w *MultiplexReader) Read(p []byte) (n int, err error) {
71 tag, payload, err := w.ReadMsg()
72 if err != nil {
73 return 0, err
74 }
75 if tag == MsgError {
76 return 0, fmt.Errorf("%s", payload)
77 }
78 if tag == MsgInfo {
79 slog.Debug("info", "payload", payload)
80 }
81 if tag != MsgData {
82 return 0, fmt.Errorf("unexpected tag: got %v, want %v", tag, MsgData)
83 }
84 if len(p) < len(payload) {
85 panic(fmt.Sprintf("not enough buffer space! %d < %d", len(p), len(payload)))
86 }
87 return copy(p, payload), nil
88}
89
90type Buffer struct {
91 // buf.Write() never fails, making for a convenient API.
92 buf bytes.Buffer
93}
94
95func (b *Buffer) WriteByte(data byte) error {
96 return binary.Write(&b.buf, binary.LittleEndian, data)
97}
98
99func (b *Buffer) WriteInt32(data int32) {
100 _ = binary.Write(&b.buf, binary.LittleEndian, data)
101}
102
103func (b *Buffer) WriteInt64(data int64) {
104 // send as a 32-bit integer if possible
105 if data <= 0x7FFFFFFF && data >= 0 {
106 b.WriteInt32(int32(data))
107 return
108 }
109 // otherwise, send -1 followed by the 64-bit integer
110 b.WriteInt32(-1)
111 _ = binary.Write(&b.buf, binary.LittleEndian, data)
112}
113
114func (b *Buffer) WriteString(data string) {
115 _, _ = io.WriteString(&b.buf, data)
116}
117
118func (b *Buffer) String() string {
119 return b.buf.String()
120}
121
122type Conn struct {
123 Writer io.Writer
124 Reader io.Reader
125}
126
127func (c *Conn) WriteByte(data byte) error {
128 return binary.Write(c.Writer, binary.LittleEndian, data)
129}
130
131func (c *Conn) WriteInt32(data int32) error {
132 return binary.Write(c.Writer, binary.LittleEndian, data)
133}
134
135func (c *Conn) WriteInt64(data int64) error {
136 // send as a 32-bit integer if possible
137 if data <= 0x7FFFFFFF && data >= 0 {
138 return c.WriteInt32(int32(data))
139 }
140 // otherwise, send -1 followed by the 64-bit integer
141 if err := c.WriteInt32(-1); err != nil {
142 return err
143 }
144 return binary.Write(c.Writer, binary.LittleEndian, data)
145}
146
147func (c *Conn) WriteString(data string) error {
148 _, err := io.WriteString(c.Writer, data)
149 return err
150}
151
152func (c *Conn) ReadByte() (byte, error) {
153 var buf [1]byte
154 if _, err := io.ReadFull(c.Reader, buf[:]); err != nil {
155 return 0, err
156 }
157 return buf[0], nil
158}
159
160func (c *Conn) ReadInt32() (int32, error) {
161 var buf [4]byte
162 if _, err := io.ReadFull(c.Reader, buf[:]); err != nil {
163 return 0, err
164 }
165 return int32(binary.LittleEndian.Uint32(buf[:])), nil
166}
167
168func (c *Conn) ReadInt64() (int64, error) {
169 {
170 data, err := c.ReadInt32()
171 if err != nil {
172 return 0, err
173 }
174 if data != -1 {
175 // The value was small enough to fit into a 32 bit int, so it was
176 // transferred directly.
177 return int64(data), nil
178 }
179 // Otherwise, -1 was transmitted, followed by the int64.
180 }
181 var data int64
182 if err := binary.Read(c.Reader, binary.LittleEndian, &data); err != nil {
183 return 0, err
184 }
185 return data, nil
186}
187
188type CountingReader struct {
189 R io.Reader
190 BytesRead int64
191}
192
193func (r *CountingReader) Read(p []byte) (n int, err error) {
194 n, err = r.R.Read(p)
195 r.BytesRead += int64(n)
196 return n, err
197}
198
199type CountingWriter struct {
200 W io.Writer
201 BytesWritten int64
202}
203
204func (w *CountingWriter) Write(p []byte) (n int, err error) {
205 n, err = w.W.Write(p)
206 w.BytesWritten += int64(n)
207 return n, err
208}
209
210func CounterPair(r io.Reader, w io.Writer) (*CountingReader, *CountingWriter) {
211 crd := &CountingReader{R: r}
212 cwr := &CountingWriter{W: w}
213 return crd, cwr
214}