repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
87b874c
parent
91dabf9
author
Eric Bower
date
2025-04-05 22:55:19 -0400 EDT
feat(pgs): lru cache for object info and special files
11 files changed,  +172, -70
M go.mod
M go.sum
M go.mod
+1, -0
1@@ -35,6 +35,7 @@ require (
2 	github.com/google/uuid v1.6.0
3 	github.com/gorilla/feeds v1.2.0
4 	github.com/gorilla/websocket v1.5.3
5+	github.com/hashicorp/golang-lru/v2 v2.0.7
6 	github.com/jmoiron/sqlx v1.4.0
7 	github.com/lib/pq v1.10.9
8 	github.com/matryer/is v1.4.1
M go.sum
+2, -0
1@@ -451,6 +451,8 @@ github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
2 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
3 github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
4 github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
5+github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
6+github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
7 github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
8 github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y=
9 github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
M pkg/apps/pgs/uploader.go
+5, -5
 1@@ -395,7 +395,7 @@ func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutil
 2 	)
 3 
 4 	specialFileMax := featureFlag.Data.SpecialFileMax
 5-	if isSpecialFile(entry) {
 6+	if isSpecialFile(entry.Filepath) {
 7 		sizeRemaining = min(sizeRemaining, specialFileMax)
 8 	}
 9 
10@@ -441,9 +441,9 @@ func (h *UploadAssetHandler) Write(s *pssh.SSHServerConnSession, entry *sendutil
11 	return str, err
12 }
13 
14-func isSpecialFile(entry *sendutils.FileEntry) bool {
15-	fname := filepath.Base(entry.Filepath)
16-	return fname == "_headers" || fname == "_redirects"
17+func isSpecialFile(entry string) bool {
18+	fname := filepath.Base(entry)
19+	return fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore"
20 }
21 
22 func (h *UploadAssetHandler) Delete(s *pssh.SSHServerConnSession, entry *sendutils.FileEntry) error {
23@@ -525,7 +525,7 @@ func (h *UploadAssetHandler) validateAsset(data *FileData) (bool, error) {
24 	}
25 
26 	// special files we use for custom routing
27-	if fname == "_pgs_ignore" || fname == "_redirects" || fname == "_headers" {
28+	if isSpecialFile(fname) {
29 		return true, nil
30 	}
31 
M pkg/apps/pgs/web.go
+26, -7
 1@@ -9,6 +9,7 @@ import (
 2 	"net/http"
 3 	"net/url"
 4 	"os"
 5+	"path/filepath"
 6 	"regexp"
 7 	"strings"
 8 	"time"
 9@@ -20,6 +21,8 @@ import (
10 	"github.com/darkweak/souin/plugins/souin/storages"
11 	"github.com/darkweak/storages/core"
12 	"github.com/gorilla/feeds"
13+	"github.com/hashicorp/golang-lru/v2/expirable"
14+	"github.com/picosh/pico/pkg/cache"
15 	"github.com/picosh/pico/pkg/db"
16 	sst "github.com/picosh/pico/pkg/pobj/storage"
17 	"github.com/picosh/pico/pkg/shared"
18@@ -74,7 +77,7 @@ func StartApiServer(cfg *PgsConfig) {
19 		routes:  routes,
20 	}
21 
22-	go routes.cacheMgmt(ctx, httpCache)
23+	go routes.cacheMgmt(ctx, httpCache, cfg.CacheClearingQueue)
24 
25 	portStr := fmt.Sprintf(":%s", cfg.WebPort)
26 	cfg.Logger.Info(
27@@ -92,19 +95,34 @@ func StartApiServer(cfg *PgsConfig) {
28 type HasPerm = func(proj *db.Project) bool
29 
30 type WebRouter struct {
31-	Cfg        *PgsConfig
32-	RootRouter *http.ServeMux
33-	UserRouter *http.ServeMux
34+	Cfg            *PgsConfig
35+	RootRouter     *http.ServeMux
36+	UserRouter     *http.ServeMux
37+	RedirectsCache *expirable.LRU[string, []*RedirectRule]
38+	HeadersCache   *expirable.LRU[string, []*HeaderRule]
39 }
40 
41 func NewWebRouter(cfg *PgsConfig) *WebRouter {
42 	router := &WebRouter{
43-		Cfg: cfg,
44+		Cfg:            cfg,
45+		RedirectsCache: expirable.NewLRU[string, []*RedirectRule](2048, nil, cache.CacheTimeout),
46+		HeadersCache:   expirable.NewLRU[string, []*HeaderRule](2048, nil, cache.CacheTimeout),
47 	}
48 	router.initRouters()
49+	go router.watchCacheClear()
50 	return router
51 }
52 
53+func (web *WebRouter) watchCacheClear() {
54+	for key := range web.Cfg.CacheClearingQueue {
55+		web.Cfg.Logger.Info("lru cache clear request", "key", key)
56+		rKey := filepath.Join(key, "_redirects")
57+		web.RedirectsCache.Remove(rKey)
58+		hKey := filepath.Join(key, "_headers")
59+		web.HeadersCache.Remove(hKey)
60+	}
61+}
62+
63 func (web *WebRouter) initRouters() {
64 	// ensure legacy router is disabled
65 	// GODEBUG=httpmuxgo121=0
66@@ -259,7 +277,7 @@ func (web *WebRouter) checkHandler(w http.ResponseWriter, r *http.Request) {
67 	w.WriteHeader(http.StatusNotFound)
68 }
69 
70-func (web *WebRouter) cacheMgmt(ctx context.Context, httpCache *middleware.SouinBaseHandler) {
71+func (web *WebRouter) cacheMgmt(ctx context.Context, httpCache *middleware.SouinBaseHandler, notify chan string) {
72 	storer := httpCache.Storers[0]
73 	drain := createSubCacheDrain(ctx, web.Cfg.Logger)
74 
75@@ -269,6 +287,7 @@ func (web *WebRouter) cacheMgmt(ctx context.Context, httpCache *middleware.Souin
76 		for scanner.Scan() {
77 			surrogateKey := strings.TrimSpace(scanner.Text())
78 			web.Cfg.Logger.Info("received cache-drain item", "surrogateKey", surrogateKey)
79+			notify <- surrogateKey
80 
81 			if surrogateKey == "*" {
82 				storer.DeleteMany(".+")
83@@ -426,7 +445,7 @@ func (web *WebRouter) ServeAsset(fname string, opts *storage.ImgProcessOpts, fro
84 		"host", r.Host,
85 	)
86 
87-	if fname == "_headers" || fname == "_redirects" || fname == "_pgs_ignore" {
88+	if isSpecialFile(fname) {
89 		logger.Info("special file names are not allowed to be served over http")
90 		http.Error(w, "404 not found", http.StatusNotFound)
91 		return
M pkg/apps/pgs/web_asset_handler.go
+63, -41
  1@@ -41,28 +41,39 @@ func hasProtocol(url string) bool {
  2 func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  3 	logger := h.Logger
  4 	var redirects []*RedirectRule
  5-	redirectFp, redirectInfo, err := h.Cfg.Storage.GetObject(h.Bucket, filepath.Join(h.ProjectDir, "_redirects"))
  6-	if err == nil {
  7-		defer redirectFp.Close()
  8-		if redirectInfo != nil && redirectInfo.Size > h.Cfg.MaxSpecialFileSize {
  9-			errMsg := fmt.Sprintf("_redirects file is too large (%d > %d)", redirectInfo.Size, h.Cfg.MaxSpecialFileSize)
 10-			logger.Error(errMsg)
 11-			http.Error(w, errMsg, http.StatusInternalServerError)
 12-			return
 13-		}
 14-		buf := new(strings.Builder)
 15-		lr := io.LimitReader(redirectFp, h.Cfg.MaxSpecialFileSize)
 16-		_, err := io.Copy(buf, lr)
 17-		if err != nil {
 18-			logger.Error("io copy", "err", err.Error())
 19-			http.Error(w, "cannot read _redirects file", http.StatusInternalServerError)
 20-			return
 21-		}
 22 
 23-		redirects, err = parseRedirectText(buf.String())
 24-		if err != nil {
 25-			logger.Error("could not parse redirect text", "err", err.Error())
 26+	redirectsCacheKey := filepath.Join(getSurrogateKey(h.UserID, h.ProjectDir), "_redirects")
 27+	logger.Info("looking for _redirects in lru cache", "key", redirectsCacheKey)
 28+	if cachedRedirects, found := h.RedirectsCache.Get(redirectsCacheKey); found {
 29+		logger.Info("_redirects found in lru cache", "key", redirectsCacheKey)
 30+		redirects = cachedRedirects
 31+	} else {
 32+		logger.Info("_redirects not found in lru cache", "key", redirectsCacheKey)
 33+		redirectFp, redirectInfo, err := h.Cfg.Storage.GetObject(h.Bucket, filepath.Join(h.ProjectDir, "_redirects"))
 34+		if err == nil {
 35+			defer redirectFp.Close()
 36+			if redirectInfo != nil && redirectInfo.Size > h.Cfg.MaxSpecialFileSize {
 37+				errMsg := fmt.Sprintf("_redirects file is too large (%d > %d)", redirectInfo.Size, h.Cfg.MaxSpecialFileSize)
 38+				logger.Error(errMsg)
 39+				http.Error(w, errMsg, http.StatusInternalServerError)
 40+				return
 41+			}
 42+			buf := new(strings.Builder)
 43+			lr := io.LimitReader(redirectFp, h.Cfg.MaxSpecialFileSize)
 44+			_, err := io.Copy(buf, lr)
 45+			if err != nil {
 46+				logger.Error("io copy", "err", err.Error())
 47+				http.Error(w, "cannot read _redirects file", http.StatusInternalServerError)
 48+				return
 49+			}
 50+
 51+			redirects, err = parseRedirectText(buf.String())
 52+			if err != nil {
 53+				logger.Error("could not parse redirect text", "err", err.Error())
 54+			}
 55 		}
 56+
 57+		h.RedirectsCache.Add(redirectsCacheKey, redirects)
 58 	}
 59 
 60 	routes := calcRoutes(h.ProjectDir, h.Filepath, redirects)
 61@@ -163,28 +174,39 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 62 	defer contents.Close()
 63 
 64 	var headers []*HeaderRule
 65-	headersFp, headersInfo, err := h.Cfg.Storage.GetObject(h.Bucket, filepath.Join(h.ProjectDir, "_headers"))
 66-	if err == nil {
 67-		defer headersFp.Close()
 68-		if headersInfo != nil && headersInfo.Size > h.Cfg.MaxSpecialFileSize {
 69-			errMsg := fmt.Sprintf("_headers file is too large (%d > %d)", headersInfo.Size, h.Cfg.MaxSpecialFileSize)
 70-			logger.Error(errMsg)
 71-			http.Error(w, errMsg, http.StatusInternalServerError)
 72-			return
 73-		}
 74-		buf := new(strings.Builder)
 75-		lr := io.LimitReader(headersFp, h.Cfg.MaxSpecialFileSize)
 76-		_, err := io.Copy(buf, lr)
 77-		if err != nil {
 78-			logger.Error("io copy", "err", err.Error())
 79-			http.Error(w, "cannot read _headers file", http.StatusInternalServerError)
 80-			return
 81-		}
 82 
 83-		headers, err = parseHeaderText(buf.String())
 84-		if err != nil {
 85-			logger.Error("could not parse header text", "err", err.Error())
 86+	headersCacheKey := filepath.Join(getSurrogateKey(h.UserID, h.ProjectDir), "_headers")
 87+	logger.Info("looking for _headers in lru cache", "key", headersCacheKey)
 88+	if cachedHeaders, found := h.HeadersCache.Get(headersCacheKey); found {
 89+		logger.Info("_headers found in lru", "key", headersCacheKey)
 90+		headers = cachedHeaders
 91+	} else {
 92+		logger.Info("_headers not found in lru cache", "key", headersCacheKey)
 93+		headersFp, headersInfo, err := h.Cfg.Storage.GetObject(h.Bucket, filepath.Join(h.ProjectDir, "_headers"))
 94+		if err == nil {
 95+			defer headersFp.Close()
 96+			if headersInfo != nil && headersInfo.Size > h.Cfg.MaxSpecialFileSize {
 97+				errMsg := fmt.Sprintf("_headers file is too large (%d > %d)", headersInfo.Size, h.Cfg.MaxSpecialFileSize)
 98+				logger.Error(errMsg)
 99+				http.Error(w, errMsg, http.StatusInternalServerError)
100+				return
101+			}
102+			buf := new(strings.Builder)
103+			lr := io.LimitReader(headersFp, h.Cfg.MaxSpecialFileSize)
104+			_, err := io.Copy(buf, lr)
105+			if err != nil {
106+				logger.Error("io copy", "err", err.Error())
107+				http.Error(w, "cannot read _headers file", http.StatusInternalServerError)
108+				return
109+			}
110+
111+			headers, err = parseHeaderText(buf.String())
112+			if err != nil {
113+				logger.Error("could not parse header text", "err", err.Error())
114+			}
115 		}
116+
117+		h.HeadersCache.Add(headersCacheKey, headers)
118 	}
119 
120 	userHeaders := []*HeaderLine{}
121@@ -236,7 +258,7 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
122 		return
123 	}
124 	w.WriteHeader(status)
125-	_, err = io.Copy(w, contents)
126+	_, err := io.Copy(w, contents)
127 
128 	if err != nil {
129 		logger.Error("io copy", "err", err.Error())
M pkg/apps/pgs/web_test.go
+1, -1
1@@ -329,7 +329,7 @@ func TestApiBasic(t *testing.T) {
2 
3 			ct := responseRecorder.Header().Get("content-type")
4 			if ct != tc.contentType {
5-				t.Errorf("Want status '%s', got '%s'", tc.contentType, ct)
6+				t.Errorf("Want content type '%s', got '%s'", tc.contentType, ct)
7 			}
8 
9 			body := strings.TrimSpace(responseRecorder.Body.String())
A pkg/cache/cache.go
+21, -0
 1@@ -0,0 +1,21 @@
 2+package cache
 3+
 4+import (
 5+	"log/slog"
 6+	"time"
 7+
 8+	"github.com/picosh/utils"
 9+)
10+
11+var CacheTimeout time.Duration
12+
13+func init() {
14+	cacheDuration := utils.GetEnv("STORAGE_MINIO_CACHE_DURATION", "1m")
15+	duration, err := time.ParseDuration(cacheDuration)
16+	if err != nil {
17+		slog.Error("Invalid STORAGE_MINIO_CACHE_DURATION value, using default 1m", "error", err)
18+		duration = 1 * time.Minute
19+	}
20+
21+	CacheTimeout = duration
22+}
M pkg/pobj/storage/minio.go
+45, -12
  1@@ -5,27 +5,45 @@ import (
  2 	"errors"
  3 	"fmt"
  4 	"io"
  5+	"log/slog"
  6 	"net/url"
  7 	"os"
  8+	"path/filepath"
  9 	"strconv"
 10 	"strings"
 11 	"time"
 12 
 13+	"github.com/hashicorp/golang-lru/v2/expirable"
 14 	"github.com/minio/madmin-go/v3"
 15 	"github.com/minio/minio-go/v7"
 16 	"github.com/minio/minio-go/v7/pkg/credentials"
 17+	"github.com/picosh/pico/pkg/cache"
 18 	"github.com/picosh/pico/pkg/send/utils"
 19 )
 20 
 21 type StorageMinio struct {
 22-	Client *minio.Client
 23-	Admin  *madmin.AdminClient
 24+	Client      *minio.Client
 25+	Admin       *madmin.AdminClient
 26+	BucketCache *expirable.LRU[string, CachedBucket]
 27+	Logger      *slog.Logger
 28 }
 29 
 30-var _ ObjectStorage = &StorageMinio{}
 31-var _ ObjectStorage = (*StorageMinio)(nil)
 32+type CachedBucket struct {
 33+	Bucket
 34+	Error error
 35+}
 36+
 37+type CachedObjectInfo struct {
 38+	*ObjectInfo
 39+	Error error
 40+}
 41+
 42+var (
 43+	_ ObjectStorage = &StorageMinio{}
 44+	_ ObjectStorage = (*StorageMinio)(nil)
 45+)
 46 
 47-func NewStorageMinio(address, user, pass string) (*StorageMinio, error) {
 48+func NewStorageMinio(logger *slog.Logger, address, user, pass string) (*StorageMinio, error) {
 49 	endpoint, err := url.Parse(address)
 50 	if err != nil {
 51 		return nil, err
 52@@ -52,13 +70,21 @@ func NewStorageMinio(address, user, pass string) (*StorageMinio, error) {
 53 	}
 54 
 55 	mini := &StorageMinio{
 56-		Client: mClient,
 57-		Admin:  aClient,
 58+		Client:      mClient,
 59+		Admin:       aClient,
 60+		BucketCache: expirable.NewLRU[string, CachedBucket](2048, nil, cache.CacheTimeout),
 61+		Logger:      logger,
 62 	}
 63 	return mini, err
 64 }
 65 
 66 func (s *StorageMinio) GetBucket(name string) (Bucket, error) {
 67+	if cachedBucket, found := s.BucketCache.Get(name); found {
 68+		s.Logger.Info("bucket found in lru cache", "name", name)
 69+		return cachedBucket.Bucket, cachedBucket.Error
 70+	}
 71+	s.Logger.Info("bucket not found in lru cache", "name", name)
 72+
 73 	bucket := Bucket{
 74 		Name: name,
 75 	}
 76@@ -68,9 +94,13 @@ func (s *StorageMinio) GetBucket(name string) (Bucket, error) {
 77 		if err == nil {
 78 			err = errors.New("bucket does not exist")
 79 		}
 80+
 81+		s.BucketCache.Add(name, CachedBucket{bucket, err})
 82 		return bucket, err
 83 	}
 84 
 85+	s.BucketCache.Add(name, CachedBucket{bucket, nil})
 86+
 87 	return bucket, nil
 88 }
 89 
 90@@ -160,6 +190,9 @@ func (s *StorageMinio) GetObject(bucket Bucket, fpath string) (utils.ReadAndRead
 91 		ETag:         "",
 92 	}
 93 
 94+	cacheKey := filepath.Join(bucket.Name, fpath)
 95+
 96+	s.Logger.Info("object info not found in lru cache", "key", cacheKey)
 97 	info, err := s.Client.StatObject(context.Background(), bucket.Name, fpath, minio.StatObjectOptions{})
 98 	if err != nil {
 99 		return nil, objInfo, err
100@@ -171,11 +204,6 @@ func (s *StorageMinio) GetObject(bucket Bucket, fpath string) (utils.ReadAndRead
101 	objInfo.UserMetadata = info.UserMetadata
102 	objInfo.Size = info.Size
103 
104-	obj, err := s.Client.GetObject(context.Background(), bucket.Name, fpath, minio.GetObjectOptions{})
105-	if err != nil {
106-		return nil, objInfo, err
107-	}
108-
109 	if mtime, ok := info.UserMetadata["Mtime"]; ok {
110 		mtimeUnix, err := strconv.Atoi(mtime)
111 		if err == nil {
112@@ -183,6 +211,11 @@ func (s *StorageMinio) GetObject(bucket Bucket, fpath string) (utils.ReadAndRead
113 		}
114 	}
115 
116+	obj, err := s.Client.GetObject(context.Background(), bucket.Name, fpath, minio.GetObjectOptions{})
117+	if err != nil {
118+		return nil, objInfo, err
119+	}
120+
121 	return obj, objInfo, nil
122 }
123 
M pkg/pobj/util.go
+1, -1
1@@ -29,7 +29,7 @@ func EnvDriverDetector(logger *slog.Logger) (storage.ObjectStorage, error) {
2 			"url", url,
3 			"user", user,
4 		)
5-		return storage.NewStorageMinio(url, user, pass)
6+		return storage.NewStorageMinio(logger, url, user, pass)
7 	}
8 
9 	// implied driver == "fs"
M pkg/shared/storage/minio.go
+2, -3
 1@@ -13,15 +13,14 @@ import (
 2 
 3 type StorageMinio struct {
 4 	*sst.StorageMinio
 5-	Logger *slog.Logger
 6 }
 7 
 8 func NewStorageMinio(logger *slog.Logger, address, user, pass string) (*StorageMinio, error) {
 9-	st, err := sst.NewStorageMinio(address, user, pass)
10+	st, err := sst.NewStorageMinio(logger, address, user, pass)
11 	if err != nil {
12 		return nil, err
13 	}
14-	return &StorageMinio{st, logger}, nil
15+	return &StorageMinio{st}, nil
16 }
17 
18 func (s *StorageMinio) ServeObject(bucket sst.Bucket, fpath string, opts *ImgProcessOpts) (io.ReadCloser, *sst.ObjectInfo, error) {
M pkg/shared/storage/proxy.go
+5, -0
 1@@ -166,6 +166,7 @@ type ImgProcessOpts struct {
 2 	Ratio   *Ratio
 3 	Rotate  int
 4 	Ext     string
 5+	NoRaw   bool
 6 }
 7 
 8 func (img *ImgProcessOpts) String() string {
 9@@ -204,6 +205,10 @@ func (img *ImgProcessOpts) String() string {
10 		processOpts = fmt.Sprintf("%s/ext:%s", processOpts, img.Ext)
11 	}
12 
13+	if processOpts == "" && !img.NoRaw {
14+		processOpts = fmt.Sprintf("%s/raw:true", processOpts)
15+	}
16+
17 	return processOpts
18 }
19