Eric Bower
·
2026-01-25
uploader.go
1package pgs
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "io/fs"
9 "log/slog"
10 "os"
11 "path"
12 "path/filepath"
13 "slices"
14 "strings"
15 "sync"
16 "time"
17
18 pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
19 "github.com/picosh/pico/pkg/db"
20 "github.com/picosh/pico/pkg/pobj"
21 sst "github.com/picosh/pico/pkg/pobj/storage"
22 "github.com/picosh/pico/pkg/pssh"
23 sendutils "github.com/picosh/pico/pkg/send/utils"
24 "github.com/picosh/pico/pkg/shared"
25 ignore "github.com/sabhiram/go-gitignore"
26)
27
28type ctxBucketKey struct{}
29type ctxStorageSizeKey struct{}
30type ctxProjectKey struct{}
31type ctxDenylistKey struct{}
32
33type DenyList struct {
34 Denylist string
35}
36
37func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
38 v := s.Context().Value(ctxDenylistKey{})
39 if v == nil {
40 return nil
41 }
42 denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
43 return denylist
44}
45
46func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
47 s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
48}
49
50func getProject(s *pssh.SSHServerConnSession) *db.Project {
51 v := s.Context().Value(ctxProjectKey{})
52 if v == nil {
53 return nil
54 }
55 project := s.Context().Value(ctxProjectKey{}).(*db.Project)
56 return project
57}
58
59func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
60 s.SetValue(ctxProjectKey{}, project)
61}
62
63func getBucket(s *pssh.SSHServerConnSession) (sst.Bucket, error) {
64 bucket := s.Context().Value(ctxBucketKey{}).(sst.Bucket)
65 if bucket.Name == "" {
66 return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
67 }
68 return bucket, nil
69}
70
71func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
72 return s.Context().Value(ctxStorageSizeKey{}).(uint64)
73}
74
75func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
76 curSize := getStorageSize(s)
77 var nextStorageSize uint64
78 if fileSize < 0 {
79 nextStorageSize = curSize - uint64(fileSize)
80 } else {
81 nextStorageSize = curSize + uint64(fileSize)
82 }
83 s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
84 return nextStorageSize
85}
86
87func shouldIgnoreFile(fp, ignoreStr string) bool {
88 object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
89 return object.MatchesPath(fp)
90}
91
92type FileData struct {
93 *sendutils.FileEntry
94 User *db.User
95 Bucket sst.Bucket
96 Project *db.Project
97 DenyList string
98}
99
100type UploadAssetHandler struct {
101 Cfg *PgsConfig
102 CacheClearingQueue chan string
103}
104
105func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
106 go runCacheQueue(cfg, ctx)
107 return &UploadAssetHandler{
108 Cfg: cfg,
109 CacheClearingQueue: ch,
110 }
111}
112
113func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
114 return pssh.GetLogger(s)
115}
116
117func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
118 logger := pssh.GetLogger(s)
119 user := pssh.GetUser(s)
120
121 if user == nil {
122 err := fmt.Errorf("could not get user from ctx")
123 logger.Error("error getting user from ctx", "err", err)
124 return nil, nil, err
125 }
126
127 fileInfo := &sendutils.VirtualFile{
128 FName: filepath.Base(entry.Filepath),
129 FIsDir: false,
130 FSize: entry.Size,
131 FModTime: time.Unix(entry.Mtime, 0),
132 }
133
134 bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
135 if err != nil {
136 return nil, nil, err
137 }
138
139 fname := shared.GetAssetFileName(entry)
140 contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
141 if err != nil {
142 return nil, nil, err
143 }
144
145 fileInfo.FSize = info.Size
146 fileInfo.FModTime = info.LastModified
147
148 reader := pobj.NewAllReaderAt(contents)
149
150 return fileInfo, reader, nil
151}
152
153func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
154 var fileList []os.FileInfo
155
156 logger := pssh.GetLogger(s)
157 user := pssh.GetUser(s)
158
159 if user == nil {
160 err := fmt.Errorf("could not get user from ctx")
161 logger.Error("error getting user from ctx", "err", err)
162 return fileList, err
163 }
164
165 cleanFilename := fpath
166
167 bucketName := shared.GetAssetBucketName(user.ID)
168 bucket, err := h.Cfg.Storage.GetBucket(bucketName)
169 if err != nil {
170 return fileList, err
171 }
172
173 if cleanFilename == "" || cleanFilename == "." {
174 name := cleanFilename
175 if name == "" {
176 name = "/"
177 }
178
179 info := &sendutils.VirtualFile{
180 FName: name,
181 FIsDir: true,
182 }
183
184 fileList = append(fileList, info)
185 } else {
186 if cleanFilename != "/" && isDir {
187 cleanFilename += "/"
188 }
189
190 foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
191 if err != nil {
192 return fileList, err
193 }
194
195 fileList = append(fileList, foundList...)
196 }
197
198 return fileList, nil
199}
200
201func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
202 logger := pssh.GetLogger(s)
203 user := pssh.GetUser(s)
204
205 if user == nil {
206 err := fmt.Errorf("could not get user from ctx")
207 logger.Error("error getting user from ctx", "err", err)
208 return err
209 }
210
211 assetBucket := shared.GetAssetBucketName(user.ID)
212 bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
213 if err != nil {
214 return err
215 }
216
217 s.SetValue(ctxBucketKey{}, bucket)
218
219 totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
220 if err != nil {
221 return err
222 }
223
224 s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
225
226 logger.Info(
227 "bucket size",
228 "user", user.Name,
229 "bytes", totalStorageSize,
230 )
231
232 logger.Info(
233 "attempting to upload files",
234 "user", user.Name,
235 "txtPrefix", h.Cfg.TxtPrefix,
236 )
237
238 return nil
239}
240
241func (h *UploadAssetHandler) findDenylist(bucket sst.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
242 fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
243 if err != nil {
244 return "", fmt.Errorf("_pgs_ignore not found")
245 }
246 defer func() {
247 _ = fp.Close()
248 }()
249
250 buf := new(strings.Builder)
251 _, err = io.Copy(buf, fp)
252 if err != nil {
253 logger.Error("io copy", "err", err.Error())
254 return "", err
255 }
256
257 str := buf.String()
258 return str, nil
259}
260
261func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
262 ff, _ := dbpool.FindFeature(userID, "plus")
263 // we have free tiers so users might not have a feature flag
264 // in which case we set sane defaults
265 if ff == nil {
266 ff = db.NewFeatureFlag(
267 userID,
268 "plus",
269 cfg.MaxSize,
270 cfg.MaxAssetSize,
271 cfg.MaxSpecialFileSize,
272 )
273 }
274 // this is jank
275 ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
276 ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
277 ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
278 return ff
279}
280
281func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
282 logger := pssh.GetLogger(s)
283 user := pssh.GetUser(s)
284
285 if user == nil {
286 err := fmt.Errorf("could not get user from ctx")
287 logger.Error("error getting user from ctx", "err", err)
288 return "", err
289 }
290
291 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
292 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
293 }
294
295 logger = logger.With(
296 "file", entry.Filepath,
297 "size", entry.Size,
298 )
299
300 bucket, err := getBucket(s)
301 if err != nil {
302 logger.Error("could not find bucket in ctx", "err", err.Error())
303 return "", err
304 }
305
306 project := getProject(s)
307 projectName := shared.GetProjectName(entry)
308 logger = logger.With("project", projectName)
309
310 // find, create, or update project if we haven't already done it
311 // we need to also check if the project stored in ctx is the same project
312 // being uploaded since users can keep an ssh connection alive via sftp
313 // and created many projects in a single session
314 if project == nil || project.Name != projectName {
315 project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
316 if err != nil {
317 logger.Error("upsert project", "err", err.Error())
318 return "", err
319 }
320 setProject(s, project)
321 }
322
323 if project.Blocked != "" {
324 msg := "project has been blocked and cannot upload files: %s"
325 return "", fmt.Errorf(msg, project.Blocked)
326 }
327
328 if entry.Mode.IsDir() {
329 _, _, err := h.Cfg.Storage.PutObject(
330 bucket,
331 path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
332 bytes.NewReader([]byte{}),
333 entry,
334 )
335 return "", err
336 }
337
338 featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
339 // calculate the filsize difference between the same file already
340 // stored and the updated file being uploaded
341 assetFilename := shared.GetAssetFileName(entry)
342 obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
343 var curFileSize int64
344 if info != nil {
345 curFileSize = info.Size
346 }
347 if obj != nil {
348 defer func() {
349 _ = obj.Close()
350 }()
351 }
352
353 denylist := getDenylist(s)
354 if denylist == nil {
355 dlist, err := h.findDenylist(bucket, project, logger)
356 if err != nil {
357 logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
358 dlist = ".*"
359 }
360 setDenylist(s, dlist)
361 denylist = &DenyList{Denylist: dlist}
362 }
363
364 data := &FileData{
365 FileEntry: entry,
366 User: user,
367 Bucket: bucket,
368 DenyList: denylist.Denylist,
369 Project: project,
370 }
371
372 valid, err := h.validateAsset(data)
373 if !valid {
374 return "", err
375 }
376
377 // SFTP does not report file size so the more performant way to
378 // check filesize constraints is to try and upload the file to s3
379 // with a specialized reader that raises an error if the filesize limit
380 // has been reached
381 storageMax := featureFlag.Data.StorageMax
382 fileMax := featureFlag.Data.FileMax
383 curStorageSize := getStorageSize(s)
384 remaining := int64(storageMax) - int64(curStorageSize)
385 sizeRemaining := min(remaining+curFileSize, fileMax)
386 if sizeRemaining <= 0 {
387 _, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
388 _, _ = fmt.Fprintf(s.Stderr(), "\r")
389 _ = s.Exit(1)
390 _ = s.Close()
391 return "", fmt.Errorf("storage quota reached")
392 }
393 logger = logger.With(
394 "storageMax", storageMax,
395 "currentStorageMax", curStorageSize,
396 "fileMax", fileMax,
397 "sizeRemaining", sizeRemaining,
398 )
399
400 specialFileMax := featureFlag.Data.SpecialFileMax
401 if isSpecialFile(entry.Filepath) {
402 sizeRemaining = min(sizeRemaining, specialFileMax)
403 }
404
405 fsize, err := h.writeAsset(
406 s,
407 shared.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
408 data,
409 )
410 if err != nil {
411 logger.Error("could not write asset", "err", err.Error())
412 cerr := fmt.Errorf(
413 "%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
414 err,
415 shared.BytesToMB(int(curStorageSize)),
416 shared.BytesToMB(int(storageMax)),
417 shared.BytesToMB(int(fileMax)),
418 shared.BytesToMB(int(specialFileMax)),
419 )
420 return "", cerr
421 }
422
423 deltaFileSize := curFileSize - fsize
424 nextStorageSize := incrementStorageSize(s, deltaFileSize)
425
426 url := h.Cfg.AssetURL(
427 user.Name,
428 projectName,
429 strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
430 )
431
432 maxSize := int(featureFlag.Data.StorageMax)
433 str := fmt.Sprintf(
434 "%s (space: %.2f/%.2fGB, %.2f%%)",
435 url,
436 shared.BytesToGB(int(nextStorageSize)),
437 shared.BytesToGB(maxSize),
438 (float32(nextStorageSize)/float32(maxSize))*100,
439 )
440
441 surrogate := getSurrogateKey(user.Name, projectName)
442 h.Cfg.CacheClearingQueue <- surrogate
443
444 return str, err
445}
446
447func isSpecialFile(entry string) bool {
448 fname := filepath.Base(entry)
449 return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
450}
451
452func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
453 logger := pssh.GetLogger(s)
454 user := pssh.GetUser(s)
455
456 if user == nil {
457 err := fmt.Errorf("could not get user from ctx")
458 logger.Error("error getting user from ctx", "err", err)
459 return err
460 }
461
462 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
463 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
464 }
465
466 assetFilepath := shared.GetAssetFileName(entry)
467
468 logger = logger.With(
469 "file", assetFilepath,
470 )
471
472 bucket, err := getBucket(s)
473 if err != nil {
474 logger.Error("could not find bucket in ctx", "err", err.Error())
475 return err
476 }
477
478 projectName := shared.GetProjectName(entry)
479 logger = logger.With("project", projectName)
480
481 if assetFilepath == filepath.Join("/", projectName, "._pico_keep_dir") {
482 return os.ErrPermission
483 }
484
485 logger.Info("deleting file")
486
487 pathDir := filepath.Dir(assetFilepath)
488 fileName := filepath.Base(assetFilepath)
489
490 sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
491 if err != nil {
492 return err
493 }
494
495 sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
496 return sib.Name() == fileName
497 })
498
499 if len(sibs) == 0 {
500 _, _, err := h.Cfg.Storage.PutObject(
501 bucket,
502 filepath.Join(pathDir, "._pico_keep_dir"),
503 bytes.NewReader([]byte{}),
504 entry,
505 )
506 if err != nil {
507 return err
508 }
509 }
510 err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
511
512 surrogate := getSurrogateKey(user.Name, projectName)
513 h.Cfg.CacheClearingQueue <- surrogate
514
515 if err != nil {
516 return err
517 }
518
519 return err
520}
521
522func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
523 fname := filepath.Base(data.Filepath)
524
525 projectName := shared.GetProjectName(data.FileEntry)
526 if projectName == "" || projectName == "/" || projectName == "." {
527 return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
528 }
529
530 // special files we use for custom routing
531 if isSpecialFile(fname) {
532 return true, nil
533 }
534
535 fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
536 if shouldIgnoreFile(fpath, data.DenyList) {
537 err := fmt.Errorf(
538 "ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
539 data.Filepath,
540 )
541 return false, err
542 }
543
544 return true, nil
545}
546
547func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
548 assetFilepath := shared.GetAssetFileName(data.FileEntry)
549
550 logger := h.GetLogger(s)
551 logger.Info(
552 "uploading file to bucket",
553 "bucket", data.Bucket.Name,
554 "filename", assetFilepath,
555 )
556
557 _, fsize, err := h.Cfg.Storage.PutObject(
558 data.Bucket,
559 assetFilepath,
560 reader,
561 data.FileEntry,
562 )
563 return fsize, err
564}
565
566// runCacheQueue processes requests to purge the cache for a single site.
567// One message arrives per file that is written/deleted during uploads.
568// Repeated messages for the same site are grouped so that we only flush once
569// per site per 5 seconds.
570func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
571 var pendingFlushes sync.Map
572 tick := time.Tick(5 * time.Second)
573 for {
574 select {
575 case host := <-cfg.CacheClearingQueue:
576 pendingFlushes.Store(host, host)
577 case <-tick:
578 go func() {
579 pendingFlushes.Range(func(key, value any) bool {
580 pendingFlushes.Delete(key)
581 err := purgeCache(cfg, cfg.Pubsub, key.(string))
582 if err != nil {
583 cfg.Logger.Error("failed to clear cache", "err", err.Error())
584 }
585 return true
586 })
587 }()
588 }
589 }
590}