Eric Bower
·
2025-05-25
uploader.go
1package pgs
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "io/fs"
9 "log/slog"
10 "os"
11 "path"
12 "path/filepath"
13 "slices"
14 "strings"
15 "sync"
16 "time"
17
18 pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
19 "github.com/picosh/pico/pkg/db"
20 "github.com/picosh/pico/pkg/pobj"
21 sst "github.com/picosh/pico/pkg/pobj/storage"
22 "github.com/picosh/pico/pkg/pssh"
23 sendutils "github.com/picosh/pico/pkg/send/utils"
24 "github.com/picosh/pico/pkg/shared"
25 "github.com/picosh/utils"
26 ignore "github.com/sabhiram/go-gitignore"
27)
28
29type ctxBucketKey struct{}
30type ctxStorageSizeKey struct{}
31type ctxProjectKey struct{}
32type ctxDenylistKey struct{}
33
34type DenyList struct {
35 Denylist string
36}
37
38func getDenylist(s *pssh.SSHServerConnSession) *DenyList {
39 v := s.Context().Value(ctxDenylistKey{})
40 if v == nil {
41 return nil
42 }
43 denylist := s.Context().Value(ctxDenylistKey{}).(*DenyList)
44 return denylist
45}
46
47func setDenylist(s *pssh.SSHServerConnSession, denylist string) {
48 s.SetValue(ctxDenylistKey{}, &DenyList{Denylist: denylist})
49}
50
51func getProject(s *pssh.SSHServerConnSession) *db.Project {
52 v := s.Context().Value(ctxProjectKey{})
53 if v == nil {
54 return nil
55 }
56 project := s.Context().Value(ctxProjectKey{}).(*db.Project)
57 return project
58}
59
60func setProject(s *pssh.SSHServerConnSession, project *db.Project) {
61 s.SetValue(ctxProjectKey{}, project)
62}
63
64func getBucket(s *pssh.SSHServerConnSession) (sst.Bucket, error) {
65 bucket := s.Context().Value(ctxBucketKey{}).(sst.Bucket)
66 if bucket.Name == "" {
67 return bucket, fmt.Errorf("bucket not set on `ssh.Context()` for connection")
68 }
69 return bucket, nil
70}
71
72func getStorageSize(s *pssh.SSHServerConnSession) uint64 {
73 return s.Context().Value(ctxStorageSizeKey{}).(uint64)
74}
75
76func incrementStorageSize(s *pssh.SSHServerConnSession, fileSize int64) uint64 {
77 curSize := getStorageSize(s)
78 var nextStorageSize uint64
79 if fileSize < 0 {
80 nextStorageSize = curSize - uint64(fileSize)
81 } else {
82 nextStorageSize = curSize + uint64(fileSize)
83 }
84 s.SetValue(ctxStorageSizeKey{}, nextStorageSize)
85 return nextStorageSize
86}
87
88func shouldIgnoreFile(fp, ignoreStr string) bool {
89 object := ignore.CompileIgnoreLines(strings.Split(ignoreStr, "\n")...)
90 return object.MatchesPath(fp)
91}
92
93type FileData struct {
94 *sendutils.FileEntry
95 User *db.User
96 Bucket sst.Bucket
97 Project *db.Project
98 DenyList string
99}
100
101type UploadAssetHandler struct {
102 Cfg *PgsConfig
103 CacheClearingQueue chan string
104}
105
106func NewUploadAssetHandler(cfg *PgsConfig, ch chan string, ctx context.Context) *UploadAssetHandler {
107 go runCacheQueue(cfg, ctx)
108 return &UploadAssetHandler{
109 Cfg: cfg,
110 CacheClearingQueue: ch,
111 }
112}
113
114func (h *UploadAssetHandler) GetLogger(s *pssh.SSHServerConnSession) *slog.Logger {
115 return pssh.GetLogger(s)
116}
117
118func (h *UploadAssetHandler) Read(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (os.FileInfo, sendutils.ReadAndReaderAtCloser, error) {
119 logger := pssh.GetLogger(s)
120 user := pssh.GetUser(s)
121
122 if user == nil {
123 err := fmt.Errorf("could not get user from ctx")
124 logger.Error("error getting user from ctx", "err", err)
125 return nil, nil, err
126 }
127
128 fileInfo := &sendutils.VirtualFile{
129 FName: filepath.Base(entry.Filepath),
130 FIsDir: false,
131 FSize: entry.Size,
132 FModTime: time.Unix(entry.Mtime, 0),
133 }
134
135 bucket, err := h.Cfg.Storage.GetBucket(shared.GetAssetBucketName(user.ID))
136 if err != nil {
137 return nil, nil, err
138 }
139
140 fname := shared.GetAssetFileName(entry)
141 contents, info, err := h.Cfg.Storage.GetObject(bucket, fname)
142 if err != nil {
143 return nil, nil, err
144 }
145
146 fileInfo.FSize = info.Size
147 fileInfo.FModTime = info.LastModified
148
149 reader := pobj.NewAllReaderAt(contents)
150
151 return fileInfo, reader, nil
152}
153
154func (h *UploadAssetHandler) List(s *pssh.SSHServerConnSession, fpath string, isDir bool, recursive bool) ([]os.FileInfo, error) {
155 var fileList []os.FileInfo
156
157 logger := pssh.GetLogger(s)
158 user := pssh.GetUser(s)
159
160 if user == nil {
161 err := fmt.Errorf("could not get user from ctx")
162 logger.Error("error getting user from ctx", "err", err)
163 return fileList, err
164 }
165
166 cleanFilename := fpath
167
168 bucketName := shared.GetAssetBucketName(user.ID)
169 bucket, err := h.Cfg.Storage.GetBucket(bucketName)
170 if err != nil {
171 return fileList, err
172 }
173
174 if cleanFilename == "" || cleanFilename == "." {
175 name := cleanFilename
176 if name == "" {
177 name = "/"
178 }
179
180 info := &sendutils.VirtualFile{
181 FName: name,
182 FIsDir: true,
183 }
184
185 fileList = append(fileList, info)
186 } else {
187 if cleanFilename != "/" && isDir {
188 cleanFilename += "/"
189 }
190
191 foundList, err := h.Cfg.Storage.ListObjects(bucket, cleanFilename, recursive)
192 if err != nil {
193 return fileList, err
194 }
195
196 fileList = append(fileList, foundList...)
197 }
198
199 return fileList, nil
200}
201
202func (h *UploadAssetHandler) Validate(s *pssh.SSHServerConnSession) error {
203 logger := pssh.GetLogger(s)
204 user := pssh.GetUser(s)
205
206 if user == nil {
207 err := fmt.Errorf("could not get user from ctx")
208 logger.Error("error getting user from ctx", "err", err)
209 return err
210 }
211
212 assetBucket := shared.GetAssetBucketName(user.ID)
213 bucket, err := h.Cfg.Storage.UpsertBucket(assetBucket)
214 if err != nil {
215 return err
216 }
217
218 s.SetValue(ctxBucketKey{}, bucket)
219
220 totalStorageSize, err := h.Cfg.Storage.GetBucketQuota(bucket)
221 if err != nil {
222 return err
223 }
224
225 s.SetValue(ctxStorageSizeKey{}, totalStorageSize)
226
227 logger.Info(
228 "bucket size",
229 "user", user.Name,
230 "bytes", totalStorageSize,
231 )
232
233 logger.Info(
234 "attempting to upload files",
235 "user", user.Name,
236 "txtPrefix", h.Cfg.TxtPrefix,
237 )
238
239 return nil
240}
241
242func (h *UploadAssetHandler) findDenylist(bucket sst.Bucket, project *db.Project, logger *slog.Logger) (string, error) {
243 fp, _, err := h.Cfg.Storage.GetObject(bucket, filepath.Join(project.ProjectDir, "_pgs_ignore"))
244 if err != nil {
245 return "", fmt.Errorf("_pgs_ignore not found")
246 }
247 defer func() {
248 _ = fp.Close()
249 }()
250
251 buf := new(strings.Builder)
252 _, err = io.Copy(buf, fp)
253 if err != nil {
254 logger.Error("io copy", "err", err.Error())
255 return "", err
256 }
257
258 str := buf.String()
259 return str, nil
260}
261
262func findPlusFF(dbpool pgsdb.PgsDB, cfg *PgsConfig, userID string) *db.FeatureFlag {
263 ff, _ := dbpool.FindFeature(userID, "plus")
264 // we have free tiers so users might not have a feature flag
265 // in which case we set sane defaults
266 if ff == nil {
267 ff = db.NewFeatureFlag(
268 userID,
269 "plus",
270 cfg.MaxSize,
271 cfg.MaxAssetSize,
272 cfg.MaxSpecialFileSize,
273 )
274 }
275 // this is jank
276 ff.Data.StorageMax = ff.FindStorageMax(cfg.MaxSize)
277 ff.Data.FileMax = ff.FindFileMax(cfg.MaxAssetSize)
278 ff.Data.SpecialFileMax = ff.FindSpecialFileMax(cfg.MaxSpecialFileSize)
279 return ff
280}
281
282func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) (string, error) {
283 logger := pssh.GetLogger(s)
284 user := pssh.GetUser(s)
285
286 if user == nil {
287 err := fmt.Errorf("could not get user from ctx")
288 logger.Error("error getting user from ctx", "err", err)
289 return "", err
290 }
291
292 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
293 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
294 }
295
296 logger = logger.With(
297 "file", entry.Filepath,
298 "size", entry.Size,
299 )
300
301 bucket, err := getBucket(s)
302 if err != nil {
303 logger.Error("could not find bucket in ctx", "err", err.Error())
304 return "", err
305 }
306
307 project := getProject(s)
308 projectName := shared.GetProjectName(entry)
309 logger = logger.With("project", projectName)
310
311 // find, create, or update project if we haven't already done it
312 // we need to also check if the project stored in ctx is the same project
313 // being uploaded since users can keep an ssh connection alive via sftp
314 // and created many projects in a single session
315 if project == nil || project.Name != projectName {
316 project, err = h.Cfg.DB.UpsertProject(user.ID, projectName, projectName)
317 if err != nil {
318 logger.Error("upsert project", "err", err.Error())
319 return "", err
320 }
321 setProject(s, project)
322 }
323
324 if project.Blocked != "" {
325 msg := "project has been blocked and cannot upload files: %s"
326 return "", fmt.Errorf(msg, project.Blocked)
327 }
328
329 if entry.Mode.IsDir() {
330 _, _, err := h.Cfg.Storage.PutObject(
331 bucket,
332 path.Join(shared.GetAssetFileName(entry), "._pico_keep_dir"),
333 bytes.NewReader([]byte{}),
334 entry,
335 )
336 return "", err
337 }
338
339 featureFlag := findPlusFF(h.Cfg.DB, h.Cfg, user.ID)
340 // calculate the filsize difference between the same file already
341 // stored and the updated file being uploaded
342 assetFilename := shared.GetAssetFileName(entry)
343 obj, info, _ := h.Cfg.Storage.GetObject(bucket, assetFilename)
344 var curFileSize int64
345 if info != nil {
346 curFileSize = info.Size
347 }
348 if obj != nil {
349 defer func() {
350 _ = obj.Close()
351 }()
352 }
353
354 denylist := getDenylist(s)
355 if denylist == nil {
356 dlist, err := h.findDenylist(bucket, project, logger)
357 if err != nil {
358 logger.Info("failed to get denylist, setting default (.*)", "err", err.Error())
359 dlist = ".*"
360 }
361 setDenylist(s, dlist)
362 denylist = &DenyList{Denylist: dlist}
363 }
364
365 data := &FileData{
366 FileEntry: entry,
367 User: user,
368 Bucket: bucket,
369 DenyList: denylist.Denylist,
370 Project: project,
371 }
372
373 valid, err := h.validateAsset(data)
374 if !valid {
375 return "", err
376 }
377
378 // SFTP does not report file size so the more performant way to
379 // check filesize constraints is to try and upload the file to s3
380 // with a specialized reader that raises an error if the filesize limit
381 // has been reached
382 storageMax := featureFlag.Data.StorageMax
383 fileMax := featureFlag.Data.FileMax
384 curStorageSize := getStorageSize(s)
385 remaining := int64(storageMax) - int64(curStorageSize)
386 sizeRemaining := min(remaining+curFileSize, fileMax)
387 if sizeRemaining <= 0 {
388 _, _ = fmt.Fprintln(s.Stderr(), "storage quota reached")
389 _, _ = fmt.Fprintf(s.Stderr(), "\r")
390 _ = s.Exit(1)
391 _ = s.Close()
392 return "", fmt.Errorf("storage quota reached")
393 }
394 logger = logger.With(
395 "storageMax", storageMax,
396 "currentStorageMax", curStorageSize,
397 "fileMax", fileMax,
398 "sizeRemaining", sizeRemaining,
399 )
400
401 specialFileMax := featureFlag.Data.SpecialFileMax
402 if isSpecialFile(entry.Filepath) {
403 sizeRemaining = min(sizeRemaining, specialFileMax)
404 }
405
406 fsize, err := h.writeAsset(
407 s,
408 utils.NewMaxBytesReader(data.Reader, int64(sizeRemaining)),
409 data,
410 )
411 if err != nil {
412 logger.Error("could not write asset", "err", err.Error())
413 cerr := fmt.Errorf(
414 "%s: storage size %.2fmb, storage max %.2fmb, file max %.2fmb, special file max %.4fmb",
415 err,
416 utils.BytesToMB(int(curStorageSize)),
417 utils.BytesToMB(int(storageMax)),
418 utils.BytesToMB(int(fileMax)),
419 utils.BytesToMB(int(specialFileMax)),
420 )
421 return "", cerr
422 }
423
424 deltaFileSize := curFileSize - fsize
425 nextStorageSize := incrementStorageSize(s, deltaFileSize)
426
427 url := h.Cfg.AssetURL(
428 user.Name,
429 projectName,
430 strings.Replace(data.Filepath, "/"+projectName+"/", "", 1),
431 )
432
433 maxSize := int(featureFlag.Data.StorageMax)
434 str := fmt.Sprintf(
435 "%s (space: %.2f/%.2fGB, %.2f%%)",
436 url,
437 utils.BytesToGB(int(nextStorageSize)),
438 utils.BytesToGB(maxSize),
439 (float32(nextStorageSize)/float32(maxSize))*100,
440 )
441
442 surrogate := getSurrogateKey(user.Name, projectName)
443 h.Cfg.CacheClearingQueue <- surrogate
444
445 return str, err
446}
447
448func isSpecialFile(entry string) bool {
449 fname := filepath.Base(entry)
450 return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
451}
452
453func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
454 logger := pssh.GetLogger(s)
455 user := pssh.GetUser(s)
456
457 if user == nil {
458 err := fmt.Errorf("could not get user from ctx")
459 logger.Error("error getting user from ctx", "err", err)
460 return err
461 }
462
463 if entry.Mode.IsDir() && strings.Count(entry.Filepath, "/") == 1 {
464 entry.Filepath = strings.TrimPrefix(entry.Filepath, "/")
465 }
466
467 assetFilepath := shared.GetAssetFileName(entry)
468
469 logger = logger.With(
470 "file", assetFilepath,
471 )
472
473 bucket, err := getBucket(s)
474 if err != nil {
475 logger.Error("could not find bucket in ctx", "err", err.Error())
476 return err
477 }
478
479 projectName := shared.GetProjectName(entry)
480 logger = logger.With("project", projectName)
481
482 if assetFilepath == filepath.Join("/", projectName, "._pico_keep_dir") {
483 return os.ErrPermission
484 }
485
486 logger.Info("deleting file")
487
488 pathDir := filepath.Dir(assetFilepath)
489 fileName := filepath.Base(assetFilepath)
490
491 sibs, err := h.Cfg.Storage.ListObjects(bucket, pathDir+"/", false)
492 if err != nil {
493 return err
494 }
495
496 sibs = slices.DeleteFunc(sibs, func(sib fs.FileInfo) bool {
497 return sib.Name() == fileName
498 })
499
500 if len(sibs) == 0 {
501 _, _, err := h.Cfg.Storage.PutObject(
502 bucket,
503 filepath.Join(pathDir, "._pico_keep_dir"),
504 bytes.NewReader([]byte{}),
505 entry,
506 )
507 if err != nil {
508 return err
509 }
510 }
511 err = h.Cfg.Storage.DeleteObject(bucket, assetFilepath)
512
513 surrogate := getSurrogateKey(user.Name, projectName)
514 h.Cfg.CacheClearingQueue <- surrogate
515
516 if err != nil {
517 return err
518 }
519
520 return err
521}
522
523func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
524 fname := filepath.Base(data.Filepath)
525
526 projectName := shared.GetProjectName(data.FileEntry)
527 if projectName == "" || projectName == "/" || projectName == "." {
528 return false, fmt.Errorf("ERROR: invalid project name, you must copy files to a non-root folder (e.g. pgs.sh:/project-name)")
529 }
530
531 // special files we use for custom routing
532 if isSpecialFile(fname) {
533 return true, nil
534 }
535
536 fpath := strings.Replace(data.Filepath, "/"+projectName, "", 1)
537 if shouldIgnoreFile(fpath, data.DenyList) {
538 err := fmt.Errorf(
539 "ERROR: (%s) file rejected, https://pico.sh/pgs#-pgs-ignore",
540 data.Filepath,
541 )
542 return false, err
543 }
544
545 return true, nil
546}
547
548func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.Reader, data *FileData) (int64, error) {
549 assetFilepath := shared.GetAssetFileName(data.FileEntry)
550
551 logger := h.GetLogger(s)
552 logger.Info(
553 "uploading file to bucket",
554 "bucket", data.Bucket.Name,
555 "filename", assetFilepath,
556 )
557
558 _, fsize, err := h.Cfg.Storage.PutObject(
559 data.Bucket,
560 assetFilepath,
561 reader,
562 data.FileEntry,
563 )
564 return fsize, err
565}
566
567// runCacheQueue processes requests to purge the cache for a single site.
568// One message arrives per file that is written/deleted during uploads.
569// Repeated messages for the same site are grouped so that we only flush once
570// per site per 5 seconds.
571func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
572 send := createPubCacheDrain(ctx, cfg.Logger)
573 var pendingFlushes sync.Map
574 tick := time.Tick(5 * time.Second)
575 for {
576 select {
577 case host := <-cfg.CacheClearingQueue:
578 pendingFlushes.Store(host, host)
579 case <-tick:
580 go func() {
581 pendingFlushes.Range(func(key, value any) bool {
582 pendingFlushes.Delete(key)
583 err := purgeCache(cfg, send, key.(string))
584 if err != nil {
585 cfg.Logger.Error("failed to clear cache", "err", err.Error())
586 }
587 return true
588 })
589 }()
590 }
591 }
592}