main pico / pkg / rsync-receiver / rsyncwire / wire.go
Eric Bower  ·  2026-05-31
  1package rsyncwire
  2
  3import (
  4	"bytes"
  5	"encoding/binary"
  6	"fmt"
  7	"io"
  8	"log/slog"
  9)
 10
 11const (
 12	MsgData  uint8 = 0
 13	MsgInfo  uint8 = 2
 14	MsgError uint8 = 1
 15)
 16
 17const mplexBase = 7
 18
 19type MultiplexWriter struct {
 20	Writer io.Writer
 21}
 22
 23func (w *MultiplexWriter) Write(p []byte) (n int, err error) {
 24	return w.WriteMsg(MsgData, p)
 25}
 26
 27func (w *MultiplexWriter) WriteMsg(tag uint8, p []byte) (n int, err error) {
 28	header := uint32(mplexBase+tag)<<24 | uint32(len(p))
 29	// log.Printf("len %d (hex %x)", len(p), uint32(len(p)))
 30	// log.Printf("header=%v (%x)", header, header)
 31	if err := binary.Write(w.Writer, binary.LittleEndian, header); err != nil {
 32		return 0, err
 33	}
 34	return w.Writer.Write(p)
 35}
 36
 37type MultiplexReader struct {
 38	Reader io.Reader
 39}
 40
 41// rsync.h defines IO_BUFFER_SIZE as 32 * 1024, but gokr-rsyncd increases it to
 42// 256K. Since we use this as the maximum message size, too, we need to at least
 43// match it.
 44const ioBufferSize = 256 * 1024
 45const maxMessageSize = ioBufferSize
 46
 47func (w *MultiplexReader) ReadMsg() (tag uint8, p []byte, err error) {
 48	var header uint32
 49	if err := binary.Read(w.Reader, binary.LittleEndian, &header); err != nil {
 50		return 0, nil, err
 51	}
 52
 53	tag = uint8(header>>24) - mplexBase
 54	length := header & 0x00FFFFFF
 55	if length > maxMessageSize {
 56		// NOTE: if you run into this error, one alternative to bumping
 57		// maxMessageSize is to restructure the program to work with i/o buffer
 58		// windowing.
 59		return 0, nil, fmt.Errorf("length %d exceeds max message size (%d)", length, maxMessageSize)
 60	}
 61	p = make([]byte, int(length))
 62	if _, err := io.ReadFull(w.Reader, p); err != nil {
 63		return 0, nil, err
 64	}
 65	// log.Printf("header=%v (%x), tag=%v, length=%v", header, header, tag, length)
 66	// log.Printf("payload=%x / %q", p, p)
 67	return tag, p, nil
 68}
 69
 70func (w *MultiplexReader) Read(p []byte) (n int, err error) {
 71	tag, payload, err := w.ReadMsg()
 72	if err != nil {
 73		return 0, err
 74	}
 75	if tag == MsgError {
 76		return 0, fmt.Errorf("%s", payload)
 77	}
 78	if tag == MsgInfo {
 79		slog.Debug("info", "payload", payload)
 80	}
 81	if tag != MsgData {
 82		return 0, fmt.Errorf("unexpected tag: got %v, want %v", tag, MsgData)
 83	}
 84	if len(p) < len(payload) {
 85		panic(fmt.Sprintf("not enough buffer space! %d < %d", len(p), len(payload)))
 86	}
 87	return copy(p, payload), nil
 88}
 89
 90type Buffer struct {
 91	// buf.Write() never fails, making for a convenient API.
 92	buf bytes.Buffer
 93}
 94
 95func (b *Buffer) WriteByte(data byte) error {
 96	return binary.Write(&b.buf, binary.LittleEndian, data)
 97}
 98
 99func (b *Buffer) WriteInt32(data int32) {
100	_ = binary.Write(&b.buf, binary.LittleEndian, data)
101}
102
103func (b *Buffer) WriteInt64(data int64) {
104	// send as a 32-bit integer if possible
105	if data <= 0x7FFFFFFF && data >= 0 {
106		b.WriteInt32(int32(data))
107		return
108	}
109	// otherwise, send -1 followed by the 64-bit integer
110	b.WriteInt32(-1)
111	_ = binary.Write(&b.buf, binary.LittleEndian, data)
112}
113
114func (b *Buffer) WriteString(data string) {
115	_, _ = io.WriteString(&b.buf, data)
116}
117
118func (b *Buffer) String() string {
119	return b.buf.String()
120}
121
122type Conn struct {
123	Writer io.Writer
124	Reader io.Reader
125}
126
127func (c *Conn) WriteByte(data byte) error {
128	return binary.Write(c.Writer, binary.LittleEndian, data)
129}
130
131func (c *Conn) WriteInt32(data int32) error {
132	return binary.Write(c.Writer, binary.LittleEndian, data)
133}
134
135func (c *Conn) WriteInt64(data int64) error {
136	// send as a 32-bit integer if possible
137	if data <= 0x7FFFFFFF && data >= 0 {
138		return c.WriteInt32(int32(data))
139	}
140	// otherwise, send -1 followed by the 64-bit integer
141	if err := c.WriteInt32(-1); err != nil {
142		return err
143	}
144	return binary.Write(c.Writer, binary.LittleEndian, data)
145}
146
147func (c *Conn) WriteString(data string) error {
148	_, err := io.WriteString(c.Writer, data)
149	return err
150}
151
152func (c *Conn) ReadByte() (byte, error) {
153	var buf [1]byte
154	if _, err := io.ReadFull(c.Reader, buf[:]); err != nil {
155		return 0, err
156	}
157	return buf[0], nil
158}
159
160func (c *Conn) ReadInt32() (int32, error) {
161	var buf [4]byte
162	if _, err := io.ReadFull(c.Reader, buf[:]); err != nil {
163		return 0, err
164	}
165	return int32(binary.LittleEndian.Uint32(buf[:])), nil
166}
167
168func (c *Conn) ReadInt64() (int64, error) {
169	{
170		data, err := c.ReadInt32()
171		if err != nil {
172			return 0, err
173		}
174		if data != -1 {
175			// The value was small enough to fit into a 32 bit int, so it was
176			// transferred directly.
177			return int64(data), nil
178		}
179		// Otherwise, -1 was transmitted, followed by the int64.
180	}
181	var data int64
182	if err := binary.Read(c.Reader, binary.LittleEndian, &data); err != nil {
183		return 0, err
184	}
185	return data, nil
186}
187
188type CountingReader struct {
189	R         io.Reader
190	BytesRead int64
191}
192
193func (r *CountingReader) Read(p []byte) (n int, err error) {
194	n, err = r.R.Read(p)
195	r.BytesRead += int64(n)
196	return n, err
197}
198
199type CountingWriter struct {
200	W            io.Writer
201	BytesWritten int64
202}
203
204func (w *CountingWriter) Write(p []byte) (n int, err error) {
205	n, err = w.W.Write(p)
206	w.BytesWritten += int64(n)
207	return n, err
208}
209
210func CounterPair(r io.Reader, w io.Writer) (*CountingReader, *CountingWriter) {
211	crd := &CountingReader{R: r}
212	cwr := &CountingWriter{W: w}
213	return crd, cwr
214}