Eric Bower
·
2026-05-31
1package rsyncsender
2
3import (
4 "encoding/binary"
5 "io"
6 "os"
7 "sort"
8
9 "github.com/mmcloughlin/md4"
10 "github.com/picosh/pico/pkg/rsync-receiver/rsync"
11 "github.com/picosh/pico/pkg/rsync-receiver/rsyncchecksum"
12 "github.com/picosh/pico/pkg/rsync-receiver/rsynccommon"
13 "github.com/picosh/pico/pkg/rsync-receiver/utils"
14)
15
16// rsync/sender.c:send_files().
17func (st *Transfer) SendFiles(fileList *fileList) error {
18 phase := 0
19 for {
20 // receive data about receiver’s copy of the file list contents (not
21 // ordered)
22 // see (*rsync.Receiver).Generator()
23 fileIndex, err := st.Conn.ReadInt32()
24 if err != nil {
25 return err
26 }
27 if fileIndex == -1 {
28 if phase == 0 {
29 phase++
30 // acknowledge phase change by sending -1
31 if err := st.Conn.WriteInt32(-1); err != nil {
32 return err
33 }
34 continue
35 }
36 break
37 }
38
39 if st.Opts.DryRun() {
40 if err := st.Conn.WriteInt32(fileIndex); err != nil {
41 return err
42 }
43 continue
44 }
45
46 head, err := st.receiveSums()
47 if err != nil {
48 return err
49 }
50
51 // The following quotes are citations from
52 // https://www.samba.org/~tridge/phd_thesis.pdf, section 3.2.6 The
53 // signature search algorithm (PDF page 64).
54
55 // rsync/match.c:build_hash_table
56 targets := make([]target, len(head.Sums))
57 tagTable := make(map[uint16]int) // TODO: or int32 more specifically?
58 {
59 // “The first step in the algorithm is to sort the received
60 // signatures by a 16 bit hash of the fast signature.”
61 for idx, sum := range head.Sums {
62 targets[idx] = target{
63 index: int32(idx),
64 tag: rsyncchecksum.Tag(sum.Sum1),
65 }
66 }
67 sort.Slice(targets, func(i, j int) bool {
68 return targets[i].tag < targets[j].tag
69 })
70
71 // “A 16 bit index table is then formed which takes a 16 bit hash
72 // value and gives an index into the sorted signature table which
73 // points to the first entry in the table which has a matching
74 // hash.”
75 for idx := len(head.Sums) - 1; idx >= 0; idx-- {
76 tagTable[targets[idx].tag] = idx
77 }
78 }
79
80 st.lastMatch = 0
81 if len(head.Sums) == 0 {
82 // fast path: send the whole file
83 err = st.sendFile(fileIndex, fileList.Files[fileIndex])
84 } else {
85 err = st.hashSearch(targets, tagTable, head, fileIndex, fileList.Files[fileIndex])
86 }
87 if err != nil {
88 if _, ok := err.(*os.PathError); ok {
89 // OpenFile() failed. Log the error (server side only) and
90 // proceed. Only starting with protocol 30, an I/O error flag is
91 // sent after the file transfer phase.
92 if os.IsNotExist(err) {
93 st.Logger.Debug("file has vanished", "file", fileList.Files[fileIndex])
94 } else {
95 st.Logger.Error("sendFiles", "err", err)
96 }
97 continue
98 } else {
99 return err
100 }
101 }
102 }
103
104 // phase done
105 if err := st.Conn.WriteInt32(-1); err != nil {
106 return err
107 }
108
109 return nil
110}
111
112// rsync/sender.c:receive_sums().
113func (st *Transfer) receiveSums() (rsync.SumHead, error) {
114 var head rsync.SumHead
115 if err := head.ReadFrom(st.Conn); err != nil {
116 return head, err
117 }
118 var offset int64
119 head.Sums = make([]rsync.SumBuf, int(head.ChecksumCount))
120 for i := int32(0); i < head.ChecksumCount; i++ {
121 shortChecksum, err := st.Conn.ReadInt32()
122 if err != nil {
123 return head, err
124 }
125 sb := rsync.SumBuf{
126 Index: i,
127 Offset: offset,
128 Sum1: uint32(shortChecksum),
129 }
130 if i == head.ChecksumCount-1 && head.RemainderLength != 0 {
131 sb.Len = int64(head.RemainderLength)
132 } else {
133 sb.Len = int64(head.BlockLength)
134 }
135 offset += sb.Len
136 n, err := io.ReadFull(st.Conn.Reader, sb.Sum2[:head.ChecksumLength])
137 if err != nil {
138 return head, err
139 }
140 _ = n
141 // log.Printf("chunk[%d] len=%d offset=%.0f sum1=%08x, sum2=%x",
142 // i, sb.len, float64(sb.offset), sb.sum1, sb.sum2[:n])
143 head.Sums[i] = sb
144 }
145 return head, nil
146}
147
148func (st *Transfer) sendFile(fileIndex int32, fl utils.SenderFile) error {
149 // rsync/rsync.h defines CHUNK_SIZE as 32 * 1024. openrsync (tridge)
150 // uses 256K, but standard rsync rejects tokens larger than 32K.
151 const chunkSize = 32 * 1024
152
153 fi, r, err := st.Files.Read(&fl)
154 if err != nil {
155 return err
156 }
157 defer func() { _ = r.Close() }()
158
159 if err := st.Conn.WriteInt32(fileIndex); err != nil {
160 return err
161 }
162
163 sh := rsynccommon.SumSizesSqroot(fi.Size())
164 // log.Printf("sh = %+v", sh)
165 if err := sh.WriteTo(st.Conn); err != nil {
166 return err
167 }
168
169 h := md4.New()
170 _ = binary.Write(h, binary.LittleEndian, st.Seed) // hash.Hash.Write never fails
171
172 buf := make([]byte, chunkSize)
173 for {
174 shouldBreak := false
175 n, err := r.Read(buf)
176 if err != nil {
177 if err == io.EOF {
178 shouldBreak = true
179 } else {
180 return err
181 }
182 }
183 chunk := buf[:n]
184
185 if len(chunk) == 0 {
186 break
187 }
188
189 _, err = h.Write(chunk)
190 if err != nil {
191 return err
192 }
193 // chunk size (“rawtok” variable in openrsync)
194 if err := st.Conn.WriteInt32(int32(len(chunk))); err != nil {
195 return err
196 }
197 if _, err := st.Conn.Writer.Write(chunk); err != nil {
198 return err
199 }
200
201 if shouldBreak {
202 break
203 }
204 }
205 // transfer finished:
206 if err := st.Conn.WriteInt32(0); err != nil {
207 return err
208 }
209
210 sum := h.Sum(nil)
211 // log.Printf("sum: %x (len = %d)", sum, len(sum))
212 if _, err := st.Conn.Writer.Write(sum); err != nil {
213 return err
214 }
215 return nil
216}