main pico / pkg / rsync-receiver / rsyncreceiver / receiver.go
Eric Bower  ·  2026-05-31
  1package rsyncreceiver
  2
  3import (
  4	"bytes"
  5	"encoding/binary"
  6	"errors"
  7	"fmt"
  8	"io"
  9	"sync"
 10
 11	"github.com/mmcloughlin/md4"
 12	"github.com/picosh/pico/pkg/rsync-receiver/rsync"
 13	"github.com/picosh/pico/pkg/rsync-receiver/utils"
 14)
 15
 16// rsync/receiver.c:recv_files.
 17func (rt *Transfer) RecvFiles(fileList []*utils.ReceiverFile) error {
 18	phase := 0
 19	for {
 20		idx, err := rt.Conn.ReadInt32()
 21		if err != nil {
 22			return err
 23		}
 24		if idx == -1 {
 25			if phase == 0 {
 26				phase++
 27				rt.Logger.Debug("recvFiles phase", "phase", phase)
 28				// TODO: send done message
 29				continue
 30			}
 31			break
 32		}
 33		rt.Logger.Debug("receiving file", "idx", idx, "file", fileList[idx])
 34		if err := rt.recvFile1(fileList[idx]); err != nil {
 35			return err
 36		}
 37	}
 38	rt.Logger.Debug("recvFiles finished")
 39	return nil
 40}
 41
 42func (rt *Transfer) recvFile1(f *utils.ReceiverFile) error {
 43	if rt.Opts.DryRun {
 44		fmt.Println(f.Name)
 45		return nil
 46	}
 47
 48	localFile, err := rt.openLocalFile(f)
 49	if err != nil {
 50		rt.Logger.Error("opening local file failed, continuing", "err", err, "file", f)
 51	} else {
 52		defer func() { _ = localFile.Close() }()
 53	}
 54
 55	err = rt.receiveData(f, localFile)
 56	if err != nil {
 57		rt.Logger.Error("receiving data failed, continuing", "err", err, "file", f)
 58	}
 59	return err
 60}
 61
 62func (rt *Transfer) openLocalFile(f *utils.ReceiverFile) (utils.ReaderAtCloser, error) {
 63	_, r, err := rt.Files.Read(&utils.SenderFile{
 64		WPath:   f.Name,
 65		Regular: true,
 66	})
 67
 68	if err != nil {
 69		return nil, err
 70	}
 71
 72	return r, nil
 73}
 74
 75// rsync/receiver.c:receive_data.
 76func (rt *Transfer) receiveData(f *utils.ReceiverFile, localFile utils.ReaderAtCloser) error {
 77	var sh rsync.SumHead
 78	if err := sh.ReadFrom(rt.Conn); err != nil {
 79		return err
 80	}
 81
 82	r, w := io.Pipe()
 83
 84	f.Reader = r
 85
 86	var wg sync.WaitGroup
 87	wg.Add(1)
 88
 89	go func() {
 90		defer func() {
 91			wg.Done()
 92			if err := r.Close(); err != nil {
 93				return
 94			}
 95		}()
 96
 97		_, err := rt.Files.Put(f)
 98		if err != nil {
 99			return
100		}
101	}()
102
103	h := md4.New()
104	_ = binary.Write(h, binary.LittleEndian, rt.Seed) // hash.Hash.Write never fails
105
106	for {
107		token, data, err := rt.recvToken()
108		if err != nil {
109			return err
110		}
111		if token == 0 {
112			break
113		}
114		if token > 0 {
115			if _, err := h.Write(data); err != nil {
116				return err
117			}
118
119			if _, err := w.Write(data); err != nil {
120				if errors.Is(err, io.ErrClosedPipe) {
121					continue
122				}
123				return err
124			}
125			continue
126		}
127		if localFile == nil {
128			return fmt.Errorf("BUG: local file %s not open for copying chunk", localFile)
129		}
130		token = -(token + 1)
131		offset2 := int64(token) * int64(sh.BlockLength)
132		dataLen := sh.BlockLength
133		if token == sh.ChecksumCount-1 && sh.RemainderLength != 0 {
134			dataLen = sh.RemainderLength
135		}
136		data = make([]byte, dataLen)
137		if _, err := localFile.ReadAt(data, offset2); err != nil {
138			return err
139		}
140
141		if _, err := h.Write(data); err != nil {
142			return err
143		}
144
145		if _, err := w.Write(data); err != nil {
146			if errors.Is(err, io.ErrClosedPipe) {
147				continue
148			}
149			return err
150		}
151	}
152
153	if err := w.Close(); err != nil {
154		return err
155	}
156
157	wg.Wait()
158
159	localSum := h.Sum(nil)
160	remoteSum := make([]byte, len(localSum))
161	if _, err := io.ReadFull(rt.Conn.Reader, remoteSum); err != nil {
162		return err
163	}
164	if !bytes.Equal(localSum, remoteSum) {
165		return fmt.Errorf("file corruption in %s", f.Name)
166	}
167	rt.Logger.Debug("checksum matches!", "localSum", localSum)
168
169	return nil
170}