main pico / pkg / rsync-receiver / rsyncsender / match.go
Eric Bower  ·  2026-05-31
  1package rsyncsender
  2
  3import (
  4	"bytes"
  5	"encoding/binary"
  6	"fmt"
  7	"hash"
  8	"os"
  9
 10	"github.com/mmcloughlin/md4"
 11	"github.com/picosh/pico/pkg/rsync-receiver/nofollow"
 12	"github.com/picosh/pico/pkg/rsync-receiver/rsync"
 13	"github.com/picosh/pico/pkg/rsync-receiver/rsyncchecksum"
 14	"github.com/picosh/pico/pkg/rsync-receiver/utils"
 15)
 16
 17type target struct {
 18	index int32
 19	tag   uint16
 20}
 21
 22// rsync/match.c:hash_search.
 23func (st *Transfer) hashSearch(targets []target, tagTable map[uint16]int, head rsync.SumHead, fileIndex int32, fl utils.SenderFile) error {
 24	st.Logger.Debug("hashSearch", "file", fl, "head", head)
 25	f, err := os.OpenFile(fl.Path, os.O_RDONLY|nofollow.Maybe, 0)
 26	if err != nil {
 27		return err
 28	}
 29	defer func() { _ = f.Close() }()
 30
 31	fi, err := f.Stat()
 32	if err != nil {
 33		return err
 34	}
 35
 36	readSize := max(3*head.BlockLength, 256*1024)
 37	ms := mapFile(f, fi.Size(), readSize, head.BlockLength)
 38
 39	if err := st.Conn.WriteInt32(fileIndex); err != nil {
 40		return err
 41	}
 42
 43	if err := head.WriteTo(st.Conn); err != nil {
 44		return err
 45	}
 46
 47	// sum_init()
 48	h := md4.New()
 49	_ = binary.Write(h, binary.LittleEndian, st.Seed) // hash.Hash.Write never fails
 50
 51	// The following quotes are citations from
 52	// https://www.samba.org/~tridge/phd_thesis.pdf, section 3.2.6 The
 53	// signature search algorithm (PDF page 64).
 54
 55	// “Once the sorted signature table and the index table have been formed the
 56	// signature search process can begin. For each byte offset in a_i the fast
 57	// signature is computed, along with the 16 bit hash of the fast
 58	// signature. The 16 bit hash is then used to lookup the signature index,
 59	// giving the index in the signature table of the first fast signature with
 60	// that hash.”
 61
 62	var k int
 63	var sum uint32
 64	var s1, s2 uint32
 65	var offset int64
 66	end := fi.Size() + 1 - head.Sums[len(head.Sums)-1].Len
 67	st.Logger.Debug("last block", "len", head.Sums[len(head.Sums)-1].Len, "end", end)
 68
 69	readChunk := func() error {
 70		k = int(head.BlockLength)
 71		if remaining := int(fi.Size() - offset); remaining < k {
 72			k = remaining
 73		}
 74
 75		chunk := ms.ptr(offset, int32(k))
 76		sum = rsyncchecksum.Checksum1(chunk)
 77		s1 = uint32(sum & 0xFFFF)
 78		s2 = uint32(sum >> 16)
 79		return nil
 80	}
 81	if err := readChunk(); err != nil {
 82		return err
 83	}
 84
 85	tagHits := 0
 86Outer:
 87	for {
 88		tag := rsyncchecksum.Tag2(uint16(s1), uint16(s2))
 89		var sum2 []byte
 90		doneCsum2 := false
 91		j, ok := tagTable[tag]
 92		if ok {
 93			// “A linear search is then performed through the signature table, stopping
 94			// when an entry is found with a 16 bit hash which doesn’t match. For each
 95			// entry the current 32 bit fast signature is compared to the entry in the
 96			// signature table, and if that matches then the full 128 bit strong
 97			// signature is computed at the current byte offset and compared to the
 98			// strong signature in the signature table”
 99			sum = (uint32(s1) & 0xFFFF) | (uint32(s2) << 16)
100			tagHits++
101			for ; j < int(head.ChecksumCount) && targets[j].tag == tag; j++ {
102				i := targets[j].index
103				if sum != head.Sums[i].Sum1 {
104					continue
105				}
106
107				l := int64(head.BlockLength)
108				if v := fi.Size() - offset; v < l {
109					l = v
110				}
111				if l != head.Sums[i].Len {
112					continue
113				}
114
115				// log.Printf("potential match at %d target=%d %d sum=%08x", offset, j, i, sum)
116
117				if !doneCsum2 {
118					buf := ms.ptr(offset, int32(l))
119					sum2 = rsyncchecksum.Checksum2(st.Seed, buf[:])
120					doneCsum2 = true
121				}
122
123				if local, remote := sum2[:head.ChecksumLength], head.Sums[i].Sum2[:head.ChecksumLength]; !bytes.Equal(local, remote) {
124					st.Logger.Debug("false alarm", "local", local, "remote", remote)
125					//falseAlarms++
126					continue
127				}
128
129				// TODO(optimization): tridge rsync locates adjacent matches
130				// here for better run-length encoding, but I’m not sure where
131				// (if at all) we currently use run-length encoding:
132				// https://github.com/WayneD/rsync/commit/923fa978088f4c044eec528d9472962d9c9d13c3
133
134				// “If the strong signature is found to match then A emits a
135				// token telling B that a match was found and which block in bi
136				// was matched12. The search then continues at the byte after
137				// the matching block.”
138
139				if err := st.matched(h, ms, head, offset, i); err != nil {
140					return err
141				}
142
143				// rsync doesn’t read the next chunk (offset+sums[i].len),
144				// rsync starts reading one byte before the next chunk
145				// (offset+sums[i].len-1), because the code path starting at
146				// “null_tag” removes the chunk’s first byte and adds the
147				// next byte after the chunk.
148				offset += head.Sums[i].Len - 1
149				if err := readChunk(); err != nil {
150					return fmt.Errorf("readChunk: %v", err)
151				}
152
153				if offset >= end {
154					break Outer
155				}
156
157				break
158			}
159		}
160
161		// Update the rolling checksum by removing the oldest byte (update[0])
162		// and adding the newest byte (update[k]).
163		backup := max(offset-st.lastMatch, 0)
164
165		more := offset+int64(k) < fi.Size()
166		mmore := int64(0)
167		if more {
168			mmore = 1
169		}
170		update := ms.ptr(offset-backup, int32(int64(k)+mmore+backup))
171		update = update[backup:]
172
173		s1 -= rsyncchecksum.SignExtend(update[0])
174		s2 -= uint32(k) * rsyncchecksum.SignExtend(update[0])
175
176		if more {
177			s1 += rsyncchecksum.SignExtend(update[k])
178			s2 += s1
179		} else {
180			k--
181		}
182		s1 = uint32(uint16(s1))
183		s2 = uint32(uint16(s2))
184
185		if backup >= int64(head.BlockLength)+chunkSize && end-offset > chunkSize {
186			// Prevent offset-st.lastMatch from growing too large by flushing
187			// intermediate chunks.
188			if err := st.matched(h, ms, head, offset-int64(head.BlockLength), -2); err != nil {
189				return err
190			}
191		}
192
193		offset++
194		if offset >= end {
195			break
196		}
197	}
198
199	if err := st.matched(h, ms, head, fi.Size(), -1); err != nil {
200		return err
201	}
202
203	{
204		sum := h.Sum(nil)
205		st.Logger.Debug("sum info", "sum", sum, "len", len(sum))
206		if _, err := st.Conn.Writer.Write(sum); err != nil {
207			return err
208		}
209	}
210
211	return nil
212
213}
214
215// rsync/match.c:matched.
216func (st *Transfer) matched(h hash.Hash, ms *mapStruct, head rsync.SumHead, offset int64, i int32) error {
217	n := offset - st.lastMatch
218
219	transmitAccumulated := i < 0
220
221	// if !transmitAccumulated {
222	// 	log.Printf("match at offset=%d last_match=%d i=%d len=%d n=%d",
223	// 		offset, st.lastMatch, i, head.Sums[i].Len, n)
224	// } else {
225	// 	log.Printf("transmit accumulated at offset=%d", offset)
226	// }
227
228	/* FIXME: this is not used
229	l := int64(0)
230	if !transmitAccumulated {
231		l = head.Sums[i].Len
232	}
233	*/
234
235	if err := st.sendToken(ms, i, st.lastMatch, n); err != nil {
236		return fmt.Errorf("sendToken: %v", err)
237	}
238	// TODO: data_transfer += n;
239
240	if !transmitAccumulated {
241		// stats.matched_data += s->sums[i].len;
242		n += head.Sums[i].Len
243	}
244
245	for j := int64(0); j < n; j += chunkSize {
246		n1 := min(int64(chunkSize), n-j)
247		chunk := ms.ptr(st.lastMatch+j, int32(n1))
248		h.Write(chunk)
249	}
250
251	if !transmitAccumulated {
252		st.lastMatch = offset + head.Sums[i].Len
253	} else {
254		st.lastMatch = offset
255	}
256	return nil
257}