repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / pgs
Eric Bower  ·  2026-01-25

uploader.go

  1package pgs
  2
  3import (
  4	"bytes"
  5	"context"
  6	"fmt"
  7	"io"
  8	"io/fs"
  9	"log/slog"
 10	"os"
 11	"path"
 12	"path/filepath"
 13	"slices"
 14	"strings"
 15	"sync"
 16	"time"
 17
 18	pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
 19	"github.com/picosh/pico/pkg/db"
 20	"github.com/picosh/pico/pkg/pobj"
 21	sst "github.com/picosh/pico/pkg/pobj/storage"
 22	"github.com/picosh/pico/pkg/pssh"
 23	sendutils "github.com/picosh/pico/pkg/send/utils"
 24	"github.com/picosh/pico/pkg/shared"
 25	ignore "github.com/sabhiram/go-gitignore"
 26)
 27
 28type ctxBucketKey struct{}
 29type ctxStorageSizeKey struct{}
 30type ctxProjectKey struct{}
 31type ctxDenylistKey struct{}
 32
 33type DenyList struct {
 34	Denylist string
 35}
 36
 37func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
 38	v := s.Context().Value(ctxDenylistKey{})
 39	if v == nil {
 40		return nil
 41	}
 42	denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
 43	return denylist
 44}
 45
 46func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
 47	s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
 48}
 49
 50func getProject(s *pssh.SSHServerConnSession) *db.Project {
 51	v := s.Context().Value(ctxProjectKey{})
 52	if v == nil {
 53		return nil
 54	}
 55	project := s.Context().Value(ctxProjectKey{}).(*db.Project)
 56	return project
 57}
 58
 59func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
 60	s.SetValue(ctxProjectKey{}, project)
 61}
 62
 63func getBucket(s *pssh.SSHServerConnSession) (sst.Bucket, error) {
 64	bucket := s.Context().Value(ctxBucketKey{}).(sst.Bucket)
 65	if bucket.Name == "" {
 66		return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
 67	}
 68	return bucket, nil
 69}
 70
 71func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
 72	return s.Context().Value(ctxStorageSizeKey{}).(uint64)
 73}
 74
 75func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
 76	curSize := getStorageSize(s)
 77	var nextStorageSize uint64
 78	if fileSize < 0 {
 79		nextStorageSize = curSize - uint64(fileSize)
 80	} else {
 81		nextStorageSize = curSize + uint64(fileSize)
 82	}
 83	s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
 84	return nextStorageSize
 85}
 86
 87func shouldIgnoreFile(fp, ignoreStr string) bool {
 88	object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
 89	return object.MatchesPath(fp)
 90}
 91
 92type FileData struct {
 93	*sendutils.FileEntry
 94	User     *db.User
 95	Bucket   sst.Bucket
 96	Project  *db.Project
 97	DenyList string
 98}
 99
100type UploadAssetHandler struct {
101	Cfg                *PgsConfig
102	CacheClearingQueue chan string
103}
104
105func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
106	go runCacheQueue(cfg, ctx)
107	return &UploadAssetHandler{
108		Cfg:                cfg,
109		CacheClearingQueue: ch,
110	}
111}
112
113func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
114	return pssh.GetLogger(s)
115}
116
117func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
118	logger := pssh.GetLogger(s)
119	user := pssh.GetUser(s)
120
121	if user == nil {
122		err := fmt.Errorf("could not get user from ctx")
123		logger.Error("error getting user from ctx", "err", err)
124		return nil, nil, err
125	}
126
127	fileInfo := &sendutils.VirtualFile{
128		FName:    filepath.Base(entry.Filepath),
129		FIsDir:   false,
130		FSize:    entry.Size,
131		FModTime: time.Unix(entry.Mtime, 0),
132	}
133
134	bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
135	if err != nil {
136		return nil, nil, err
137	}
138
139	fname := shared.GetAssetFileName(entry)
140	contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
141	if err != nil {
142		return nil, nil, err
143	}
144
145	fileInfo.FSize = info.Size
146	fileInfo.FModTime = info.LastModified
147
148	reader := pobj.NewAllReaderAt(contents)
149
150	return fileInfo, reader, nil
151}
152
153func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
154	var fileList []os.FileInfo
155
156	logger := pssh.GetLogger(s)
157	user := pssh.GetUser(s)
158
159	if user == nil {
160		err := fmt.Errorf("could not get user from ctx")
161		logger.Error("error getting user from ctx", "err", err)
162		return fileList, err
163	}
164
165	cleanFilename := fpath
166
167	bucketName := shared.GetAssetBucketName(user.ID)
168	bucket, err := h.Cfg.Storage.GetBucket(bucketName)
169	if err != nil {
170		return fileList, err
171	}
172
173	if cleanFilename == "" || cleanFilename == "." {
174		name := cleanFilename
175		if name == "" {
176			name = "/"
177		}
178
179		info := &sendutils.VirtualFile{
180			FName:  name,
181			FIsDir: true,
182		}
183
184		fileList = append(fileList, info)
185	} else {
186		if cleanFilename != "/" && isDir {
187			cleanFilename += "/"
188		}
189
190		foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
191		if err != nil {
192			return fileList, err
193		}
194
195		fileList = append(fileList, foundList...)
196	}
197
198	return fileList, nil
199}
200
201func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
202	logger := pssh.GetLogger(s)
203	user := pssh.GetUser(s)
204
205	if user == nil {
206		err := fmt.Errorf("could not get user from ctx")
207		logger.Error("error getting user from ctx", "err", err)
208		return err
209	}
210
211	assetBucket := shared.GetAssetBucketName(user.ID)
212	bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
213	if err != nil {
214		return err
215	}
216
217	s.SetValue(ctxBucketKey{}, bucket)
218
219	totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
220	if err != nil {
221		return err
222	}
223
224	s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
225
226	logger.Info(
227		"bucket size",
228		"user", user.Name,
229		"bytes", totalStorageSize,
230	)
231
232	logger.Info(
233		"attempting to upload files",
234		"user", user.Name,
235		"txtPrefix", h.Cfg.TxtPrefix,
236	)
237
238	return nil
239}
240
241func (h *UploadAssetHandler) findDenylist(bucket sst.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
242	fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
243	if err != nil {
244		return "", fmt.Errorf("_pgs_ignore not found")
245	}
246	defer func() {
247		_ = fp.Close()
248	}()
249
250	buf := new(strings.Builder)
251	_, err = io.Copy(buf, fp)
252	if err != nil {
253		logger.Error("io copy", "err", err.Error())
254		return "", err
255	}
256
257	str := buf.String()
258	return str, nil
259}
260
261func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
262	ff, _ := dbpool.FindFeature(userID, "plus")
263	// we have free tiers so users might not have a feature flag
264	// in which case we set sane defaults
265	if ff == nil {
266		ff = db.NewFeatureFlag(
267			userID,
268			"plus",
269			cfg.MaxSize,
270			cfg.MaxAssetSize,
271			cfg.MaxSpecialFileSize,
272		)
273	}
274	// this is jank
275	ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
276	ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
277	ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
278	return ff
279}
280
281func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
282	logger := pssh.GetLogger(s)
283	user := pssh.GetUser(s)
284
285	if user == nil {
286		err := fmt.Errorf("could not get user from ctx")
287		logger.Error("error getting user from ctx", "err", err)
288		return "", err
289	}
290
291	if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
292		entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
293	}
294
295	logger = logger.With(
296		"file", entry.Filepath,
297		"size", entry.Size,
298	)
299
300	bucket, err := getBucket(s)
301	if err != nil {
302		logger.Error("could not find bucket in ctx", "err", err.Error())
303		return "", err
304	}
305
306	project := getProject(s)
307	projectName := shared.GetProjectName(entry)
308	logger = logger.With("project", projectName)
309
310	// find, create, or update project if we haven't already done it
311	// we need to also check if the project stored in ctx is the same project
312	// being uploaded since users can keep an ssh connection alive via sftp
313	// and created many projects in a single session
314	if project == nil || project.Name != projectName {
315		project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
316		if err != nil {
317			logger.Error("upsert project", "err", err.Error())
318			return "", err
319		}
320		setProject(s, project)
321	}
322
323	if project.Blocked != "" {
324		msg := "project has been blocked and cannot upload files: %s"
325		return "", fmt.Errorf(msg, project.Blocked)
326	}
327
328	if entry.Mode.IsDir() {
329		_, _, err := h.Cfg.Storage.PutObject(
330			bucket,
331			path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
332			bytes.NewReader([]byte{}),
333			entry,
334		)
335		return "", err
336	}
337
338	featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
339	// calculate the filsize difference between the same file already
340	// stored and the updated file being uploaded
341	assetFilename := shared.GetAssetFileName(entry)
342	obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
343	var curFileSize int64
344	if info != nil {
345		curFileSize = info.Size
346	}
347	if obj != nil {
348		defer func() {
349			_ = obj.Close()
350		}()
351	}
352
353	denylist := getDenylist(s)
354	if denylist == nil {
355		dlist, err := h.findDenylist(bucket, project, logger)
356		if err != nil {
357			logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
358			dlist = ".*"
359		}
360		setDenylist(s, dlist)
361		denylist = &DenyList{Denylist: dlist}
362	}
363
364	data := &FileData{
365		FileEntry: entry,
366		User:      user,
367		Bucket:    bucket,
368		DenyList:  denylist.Denylist,
369		Project:   project,
370	}
371
372	valid, err := h.validateAsset(data)
373	if !valid {
374		return "", err
375	}
376
377	// SFTP does not report file size so the more performant way to
378	//   check filesize constraints is to try and upload the file to s3
379	//	 with a specialized reader that raises an error if the filesize limit
380	//	 has been reached
381	storageMax := featureFlag.Data.StorageMax
382	fileMax := featureFlag.Data.FileMax
383	curStorageSize := getStorageSize(s)
384	remaining := int64(storageMax) - int64(curStorageSize)
385	sizeRemaining := min(remaining+curFileSize, fileMax)
386	if sizeRemaining <= 0 {
387		_, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
388		_, _ = fmt.Fprintf(s.Stderr(), "\r")
389		_ = s.Exit(1)
390		_ = s.Close()
391		return "", fmt.Errorf("storage quota reached")
392	}
393	logger = logger.With(
394		"storageMax", storageMax,
395		"currentStorageMax", curStorageSize,
396		"fileMax", fileMax,
397		"sizeRemaining", sizeRemaining,
398	)
399
400	specialFileMax := featureFlag.Data.SpecialFileMax
401	if isSpecialFile(entry.Filepath) {
402		sizeRemaining = min(sizeRemaining, specialFileMax)
403	}
404
405	fsize, err := h.writeAsset(
406		s,
407		shared.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
408		data,
409	)
410	if err != nil {
411		logger.Error("could not write asset", "err", err.Error())
412		cerr := fmt.Errorf(
413			"%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
414			err,
415			shared.BytesToMB(int(curStorageSize)),
416			shared.BytesToMB(int(storageMax)),
417			shared.BytesToMB(int(fileMax)),
418			shared.BytesToMB(int(specialFileMax)),
419		)
420		return "", cerr
421	}
422
423	deltaFileSize := curFileSize - fsize
424	nextStorageSize := incrementStorageSize(s, deltaFileSize)
425
426	url := h.Cfg.AssetURL(
427		user.Name,
428		projectName,
429		strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
430	)
431
432	maxSize := int(featureFlag.Data.StorageMax)
433	str := fmt.Sprintf(
434		"%s (space: %.2f/%.2fGB, %.2f%%)",
435		url,
436		shared.BytesToGB(int(nextStorageSize)),
437		shared.BytesToGB(maxSize),
438		(float32(nextStorageSize)/float32(maxSize))*100,
439	)
440
441	surrogate := getSurrogateKey(user.Name, projectName)
442	h.Cfg.CacheClearingQueue <- surrogate
443
444	return str, err
445}
446
447func isSpecialFile(entry string) bool {
448	fname := filepath.Base(entry)
449	return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
450}
451
452func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
453	logger := pssh.GetLogger(s)
454	user := pssh.GetUser(s)
455
456	if user == nil {
457		err := fmt.Errorf("could not get user from ctx")
458		logger.Error("error getting user from ctx", "err", err)
459		return err
460	}
461
462	if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
463		entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
464	}
465
466	assetFilepath := shared.GetAssetFileName(entry)
467
468	logger = logger.With(
469		"file", assetFilepath,
470	)
471
472	bucket, err := getBucket(s)
473	if err != nil {
474		logger.Error("could not find bucket in ctx", "err", err.Error())
475		return err
476	}
477
478	projectName := shared.GetProjectName(entry)
479	logger = logger.With("project", projectName)
480
481	if assetFilepath == filepath.Join("/", projectName, "._pico_keep_dir") {
482		return os.ErrPermission
483	}
484
485	logger.Info("deleting file")
486
487	pathDir := filepath.Dir(assetFilepath)
488	fileName := filepath.Base(assetFilepath)
489
490	sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
491	if err != nil {
492		return err
493	}
494
495	sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
496		return sib.Name() == fileName
497	})
498
499	if len(sibs) == 0 {
500		_, _, err := h.Cfg.Storage.PutObject(
501			bucket,
502			filepath.Join(pathDir, "._pico_keep_dir"),
503			bytes.NewReader([]byte{}),
504			entry,
505		)
506		if err != nil {
507			return err
508		}
509	}
510	err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
511
512	surrogate := getSurrogateKey(user.Name, projectName)
513	h.Cfg.CacheClearingQueue <- surrogate
514
515	if err != nil {
516		return err
517	}
518
519	return err
520}
521
522func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
523	fname := filepath.Base(data.Filepath)
524
525	projectName := shared.GetProjectName(data.FileEntry)
526	if projectName == "" || projectName == "/" || projectName == "." {
527		return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
528	}
529
530	// special files we use for custom routing
531	if isSpecialFile(fname) {
532		return true, nil
533	}
534
535	fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
536	if shouldIgnoreFile(fpath, data.DenyList) {
537		err := fmt.Errorf(
538			"ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
539			data.Filepath,
540		)
541		return false, err
542	}
543
544	return true, nil
545}
546
547func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
548	assetFilepath := shared.GetAssetFileName(data.FileEntry)
549
550	logger := h.GetLogger(s)
551	logger.Info(
552		"uploading file to bucket",
553		"bucket", data.Bucket.Name,
554		"filename", assetFilepath,
555	)
556
557	_, fsize, err := h.Cfg.Storage.PutObject(
558		data.Bucket,
559		assetFilepath,
560		reader,
561		data.FileEntry,
562	)
563	return fsize, err
564}
565
566// runCacheQueue processes requests to purge the cache for a single site.
567// One message arrives per file that is written/deleted during uploads.
568// Repeated messages for the same site are grouped so that we only flush once
569// per site per 5 seconds.
570func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
571	var pendingFlushes sync.Map
572	tick := time.Tick(5 * time.Second)
573	for {
574		select {
575		case host := <-cfg.CacheClearingQueue:
576			pendingFlushes.Store(host, host)
577		case <-tick:
578			go func() {
579				pendingFlushes.Range(func(key, value any) bool {
580					pendingFlushes.Delete(key)
581					err := purgeCache(cfg, cfg.Pubsub, key.(string))
582					if err != nil {
583						cfg.Logger.Error("failed to clear cache", "err", err.Error())
584					}
585					return true
586				})
587			}()
588		}
589	}
590}