Eric Bower
·
2026-04-20
uploader.go
1package pgs
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "io/fs"
9 "log/slog"
10 "os"
11 "path"
12 "path/filepath"
13 "slices"
14 "strings"
15 "sync"
16 "time"
17
18 pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
19 "github.com/picosh/pico/pkg/db"
20 "github.com/picosh/pico/pkg/pssh"
21 sendutils "github.com/picosh/pico/pkg/send/utils"
22 "github.com/picosh/pico/pkg/shared"
23 "github.com/picosh/pico/pkg/storage"
24 ignore "github.com/sabhiram/go-gitignore"
25)
26
27type ctxBucketKey struct{}
28type ctxStorageSizeKey struct{}
29type ctxProjectKey struct{}
30type ctxDenylistKey struct{}
31
32type DenyList struct {
33 Denylist string
34}
35
36func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
37 v := s.Context().Value(ctxDenylistKey{})
38 if v == nil {
39 return nil
40 }
41 denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
42 return denylist
43}
44
45func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
46 s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
47}
48
49func getProject(s *pssh.SSHServerConnSession) *db.Project {
50 v := s.Context().Value(ctxProjectKey{})
51 if v == nil {
52 return nil
53 }
54 project := s.Context().Value(ctxProjectKey{}).(*db.Project)
55 return project
56}
57
58func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
59 s.SetValue(ctxProjectKey{}, project)
60}
61
62func getBucket(s *pssh.SSHServerConnSession) (storage.Bucket, error) {
63 bucket := s.Context().Value(ctxBucketKey{}).(storage.Bucket)
64 if bucket.Name == "" {
65 return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
66 }
67 return bucket, nil
68}
69
70func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
71 return s.Context().Value(ctxStorageSizeKey{}).(uint64)
72}
73
74func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
75 curSize := getStorageSize(s)
76 var nextStorageSize uint64
77 if fileSize < 0 {
78 nextStorageSize = curSize - uint64(fileSize)
79 } else {
80 nextStorageSize = curSize + uint64(fileSize)
81 }
82 s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
83 return nextStorageSize
84}
85
86func shouldIgnoreFile(fp, ignoreStr string) bool {
87 object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
88 return object.MatchesPath(fp)
89}
90
91type FileData struct {
92 *sendutils.FileEntry
93 User *db.User
94 Bucket storage.Bucket
95 Project *db.Project
96 DenyList string
97}
98
99type UploadAssetHandler struct {
100 Cfg *PgsConfig
101 CacheClearingQueue chan string
102}
103
104func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
105 go runCacheQueue(cfg, ctx)
106 return &UploadAssetHandler{
107 Cfg: cfg,
108 CacheClearingQueue: ch,
109 }
110}
111
112func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
113 return pssh.GetLogger(s)
114}
115
116func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
117 logger := pssh.GetLogger(s)
118 user := pssh.GetUser(s)
119
120 if user == nil {
121 err := fmt.Errorf("could not get user from ctx")
122 logger.Error("error getting user from ctx", "err", err)
123 return nil, nil, err
124 }
125
126 fileInfo := &sendutils.VirtualFile{
127 FName: filepath.Base(entry.Filepath),
128 FIsDir: false,
129 FSize: entry.Size,
130 FModTime: time.Unix(entry.Mtime, 0),
131 }
132
133 bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
134 if err != nil {
135 return nil, nil, err
136 }
137
138 fname := shared.GetAssetFileName(entry)
139 contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
140 if err != nil {
141 return nil, nil, err
142 }
143
144 fileInfo.FSize = info.Size
145 fileInfo.FModTime = info.LastModified
146
147 return fileInfo, contents, nil
148}
149
150func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
151 var fileList []os.FileInfo
152
153 logger := pssh.GetLogger(s)
154 user := pssh.GetUser(s)
155
156 if user == nil {
157 err := fmt.Errorf("could not get user from ctx")
158 logger.Error("error getting user from ctx", "err", err)
159 return fileList, err
160 }
161
162 cleanFilename := fpath
163
164 bucketName := shared.GetAssetBucketName(user.ID)
165 bucket, err := h.Cfg.Storage.GetBucket(bucketName)
166 if err != nil {
167 return fileList, err
168 }
169
170 if cleanFilename == "" || cleanFilename == "." {
171 name := cleanFilename
172 if name == "" {
173 name = "/"
174 }
175
176 info := &sendutils.VirtualFile{
177 FName: name,
178 FIsDir: true,
179 }
180
181 fileList = append(fileList, info)
182 } else {
183 if cleanFilename != "/" && isDir {
184 cleanFilename += "/"
185 }
186
187 foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
188 if err != nil {
189 return fileList, err
190 }
191
192 fileList = append(fileList, foundList...)
193 }
194
195 return fileList, nil
196}
197
198func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
199 logger := pssh.GetLogger(s)
200 user := pssh.GetUser(s)
201
202 if user == nil {
203 err := fmt.Errorf("could not get user from ctx")
204 logger.Error("error getting user from ctx", "err", err)
205 return err
206 }
207
208 assetBucket := shared.GetAssetBucketName(user.ID)
209 bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
210 if err != nil {
211 return err
212 }
213
214 s.SetValue(ctxBucketKey{}, bucket)
215
216 totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
217 if err != nil {
218 return err
219 }
220
221 s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
222
223 logger.Info(
224 "bucket size",
225 "user", user.Name,
226 "bytes", totalStorageSize,
227 )
228
229 logger.Info(
230 "attempting to upload files",
231 "user", user.Name,
232 "txtPrefix", h.Cfg.TxtPrefix,
233 )
234
235 return nil
236}
237
238func (h *UploadAssetHandler) findDenylist(bucket storage.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
239 fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
240 if err != nil {
241 return "", fmt.Errorf("_pgs_ignore not found")
242 }
243 defer func() {
244 _ = fp.Close()
245 }()
246
247 buf := new(strings.Builder)
248 _, err = io.Copy(buf, fp)
249 if err != nil {
250 logger.Error("io copy", "err", err.Error())
251 return "", err
252 }
253
254 str := buf.String()
255 return str, nil
256}
257
258func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
259 ff, _ := dbpool.FindFeature(userID, "plus")
260 // we have free tiers so users might not have a feature flag
261 // in which case we set sane defaults
262 if ff == nil {
263 ff = db.NewFeatureFlag(
264 userID,
265 "plus",
266 cfg.MaxSize,
267 cfg.MaxAssetSize,
268 cfg.MaxSpecialFileSize,
269 )
270 }
271 // this is jank
272 ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
273 ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
274 ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
275 return ff
276}
277
278func mtimeToTime(entry *sendutils.FileEntry) time.Time {
279 var mtime time.Time
280 if entry.Mtime > 0 {
281 return time.Unix(entry.Mtime, 0)
282 }
283 return mtime
284}
285
286func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
287 logger := pssh.GetLogger(s)
288 user := pssh.GetUser(s)
289
290 if user == nil {
291 err := fmt.Errorf("could not get user from ctx")
292 logger.Error("error getting user from ctx", "err", err)
293 return "", err
294 }
295
296 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
297 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
298 }
299
300 logger = logger.With(
301 "file", entry.Filepath,
302 "size", entry.Size,
303 )
304
305 bucket, err := getBucket(s)
306 if err != nil {
307 logger.Error("could not find bucket in ctx", "err", err.Error())
308 return "", err
309 }
310
311 project := getProject(s)
312 projectName := shared.GetProjectName(entry)
313 logger = logger.With("project", projectName)
314
315 // find, create, or update project if we haven't already done it
316 // we need to also check if the project stored in ctx is the same project
317 // being uploaded since users can keep an ssh connection alive via sftp
318 // and created many projects in a single session
319 if project == nil || project.Name != projectName {
320 project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
321 if err != nil {
322 logger.Error("upsert project", "err", err.Error())
323 return "", err
324 }
325 setProject(s, project)
326 }
327
328 if project.Blocked != "" {
329 msg := "project has been blocked and cannot upload files: %s"
330 return "", fmt.Errorf(msg, project.Blocked)
331 }
332
333 info := &storage.ObjectInfo{
334 LastModified: mtimeToTime(entry),
335 }
336 if entry.Mode.IsDir() {
337 _, _, err := h.Cfg.Storage.PutObject(
338 bucket,
339 path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
340 bytes.NewReader([]byte{}),
341 info,
342 )
343 return "", err
344 }
345
346 featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
347 if !featureFlag.IsValid() && pgsdb.IsProjectPrivate(projectName) {
348 return "", fmt.Errorf("private projects are only allowed for pico+ users")
349 }
350
351 // calculate the filsize difference between the same file already
352 // stored and the updated file being uploaded
353 assetFilename := shared.GetAssetFileName(entry)
354 obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
355 var curFileSize int64
356 if info != nil {
357 curFileSize = info.Size
358 }
359 if obj != nil {
360 defer func() {
361 _ = obj.Close()
362 }()
363 }
364
365 denylist := getDenylist(s)
366 if denylist == nil {
367 dlist, err := h.findDenylist(bucket, project, logger)
368 if err != nil {
369 logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
370 dlist = ".*"
371 }
372 setDenylist(s, dlist)
373 denylist = &DenyList{Denylist: dlist}
374 }
375
376 data := &FileData{
377 FileEntry: entry,
378 User: user,
379 Bucket: bucket,
380 DenyList: denylist.Denylist,
381 Project: project,
382 }
383
384 valid, err := h.validateAsset(data)
385 if !valid {
386 return "", err
387 }
388
389 // SFTP does not report file size so the more performant way to
390 // check filesize constraints is to try and upload the file to s3
391 // with a specialized reader that raises an error if the filesize limit
392 // has been reached
393 storageMax := featureFlag.Data.StorageMax
394 fileMax := featureFlag.Data.FileMax
395 curStorageSize := getStorageSize(s)
396 remaining := int64(storageMax) - int64(curStorageSize)
397 sizeRemaining := min(remaining+curFileSize, fileMax)
398 if sizeRemaining <= 0 {
399 _, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
400 _, _ = fmt.Fprintf(s.Stderr(), "\r")
401 _ = s.Exit(1)
402 _ = s.Close()
403 return "", fmt.Errorf("storage quota reached")
404 }
405 logger = logger.With(
406 "storageMax", storageMax,
407 "currentStorageMax", curStorageSize,
408 "fileMax", fileMax,
409 "sizeRemaining", sizeRemaining,
410 )
411
412 specialFileMax := featureFlag.Data.SpecialFileMax
413 if isSpecialFile(entry.Filepath) {
414 sizeRemaining = min(sizeRemaining, specialFileMax)
415 }
416
417 fsize, err := h.writeAsset(
418 s,
419 shared.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
420 data,
421 )
422 if err != nil {
423 logger.Error("could not write asset", "err", err.Error())
424 cerr := fmt.Errorf(
425 "%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
426 err,
427 shared.BytesToMB(int(curStorageSize)),
428 shared.BytesToMB(int(storageMax)),
429 shared.BytesToMB(int(fileMax)),
430 shared.BytesToMB(int(specialFileMax)),
431 )
432 return "", cerr
433 }
434
435 deltaFileSize := curFileSize - fsize
436 nextStorageSize := incrementStorageSize(s, deltaFileSize)
437
438 url := h.Cfg.AssetURL(
439 user.Name,
440 projectName,
441 strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
442 )
443
444 maxSize := int(featureFlag.Data.StorageMax)
445 str := fmt.Sprintf(
446 "%s (space: %.2f/%.2fGB, %.2f%%)",
447 url,
448 shared.BytesToGB(int(nextStorageSize)),
449 shared.BytesToGB(maxSize),
450 (float32(nextStorageSize)/float32(maxSize))*100,
451 )
452
453 surrogate := getSurrogateKey(user.Name, projectName)
454 h.Cfg.CacheClearingQueue <- surrogate
455
456 return str, err
457}
458
459func isSpecialFile(entry string) bool {
460 fname := filepath.Base(entry)
461 return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
462}
463
464func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
465 logger := pssh.GetLogger(s)
466 user := pssh.GetUser(s)
467
468 if user == nil {
469 err := fmt.Errorf("could not get user from ctx")
470 logger.Error("error getting user from ctx", "err", err)
471 return err
472 }
473
474 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
475 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
476 }
477
478 assetFilepath := shared.GetAssetFileName(entry)
479
480 logger = logger.With(
481 "file", assetFilepath,
482 )
483
484 bucket, err := getBucket(s)
485 if err != nil {
486 logger.Error("could not find bucket in ctx", "err", err.Error())
487 return err
488 }
489
490 projectName := shared.GetProjectName(entry)
491 logger = logger.With("project", projectName)
492
493 if assetFilepath == filepath.Join("/", projectName, "._pico_keep_dir") {
494 return os.ErrPermission
495 }
496
497 logger.Info("deleting file")
498
499 pathDir := filepath.Dir(assetFilepath)
500 fileName := filepath.Base(assetFilepath)
501
502 sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
503 if err != nil {
504 return err
505 }
506
507 sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
508 return sib.Name() == fileName
509 })
510
511 if len(sibs) == 0 {
512 info := &storage.ObjectInfo{
513 LastModified: mtimeToTime(entry),
514 }
515 _, _, err := h.Cfg.Storage.PutObject(
516 bucket,
517 filepath.Join(pathDir, "._pico_keep_dir"),
518 bytes.NewReader([]byte{}),
519 info,
520 )
521 if err != nil {
522 return err
523 }
524 }
525 err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
526
527 surrogate := getSurrogateKey(user.Name, projectName)
528 h.Cfg.CacheClearingQueue <- surrogate
529
530 if err != nil {
531 return err
532 }
533
534 return err
535}
536
537func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
538 fname := filepath.Base(data.Filepath)
539
540 projectName := shared.GetProjectName(data.FileEntry)
541 if projectName == "" || projectName == "/" || projectName == "." {
542 return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
543 }
544
545 // special files we use for custom routing
546 if isSpecialFile(fname) {
547 return true, nil
548 }
549
550 fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
551 if shouldIgnoreFile(fpath, data.DenyList) {
552 err := fmt.Errorf(
553 "ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
554 data.Filepath,
555 )
556 return false, err
557 }
558
559 return true, nil
560}
561
562func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
563 assetFilepath := shared.GetAssetFileName(data.FileEntry)
564
565 logger := h.GetLogger(s)
566 logger.Info(
567 "uploading file to bucket",
568 "bucket", data.Bucket.Name,
569 "filename", assetFilepath,
570 )
571
572 info := &storage.ObjectInfo{
573 LastModified: mtimeToTime(data.FileEntry),
574 }
575 _, fsize, err := h.Cfg.Storage.PutObject(
576 data.Bucket,
577 assetFilepath,
578 reader,
579 info,
580 )
581 return fsize, err
582}
583
584// runCacheQueue processes requests to purge the cache for a single site.
585// One message arrives per file that is written/deleted during uploads.
586// Repeated messages for the same site are grouped so that we only flush once
587// per site per 5 seconds.
588func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
589 var pendingFlushes sync.Map
590 tick := time.NewTicker(5 * time.Second)
591 defer tick.Stop()
592 for {
593 select {
594 case <-ctx.Done():
595 return
596 case host := <-cfg.CacheClearingQueue:
597 pendingFlushes.Store(host, host)
598 case <-tick.C:
599 go func() {
600 pendingFlushes.Range(func(key, value any) bool {
601 pendingFlushes.Delete(key)
602 err := purgeCache(cfg, cfg.Pubsub, key.(string))
603 if err != nil {
604 cfg.Logger.Error("failed to clear cache", "err", err.Error())
605 }
606 return true
607 })
608 }()
609 }
610 }
611}