Eric Bower
·
2026-05-31
1package rsyncreceiver
2
3import (
4 "bytes"
5 "encoding/binary"
6 "errors"
7 "fmt"
8 "io"
9 "sync"
10
11 "github.com/mmcloughlin/md4"
12 "github.com/picosh/pico/pkg/rsync-receiver/rsync"
13 "github.com/picosh/pico/pkg/rsync-receiver/utils"
14)
15
16// rsync/receiver.c:recv_files.
17func (rt *Transfer) RecvFiles(fileList []*utils.ReceiverFile) error {
18 phase := 0
19 for {
20 idx, err := rt.Conn.ReadInt32()
21 if err != nil {
22 return err
23 }
24 if idx == -1 {
25 if phase == 0 {
26 phase++
27 rt.Logger.Debug("recvFiles phase", "phase", phase)
28 // TODO: send done message
29 continue
30 }
31 break
32 }
33 rt.Logger.Debug("receiving file", "idx", idx, "file", fileList[idx])
34 if err := rt.recvFile1(fileList[idx]); err != nil {
35 return err
36 }
37 }
38 rt.Logger.Debug("recvFiles finished")
39 return nil
40}
41
42func (rt *Transfer) recvFile1(f *utils.ReceiverFile) error {
43 if rt.Opts.DryRun {
44 fmt.Println(f.Name)
45 return nil
46 }
47
48 localFile, err := rt.openLocalFile(f)
49 if err != nil {
50 rt.Logger.Error("opening local file failed, continuing", "err", err, "file", f)
51 } else {
52 defer func() { _ = localFile.Close() }()
53 }
54
55 err = rt.receiveData(f, localFile)
56 if err != nil {
57 rt.Logger.Error("receiving data failed, continuing", "err", err, "file", f)
58 }
59 return err
60}
61
62func (rt *Transfer) openLocalFile(f *utils.ReceiverFile) (utils.ReaderAtCloser, error) {
63 _, r, err := rt.Files.Read(&utils.SenderFile{
64 WPath: f.Name,
65 Regular: true,
66 })
67
68 if err != nil {
69 return nil, err
70 }
71
72 return r, nil
73}
74
75// rsync/receiver.c:receive_data.
76func (rt *Transfer) receiveData(f *utils.ReceiverFile, localFile utils.ReaderAtCloser) error {
77 var sh rsync.SumHead
78 if err := sh.ReadFrom(rt.Conn); err != nil {
79 return err
80 }
81
82 r, w := io.Pipe()
83
84 f.Reader = r
85
86 var wg sync.WaitGroup
87 wg.Add(1)
88
89 go func() {
90 defer func() {
91 wg.Done()
92 if err := r.Close(); err != nil {
93 return
94 }
95 }()
96
97 _, err := rt.Files.Put(f)
98 if err != nil {
99 return
100 }
101 }()
102
103 h := md4.New()
104 _ = binary.Write(h, binary.LittleEndian, rt.Seed) // hash.Hash.Write never fails
105
106 for {
107 token, data, err := rt.recvToken()
108 if err != nil {
109 return err
110 }
111 if token == 0 {
112 break
113 }
114 if token > 0 {
115 if _, err := h.Write(data); err != nil {
116 return err
117 }
118
119 if _, err := w.Write(data); err != nil {
120 if errors.Is(err, io.ErrClosedPipe) {
121 continue
122 }
123 return err
124 }
125 continue
126 }
127 if localFile == nil {
128 return fmt.Errorf("BUG: local file %s not open for copying chunk", localFile)
129 }
130 token = -(token + 1)
131 offset2 := int64(token) * int64(sh.BlockLength)
132 dataLen := sh.BlockLength
133 if token == sh.ChecksumCount-1 && sh.RemainderLength != 0 {
134 dataLen = sh.RemainderLength
135 }
136 data = make([]byte, dataLen)
137 if _, err := localFile.ReadAt(data, offset2); err != nil {
138 return err
139 }
140
141 if _, err := h.Write(data); err != nil {
142 return err
143 }
144
145 if _, err := w.Write(data); err != nil {
146 if errors.Is(err, io.ErrClosedPipe) {
147 continue
148 }
149 return err
150 }
151 }
152
153 if err := w.Close(); err != nil {
154 return err
155 }
156
157 wg.Wait()
158
159 localSum := h.Sum(nil)
160 remoteSum := make([]byte, len(localSum))
161 if _, err := io.ReadFull(rt.Conn.Reader, remoteSum); err != nil {
162 return err
163 }
164 if !bytes.Equal(localSum, remoteSum) {
165 return fmt.Errorf("file corruption in %s", f.Name)
166 }
167 rt.Logger.Debug("checksum matches!", "localSum", localSum)
168
169 return nil
170}