Eric Bower
·
2026-03-05
uploader.go
1package pgs
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "io/fs"
9 "log/slog"
10 "os"
11 "path"
12 "path/filepath"
13 "slices"
14 "strings"
15 "sync"
16 "time"
17
18 pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
19 "github.com/picosh/pico/pkg/db"
20 "github.com/picosh/pico/pkg/pobj"
21 sst "github.com/picosh/pico/pkg/pobj/storage"
22 "github.com/picosh/pico/pkg/pssh"
23 sendutils "github.com/picosh/pico/pkg/send/utils"
24 "github.com/picosh/pico/pkg/shared"
25 ignore "github.com/sabhiram/go-gitignore"
26)
27
28type ctxBucketKey struct{}
29type ctxStorageSizeKey struct{}
30type ctxProjectKey struct{}
31type ctxDenylistKey struct{}
32
33type DenyList struct {
34 Denylist string
35}
36
37func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
38 v := s.Context().Value(ctxDenylistKey{})
39 if v == nil {
40 return nil
41 }
42 denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
43 return denylist
44}
45
46func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
47 s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
48}
49
50func getProject(s *pssh.SSHServerConnSession) *db.Project {
51 v := s.Context().Value(ctxProjectKey{})
52 if v == nil {
53 return nil
54 }
55 project := s.Context().Value(ctxProjectKey{}).(*db.Project)
56 return project
57}
58
59func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
60 s.SetValue(ctxProjectKey{}, project)
61}
62
63func getBucket(s *pssh.SSHServerConnSession) (sst.Bucket, error) {
64 bucket := s.Context().Value(ctxBucketKey{}).(sst.Bucket)
65 if bucket.Name == "" {
66 return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
67 }
68 return bucket, nil
69}
70
71func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
72 return s.Context().Value(ctxStorageSizeKey{}).(uint64)
73}
74
75func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
76 curSize := getStorageSize(s)
77 var nextStorageSize uint64
78 if fileSize < 0 {
79 nextStorageSize = curSize - uint64(fileSize)
80 } else {
81 nextStorageSize = curSize + uint64(fileSize)
82 }
83 s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
84 return nextStorageSize
85}
86
87func shouldIgnoreFile(fp, ignoreStr string) bool {
88 object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
89 return object.MatchesPath(fp)
90}
91
92type FileData struct {
93 *sendutils.FileEntry
94 User *db.User
95 Bucket sst.Bucket
96 Project *db.Project
97 DenyList string
98}
99
100type UploadAssetHandler struct {
101 Cfg *PgsConfig
102 CacheClearingQueue chan string
103}
104
105func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
106 go runCacheQueue(cfg, ctx)
107 return &UploadAssetHandler{
108 Cfg: cfg,
109 CacheClearingQueue: ch,
110 }
111}
112
113func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
114 return pssh.GetLogger(s)
115}
116
117func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
118 logger := pssh.GetLogger(s)
119 user := pssh.GetUser(s)
120
121 if user == nil {
122 err := fmt.Errorf("could not get user from ctx")
123 logger.Error("error getting user from ctx", "err", err)
124 return nil, nil, err
125 }
126
127 fileInfo := &sendutils.VirtualFile{
128 FName: filepath.Base(entry.Filepath),
129 FIsDir: false,
130 FSize: entry.Size,
131 FModTime: time.Unix(entry.Mtime, 0),
132 }
133
134 bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
135 if err != nil {
136 return nil, nil, err
137 }
138
139 fname := shared.GetAssetFileName(entry)
140 contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
141 if err != nil {
142 return nil, nil, err
143 }
144
145 fileInfo.FSize = info.Size
146 fileInfo.FModTime = info.LastModified
147
148 reader := pobj.NewAllReaderAt(contents)
149
150 return fileInfo, reader, nil
151}
152
153func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
154 var fileList []os.FileInfo
155
156 logger := pssh.GetLogger(s)
157 user := pssh.GetUser(s)
158
159 if user == nil {
160 err := fmt.Errorf("could not get user from ctx")
161 logger.Error("error getting user from ctx", "err", err)
162 return fileList, err
163 }
164
165 cleanFilename := fpath
166
167 bucketName := shared.GetAssetBucketName(user.ID)
168 bucket, err := h.Cfg.Storage.GetBucket(bucketName)
169 if err != nil {
170 return fileList, err
171 }
172
173 if cleanFilename == "" || cleanFilename == "." {
174 name := cleanFilename
175 if name == "" {
176 name = "/"
177 }
178
179 info := &sendutils.VirtualFile{
180 FName: name,
181 FIsDir: true,
182 }
183
184 fileList = append(fileList, info)
185 } else {
186 if cleanFilename != "/" && isDir {
187 cleanFilename += "/"
188 }
189
190 foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
191 if err != nil {
192 return fileList, err
193 }
194
195 fileList = append(fileList, foundList...)
196 }
197
198 return fileList, nil
199}
200
201func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
202 logger := pssh.GetLogger(s)
203 user := pssh.GetUser(s)
204
205 if user == nil {
206 err := fmt.Errorf("could not get user from ctx")
207 logger.Error("error getting user from ctx", "err", err)
208 return err
209 }
210
211 assetBucket := shared.GetAssetBucketName(user.ID)
212 bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
213 if err != nil {
214 return err
215 }
216
217 s.SetValue(ctxBucketKey{}, bucket)
218
219 totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
220 if err != nil {
221 return err
222 }
223
224 s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
225
226 logger.Info(
227 "bucket size",
228 "user", user.Name,
229 "bytes", totalStorageSize,
230 )
231
232 logger.Info(
233 "attempting to upload files",
234 "user", user.Name,
235 "txtPrefix", h.Cfg.TxtPrefix,
236 )
237
238 return nil
239}
240
241func (h *UploadAssetHandler) findDenylist(bucket sst.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
242 fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
243 if err != nil {
244 return "", fmt.Errorf("_pgs_ignore not found")
245 }
246 defer func() {
247 _ = fp.Close()
248 }()
249
250 buf := new(strings.Builder)
251 _, err = io.Copy(buf, fp)
252 if err != nil {
253 logger.Error("io copy", "err", err.Error())
254 return "", err
255 }
256
257 str := buf.String()
258 return str, nil
259}
260
261func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
262 ff, _ := dbpool.FindFeature(userID, "plus")
263 // we have free tiers so users might not have a feature flag
264 // in which case we set sane defaults
265 if ff == nil {
266 ff = db.NewFeatureFlag(
267 userID,
268 "plus",
269 cfg.MaxSize,
270 cfg.MaxAssetSize,
271 cfg.MaxSpecialFileSize,
272 )
273 }
274 // this is jank
275 ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
276 ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
277 ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
278 return ff
279}
280
281func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
282 logger := pssh.GetLogger(s)
283 user := pssh.GetUser(s)
284
285 if user == nil {
286 err := fmt.Errorf("could not get user from ctx")
287 logger.Error("error getting user from ctx", "err", err)
288 return "", err
289 }
290
291 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
292 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
293 }
294
295 logger = logger.With(
296 "file", entry.Filepath,
297 "size", entry.Size,
298 )
299
300 bucket, err := getBucket(s)
301 if err != nil {
302 logger.Error("could not find bucket in ctx", "err", err.Error())
303 return "", err
304 }
305
306 project := getProject(s)
307 projectName := shared.GetProjectName(entry)
308 logger = logger.With("project", projectName)
309
310 // find, create, or update project if we haven't already done it
311 // we need to also check if the project stored in ctx is the same project
312 // being uploaded since users can keep an ssh connection alive via sftp
313 // and created many projects in a single session
314 if project == nil || project.Name != projectName {
315 project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
316 if err != nil {
317 logger.Error("upsert project", "err", err.Error())
318 return "", err
319 }
320 setProject(s, project)
321 }
322
323 if project.Blocked != "" {
324 msg := "project has been blocked and cannot upload files: %s"
325 return "", fmt.Errorf(msg, project.Blocked)
326 }
327
328 if entry.Mode.IsDir() {
329 _, _, err := h.Cfg.Storage.PutObject(
330 bucket,
331 path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
332 bytes.NewReader([]byte{}),
333 entry,
334 )
335 return "", err
336 }
337
338 featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
339 if !featureFlag.IsValid() && pgsdb.IsProjectPrivate(projectName) {
340 return "", fmt.Errorf("private projects are only allowed for pico+ users")
341 }
342
343 // calculate the filsize difference between the same file already
344 // stored and the updated file being uploaded
345 assetFilename := shared.GetAssetFileName(entry)
346 obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
347 var curFileSize int64
348 if info != nil {
349 curFileSize = info.Size
350 }
351 if obj != nil {
352 defer func() {
353 _ = obj.Close()
354 }()
355 }
356
357 denylist := getDenylist(s)
358 if denylist == nil {
359 dlist, err := h.findDenylist(bucket, project, logger)
360 if err != nil {
361 logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
362 dlist = ".*"
363 }
364 setDenylist(s, dlist)
365 denylist = &DenyList{Denylist: dlist}
366 }
367
368 data := &FileData{
369 FileEntry: entry,
370 User: user,
371 Bucket: bucket,
372 DenyList: denylist.Denylist,
373 Project: project,
374 }
375
376 valid, err := h.validateAsset(data)
377 if !valid {
378 return "", err
379 }
380
381 // SFTP does not report file size so the more performant way to
382 // check filesize constraints is to try and upload the file to s3
383 // with a specialized reader that raises an error if the filesize limit
384 // has been reached
385 storageMax := featureFlag.Data.StorageMax
386 fileMax := featureFlag.Data.FileMax
387 curStorageSize := getStorageSize(s)
388 remaining := int64(storageMax) - int64(curStorageSize)
389 sizeRemaining := min(remaining+curFileSize, fileMax)
390 if sizeRemaining <= 0 {
391 _, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
392 _, _ = fmt.Fprintf(s.Stderr(), "\r")
393 _ = s.Exit(1)
394 _ = s.Close()
395 return "", fmt.Errorf("storage quota reached")
396 }
397 logger = logger.With(
398 "storageMax", storageMax,
399 "currentStorageMax", curStorageSize,
400 "fileMax", fileMax,
401 "sizeRemaining", sizeRemaining,
402 )
403
404 specialFileMax := featureFlag.Data.SpecialFileMax
405 if isSpecialFile(entry.Filepath) {
406 sizeRemaining = min(sizeRemaining, specialFileMax)
407 }
408
409 fsize, err := h.writeAsset(
410 s,
411 shared.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
412 data,
413 )
414 if err != nil {
415 logger.Error("could not write asset", "err", err.Error())
416 cerr := fmt.Errorf(
417 "%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
418 err,
419 shared.BytesToMB(int(curStorageSize)),
420 shared.BytesToMB(int(storageMax)),
421 shared.BytesToMB(int(fileMax)),
422 shared.BytesToMB(int(specialFileMax)),
423 )
424 return "", cerr
425 }
426
427 deltaFileSize := curFileSize - fsize
428 nextStorageSize := incrementStorageSize(s, deltaFileSize)
429
430 url := h.Cfg.AssetURL(
431 user.Name,
432 projectName,
433 strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
434 )
435
436 maxSize := int(featureFlag.Data.StorageMax)
437 str := fmt.Sprintf(
438 "%s (space: %.2f/%.2fGB, %.2f%%)",
439 url,
440 shared.BytesToGB(int(nextStorageSize)),
441 shared.BytesToGB(maxSize),
442 (float32(nextStorageSize)/float32(maxSize))*100,
443 )
444
445 surrogate := getSurrogateKey(user.Name, projectName)
446 h.Cfg.CacheClearingQueue <- surrogate
447
448 return str, err
449}
450
451func isSpecialFile(entry string) bool {
452 fname := filepath.Base(entry)
453 return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
454}
455
456func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
457 logger := pssh.GetLogger(s)
458 user := pssh.GetUser(s)
459
460 if user == nil {
461 err := fmt.Errorf("could not get user from ctx")
462 logger.Error("error getting user from ctx", "err", err)
463 return err
464 }
465
466 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
467 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
468 }
469
470 assetFilepath := shared.GetAssetFileName(entry)
471
472 logger = logger.With(
473 "file", assetFilepath,
474 )
475
476 bucket, err := getBucket(s)
477 if err != nil {
478 logger.Error("could not find bucket in ctx", "err", err.Error())
479 return err
480 }
481
482 projectName := shared.GetProjectName(entry)
483 logger = logger.With("project", projectName)
484
485 if assetFilepath == filepath.Join("/", projectName, "._pico_keep_dir") {
486 return os.ErrPermission
487 }
488
489 logger.Info("deleting file")
490
491 pathDir := filepath.Dir(assetFilepath)
492 fileName := filepath.Base(assetFilepath)
493
494 sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
495 if err != nil {
496 return err
497 }
498
499 sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
500 return sib.Name() == fileName
501 })
502
503 if len(sibs) == 0 {
504 _, _, err := h.Cfg.Storage.PutObject(
505 bucket,
506 filepath.Join(pathDir, "._pico_keep_dir"),
507 bytes.NewReader([]byte{}),
508 entry,
509 )
510 if err != nil {
511 return err
512 }
513 }
514 err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
515
516 surrogate := getSurrogateKey(user.Name, projectName)
517 h.Cfg.CacheClearingQueue <- surrogate
518
519 if err != nil {
520 return err
521 }
522
523 return err
524}
525
526func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
527 fname := filepath.Base(data.Filepath)
528
529 projectName := shared.GetProjectName(data.FileEntry)
530 if projectName == "" || projectName == "/" || projectName == "." {
531 return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
532 }
533
534 // special files we use for custom routing
535 if isSpecialFile(fname) {
536 return true, nil
537 }
538
539 fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
540 if shouldIgnoreFile(fpath, data.DenyList) {
541 err := fmt.Errorf(
542 "ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
543 data.Filepath,
544 )
545 return false, err
546 }
547
548 return true, nil
549}
550
551func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
552 assetFilepath := shared.GetAssetFileName(data.FileEntry)
553
554 logger := h.GetLogger(s)
555 logger.Info(
556 "uploading file to bucket",
557 "bucket", data.Bucket.Name,
558 "filename", assetFilepath,
559 )
560
561 _, fsize, err := h.Cfg.Storage.PutObject(
562 data.Bucket,
563 assetFilepath,
564 reader,
565 data.FileEntry,
566 )
567 return fsize, err
568}
569
570// runCacheQueue processes requests to purge the cache for a single site.
571// One message arrives per file that is written/deleted during uploads.
572// Repeated messages for the same site are grouped so that we only flush once
573// per site per 5 seconds.
574func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
575 var pendingFlushes sync.Map
576 tick := time.Tick(5 * time.Second)
577 for {
578 select {
579 case host := <-cfg.CacheClearingQueue:
580 pendingFlushes.Store(host, host)
581 case <-tick:
582 go func() {
583 pendingFlushes.Range(func(key, value any) bool {
584 pendingFlushes.Delete(key)
585 err := purgeCache(cfg, cfg.Pubsub, key.(string))
586 if err != nil {
587 cfg.Logger.Error("failed to clear cache", "err", err.Error())
588 }
589 return true
590 })
591 }()
592 }
593 }
594}