repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / pobj / storage
Eric Bower  ·  2025-06-29

fs.go

  1package storage
  2
  3import (
  4	"crypto/md5"
  5	"encoding/hex"
  6	"fmt"
  7	"io"
  8	"io/fs"
  9	"log/slog"
 10	"net/http"
 11	"os"
 12	"path"
 13	"path/filepath"
 14	"strings"
 15	"time"
 16
 17	"github.com/google/renameio/v2"
 18	"github.com/picosh/pico/pkg/send/utils"
 19	"github.com/picosh/pico/pkg/shared/mime"
 20	putils "github.com/picosh/utils"
 21)
 22
 23// https://stackoverflow.com/a/32482941
 24func dirSize(path string) (int64, error) {
 25	var size int64
 26	err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
 27		if err != nil {
 28			return err
 29		}
 30		if !info.IsDir() {
 31			size += info.Size()
 32		}
 33		return err
 34	})
 35
 36	return size, err
 37}
 38
 39type StorageFS struct {
 40	Dir    string
 41	Logger *slog.Logger
 42}
 43
 44var _ ObjectStorage = &StorageFS{}
 45var _ ObjectStorage = (*StorageFS)(nil)
 46
 47func NewStorageFS(logger *slog.Logger, dir string) (*StorageFS, error) {
 48	return &StorageFS{Logger: logger, Dir: dir}, nil
 49}
 50
 51func (s *StorageFS) GetBucket(name string) (Bucket, error) {
 52	dirPath := filepath.Join(s.Dir, name)
 53	bucket := Bucket{
 54		Name: name,
 55		Path: dirPath,
 56	}
 57	// s.Logger.Info("get bucket", "dir", dirPath)
 58
 59	info, err := os.Stat(dirPath)
 60	if os.IsNotExist(err) {
 61		return bucket, fmt.Errorf("directory does not exist: %v %w", dirPath, err)
 62	}
 63
 64	if err != nil {
 65		return bucket, fmt.Errorf("directory error: %v %w", dirPath, err)
 66
 67	}
 68
 69	if !info.IsDir() {
 70		return bucket, fmt.Errorf("directory is a file, not a directory: %#v", dirPath)
 71	}
 72
 73	return bucket, nil
 74}
 75
 76func (s *StorageFS) UpsertBucket(name string) (Bucket, error) {
 77	s.Logger.Info("upsert bucket", "name", name)
 78	bucket, err := s.GetBucket(name)
 79	if err == nil {
 80		return bucket, nil
 81	}
 82
 83	dir := filepath.Join(s.Dir, name)
 84	s.Logger.Info("bucket not found, creating", "dir", dir, "err", err)
 85	err = os.MkdirAll(dir, os.ModePerm)
 86	if err != nil {
 87		return bucket, err
 88	}
 89
 90	return bucket, nil
 91}
 92
 93func (s *StorageFS) GetBucketQuota(bucket Bucket) (uint64, error) {
 94	dsize, err := dirSize(bucket.Path)
 95	return uint64(dsize), err
 96}
 97
 98// DeleteBucket will delete all contents regardless if files exist inside of it.
 99// This is different from minio impl which requires all files be deleted first.
100func (s *StorageFS) DeleteBucket(bucket Bucket) error {
101	return os.RemoveAll(bucket.Path)
102}
103
104func (s *StorageFS) GetObject(bucket Bucket, fpath string) (utils.ReadAndReaderAtCloser, *ObjectInfo, error) {
105	objInfo := &ObjectInfo{
106		Size:         0,
107		LastModified: time.Time{},
108		Metadata:     make(http.Header),
109		ETag:         "",
110	}
111
112	dat, err := os.Open(filepath.Join(bucket.Path, fpath))
113	if err != nil {
114		return nil, objInfo, err
115	}
116
117	info, err := dat.Stat()
118	if err != nil {
119		_ = dat.Close()
120		return nil, objInfo, err
121	}
122
123	etag := ""
124	// only generate etag if file is less than 10MB
125	if info.Size() <= int64(10*putils.MB) {
126		// calculate etag
127		h := md5.New()
128		if _, err := io.Copy(h, dat); err != nil {
129			_ = dat.Close()
130			return nil, objInfo, err
131		}
132		md5Sum := h.Sum(nil)
133		etag = hex.EncodeToString(md5Sum)
134
135		// reset os.File reader
136		_, err = dat.Seek(0, io.SeekStart)
137		if err != nil {
138			_ = dat.Close()
139			return nil, objInfo, err
140		}
141	}
142
143	objInfo.ETag = etag
144	objInfo.Size = info.Size()
145	objInfo.LastModified = info.ModTime()
146	objInfo.Metadata.Set("content-type", mime.GetMimeType(fpath))
147	return dat, objInfo, nil
148}
149
150func (s *StorageFS) PutObject(bucket Bucket, fpath string, contents io.Reader, entry *utils.FileEntry) (string, int64, error) {
151	loc := filepath.Join(bucket.Path, fpath)
152	err := os.MkdirAll(filepath.Dir(loc), os.ModePerm)
153	if err != nil {
154		return "", 0, err
155	}
156	out, err := renameio.NewPendingFile(loc)
157	if err != nil {
158		return "", 0, err
159	}
160
161	size, err := io.Copy(out, contents)
162	if err != nil {
163		return "", 0, err
164	}
165
166	if err := out.CloseAtomicallyReplace(); err != nil {
167		return "", 0, err
168	}
169
170	if entry.Mtime > 0 {
171		uTime := time.Unix(entry.Mtime, 0)
172		_ = os.Chtimes(loc, uTime, uTime)
173	}
174
175	return loc, size, nil
176}
177
178func (s *StorageFS) DeleteObject(bucket Bucket, fpath string) error {
179	loc := filepath.Join(bucket.Path, fpath)
180	err := os.Remove(loc)
181	if err != nil {
182		if os.IsNotExist(err) {
183			return nil
184		}
185		return err
186	}
187
188	// traverse up the folder tree and remove all empty folders
189	dir := filepath.Dir(loc)
190	for dir != "" {
191		f, err := os.Open(dir)
192		if err != nil {
193			s.Logger.Info("open dir", "dir", dir, "err", err)
194			break
195		}
196		defer func() {
197			_ = f.Close()
198		}()
199
200		// https://stackoverflow.com/a/30708914
201		contents, err := f.Readdirnames(-1)
202		if err != nil {
203			s.Logger.Info("read dir", "dir", dir, "err", err)
204			break
205		}
206		if len(contents) > 0 {
207			break
208		}
209
210		err = os.Remove(dir)
211		if err != nil {
212			s.Logger.Info("remove dir", "dir", dir, "err", err)
213			break
214		}
215		fp := strings.Split(dir, "/")
216		prefix := ""
217		if strings.HasPrefix(loc, "/") {
218			prefix = "/"
219		}
220		dir = prefix + filepath.Join(fp[:len(fp)-1]...)
221	}
222
223	return nil
224}
225
226func (s *StorageFS) ListBuckets() ([]string, error) {
227	entries, err := os.ReadDir(s.Dir)
228	if err != nil {
229		return []string{}, err
230	}
231
232	buckets := []string{}
233	for _, e := range entries {
234		if !e.IsDir() {
235			continue
236		}
237		buckets = append(buckets, e.Name())
238	}
239	return buckets, nil
240}
241
242func (s *StorageFS) ListObjects(bucket Bucket, dir string, recursive bool) ([]os.FileInfo, error) {
243	fileList := []os.FileInfo{}
244
245	fpath := path.Join(bucket.Path, dir)
246
247	info, err := os.Stat(fpath)
248	if err != nil {
249		if os.IsNotExist(err) {
250			return fileList, nil
251		}
252		return fileList, err
253	}
254
255	if info.IsDir() && !strings.HasSuffix(dir, "/") {
256		fileList = append(fileList, &utils.VirtualFile{
257			FName:    "",
258			FIsDir:   info.IsDir(),
259			FSize:    info.Size(),
260			FModTime: info.ModTime(),
261		})
262
263		return fileList, err
264	}
265
266	var files []utils.VirtualFile
267
268	if recursive {
269		err = filepath.WalkDir(fpath, func(s string, d fs.DirEntry, err error) error {
270			if err != nil {
271				return err
272			}
273			info, err := d.Info()
274			if err != nil {
275				return nil
276			}
277			fname := strings.TrimPrefix(s, fpath)
278			if fname == "" {
279				return nil
280			}
281			// rsync does not expect prefixed `/` so without this `rsync --delete` is borked
282			fname = strings.TrimPrefix(fname, "/")
283			files = append(files, utils.VirtualFile{
284				FName:    fname,
285				FIsDir:   info.IsDir(),
286				FSize:    info.Size(),
287				FModTime: info.ModTime(),
288			})
289			return nil
290		})
291		if err != nil {
292			fileList = append(fileList, info)
293			return fileList, nil
294		}
295	} else {
296		fls, err := os.ReadDir(fpath)
297		if err != nil {
298			fileList = append(fileList, info)
299			return fileList, nil
300		}
301		for _, d := range fls {
302			info, err := d.Info()
303			if err != nil {
304				continue
305			}
306			fp := info.Name()
307			files = append(files, utils.VirtualFile{
308				FName:    fp,
309				FIsDir:   info.IsDir(),
310				FSize:    info.Size(),
311				FModTime: info.ModTime(),
312			})
313		}
314	}
315
316	for _, f := range files {
317		fileList = append(fileList, &f)
318	}
319
320	return fileList, err
321}