Eric Bower
·
2026-06-17
1package pgs
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "io/fs"
9 "log/slog"
10 "os"
11 "path"
12 "path/filepath"
13 "slices"
14 "strings"
15 "sync"
16 "time"
17
18 pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
19 "github.com/picosh/pico/pkg/db"
20 "github.com/picosh/pico/pkg/pssh"
21 sendutils "github.com/picosh/pico/pkg/send/utils"
22 "github.com/picosh/pico/pkg/shared"
23 "github.com/picosh/pico/pkg/storage"
24 ignore "github.com/sabhiram/go-gitignore"
25)
26
27type ctxBucketKey struct{}
28type ctxStorageSizeKey struct{}
29type ctxProjectKey struct{}
30type ctxDenylistKey struct{}
31
32type DenyList struct {
33 Denylist string
34}
35
36func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
37 v := s.Context().Value(ctxDenylistKey{})
38 if v == nil {
39 return nil
40 }
41 denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
42 return denylist
43}
44
45func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
46 s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
47}
48
49func getProject(s *pssh.SSHServerConnSession) *db.Project {
50 v := s.Context().Value(ctxProjectKey{})
51 if v == nil {
52 return nil
53 }
54 project := s.Context().Value(ctxProjectKey{}).(*db.Project)
55 return project
56}
57
58func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
59 s.SetValue(ctxProjectKey{}, project)
60}
61
62func getBucket(s *pssh.SSHServerConnSession) (storage.Bucket, error) {
63 bucket := s.Context().Value(ctxBucketKey{}).(storage.Bucket)
64 if bucket.Name == "" {
65 return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
66 }
67 return bucket, nil
68}
69
70func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
71 return s.Context().Value(ctxStorageSizeKey{}).(uint64)
72}
73
74func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
75 curSize := getStorageSize(s)
76 var nextStorageSize uint64
77 if fileSize < 0 {
78 nextStorageSize = curSize - uint64(fileSize)
79 } else {
80 nextStorageSize = curSize + uint64(fileSize)
81 }
82 s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
83 return nextStorageSize
84}
85
86func shouldIgnoreFile(fp, ignoreStr string) bool {
87 object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
88 return object.MatchesPath(fp)
89}
90
91type FileData struct {
92 *sendutils.FileEntry
93 User *db.User
94 Bucket storage.Bucket
95 Project *db.Project
96 DenyList string
97}
98
99type UploadAssetHandler struct {
100 Cfg *PgsConfig
101 CacheClearingQueue chan string
102}
103
104func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
105 go runCacheQueue(cfg, ctx)
106 return &UploadAssetHandler{
107 Cfg: cfg,
108 CacheClearingQueue: ch,
109 }
110}
111
112func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
113 return pssh.GetLogger(s)
114}
115
116func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
117 logger := pssh.GetLogger(s)
118 user := pssh.GetUser(s)
119
120 if user == nil {
121 err := fmt.Errorf("could not get user from ctx")
122 logger.Error("error getting user from ctx", "err", err)
123 return nil, nil, err
124 }
125
126 fileInfo := &sendutils.VirtualFile{
127 FName: filepath.Base(entry.Filepath),
128 FIsDir: false,
129 FSize: entry.Size,
130 FModTime: time.Unix(entry.Mtime, 0),
131 }
132
133 bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
134 if err != nil {
135 return nil, nil, err
136 }
137
138 fname := shared.GetAssetFileName(entry)
139 contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
140 if err != nil {
141 return nil, nil, err
142 }
143
144 fileInfo.FSize = info.Size
145 fileInfo.FModTime = info.LastModified
146
147 return fileInfo, contents, nil
148}
149
150func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
151 var fileList []os.FileInfo
152
153 logger := pssh.GetLogger(s)
154 user := pssh.GetUser(s)
155
156 if user == nil {
157 err := fmt.Errorf("could not get user from ctx")
158 logger.Error("error getting user from ctx", "err", err)
159 return fileList, err
160 }
161
162 cleanFilename := fpath
163
164 bucketName := shared.GetAssetBucketName(user.ID)
165 bucket, err := h.Cfg.Storage.GetBucket(bucketName)
166 if err != nil {
167 return fileList, err
168 }
169
170 if cleanFilename == "" || cleanFilename == "." {
171 name := cleanFilename
172 if name == "" {
173 name = "/"
174 }
175
176 info := &sendutils.VirtualFile{
177 FName: name,
178 FIsDir: true,
179 }
180
181 fileList = append(fileList, info)
182 } else {
183 if cleanFilename != "/" && isDir {
184 cleanFilename += "/"
185 }
186
187 foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
188 if err != nil {
189 return fileList, err
190 }
191
192 fileList = append(fileList, foundList...)
193 }
194
195 return fileList, nil
196}
197
198func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
199 logger := pssh.GetLogger(s)
200 user := pssh.GetUser(s)
201
202 if user == nil {
203 err := fmt.Errorf("could not get user from ctx")
204 logger.Error("error getting user from ctx", "err", err)
205 return err
206 }
207
208 assetBucket := shared.GetAssetBucketName(user.ID)
209 bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
210 if err != nil {
211 return err
212 }
213
214 s.SetValue(ctxBucketKey{}, bucket)
215
216 totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
217 if err != nil {
218 return err
219 }
220
221 s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
222
223 logger.Info(
224 "bucket size",
225 "user", user.Name,
226 "bytes", totalStorageSize,
227 )
228
229 logger.Info(
230 "attempting to upload files",
231 "user", user.Name,
232 "txtPrefix", h.Cfg.TxtPrefix,
233 )
234
235 return nil
236}
237
238func (h *UploadAssetHandler) findDenylist(bucket storage.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
239 fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
240 if err != nil {
241 return "", fmt.Errorf("_pgs_ignore not found")
242 }
243 defer func() {
244 _ = fp.Close()
245 }()
246
247 buf := new(strings.Builder)
248 _, err = io.Copy(buf, fp)
249 if err != nil {
250 logger.Error("io copy", "err", err.Error())
251 return "", err
252 }
253
254 str := buf.String()
255 return str, nil
256}
257
258func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
259 ff, _ := dbpool.FindFeature(userID, "plus")
260 // we have free tiers so users might not have a feature flag
261 // in which case we set sane defaults
262 if ff == nil {
263 ff = db.NewFeatureFlag(
264 userID,
265 "plus",
266 cfg.MaxSize,
267 cfg.MaxAssetSize,
268 cfg.MaxSpecialFileSize,
269 )
270 }
271 // this is jank
272 ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
273 ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
274 ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
275 return ff
276}
277
278func mtimeToTime(entry *sendutils.FileEntry) time.Time {
279 var mtime time.Time
280 if entry.Mtime > 0 {
281 return time.Unix(entry.Mtime, 0)
282 }
283 return mtime
284}
285
286func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
287 logger := pssh.GetLogger(s)
288 user := pssh.GetUser(s)
289
290 if user == nil {
291 err := fmt.Errorf("could not get user from ctx")
292 logger.Error("error getting user from ctx", "err", err)
293 return "", err
294 }
295
296 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
297 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
298 }
299
300 logger = logger.With(
301 "file", entry.Filepath,
302 "size", entry.Size,
303 )
304
305 bucket, err := getBucket(s)
306 if err != nil {
307 logger.Error("could not find bucket in ctx", "err", err.Error())
308 return "", err
309 }
310
311 project := getProject(s)
312 projectName := shared.GetProjectName(entry)
313 logger = logger.With("project", projectName)
314
315 // find, create, or update project if we haven't already done it
316 // we need to also check if the project stored in ctx is the same project
317 // being uploaded since users can keep an ssh connection alive via sftp
318 // and created many projects in a single session
319 if project == nil || project.Name != projectName {
320 project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
321 if err != nil {
322 logger.Error("upsert project", "err", err.Error())
323 return "", err
324 }
325 setProject(s, project)
326 }
327
328 if project.Blocked != "" {
329 msg := "project has been blocked and cannot upload files: %s"
330 return "", fmt.Errorf(msg, project.Blocked)
331 }
332
333 info := &storage.ObjectInfo{
334 LastModified: mtimeToTime(entry),
335 }
336 if entry.Mode.IsDir() {
337 _, _, err := h.Cfg.Storage.PutObject(
338 bucket,
339 path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
340 bytes.NewReader([]byte{}),
341 info,
342 )
343 return "", err
344 }
345
346 featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
347 if !featureFlag.IsValid() && pgsdb.IsProjectPrivate(projectName) {
348 return "", fmt.Errorf("private projects are only allowed for pico+ users")
349 }
350
351 // calculate the filsize difference between the same file already
352 // stored and the updated file being uploaded
353 assetFilename := shared.GetAssetFileName(entry)
354 obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
355 var curFileSize int64
356 if info != nil {
357 curFileSize = info.Size
358 }
359 if obj != nil {
360 defer func() {
361 _ = obj.Close()
362 }()
363 }
364
365 denylist := getDenylist(s)
366 if denylist == nil {
367 dlist, err := h.findDenylist(bucket, project, logger)
368 if err != nil {
369 logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
370 dlist = ".*"
371 }
372 setDenylist(s, dlist)
373 denylist = &DenyList{Denylist: dlist}
374 }
375
376 data := &FileData{
377 FileEntry: entry,
378 User: user,
379 Bucket: bucket,
380 DenyList: denylist.Denylist,
381 Project: project,
382 }
383
384 valid, err := h.validateAsset(data)
385 if !valid {
386 return "", err
387 }
388
389 // SFTP does not report file size so the more performant way to
390 // check filesize constraints is to try and upload the file to s3
391 // with a specialized reader that raises an error if the filesize limit
392 // has been reached
393 storageMax := featureFlag.Data.StorageMax
394 fileMax := featureFlag.Data.FileMax
395 curStorageSize := getStorageSize(s)
396 remaining := int64(storageMax) - int64(curStorageSize)
397 sizeRemaining := min(remaining+curFileSize, fileMax)
398 if sizeRemaining <= 0 {
399 _, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
400 _, _ = fmt.Fprintf(s.Stderr(), "\r")
401 _ = s.Exit(1)
402 _ = s.Close()
403 return "", fmt.Errorf("storage quota reached")
404 }
405 logger = logger.With(
406 "storageMax", storageMax,
407 "currentStorageMax", curStorageSize,
408 "fileMax", fileMax,
409 "sizeRemaining", sizeRemaining,
410 )
411
412 specialFileMax := featureFlag.Data.SpecialFileMax
413 if isSpecialFile(entry.Filepath) {
414 sizeRemaining = min(sizeRemaining, specialFileMax)
415 }
416
417 fsize, err := h.writeAsset(
418 s,
419 shared.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
420 data,
421 )
422 if err != nil {
423 logger.Error("could not write asset", "err", err.Error())
424 cerr := fmt.Errorf(
425 "%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
426 err,
427 shared.BytesToMB(int(curStorageSize)),
428 shared.BytesToMB(int(storageMax)),
429 shared.BytesToMB(int(fileMax)),
430 shared.BytesToMB(int(specialFileMax)),
431 )
432 return "", cerr
433 }
434
435 deltaFileSize := curFileSize - fsize
436 nextStorageSize := incrementStorageSize(s, deltaFileSize)
437
438 url := h.Cfg.AssetURL(
439 user.Name,
440 projectName,
441 strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
442 )
443
444 maxSize := int(featureFlag.Data.StorageMax)
445 str := fmt.Sprintf(
446 "%s (space: %.2f/%.2fGB, %.2f%%)",
447 url,
448 shared.BytesToGB(int(nextStorageSize)),
449 shared.BytesToGB(maxSize),
450 (float32(nextStorageSize)/float32(maxSize))*100,
451 )
452
453 surrogate := getSurrogateKey(user.Name, projectName)
454 h.Cfg.CacheClearingQueue <- surrogate
455
456 return str, err
457}
458
459func isSpecialFile(entry string) bool {
460 fname := filepath.Base(entry)
461 return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
462}
463
464func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
465 logger := pssh.GetLogger(s)
466 user := pssh.GetUser(s)
467
468 if user == nil {
469 err := fmt.Errorf("could not get user from ctx")
470 logger.Error("error getting user from ctx", "err", err)
471 return err
472 }
473
474 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
475 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
476 }
477
478 assetFilepath := shared.GetAssetFileName(entry)
479
480 logger = logger.With(
481 "file", assetFilepath,
482 )
483
484 bucket, err := getBucket(s)
485 if err != nil {
486 logger.Error("could not find bucket in ctx", "err", err.Error())
487 return err
488 }
489
490 projectName := shared.GetProjectName(entry)
491 logger = logger.With("project", projectName)
492
493 logger.Info("deleting file")
494
495 // Check if this path represents a directory (has a . _pico_keep_dir marker)
496 keepDirPath := filepath.Join(assetFilepath, "._pico_keep_dir")
497 keepDirReader, _, keepDirErr := h.Cfg.Storage.GetObject(bucket, keepDirPath)
498 if keepDirReader != nil {
499 defer func() {
500 _ = keepDirReader.Close()
501 }()
502 }
503
504 if keepDirErr == nil {
505 // This is a directory being deleted. We must delete all nested
506 // . _pico_keep_dir files first, otherwise os.Remove() on the
507 // directory will fail with "directory not empty".
508 nested, err := h.Cfg.Storage.ListObjects(bucket, assetFilepath+"/", true)
509 if err != nil {
510 return err
511 }
512
513 baseDepth := strings.Count(assetFilepath, string(os.PathSeparator))
514 for _, nestedEntry := range nested {
515 if filepath.Base(nestedEntry.Name()) != "._pico_keep_dir" {
516 continue
517 }
518 // Only delete keep_dir files that are direct or nested children
519 nestedDepth := strings.Count(nestedEntry.Name(), string(os.PathSeparator))
520 if nestedDepth <= baseDepth {
521 continue
522 }
523 nestedKeepDirPath := filepath.Join(assetFilepath, nestedEntry.Name())
524 if delErr := h.Cfg.Storage.DeleteObject(bucket, nestedKeepDirPath); delErr != nil {
525 return delErr
526 }
527 }
528
529 // Delete this directory's own . _pico_keep_dir
530 err = h.Cfg.Storage.DeleteObject(bucket, keepDirPath)
531 if err != nil {
532 return err
533 }
534
535 // Delete the directory itself (no-op for S3-style storage, removes the dir for fs storage)
536 _ = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
537
538 surrogate := getSurrogateKey(user.Name, projectName)
539 h.Cfg.CacheClearingQueue <- surrogate
540 return nil
541 }
542
543 // Regular file deletion: create . _pico_keep_dir if the directory becomes empty
544 pathDir := filepath.Dir(assetFilepath)
545 fileName := filepath.Base(assetFilepath)
546
547 sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
548 if err != nil {
549 return err
550 }
551
552 sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
553 return sib.Name() == fileName
554 })
555
556 if len(sibs) == 0 {
557 info := &storage.ObjectInfo{
558 LastModified: mtimeToTime(entry),
559 }
560 _, _, err := h.Cfg.Storage.PutObject(
561 bucket,
562 filepath.Join(pathDir, "._pico_keep_dir"),
563 bytes.NewReader([]byte{}),
564 info,
565 )
566 if err != nil {
567 return err
568 }
569 }
570 err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
571
572 surrogate := getSurrogateKey(user.Name, projectName)
573 h.Cfg.CacheClearingQueue <- surrogate
574
575 if err != nil {
576 return err
577 }
578
579 return err
580}
581
582func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
583 fname := filepath.Base(data.Filepath)
584
585 projectName := shared.GetProjectName(data.FileEntry)
586 if projectName == "" || projectName == "/" || projectName == "." {
587 return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
588 }
589
590 // special files we use for custom routing
591 if isSpecialFile(fname) {
592 return true, nil
593 }
594
595 fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
596 if shouldIgnoreFile(fpath, data.DenyList) {
597 err := fmt.Errorf(
598 "ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
599 data.Filepath,
600 )
601 return false, err
602 }
603
604 return true, nil
605}
606
607func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
608 assetFilepath := shared.GetAssetFileName(data.FileEntry)
609
610 logger := h.GetLogger(s)
611 logger.Info(
612 "uploading file to bucket",
613 "bucket", data.Bucket.Name,
614 "filename", assetFilepath,
615 )
616
617 info := &storage.ObjectInfo{
618 LastModified: mtimeToTime(data.FileEntry),
619 }
620 _, fsize, err := h.Cfg.Storage.PutObject(
621 data.Bucket,
622 assetFilepath,
623 reader,
624 info,
625 )
626 return fsize, err
627}
628
629// runCacheQueue processes requests to purge the cache for a single site.
630// One message arrives per file that is written/deleted during uploads.
631// Repeated messages for the same site are grouped so that we only flush once
632// per site per 5 seconds.
633func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
634 var pendingFlushes sync.Map
635 tick := time.NewTicker(5 * time.Second)
636 defer tick.Stop()
637 for {
638 select {
639 case <-ctx.Done():
640 return
641 case host := <-cfg.CacheClearingQueue:
642 pendingFlushes.Store(host, host)
643 case <-tick.C:
644 go func() {
645 pendingFlushes.Range(func(key, value any) bool {
646 pendingFlushes.Delete(key)
647 err := purgeCache(cfg, cfg.Pubsub, key.(string))
648 if err != nil {
649 cfg.Logger.Error("failed to clear cache", "err", err.Error())
650 }
651 return true
652 })
653 }()
654 }
655 }
656}