repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / pgs
Eric Bower  ·  2026-03-05

uploader.go

  1package pgs
  2
  3import (
  4	"bytes"
  5	"context"
  6	"fmt"
  7	"io"
  8	"io/fs"
  9	"log/slog"
 10	"os"
 11	"path"
 12	"path/filepath"
 13	"slices"
 14	"strings"
 15	"sync"
 16	"time"
 17
 18	pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
 19	"github.com/picosh/pico/pkg/db"
 20	"github.com/picosh/pico/pkg/pobj"
 21	sst "github.com/picosh/pico/pkg/pobj/storage"
 22	"github.com/picosh/pico/pkg/pssh"
 23	sendutils "github.com/picosh/pico/pkg/send/utils"
 24	"github.com/picosh/pico/pkg/shared"
 25	ignore "github.com/sabhiram/go-gitignore"
 26)
 27
 28type ctxBucketKey struct{}
 29type ctxStorageSizeKey struct{}
 30type ctxProjectKey struct{}
 31type ctxDenylistKey struct{}
 32
 33type DenyList struct {
 34	Denylist string
 35}
 36
 37func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
 38	v := s.Context().Value(ctxDenylistKey{})
 39	if v == nil {
 40		return nil
 41	}
 42	denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
 43	return denylist
 44}
 45
 46func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
 47	s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
 48}
 49
 50func getProject(s *pssh.SSHServerConnSession) *db.Project {
 51	v := s.Context().Value(ctxProjectKey{})
 52	if v == nil {
 53		return nil
 54	}
 55	project := s.Context().Value(ctxProjectKey{}).(*db.Project)
 56	return project
 57}
 58
 59func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
 60	s.SetValue(ctxProjectKey{}, project)
 61}
 62
 63func getBucket(s *pssh.SSHServerConnSession) (sst.Bucket, error) {
 64	bucket := s.Context().Value(ctxBucketKey{}).(sst.Bucket)
 65	if bucket.Name == "" {
 66		return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
 67	}
 68	return bucket, nil
 69}
 70
 71func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
 72	return s.Context().Value(ctxStorageSizeKey{}).(uint64)
 73}
 74
 75func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
 76	curSize := getStorageSize(s)
 77	var nextStorageSize uint64
 78	if fileSize < 0 {
 79		nextStorageSize = curSize - uint64(fileSize)
 80	} else {
 81		nextStorageSize = curSize + uint64(fileSize)
 82	}
 83	s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
 84	return nextStorageSize
 85}
 86
 87func shouldIgnoreFile(fp, ignoreStr string) bool {
 88	object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
 89	return object.MatchesPath(fp)
 90}
 91
 92type FileData struct {
 93	*sendutils.FileEntry
 94	User     *db.User
 95	Bucket   sst.Bucket
 96	Project  *db.Project
 97	DenyList string
 98}
 99
100type UploadAssetHandler struct {
101	Cfg                *PgsConfig
102	CacheClearingQueue chan string
103}
104
105func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
106	go runCacheQueue(cfg, ctx)
107	return &UploadAssetHandler{
108		Cfg:                cfg,
109		CacheClearingQueue: ch,
110	}
111}
112
113func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
114	return pssh.GetLogger(s)
115}
116
117func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
118	logger := pssh.GetLogger(s)
119	user := pssh.GetUser(s)
120
121	if user == nil {
122		err := fmt.Errorf("could not get user from ctx")
123		logger.Error("error getting user from ctx", "err", err)
124		return nil, nil, err
125	}
126
127	fileInfo := &sendutils.VirtualFile{
128		FName:    filepath.Base(entry.Filepath),
129		FIsDir:   false,
130		FSize:    entry.Size,
131		FModTime: time.Unix(entry.Mtime, 0),
132	}
133
134	bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
135	if err != nil {
136		return nil, nil, err
137	}
138
139	fname := shared.GetAssetFileName(entry)
140	contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
141	if err != nil {
142		return nil, nil, err
143	}
144
145	fileInfo.FSize = info.Size
146	fileInfo.FModTime = info.LastModified
147
148	reader := pobj.NewAllReaderAt(contents)
149
150	return fileInfo, reader, nil
151}
152
153func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
154	var fileList []os.FileInfo
155
156	logger := pssh.GetLogger(s)
157	user := pssh.GetUser(s)
158
159	if user == nil {
160		err := fmt.Errorf("could not get user from ctx")
161		logger.Error("error getting user from ctx", "err", err)
162		return fileList, err
163	}
164
165	cleanFilename := fpath
166
167	bucketName := shared.GetAssetBucketName(user.ID)
168	bucket, err := h.Cfg.Storage.GetBucket(bucketName)
169	if err != nil {
170		return fileList, err
171	}
172
173	if cleanFilename == "" || cleanFilename == "." {
174		name := cleanFilename
175		if name == "" {
176			name = "/"
177		}
178
179		info := &sendutils.VirtualFile{
180			FName:  name,
181			FIsDir: true,
182		}
183
184		fileList = append(fileList, info)
185	} else {
186		if cleanFilename != "/" && isDir {
187			cleanFilename += "/"
188		}
189
190		foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
191		if err != nil {
192			return fileList, err
193		}
194
195		fileList = append(fileList, foundList...)
196	}
197
198	return fileList, nil
199}
200
201func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
202	logger := pssh.GetLogger(s)
203	user := pssh.GetUser(s)
204
205	if user == nil {
206		err := fmt.Errorf("could not get user from ctx")
207		logger.Error("error getting user from ctx", "err", err)
208		return err
209	}
210
211	assetBucket := shared.GetAssetBucketName(user.ID)
212	bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
213	if err != nil {
214		return err
215	}
216
217	s.SetValue(ctxBucketKey{}, bucket)
218
219	totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
220	if err != nil {
221		return err
222	}
223
224	s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
225
226	logger.Info(
227		"bucket size",
228		"user", user.Name,
229		"bytes", totalStorageSize,
230	)
231
232	logger.Info(
233		"attempting to upload files",
234		"user", user.Name,
235		"txtPrefix", h.Cfg.TxtPrefix,
236	)
237
238	return nil
239}
240
241func (h *UploadAssetHandler) findDenylist(bucket sst.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
242	fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
243	if err != nil {
244		return "", fmt.Errorf("_pgs_ignore not found")
245	}
246	defer func() {
247		_ = fp.Close()
248	}()
249
250	buf := new(strings.Builder)
251	_, err = io.Copy(buf, fp)
252	if err != nil {
253		logger.Error("io copy", "err", err.Error())
254		return "", err
255	}
256
257	str := buf.String()
258	return str, nil
259}
260
261func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
262	ff, _ := dbpool.FindFeature(userID, "plus")
263	// we have free tiers so users might not have a feature flag
264	// in which case we set sane defaults
265	if ff == nil {
266		ff = db.NewFeatureFlag(
267			userID,
268			"plus",
269			cfg.MaxSize,
270			cfg.MaxAssetSize,
271			cfg.MaxSpecialFileSize,
272		)
273	}
274	// this is jank
275	ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
276	ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
277	ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
278	return ff
279}
280
281func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
282	logger := pssh.GetLogger(s)
283	user := pssh.GetUser(s)
284
285	if user == nil {
286		err := fmt.Errorf("could not get user from ctx")
287		logger.Error("error getting user from ctx", "err", err)
288		return "", err
289	}
290
291	if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
292		entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
293	}
294
295	logger = logger.With(
296		"file", entry.Filepath,
297		"size", entry.Size,
298	)
299
300	bucket, err := getBucket(s)
301	if err != nil {
302		logger.Error("could not find bucket in ctx", "err", err.Error())
303		return "", err
304	}
305
306	project := getProject(s)
307	projectName := shared.GetProjectName(entry)
308	logger = logger.With("project", projectName)
309
310	// find, create, or update project if we haven't already done it
311	// we need to also check if the project stored in ctx is the same project
312	// being uploaded since users can keep an ssh connection alive via sftp
313	// and created many projects in a single session
314	if project == nil || project.Name != projectName {
315		project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
316		if err != nil {
317			logger.Error("upsert project", "err", err.Error())
318			return "", err
319		}
320		setProject(s, project)
321	}
322
323	if project.Blocked != "" {
324		msg := "project has been blocked and cannot upload files: %s"
325		return "", fmt.Errorf(msg, project.Blocked)
326	}
327
328	if entry.Mode.IsDir() {
329		_, _, err := h.Cfg.Storage.PutObject(
330			bucket,
331			path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
332			bytes.NewReader([]byte{}),
333			entry,
334		)
335		return "", err
336	}
337
338	featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
339	if !featureFlag.IsValid() && pgsdb.IsProjectPrivate(projectName) {
340		return "", fmt.Errorf("private projects are only allowed for pico+ users")
341	}
342
343	// calculate the filsize difference between the same file already
344	// stored and the updated file being uploaded
345	assetFilename := shared.GetAssetFileName(entry)
346	obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
347	var curFileSize int64
348	if info != nil {
349		curFileSize = info.Size
350	}
351	if obj != nil {
352		defer func() {
353			_ = obj.Close()
354		}()
355	}
356
357	denylist := getDenylist(s)
358	if denylist == nil {
359		dlist, err := h.findDenylist(bucket, project, logger)
360		if err != nil {
361			logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
362			dlist = ".*"
363		}
364		setDenylist(s, dlist)
365		denylist = &DenyList{Denylist: dlist}
366	}
367
368	data := &FileData{
369		FileEntry: entry,
370		User:      user,
371		Bucket:    bucket,
372		DenyList:  denylist.Denylist,
373		Project:   project,
374	}
375
376	valid, err := h.validateAsset(data)
377	if !valid {
378		return "", err
379	}
380
381	// SFTP does not report file size so the more performant way to
382	//   check filesize constraints is to try and upload the file to s3
383	//	 with a specialized reader that raises an error if the filesize limit
384	//	 has been reached
385	storageMax := featureFlag.Data.StorageMax
386	fileMax := featureFlag.Data.FileMax
387	curStorageSize := getStorageSize(s)
388	remaining := int64(storageMax) - int64(curStorageSize)
389	sizeRemaining := min(remaining+curFileSize, fileMax)
390	if sizeRemaining <= 0 {
391		_, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
392		_, _ = fmt.Fprintf(s.Stderr(), "\r")
393		_ = s.Exit(1)
394		_ = s.Close()
395		return "", fmt.Errorf("storage quota reached")
396	}
397	logger = logger.With(
398		"storageMax", storageMax,
399		"currentStorageMax", curStorageSize,
400		"fileMax", fileMax,
401		"sizeRemaining", sizeRemaining,
402	)
403
404	specialFileMax := featureFlag.Data.SpecialFileMax
405	if isSpecialFile(entry.Filepath) {
406		sizeRemaining = min(sizeRemaining, specialFileMax)
407	}
408
409	fsize, err := h.writeAsset(
410		s,
411		shared.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
412		data,
413	)
414	if err != nil {
415		logger.Error("could not write asset", "err", err.Error())
416		cerr := fmt.Errorf(
417			"%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
418			err,
419			shared.BytesToMB(int(curStorageSize)),
420			shared.BytesToMB(int(storageMax)),
421			shared.BytesToMB(int(fileMax)),
422			shared.BytesToMB(int(specialFileMax)),
423		)
424		return "", cerr
425	}
426
427	deltaFileSize := curFileSize - fsize
428	nextStorageSize := incrementStorageSize(s, deltaFileSize)
429
430	url := h.Cfg.AssetURL(
431		user.Name,
432		projectName,
433		strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
434	)
435
436	maxSize := int(featureFlag.Data.StorageMax)
437	str := fmt.Sprintf(
438		"%s (space: %.2f/%.2fGB, %.2f%%)",
439		url,
440		shared.BytesToGB(int(nextStorageSize)),
441		shared.BytesToGB(maxSize),
442		(float32(nextStorageSize)/float32(maxSize))*100,
443	)
444
445	surrogate := getSurrogateKey(user.Name, projectName)
446	h.Cfg.CacheClearingQueue <- surrogate
447
448	return str, err
449}
450
451func isSpecialFile(entry string) bool {
452	fname := filepath.Base(entry)
453	return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
454}
455
456func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
457	logger := pssh.GetLogger(s)
458	user := pssh.GetUser(s)
459
460	if user == nil {
461		err := fmt.Errorf("could not get user from ctx")
462		logger.Error("error getting user from ctx", "err", err)
463		return err
464	}
465
466	if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
467		entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
468	}
469
470	assetFilepath := shared.GetAssetFileName(entry)
471
472	logger = logger.With(
473		"file", assetFilepath,
474	)
475
476	bucket, err := getBucket(s)
477	if err != nil {
478		logger.Error("could not find bucket in ctx", "err", err.Error())
479		return err
480	}
481
482	projectName := shared.GetProjectName(entry)
483	logger = logger.With("project", projectName)
484
485	if assetFilepath == filepath.Join("/", projectName, "._pico_keep_dir") {
486		return os.ErrPermission
487	}
488
489	logger.Info("deleting file")
490
491	pathDir := filepath.Dir(assetFilepath)
492	fileName := filepath.Base(assetFilepath)
493
494	sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
495	if err != nil {
496		return err
497	}
498
499	sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
500		return sib.Name() == fileName
501	})
502
503	if len(sibs) == 0 {
504		_, _, err := h.Cfg.Storage.PutObject(
505			bucket,
506			filepath.Join(pathDir, "._pico_keep_dir"),
507			bytes.NewReader([]byte{}),
508			entry,
509		)
510		if err != nil {
511			return err
512		}
513	}
514	err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
515
516	surrogate := getSurrogateKey(user.Name, projectName)
517	h.Cfg.CacheClearingQueue <- surrogate
518
519	if err != nil {
520		return err
521	}
522
523	return err
524}
525
526func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
527	fname := filepath.Base(data.Filepath)
528
529	projectName := shared.GetProjectName(data.FileEntry)
530	if projectName == "" || projectName == "/" || projectName == "." {
531		return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
532	}
533
534	// special files we use for custom routing
535	if isSpecialFile(fname) {
536		return true, nil
537	}
538
539	fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
540	if shouldIgnoreFile(fpath, data.DenyList) {
541		err := fmt.Errorf(
542			"ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
543			data.Filepath,
544		)
545		return false, err
546	}
547
548	return true, nil
549}
550
551func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
552	assetFilepath := shared.GetAssetFileName(data.FileEntry)
553
554	logger := h.GetLogger(s)
555	logger.Info(
556		"uploading file to bucket",
557		"bucket", data.Bucket.Name,
558		"filename", assetFilepath,
559	)
560
561	_, fsize, err := h.Cfg.Storage.PutObject(
562		data.Bucket,
563		assetFilepath,
564		reader,
565		data.FileEntry,
566	)
567	return fsize, err
568}
569
570// runCacheQueue processes requests to purge the cache for a single site.
571// One message arrives per file that is written/deleted during uploads.
572// Repeated messages for the same site are grouped so that we only flush once
573// per site per 5 seconds.
574func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
575	var pendingFlushes sync.Map
576	tick := time.Tick(5 * time.Second)
577	for {
578		select {
579		case host := <-cfg.CacheClearingQueue:
580			pendingFlushes.Store(host, host)
581		case <-tick:
582			go func() {
583				pendingFlushes.Range(func(key, value any) bool {
584					pendingFlushes.Delete(key)
585					err := purgeCache(cfg, cfg.Pubsub, key.(string))
586					if err != nil {
587						cfg.Logger.Error("failed to clear cache", "err", err.Error())
588					}
589					return true
590				})
591			}()
592		}
593	}
594}