main pico / pkg / rsync-receiver / rsyncsender / sender.go
Eric Bower  ·  2026-05-31
  1package rsyncsender
  2
  3import (
  4	"encoding/binary"
  5	"io"
  6	"os"
  7	"sort"
  8
  9	"github.com/mmcloughlin/md4"
 10	"github.com/picosh/pico/pkg/rsync-receiver/rsync"
 11	"github.com/picosh/pico/pkg/rsync-receiver/rsyncchecksum"
 12	"github.com/picosh/pico/pkg/rsync-receiver/rsynccommon"
 13	"github.com/picosh/pico/pkg/rsync-receiver/utils"
 14)
 15
 16// rsync/sender.c:send_files().
 17func (st *Transfer) SendFiles(fileList *fileList) error {
 18	phase := 0
 19	for {
 20		// receive data about receiver’s copy of the file list contents (not
 21		// ordered)
 22		// see (*rsync.Receiver).Generator()
 23		fileIndex, err := st.Conn.ReadInt32()
 24		if err != nil {
 25			return err
 26		}
 27		if fileIndex == -1 {
 28			if phase == 0 {
 29				phase++
 30				// acknowledge phase change by sending -1
 31				if err := st.Conn.WriteInt32(-1); err != nil {
 32					return err
 33				}
 34				continue
 35			}
 36			break
 37		}
 38
 39		if st.Opts.DryRun() {
 40			if err := st.Conn.WriteInt32(fileIndex); err != nil {
 41				return err
 42			}
 43			continue
 44		}
 45
 46		head, err := st.receiveSums()
 47		if err != nil {
 48			return err
 49		}
 50
 51		// The following quotes are citations from
 52		// https://www.samba.org/~tridge/phd_thesis.pdf, section 3.2.6 The
 53		// signature search algorithm (PDF page 64).
 54
 55		// rsync/match.c:build_hash_table
 56		targets := make([]target, len(head.Sums))
 57		tagTable := make(map[uint16]int) // TODO: or int32 more specifically?
 58		{
 59			// “The first step in the algorithm is to sort the received
 60			// signatures by a 16 bit hash of the fast signature.”
 61			for idx, sum := range head.Sums {
 62				targets[idx] = target{
 63					index: int32(idx),
 64					tag:   rsyncchecksum.Tag(sum.Sum1),
 65				}
 66			}
 67			sort.Slice(targets, func(i, j int) bool {
 68				return targets[i].tag < targets[j].tag
 69			})
 70
 71			// “A 16 bit index table is then formed which takes a 16 bit hash
 72			// value and gives an index into the sorted signature table which
 73			// points to the first entry in the table which has a matching
 74			// hash.”
 75			for idx := len(head.Sums) - 1; idx >= 0; idx-- {
 76				tagTable[targets[idx].tag] = idx
 77			}
 78		}
 79
 80		st.lastMatch = 0
 81		if len(head.Sums) == 0 {
 82			// fast path: send the whole file
 83			err = st.sendFile(fileIndex, fileList.Files[fileIndex])
 84		} else {
 85			err = st.hashSearch(targets, tagTable, head, fileIndex, fileList.Files[fileIndex])
 86		}
 87		if err != nil {
 88			if _, ok := err.(*os.PathError); ok {
 89				// OpenFile() failed. Log the error (server side only) and
 90				// proceed. Only starting with protocol 30, an I/O error flag is
 91				// sent after the file transfer phase.
 92				if os.IsNotExist(err) {
 93					st.Logger.Debug("file has vanished", "file", fileList.Files[fileIndex])
 94				} else {
 95					st.Logger.Error("sendFiles", "err", err)
 96				}
 97				continue
 98			} else {
 99				return err
100			}
101		}
102	}
103
104	// phase done
105	if err := st.Conn.WriteInt32(-1); err != nil {
106		return err
107	}
108
109	return nil
110}
111
112// rsync/sender.c:receive_sums().
113func (st *Transfer) receiveSums() (rsync.SumHead, error) {
114	var head rsync.SumHead
115	if err := head.ReadFrom(st.Conn); err != nil {
116		return head, err
117	}
118	var offset int64
119	head.Sums = make([]rsync.SumBuf, int(head.ChecksumCount))
120	for i := int32(0); i < head.ChecksumCount; i++ {
121		shortChecksum, err := st.Conn.ReadInt32()
122		if err != nil {
123			return head, err
124		}
125		sb := rsync.SumBuf{
126			Index:  i,
127			Offset: offset,
128			Sum1:   uint32(shortChecksum),
129		}
130		if i == head.ChecksumCount-1 && head.RemainderLength != 0 {
131			sb.Len = int64(head.RemainderLength)
132		} else {
133			sb.Len = int64(head.BlockLength)
134		}
135		offset += sb.Len
136		n, err := io.ReadFull(st.Conn.Reader, sb.Sum2[:head.ChecksumLength])
137		if err != nil {
138			return head, err
139		}
140		_ = n
141		// log.Printf("chunk[%d] len=%d offset=%.0f sum1=%08x, sum2=%x",
142		// 	i, sb.len, float64(sb.offset), sb.sum1, sb.sum2[:n])
143		head.Sums[i] = sb
144	}
145	return head, nil
146}
147
148func (st *Transfer) sendFile(fileIndex int32, fl utils.SenderFile) error {
149	// rsync/rsync.h defines CHUNK_SIZE as 32 * 1024. openrsync (tridge)
150	// uses 256K, but standard rsync rejects tokens larger than 32K.
151	const chunkSize = 32 * 1024
152
153	fi, r, err := st.Files.Read(&fl)
154	if err != nil {
155		return err
156	}
157	defer func() { _ = r.Close() }()
158
159	if err := st.Conn.WriteInt32(fileIndex); err != nil {
160		return err
161	}
162
163	sh := rsynccommon.SumSizesSqroot(fi.Size())
164	// log.Printf("sh = %+v", sh)
165	if err := sh.WriteTo(st.Conn); err != nil {
166		return err
167	}
168
169	h := md4.New()
170	_ = binary.Write(h, binary.LittleEndian, st.Seed) // hash.Hash.Write never fails
171
172	buf := make([]byte, chunkSize)
173	for {
174		shouldBreak := false
175		n, err := r.Read(buf)
176		if err != nil {
177			if err == io.EOF {
178				shouldBreak = true
179			} else {
180				return err
181			}
182		}
183		chunk := buf[:n]
184
185		if len(chunk) == 0 {
186			break
187		}
188
189		_, err = h.Write(chunk)
190		if err != nil {
191			return err
192		}
193		// chunk size (“rawtok” variable in openrsync)
194		if err := st.Conn.WriteInt32(int32(len(chunk))); err != nil {
195			return err
196		}
197		if _, err := st.Conn.Writer.Write(chunk); err != nil {
198			return err
199		}
200
201		if shouldBreak {
202			break
203		}
204	}
205	// transfer finished:
206	if err := st.Conn.WriteInt32(0); err != nil {
207		return err
208	}
209
210	sum := h.Sum(nil)
211	// log.Printf("sum: %x (len = %d)", sum, len(sum))
212	if _, err := st.Conn.Writer.Write(sum); err != nil {
213		return err
214	}
215	return nil
216}