repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / pgs
Eric Bower  ·  2025-05-25

uploader.go

  1package pgs
  2
  3import (
  4	"bytes"
  5	"context"
  6	"fmt"
  7	"io"
  8	"io/fs"
  9	"log/slog"
 10	"os"
 11	"path"
 12	"path/filepath"
 13	"slices"
 14	"strings"
 15	"sync"
 16	"time"
 17
 18	pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
 19	"github.com/picosh/pico/pkg/db"
 20	"github.com/picosh/pico/pkg/pobj"
 21	sst "github.com/picosh/pico/pkg/pobj/storage"
 22	"github.com/picosh/pico/pkg/pssh"
 23	sendutils "github.com/picosh/pico/pkg/send/utils"
 24	"github.com/picosh/pico/pkg/shared"
 25	"github.com/picosh/utils"
 26	ignore "github.com/sabhiram/go-gitignore"
 27)
 28
 29type ctxBucketKey struct{}
 30type ctxStorageSizeKey struct{}
 31type ctxProjectKey struct{}
 32type ctxDenylistKey struct{}
 33
 34type DenyList struct {
 35	Denylist string
 36}
 37
 38func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
 39	v := s.Context().Value(ctxDenylistKey{})
 40	if v == nil {
 41		return nil
 42	}
 43	denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
 44	return denylist
 45}
 46
 47func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
 48	s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
 49}
 50
 51func getProject(s *pssh.SSHServerConnSession) *db.Project {
 52	v := s.Context().Value(ctxProjectKey{})
 53	if v == nil {
 54		return nil
 55	}
 56	project := s.Context().Value(ctxProjectKey{}).(*db.Project)
 57	return project
 58}
 59
 60func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
 61	s.SetValue(ctxProjectKey{}, project)
 62}
 63
 64func getBucket(s *pssh.SSHServerConnSession) (sst.Bucket, error) {
 65	bucket := s.Context().Value(ctxBucketKey{}).(sst.Bucket)
 66	if bucket.Name == "" {
 67		return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
 68	}
 69	return bucket, nil
 70}
 71
 72func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
 73	return s.Context().Value(ctxStorageSizeKey{}).(uint64)
 74}
 75
 76func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
 77	curSize := getStorageSize(s)
 78	var nextStorageSize uint64
 79	if fileSize < 0 {
 80		nextStorageSize = curSize - uint64(fileSize)
 81	} else {
 82		nextStorageSize = curSize + uint64(fileSize)
 83	}
 84	s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
 85	return nextStorageSize
 86}
 87
 88func shouldIgnoreFile(fp, ignoreStr string) bool {
 89	object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
 90	return object.MatchesPath(fp)
 91}
 92
 93type FileData struct {
 94	*sendutils.FileEntry
 95	User     *db.User
 96	Bucket   sst.Bucket
 97	Project  *db.Project
 98	DenyList string
 99}
100
101type UploadAssetHandler struct {
102	Cfg                *PgsConfig
103	CacheClearingQueue chan string
104}
105
106func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
107	go runCacheQueue(cfg, ctx)
108	return &UploadAssetHandler{
109		Cfg:                cfg,
110		CacheClearingQueue: ch,
111	}
112}
113
114func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
115	return pssh.GetLogger(s)
116}
117
118func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
119	logger := pssh.GetLogger(s)
120	user := pssh.GetUser(s)
121
122	if user == nil {
123		err := fmt.Errorf("could not get user from ctx")
124		logger.Error("error getting user from ctx", "err", err)
125		return nil, nil, err
126	}
127
128	fileInfo := &sendutils.VirtualFile{
129		FName:    filepath.Base(entry.Filepath),
130		FIsDir:   false,
131		FSize:    entry.Size,
132		FModTime: time.Unix(entry.Mtime, 0),
133	}
134
135	bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
136	if err != nil {
137		return nil, nil, err
138	}
139
140	fname := shared.GetAssetFileName(entry)
141	contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
142	if err != nil {
143		return nil, nil, err
144	}
145
146	fileInfo.FSize = info.Size
147	fileInfo.FModTime = info.LastModified
148
149	reader := pobj.NewAllReaderAt(contents)
150
151	return fileInfo, reader, nil
152}
153
154func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
155	var fileList []os.FileInfo
156
157	logger := pssh.GetLogger(s)
158	user := pssh.GetUser(s)
159
160	if user == nil {
161		err := fmt.Errorf("could not get user from ctx")
162		logger.Error("error getting user from ctx", "err", err)
163		return fileList, err
164	}
165
166	cleanFilename := fpath
167
168	bucketName := shared.GetAssetBucketName(user.ID)
169	bucket, err := h.Cfg.Storage.GetBucket(bucketName)
170	if err != nil {
171		return fileList, err
172	}
173
174	if cleanFilename == "" || cleanFilename == "." {
175		name := cleanFilename
176		if name == "" {
177			name = "/"
178		}
179
180		info := &sendutils.VirtualFile{
181			FName:  name,
182			FIsDir: true,
183		}
184
185		fileList = append(fileList, info)
186	} else {
187		if cleanFilename != "/" && isDir {
188			cleanFilename += "/"
189		}
190
191		foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
192		if err != nil {
193			return fileList, err
194		}
195
196		fileList = append(fileList, foundList...)
197	}
198
199	return fileList, nil
200}
201
202func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
203	logger := pssh.GetLogger(s)
204	user := pssh.GetUser(s)
205
206	if user == nil {
207		err := fmt.Errorf("could not get user from ctx")
208		logger.Error("error getting user from ctx", "err", err)
209		return err
210	}
211
212	assetBucket := shared.GetAssetBucketName(user.ID)
213	bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
214	if err != nil {
215		return err
216	}
217
218	s.SetValue(ctxBucketKey{}, bucket)
219
220	totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
221	if err != nil {
222		return err
223	}
224
225	s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
226
227	logger.Info(
228		"bucket size",
229		"user", user.Name,
230		"bytes", totalStorageSize,
231	)
232
233	logger.Info(
234		"attempting to upload files",
235		"user", user.Name,
236		"txtPrefix", h.Cfg.TxtPrefix,
237	)
238
239	return nil
240}
241
242func (h *UploadAssetHandler) findDenylist(bucket sst.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
243	fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
244	if err != nil {
245		return "", fmt.Errorf("_pgs_ignore not found")
246	}
247	defer func() {
248		_ = fp.Close()
249	}()
250
251	buf := new(strings.Builder)
252	_, err = io.Copy(buf, fp)
253	if err != nil {
254		logger.Error("io copy", "err", err.Error())
255		return "", err
256	}
257
258	str := buf.String()
259	return str, nil
260}
261
262func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
263	ff, _ := dbpool.FindFeature(userID, "plus")
264	// we have free tiers so users might not have a feature flag
265	// in which case we set sane defaults
266	if ff == nil {
267		ff = db.NewFeatureFlag(
268			userID,
269			"plus",
270			cfg.MaxSize,
271			cfg.MaxAssetSize,
272			cfg.MaxSpecialFileSize,
273		)
274	}
275	// this is jank
276	ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
277	ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
278	ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
279	return ff
280}
281
282func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
283	logger := pssh.GetLogger(s)
284	user := pssh.GetUser(s)
285
286	if user == nil {
287		err := fmt.Errorf("could not get user from ctx")
288		logger.Error("error getting user from ctx", "err", err)
289		return "", err
290	}
291
292	if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
293		entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
294	}
295
296	logger = logger.With(
297		"file", entry.Filepath,
298		"size", entry.Size,
299	)
300
301	bucket, err := getBucket(s)
302	if err != nil {
303		logger.Error("could not find bucket in ctx", "err", err.Error())
304		return "", err
305	}
306
307	project := getProject(s)
308	projectName := shared.GetProjectName(entry)
309	logger = logger.With("project", projectName)
310
311	// find, create, or update project if we haven't already done it
312	// we need to also check if the project stored in ctx is the same project
313	// being uploaded since users can keep an ssh connection alive via sftp
314	// and created many projects in a single session
315	if project == nil || project.Name != projectName {
316		project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
317		if err != nil {
318			logger.Error("upsert project", "err", err.Error())
319			return "", err
320		}
321		setProject(s, project)
322	}
323
324	if project.Blocked != "" {
325		msg := "project has been blocked and cannot upload files: %s"
326		return "", fmt.Errorf(msg, project.Blocked)
327	}
328
329	if entry.Mode.IsDir() {
330		_, _, err := h.Cfg.Storage.PutObject(
331			bucket,
332			path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
333			bytes.NewReader([]byte{}),
334			entry,
335		)
336		return "", err
337	}
338
339	featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
340	// calculate the filsize difference between the same file already
341	// stored and the updated file being uploaded
342	assetFilename := shared.GetAssetFileName(entry)
343	obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
344	var curFileSize int64
345	if info != nil {
346		curFileSize = info.Size
347	}
348	if obj != nil {
349		defer func() {
350			_ = obj.Close()
351		}()
352	}
353
354	denylist := getDenylist(s)
355	if denylist == nil {
356		dlist, err := h.findDenylist(bucket, project, logger)
357		if err != nil {
358			logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
359			dlist = ".*"
360		}
361		setDenylist(s, dlist)
362		denylist = &DenyList{Denylist: dlist}
363	}
364
365	data := &FileData{
366		FileEntry: entry,
367		User:      user,
368		Bucket:    bucket,
369		DenyList:  denylist.Denylist,
370		Project:   project,
371	}
372
373	valid, err := h.validateAsset(data)
374	if !valid {
375		return "", err
376	}
377
378	// SFTP does not report file size so the more performant way to
379	//   check filesize constraints is to try and upload the file to s3
380	//	 with a specialized reader that raises an error if the filesize limit
381	//	 has been reached
382	storageMax := featureFlag.Data.StorageMax
383	fileMax := featureFlag.Data.FileMax
384	curStorageSize := getStorageSize(s)
385	remaining := int64(storageMax) - int64(curStorageSize)
386	sizeRemaining := min(remaining+curFileSize, fileMax)
387	if sizeRemaining <= 0 {
388		_, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
389		_, _ = fmt.Fprintf(s.Stderr(), "\r")
390		_ = s.Exit(1)
391		_ = s.Close()
392		return "", fmt.Errorf("storage quota reached")
393	}
394	logger = logger.With(
395		"storageMax", storageMax,
396		"currentStorageMax", curStorageSize,
397		"fileMax", fileMax,
398		"sizeRemaining", sizeRemaining,
399	)
400
401	specialFileMax := featureFlag.Data.SpecialFileMax
402	if isSpecialFile(entry.Filepath) {
403		sizeRemaining = min(sizeRemaining, specialFileMax)
404	}
405
406	fsize, err := h.writeAsset(
407		s,
408		utils.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
409		data,
410	)
411	if err != nil {
412		logger.Error("could not write asset", "err", err.Error())
413		cerr := fmt.Errorf(
414			"%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
415			err,
416			utils.BytesToMB(int(curStorageSize)),
417			utils.BytesToMB(int(storageMax)),
418			utils.BytesToMB(int(fileMax)),
419			utils.BytesToMB(int(specialFileMax)),
420		)
421		return "", cerr
422	}
423
424	deltaFileSize := curFileSize - fsize
425	nextStorageSize := incrementStorageSize(s, deltaFileSize)
426
427	url := h.Cfg.AssetURL(
428		user.Name,
429		projectName,
430		strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
431	)
432
433	maxSize := int(featureFlag.Data.StorageMax)
434	str := fmt.Sprintf(
435		"%s (space: %.2f/%.2fGB, %.2f%%)",
436		url,
437		utils.BytesToGB(int(nextStorageSize)),
438		utils.BytesToGB(maxSize),
439		(float32(nextStorageSize)/float32(maxSize))*100,
440	)
441
442	surrogate := getSurrogateKey(user.Name, projectName)
443	h.Cfg.CacheClearingQueue <- surrogate
444
445	return str, err
446}
447
448func isSpecialFile(entry string) bool {
449	fname := filepath.Base(entry)
450	return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
451}
452
453func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
454	logger := pssh.GetLogger(s)
455	user := pssh.GetUser(s)
456
457	if user == nil {
458		err := fmt.Errorf("could not get user from ctx")
459		logger.Error("error getting user from ctx", "err", err)
460		return err
461	}
462
463	if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
464		entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
465	}
466
467	assetFilepath := shared.GetAssetFileName(entry)
468
469	logger = logger.With(
470		"file", assetFilepath,
471	)
472
473	bucket, err := getBucket(s)
474	if err != nil {
475		logger.Error("could not find bucket in ctx", "err", err.Error())
476		return err
477	}
478
479	projectName := shared.GetProjectName(entry)
480	logger = logger.With("project", projectName)
481
482	if assetFilepath == filepath.Join("/", projectName, "._pico_keep_dir") {
483		return os.ErrPermission
484	}
485
486	logger.Info("deleting file")
487
488	pathDir := filepath.Dir(assetFilepath)
489	fileName := filepath.Base(assetFilepath)
490
491	sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
492	if err != nil {
493		return err
494	}
495
496	sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
497		return sib.Name() == fileName
498	})
499
500	if len(sibs) == 0 {
501		_, _, err := h.Cfg.Storage.PutObject(
502			bucket,
503			filepath.Join(pathDir, "._pico_keep_dir"),
504			bytes.NewReader([]byte{}),
505			entry,
506		)
507		if err != nil {
508			return err
509		}
510	}
511	err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
512
513	surrogate := getSurrogateKey(user.Name, projectName)
514	h.Cfg.CacheClearingQueue <- surrogate
515
516	if err != nil {
517		return err
518	}
519
520	return err
521}
522
523func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
524	fname := filepath.Base(data.Filepath)
525
526	projectName := shared.GetProjectName(data.FileEntry)
527	if projectName == "" || projectName == "/" || projectName == "." {
528		return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
529	}
530
531	// special files we use for custom routing
532	if isSpecialFile(fname) {
533		return true, nil
534	}
535
536	fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
537	if shouldIgnoreFile(fpath, data.DenyList) {
538		err := fmt.Errorf(
539			"ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
540			data.Filepath,
541		)
542		return false, err
543	}
544
545	return true, nil
546}
547
548func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
549	assetFilepath := shared.GetAssetFileName(data.FileEntry)
550
551	logger := h.GetLogger(s)
552	logger.Info(
553		"uploading file to bucket",
554		"bucket", data.Bucket.Name,
555		"filename", assetFilepath,
556	)
557
558	_, fsize, err := h.Cfg.Storage.PutObject(
559		data.Bucket,
560		assetFilepath,
561		reader,
562		data.FileEntry,
563	)
564	return fsize, err
565}
566
567// runCacheQueue processes requests to purge the cache for a single site.
568// One message arrives per file that is written/deleted during uploads.
569// Repeated messages for the same site are grouped so that we only flush once
570// per site per 5 seconds.
571func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
572	send := createPubCacheDrain(ctx, cfg.Logger)
573	var pendingFlushes sync.Map
574	tick := time.Tick(5 * time.Second)
575	for {
576		select {
577		case host := <-cfg.CacheClearingQueue:
578			pendingFlushes.Store(host, host)
579		case <-tick:
580			go func() {
581				pendingFlushes.Range(func(key, value any) bool {
582					pendingFlushes.Delete(key)
583					err := purgeCache(cfg, send, key.(string))
584					if err != nil {
585						cfg.Logger.Error("failed to clear cache", "err", err.Error())
586					}
587					return true
588				})
589			}()
590		}
591	}
592}