repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / pgs
Eric Bower  ·  2026-04-20

uploader.go

  1package pgs
  2
  3import (
  4	"bytes"
  5	"context"
  6	"fmt"
  7	"io"
  8	"io/fs"
  9	"log/slog"
 10	"os"
 11	"path"
 12	"path/filepath"
 13	"slices"
 14	"strings"
 15	"sync"
 16	"time"
 17
 18	pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
 19	"github.com/picosh/pico/pkg/db"
 20	"github.com/picosh/pico/pkg/pssh"
 21	sendutils "github.com/picosh/pico/pkg/send/utils"
 22	"github.com/picosh/pico/pkg/shared"
 23	"github.com/picosh/pico/pkg/storage"
 24	ignore "github.com/sabhiram/go-gitignore"
 25)
 26
 27type ctxBucketKey struct{}
 28type ctxStorageSizeKey struct{}
 29type ctxProjectKey struct{}
 30type ctxDenylistKey struct{}
 31
 32type DenyList struct {
 33	Denylist string
 34}
 35
 36func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
 37	v := s.Context().Value(ctxDenylistKey{})
 38	if v == nil {
 39		return nil
 40	}
 41	denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
 42	return denylist
 43}
 44
 45func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
 46	s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
 47}
 48
 49func getProject(s *pssh.SSHServerConnSession) *db.Project {
 50	v := s.Context().Value(ctxProjectKey{})
 51	if v == nil {
 52		return nil
 53	}
 54	project := s.Context().Value(ctxProjectKey{}).(*db.Project)
 55	return project
 56}
 57
 58func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
 59	s.SetValue(ctxProjectKey{}, project)
 60}
 61
 62func getBucket(s *pssh.SSHServerConnSession) (storage.Bucket, error) {
 63	bucket := s.Context().Value(ctxBucketKey{}).(storage.Bucket)
 64	if bucket.Name == "" {
 65		return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
 66	}
 67	return bucket, nil
 68}
 69
 70func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
 71	return s.Context().Value(ctxStorageSizeKey{}).(uint64)
 72}
 73
 74func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
 75	curSize := getStorageSize(s)
 76	var nextStorageSize uint64
 77	if fileSize < 0 {
 78		nextStorageSize = curSize - uint64(fileSize)
 79	} else {
 80		nextStorageSize = curSize + uint64(fileSize)
 81	}
 82	s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
 83	return nextStorageSize
 84}
 85
 86func shouldIgnoreFile(fp, ignoreStr string) bool {
 87	object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
 88	return object.MatchesPath(fp)
 89}
 90
 91type FileData struct {
 92	*sendutils.FileEntry
 93	User     *db.User
 94	Bucket   storage.Bucket
 95	Project  *db.Project
 96	DenyList string
 97}
 98
 99type UploadAssetHandler struct {
100	Cfg                *PgsConfig
101	CacheClearingQueue chan string
102}
103
104func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
105	go runCacheQueue(cfg, ctx)
106	return &UploadAssetHandler{
107		Cfg:                cfg,
108		CacheClearingQueue: ch,
109	}
110}
111
112func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
113	return pssh.GetLogger(s)
114}
115
116func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
117	logger := pssh.GetLogger(s)
118	user := pssh.GetUser(s)
119
120	if user == nil {
121		err := fmt.Errorf("could not get user from ctx")
122		logger.Error("error getting user from ctx", "err", err)
123		return nil, nil, err
124	}
125
126	fileInfo := &sendutils.VirtualFile{
127		FName:    filepath.Base(entry.Filepath),
128		FIsDir:   false,
129		FSize:    entry.Size,
130		FModTime: time.Unix(entry.Mtime, 0),
131	}
132
133	bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
134	if err != nil {
135		return nil, nil, err
136	}
137
138	fname := shared.GetAssetFileName(entry)
139	contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
140	if err != nil {
141		return nil, nil, err
142	}
143
144	fileInfo.FSize = info.Size
145	fileInfo.FModTime = info.LastModified
146
147	return fileInfo, contents, nil
148}
149
150func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
151	var fileList []os.FileInfo
152
153	logger := pssh.GetLogger(s)
154	user := pssh.GetUser(s)
155
156	if user == nil {
157		err := fmt.Errorf("could not get user from ctx")
158		logger.Error("error getting user from ctx", "err", err)
159		return fileList, err
160	}
161
162	cleanFilename := fpath
163
164	bucketName := shared.GetAssetBucketName(user.ID)
165	bucket, err := h.Cfg.Storage.GetBucket(bucketName)
166	if err != nil {
167		return fileList, err
168	}
169
170	if cleanFilename == "" || cleanFilename == "." {
171		name := cleanFilename
172		if name == "" {
173			name = "/"
174		}
175
176		info := &sendutils.VirtualFile{
177			FName:  name,
178			FIsDir: true,
179		}
180
181		fileList = append(fileList, info)
182	} else {
183		if cleanFilename != "/" && isDir {
184			cleanFilename += "/"
185		}
186
187		foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
188		if err != nil {
189			return fileList, err
190		}
191
192		fileList = append(fileList, foundList...)
193	}
194
195	return fileList, nil
196}
197
198func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
199	logger := pssh.GetLogger(s)
200	user := pssh.GetUser(s)
201
202	if user == nil {
203		err := fmt.Errorf("could not get user from ctx")
204		logger.Error("error getting user from ctx", "err", err)
205		return err
206	}
207
208	assetBucket := shared.GetAssetBucketName(user.ID)
209	bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
210	if err != nil {
211		return err
212	}
213
214	s.SetValue(ctxBucketKey{}, bucket)
215
216	totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
217	if err != nil {
218		return err
219	}
220
221	s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
222
223	logger.Info(
224		"bucket size",
225		"user", user.Name,
226		"bytes", totalStorageSize,
227	)
228
229	logger.Info(
230		"attempting to upload files",
231		"user", user.Name,
232		"txtPrefix", h.Cfg.TxtPrefix,
233	)
234
235	return nil
236}
237
238func (h *UploadAssetHandler) findDenylist(bucket storage.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
239	fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
240	if err != nil {
241		return "", fmt.Errorf("_pgs_ignore not found")
242	}
243	defer func() {
244		_ = fp.Close()
245	}()
246
247	buf := new(strings.Builder)
248	_, err = io.Copy(buf, fp)
249	if err != nil {
250		logger.Error("io copy", "err", err.Error())
251		return "", err
252	}
253
254	str := buf.String()
255	return str, nil
256}
257
258func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
259	ff, _ := dbpool.FindFeature(userID, "plus")
260	// we have free tiers so users might not have a feature flag
261	// in which case we set sane defaults
262	if ff == nil {
263		ff = db.NewFeatureFlag(
264			userID,
265			"plus",
266			cfg.MaxSize,
267			cfg.MaxAssetSize,
268			cfg.MaxSpecialFileSize,
269		)
270	}
271	// this is jank
272	ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
273	ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
274	ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
275	return ff
276}
277
278func mtimeToTime(entry *sendutils.FileEntry) time.Time {
279	var mtime time.Time
280	if entry.Mtime > 0 {
281		return time.Unix(entry.Mtime, 0)
282	}
283	return mtime
284}
285
286func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
287	logger := pssh.GetLogger(s)
288	user := pssh.GetUser(s)
289
290	if user == nil {
291		err := fmt.Errorf("could not get user from ctx")
292		logger.Error("error getting user from ctx", "err", err)
293		return "", err
294	}
295
296	if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
297		entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
298	}
299
300	logger = logger.With(
301		"file", entry.Filepath,
302		"size", entry.Size,
303	)
304
305	bucket, err := getBucket(s)
306	if err != nil {
307		logger.Error("could not find bucket in ctx", "err", err.Error())
308		return "", err
309	}
310
311	project := getProject(s)
312	projectName := shared.GetProjectName(entry)
313	logger = logger.With("project", projectName)
314
315	// find, create, or update project if we haven't already done it
316	// we need to also check if the project stored in ctx is the same project
317	// being uploaded since users can keep an ssh connection alive via sftp
318	// and created many projects in a single session
319	if project == nil || project.Name != projectName {
320		project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
321		if err != nil {
322			logger.Error("upsert project", "err", err.Error())
323			return "", err
324		}
325		setProject(s, project)
326	}
327
328	if project.Blocked != "" {
329		msg := "project has been blocked and cannot upload files: %s"
330		return "", fmt.Errorf(msg, project.Blocked)
331	}
332
333	info := &storage.ObjectInfo{
334		LastModified: mtimeToTime(entry),
335	}
336	if entry.Mode.IsDir() {
337		_, _, err := h.Cfg.Storage.PutObject(
338			bucket,
339			path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
340			bytes.NewReader([]byte{}),
341			info,
342		)
343		return "", err
344	}
345
346	featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
347	if !featureFlag.IsValid() && pgsdb.IsProjectPrivate(projectName) {
348		return "", fmt.Errorf("private projects are only allowed for pico+ users")
349	}
350
351	// calculate the filsize difference between the same file already
352	// stored and the updated file being uploaded
353	assetFilename := shared.GetAssetFileName(entry)
354	obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
355	var curFileSize int64
356	if info != nil {
357		curFileSize = info.Size
358	}
359	if obj != nil {
360		defer func() {
361			_ = obj.Close()
362		}()
363	}
364
365	denylist := getDenylist(s)
366	if denylist == nil {
367		dlist, err := h.findDenylist(bucket, project, logger)
368		if err != nil {
369			logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
370			dlist = ".*"
371		}
372		setDenylist(s, dlist)
373		denylist = &DenyList{Denylist: dlist}
374	}
375
376	data := &FileData{
377		FileEntry: entry,
378		User:      user,
379		Bucket:    bucket,
380		DenyList:  denylist.Denylist,
381		Project:   project,
382	}
383
384	valid, err := h.validateAsset(data)
385	if !valid {
386		return "", err
387	}
388
389	// SFTP does not report file size so the more performant way to
390	//   check filesize constraints is to try and upload the file to s3
391	//	 with a specialized reader that raises an error if the filesize limit
392	//	 has been reached
393	storageMax := featureFlag.Data.StorageMax
394	fileMax := featureFlag.Data.FileMax
395	curStorageSize := getStorageSize(s)
396	remaining := int64(storageMax) - int64(curStorageSize)
397	sizeRemaining := min(remaining+curFileSize, fileMax)
398	if sizeRemaining <= 0 {
399		_, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
400		_, _ = fmt.Fprintf(s.Stderr(), "\r")
401		_ = s.Exit(1)
402		_ = s.Close()
403		return "", fmt.Errorf("storage quota reached")
404	}
405	logger = logger.With(
406		"storageMax", storageMax,
407		"currentStorageMax", curStorageSize,
408		"fileMax", fileMax,
409		"sizeRemaining", sizeRemaining,
410	)
411
412	specialFileMax := featureFlag.Data.SpecialFileMax
413	if isSpecialFile(entry.Filepath) {
414		sizeRemaining = min(sizeRemaining, specialFileMax)
415	}
416
417	fsize, err := h.writeAsset(
418		s,
419		shared.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
420		data,
421	)
422	if err != nil {
423		logger.Error("could not write asset", "err", err.Error())
424		cerr := fmt.Errorf(
425			"%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
426			err,
427			shared.BytesToMB(int(curStorageSize)),
428			shared.BytesToMB(int(storageMax)),
429			shared.BytesToMB(int(fileMax)),
430			shared.BytesToMB(int(specialFileMax)),
431		)
432		return "", cerr
433	}
434
435	deltaFileSize := curFileSize - fsize
436	nextStorageSize := incrementStorageSize(s, deltaFileSize)
437
438	url := h.Cfg.AssetURL(
439		user.Name,
440		projectName,
441		strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
442	)
443
444	maxSize := int(featureFlag.Data.StorageMax)
445	str := fmt.Sprintf(
446		"%s (space: %.2f/%.2fGB, %.2f%%)",
447		url,
448		shared.BytesToGB(int(nextStorageSize)),
449		shared.BytesToGB(maxSize),
450		(float32(nextStorageSize)/float32(maxSize))*100,
451	)
452
453	surrogate := getSurrogateKey(user.Name, projectName)
454	h.Cfg.CacheClearingQueue <- surrogate
455
456	return str, err
457}
458
459func isSpecialFile(entry string) bool {
460	fname := filepath.Base(entry)
461	return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
462}
463
464func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
465	logger := pssh.GetLogger(s)
466	user := pssh.GetUser(s)
467
468	if user == nil {
469		err := fmt.Errorf("could not get user from ctx")
470		logger.Error("error getting user from ctx", "err", err)
471		return err
472	}
473
474	if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
475		entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
476	}
477
478	assetFilepath := shared.GetAssetFileName(entry)
479
480	logger = logger.With(
481		"file", assetFilepath,
482	)
483
484	bucket, err := getBucket(s)
485	if err != nil {
486		logger.Error("could not find bucket in ctx", "err", err.Error())
487		return err
488	}
489
490	projectName := shared.GetProjectName(entry)
491	logger = logger.With("project", projectName)
492
493	if assetFilepath == filepath.Join("/", projectName, "._pico_keep_dir") {
494		return os.ErrPermission
495	}
496
497	logger.Info("deleting file")
498
499	pathDir := filepath.Dir(assetFilepath)
500	fileName := filepath.Base(assetFilepath)
501
502	sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
503	if err != nil {
504		return err
505	}
506
507	sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
508		return sib.Name() == fileName
509	})
510
511	if len(sibs) == 0 {
512		info := &storage.ObjectInfo{
513			LastModified: mtimeToTime(entry),
514		}
515		_, _, err := h.Cfg.Storage.PutObject(
516			bucket,
517			filepath.Join(pathDir, "._pico_keep_dir"),
518			bytes.NewReader([]byte{}),
519			info,
520		)
521		if err != nil {
522			return err
523		}
524	}
525	err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
526
527	surrogate := getSurrogateKey(user.Name, projectName)
528	h.Cfg.CacheClearingQueue <- surrogate
529
530	if err != nil {
531		return err
532	}
533
534	return err
535}
536
537func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
538	fname := filepath.Base(data.Filepath)
539
540	projectName := shared.GetProjectName(data.FileEntry)
541	if projectName == "" || projectName == "/" || projectName == "." {
542		return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
543	}
544
545	// special files we use for custom routing
546	if isSpecialFile(fname) {
547		return true, nil
548	}
549
550	fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
551	if shouldIgnoreFile(fpath, data.DenyList) {
552		err := fmt.Errorf(
553			"ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
554			data.Filepath,
555		)
556		return false, err
557	}
558
559	return true, nil
560}
561
562func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
563	assetFilepath := shared.GetAssetFileName(data.FileEntry)
564
565	logger := h.GetLogger(s)
566	logger.Info(
567		"uploading file to bucket",
568		"bucket", data.Bucket.Name,
569		"filename", assetFilepath,
570	)
571
572	info := &storage.ObjectInfo{
573		LastModified: mtimeToTime(data.FileEntry),
574	}
575	_, fsize, err := h.Cfg.Storage.PutObject(
576		data.Bucket,
577		assetFilepath,
578		reader,
579		info,
580	)
581	return fsize, err
582}
583
584// runCacheQueue processes requests to purge the cache for a single site.
585// One message arrives per file that is written/deleted during uploads.
586// Repeated messages for the same site are grouped so that we only flush once
587// per site per 5 seconds.
588func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
589	var pendingFlushes sync.Map
590	tick := time.NewTicker(5 * time.Second)
591	defer tick.Stop()
592	for {
593		select {
594		case <-ctx.Done():
595			return
596		case host := <-cfg.CacheClearingQueue:
597			pendingFlushes.Store(host, host)
598		case <-tick.C:
599			go func() {
600				pendingFlushes.Range(func(key, value any) bool {
601					pendingFlushes.Delete(key)
602					err := purgeCache(cfg, cfg.Pubsub, key.(string))
603					if err != nil {
604						cfg.Logger.Error("failed to clear cache", "err", err.Error())
605					}
606					return true
607				})
608			}()
609		}
610	}
611}